Lab_1_MapReduce

开个头,之后慢慢补。

背景

6.824
MapReduce 介绍

Part I

完成 mapreduce 的 Sequential 版本,主要是 doMap 和 doReduce 函数,它们分别在 common_map.go 和 common_reduce.go 中定义。
程序从 test_test.go 进入,先产生 input 文件,根据参数调用测试函数;
调用 master.go 的 Sequential 函数,Sequential 函数顺序地执行 nMap 个 mapTask,一个输入文件一个 mapper,每个 mapper 将产生的数据随机放入 nReduce 个文件中,然后顺序执行 nReduce 个 reduceTask,每个 reducer 读取分配给它的数据文件,排序数据再调用 reduce 函数。
排序因为目标数据是数字字符串,所以策略是先转换成数字再做比较:

1
k1, _ := strconv.Atoi(kv1.Key);k2, _ := strconv.Atoi(kv2.Key);return k1 < k2;

例子 1 1 个 mapper1 个 reducer
mapper 将数据输入到中间文件中,后 reducer 读出来

例子 2 5 个 mapper3 个 reducer
1.Makeinput 函数产生 5 个 input 文件,分别包括 0 - 19999、20000 - 39999、40000 - 59999、60000 - 79999、80000 - 99999;
lab1_part1_1
2.mapper0 处理文件 0,将 0 - 19999 随机分散到 3 个文件中,文件名显示该 mapper 号和其指定的 reduce 号,其他 mapper 同理;
lab1_part1_2
3.reducer0 处理分配给它的数据文件,即 0-0、1-0、2-0、3-0、4-0,分别包括 0 - 19999、20000 - 39999、40000 - 59999、60000 - 79999、80000 - 99999 的数据,reducer 读取这些数据然后整合成一个 res 文件,其他 reducer 同理;
lab1_part1_3

结果

Part II

问题,reduce 函数需要统计 key 相同的项,所以 doReduce 函数在排序后,需要将相邻的 key 相同的项组织成一个[]string。修改后的 reduce 部分:

1
l := len(kvs);values := []string{kvs[0].Value};for i := 1; i < l; i++{if kvs[i].Key == kvs[i - 1].Key{    values = append(values, kvs[i].Value);} else{    enc.Encode(KeyValue{kvs[i - 1].Key, reduceF(kvs[i - 1].Key, values)});    values = nil;    values = append(values, kvs[i].Value);}}enc.Encode(KeyValue{kvs[l - 1].Key, reduceF(kvs[l - 1].Key, values)});

上题排序函数因为是给数字字符串排序所以采取的策略是”先转换成数字再比较数字大小排序”,而本题需要给 word 排序,也即按字母序排序:

1
return strings.Compare(kv1.Key, kv2.Key) < 0;

Part III

part II 的 distributed 版本,使用 golang 的 RPC 和 channel 模拟现实环境。
master 和 worker 通过 RPC 交互,master.go 中的 run()调用 schedule.go 中的 schedule()来分派任务给 worker,schedule()先 doMap 然后 doReduce 最后调用 master_splitmerge.go 中的 merge()来将中间文件组织成最终结果。
根据需要 schedule 需要等待 worker 来才能执行 task,那么这些 worker 从哪里来?
首先看本题的测试函数 TestBasic,它调用了 worker.go 中的 RunWorker()来生成 worker
RunWorker()内部 new 一个 wk 并初始化后,调用了 wk.register(MasterAddress),这个方法使用 RPC 方式调用 Master.Register 向 master 传这个 wk 的 name 以告知该 wk 空闲准备接受 task,同时 wk.register(MasterAddress)函数在 rpc server 注册了 wk,wk.name 就是其地址,这样 schedule 可以使用 RPC 来远程调用 wc.DoTask();(但是有个问题,官方文档说不可注册同类型的多个对象,但是这里注册的多个 worker 都是相同的类型,为什么?)
run 函数内部先调用 schedule(map)进行 map 处理,本题有 100 个文件,也就是说有 100 个 task,而 worker 只有 2 个(这里的 worker 和 mapper 不是一回事),每个 task 都需要等待空闲的 worker,在处理完毕后即释放 worker;

Part IV: Handling worker failures

处理 worker 发生的错误。通过检测 RPC 是否超时可以判断 worker 是否出现错误,当 master 通过 RPC 分配给 worker 的任务失败,这个任务需要重新分配给其他 worker。
超时错误发生在 RPC 调用,本程序的 RPC 调用由 common_rpc.go 中的 call()函数负责,若发生超时它将返回 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// schedule starts and waits for all tasks in the given phase (Map or Reduce).
func (mr *Master) schedule(phase jobPhase) {
var ntasks int
var nios int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mr.files)
nios = mr.nReduce
case reducePhase:
ntasks = mr.nReduce
nios = len(mr.files)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios)

// All ntasks tasks have to be scheduled on workers, and only once all of
// them have been completed successfully should the function return.
// Remember that workers may fail, and that any given worker may finish
// multiple tasks.
//
// TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO
//

   /*
       schedule只需要将输入文件名mr.files[task]和任务task告诉worker,worker知道应该从哪个文件输入哪个文件输出

       worker上线后调用Master.Register并通过mr.registerChannel来传递新worker的信息,schedule需要读取这个channel
       master通过调用Worker.DoTask(DoTaskArgs)来通知worker新任务,Master结构体保存了当前运行job的信息,但是它不知道Map和Reduce函数的内部情况,那是worker自己的事
       worker处理结束后需要重新调用mr.registerChannel <- args.Worker,表示这个worker重新空闲
   */
   /*
   for task := 0; task < ntasks; task++{
       worker := <-mr.registerChannel;
       args := DoTaskArgs{mr.jobName, mr.files[task], phase, task, nios};
       ok := call(worker, "Worker.DoTask", args, new(struct{}))
       if ok == false {
           fmt.Printf("DoTask: RPC %s register error\n", worker);
       }
       mr.registerChannel <- worker;
   }
   */
   sem := make(chan int);
   var deal func(int) = func(task int) {
       worker := <-mr.registerChannel;
       args := DoTaskArgs{mr.jobName, mr.files[task], phase, task, nios};
       // 通知worker工作
       okDo := call(worker, "Worker.DoTask", args, new(struct{}));
       if okDo == false {
           fmt.Printf("DoTask: RPC %s register error\n", worker);
           // 重新调用一次
           deal(task);
       }
       // 通知master有新worker空闲了
       argsmr := RegisterArgs{worker};
       okRg := call(mr.address, "Master.Register", argsmr, new(struct{}));
       if okRg == false {
           fmt.Printf("Register: RPC %s register error", worker);
       }
       // 不能使用下面这行!!!
       // mr.registerChannel <- worker;
       if okDo == true {
           sem <- 1;
       }
   };
   for task := 0; task < ntasks; task++ {
       go deal(task);
   }
   for i := 0; i < ntasks - 1; i++ {
       <-sem;
   }
fmt.Printf("Schedule: %v phase done\n", phase)
}

在发生错误时重新调用 deal 来寻找另一个 worker 来解决这个 task,这个 task 解决掉后才 sem <- 1。

Part V: Inverted index generation

实验 2 使用 mapreduce 来统计文件中各单词的出现次数,这是个经典的例子但是基本没有实际价值,接下来实验 5 要使用 mapreduce 实现一个倒排索引;
倒排索引广泛应用于文献查找领域,基本需要实现输入一个 key 返回这个 key 的出处,所以不仅要统计单词的出现次数,还要记录单词的出现位置(文件名);

Result

lab1_result