Massive Data Process Homework 2

仔细阅读MapReduce这篇论文,回答下面的问题。

分布式排序

在MapReduce执行的过程中,中间的shuffle过程是一个非常重要的过程。这个过程中其中一个使用的方法就是进行分布式的排序。依据你自己的理解,阐述分布式排序方法。只需要用伪代码写出在各个节点中的程序代码即可。

解: 采用分布式并行排序算法,算法步骤如下:

  • 对于n个分布式计算节点0,1,2,,,,n-1
  • 对每个节点,按采样率p随机抽取$x_i$条数据, $x_i = p * X_i$, 其中$X_i$表示编号为i的节点当前存储的数据数
  • 定义节点0为Primary节点,其余节点将各自的采样数据$x_i$发送给节点0,节点0根据采样数据的统计信息将数据划分为大致相等的n个数值范围,并将划分信息反馈给各个节点(即根据分布情况各个节点分别处理一定范围的数据)
  • 各个节点收到划分结果后对不属于本节点的数据发送给对应的节点,并进行排序
  • 各节点收到属于该范围的所有节点之后,对数据进行排序操作
  • 最终各个节点存储的是所有数据排序后的分割结果

MapReduce程序完成矩阵相乘的运算

一个非常大的矩阵需要使用MapReduce来完成相乘的运算,写出MapReduce的伪代码。

算法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
map(key, value, matrix):
/*
* matrix = "M" / "N"
* 矩阵M*N, M矩阵m*n, N矩阵n*p
* key: (i, j) 行列标签对
* value = a(i, j) 矩阵对应位置的数值
*/
i = key[0];
j = key[1];
if (matrix == "M")
for (k = 0 to m - 1)
EmitIntermediate((i, k), (value, j, matrix));
else:
for (k = 0 to n - 1)
EmitIntermediate((k, j), (value, i, matrix));

reduce((i, j), Value):
/
* key = i,j
* Value格式为: (value, pos, matrix),=即(数值,pos, M/N)
*/
result = 0;
for (pos = 0 to n-1)
result += Value[pos]["M"].value * Value[pos]["N"].value;
Emit(result);

分布式计算MapReduce

请仔细阅读下面这段关于MapReduce的伪代码。这段代码最终将会输出某一个大型文档集合中的出现次数最多的三个单词。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void Map(String DocID, Text content) {
HashMap<word, count> wordmap=new HashMap(……);
for each word in content{
if word not in wordmap
wordmap.put(word,1);
else
wordmap.get(word).count++;
}
Emit("donemap",wordmap);
}

void Reduce(String key, Iterator<HashMap> maps) {
HashMap<word, count> allwords = new HashMap(……);
List<word, count> wordlist = new List(……);

for map in maps{
for each (word, count) in map
if word not in allwords
allwords.put(word,count)
else
allwords.get(word)+=count;
}

for each (word, count) in allwords
wordlist.add(word,count);
sort(wordlist) on count;
Emit(wordlist.get(0));
Emit(wordlist.get(1));
Emit(wordlist.get(2));
}

小华同学运行上述的代码,发现部分reducers会出现OutOfMemoryException的错误。请结合代码分析其原因(不要指出语法错误)。

在该MAP函数中,map输出的(key,value)对的key均为”donemap”,故而在最终的reduce环节中,所有的数据中间(key, value)对会全部由某一个reducer进行处理,数据规模较大时必然超出内存

针对上述错误,你有什么修改方案?请简要说明你的修改方案。

词频统计阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void Map(String DocID, Text content) {
HashMap<word, count> wordmap=new HashMap(……);
for each word in content{
if word not in wordmap
wordmap.put(word,1);
else
wordmap.get(word).count++;
}
Emit(wordmap);
}

void Reduce(String key, Iterator<IntWriteable> counts) {
int sum = 0;
for count in counts
sum += count;
Emit((word, sum))
}

寻找Top 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void Map(key, value) {
/*
* key: 将统计结果分成若干组,每组M条数据(自行切割)
* value: (word, count)
*/
wordlist = value to list;
sort(wordlist) on count;
for i = 0 to 3
Emit("Final", (wordlist[i]));
}

void Reduce(String key, Iterator<HashMap> maps) {
HashMap<word, count> allwords = new HashMap(……);
List<word, count> wordlist = new List(……);

for map in maps{
for each (word, count) in map
if word not in allwords
allwords.put(word,count)
else
allwords.get(word)+=count;
}

for each (word, count) in allwords
wordlist.add(word,count);
sort(wordlist) on count;
Emit(wordlist.get(0));
Emit(wordlist.get(1));
Emit(wordlist.get(2));
}

改进方案是采用两轮MapReduce操作的做法,第一轮操作统计单词出现次数,输出(word,count)对,第二轮操作根据第一轮的统计结果找出频数的Top 3

第二轮操作时先将第一轮的统计结果分割成多个部分进行map,每个Map阶段先对本节点的数据按单词数count排序,然后输出前3个(key 一致), 然后reduce阶段将各节点输出的”前3”<word, count>对进行排序,输出前3个结果即为所求

打赏