MIT6.824 2020 Lab1 MapReduce 实现

准备工作

实验地址:http://nil.csail.mit.edu/6.824/2020/labs/lab-mr.html

论文地址:mapreduce

实验环境可以在实验地址里面找到具体的搭建方式。

系统总览

MapReduce 系统是由一个 master 进程和多个 worker 进程组成。

Master 负责任务状态的记录以及任务的分发。

Worker 负责不断向 master 请求任务,并根据任务的类型(map/reduce)进行处理,最后将任务结果发送给 master。

系统框架图如下:

process

系统流程图如下:

程序基本逻辑

Master

  • master 一开始只能分发 map 任务。
  • 当所有 map 任务执行完毕后,master 才开始分发 reduce 任务。
  • 当所有 map 和 reduce 任务执行完毕,master 退出。
  • 对于分发出去的任务,需要进行超时控制,即超时的任务需要重新分发处理。在完成分发任务的同时,对该任务运行一条检测任务超时的 go routine checkTaskTimeout

Worker

Worker 调用 GetTask RPC 接口不断向 master 请求任务。当接收到任务,根据任务的类型分类处理。处理完后,调用 CompleteTask 接口告知 master 任务执行完毕。

  • 如果是 map 任务,输入是单个文件,通过 mapf 处理后,使用 ihash(key) % nReduce 决定写入到哪个中间文件,输出是 nReduce 个中间文件。
  • 如果是 reduce 任务,输入是多个 map 输出的中间文件,通过 reducef 处理后,输出是单个文件。
  • 如果没有可执行的任务,则等待一下,继续轮询。

代码结构

Master 的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// master.go

type Task struct {
// Pending, Running and Completed
phase string
taskID int
taskType string
// for map, input path has only one element
inputPaths []string
// for reduce, output path has only one element
outputPaths []string
}

type Master struct {
nReduce int
mapTasks []*Task
reduceTasks []*Task
incompletedMapTaskCount int
incompletedReduceTaskCount int
reduceInitialized bool
mux sync.Mutex
}

Worker 主要是处理逻辑,官方实验文档以及 main/mrsequential.go 里面有例子。

RPC 的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// rpc.go

type GetTaskRequest struct{}

type GetTaskResponse struct {
TaskType string
TaskID int
TaskInputs []string
NReduce int
}

type CompleteTaskRequest struct {
TaskType string
TaskID int
TaskOutputs []string
}

type CompleteTaskResponse struct{}

以上是代码的结构,具体的代码细节在 github 上面。

踩过的坑

这里最主要的应该是对于并发的控制,以及 crash 的处理。

并发控制加锁可以完成。

crash 的处理是靠 master 的超时机制,以及在 worker 处理的时候,生成一个临时文件,在处理结束后再 rename 成最终的文件。

为了方便 debug,推荐使用 github.com/sirupsen/logrus 这个库。可以将 debug 等级设为 debug level 来输出自己的 debug 信息。

1
2
3
4
5
6
7
8
9
10
import (
log "github.com/sirupsen/logrus"
)

log.SetOutput(os.Stdout)
// log.SetLevel(log.DebugLevel)
log.SetLevel(log.WarnLevel)
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})

最终结果

这里的 FATAL 是 master 检测到所有任务完成后退出,worker 连接不上 master 而抛出的错误,是预期的。

结语

这个实验虽然只是一个小玩具,但还是有收获的。特别是看过论文后进行实验,对 mapreduce 一些细节的实现有更深的了解。有兴趣的同学可以自己完成一遍。