Lab_1_MapReduce
开个头,之后慢慢补。
背景
Part I
完成 mapreduce 的 Sequential 版本,主要是 doMap 和 doReduce 函数,它们分别在 common_map.go 和 common_reduce.go 中定义。
程序从 test_test.go 进入,先产生 input 文件,根据参数调用测试函数;
调用 master.go 的 Sequential 函数,Sequential 函数顺序地执行 nMap 个 mapTask,一个输入文件一个 mapper,每个 mapper 将产生的数据随机放入 nReduce 个文件中,然后顺序执行 nReduce 个 reduceTask,每个 reducer 读取分配给它的数据文件,排序数据再调用 reduce 函数。
排序因为目标数据是数字字符串,所以策略是先转换成数字再做比较:
1 | k1, _ := strconv.Atoi(kv1.Key);k2, _ := strconv.Atoi(kv2.Key);return k1 < k2; |
例子 1 1 个 mapper1 个 reducer
mapper 将数据输入到中间文件中,后 reducer 读出来
例子 2 5 个 mapper3 个 reducer
1.Makeinput 函数产生 5 个 input 文件,分别包括 0 - 19999、20000 - 39999、40000 - 59999、60000 - 79999、80000 - 99999;
2.mapper0 处理文件 0,将 0 - 19999 随机分散到 3 个文件中,文件名显示该 mapper 号和其指定的 reduce 号,其他 mapper 同理;
3.reducer0 处理分配给它的数据文件,即 0-0、1-0、2-0、3-0、4-0,分别包括 0 - 19999、20000 - 39999、40000 - 59999、60000 - 79999、80000 - 99999 的数据,reducer 读取这些数据然后整合成一个 res 文件,其他 reducer 同理;
结果
Part II
问题,reduce 函数需要统计 key 相同的项,所以 doReduce 函数在排序后,需要将相邻的 key 相同的项组织成一个[]string。修改后的 reduce 部分:
1 | l := len(kvs);values := []string{kvs[0].Value};for i := 1; i < l; i++{if kvs[i].Key == kvs[i - 1].Key{ values = append(values, kvs[i].Value);} else{ enc.Encode(KeyValue{kvs[i - 1].Key, reduceF(kvs[i - 1].Key, values)}); values = nil; values = append(values, kvs[i].Value);}}enc.Encode(KeyValue{kvs[l - 1].Key, reduceF(kvs[l - 1].Key, values)}); |
上题排序函数因为是给数字字符串排序所以采取的策略是”先转换成数字再比较数字大小排序”,而本题需要给 word 排序,也即按字母序排序:
1 | return strings.Compare(kv1.Key, kv2.Key) < 0; |
Part III
part II 的 distributed 版本,使用 golang 的 RPC 和 channel 模拟现实环境。
master 和 worker 通过 RPC 交互,master.go 中的 run()调用 schedule.go 中的 schedule()来分派任务给 worker,schedule()先 doMap 然后 doReduce 最后调用 master_splitmerge.go 中的 merge()来将中间文件组织成最终结果。
根据需要 schedule 需要等待 worker 来才能执行 task,那么这些 worker 从哪里来?
首先看本题的测试函数 TestBasic,它调用了 worker.go 中的 RunWorker()来生成 worker
RunWorker()内部 new 一个 wk 并初始化后,调用了 wk.register(MasterAddress),这个方法使用 RPC 方式调用 Master.Register 向 master 传这个 wk 的 name 以告知该 wk 空闲准备接受 task,同时 wk.register(MasterAddress)函数在 rpc server 注册了 wk,wk.name 就是其地址,这样 schedule 可以使用 RPC 来远程调用 wc.DoTask();(但是有个问题,官方文档说不可注册同类型的多个对象,但是这里注册的多个 worker 都是相同的类型,为什么?)
run 函数内部先调用 schedule(map)进行 map 处理,本题有 100 个文件,也就是说有 100 个 task,而 worker 只有 2 个(这里的 worker 和 mapper 不是一回事),每个 task 都需要等待空闲的 worker,在处理完毕后即释放 worker;
Part IV: Handling worker failures
处理 worker 发生的错误。通过检测 RPC 是否超时可以判断 worker 是否出现错误,当 master 通过 RPC 分配给 worker 的任务失败,这个任务需要重新分配给其他 worker。
超时错误发生在 RPC 调用,本程序的 RPC 调用由 common_rpc.go 中的 call()函数负责,若发生超时它将返回 false。
1 | // schedule starts and waits for all tasks in the given phase (Map or Reduce). |
在发生错误时重新调用 deal 来寻找另一个 worker 来解决这个 task,这个 task 解决掉后才 sem <- 1。
Part V: Inverted index generation
实验 2 使用 mapreduce 来统计文件中各单词的出现次数,这是个经典的例子但是基本没有实际价值,接下来实验 5 要使用 mapreduce 实现一个倒排索引;
倒排索引广泛应用于文献查找领域,基本需要实现输入一个 key 返回这个 key 的出处,所以不仅要统计单词的出现次数,还要记录单词的出现位置(文件名);