6.5840 Lab 1 - MapReduce

开新坑刷 MIT 6.5840 Distributed Systems (Spring 2025),原 6.824。

本文是第一个实验的学习记录。

实验指导:6.5840 Lab 1: MapReduce

这个实验需要实现一个分布式的 MapReduce,包括 Coordinator 和 Worker。现实中的 MapReduce 一般是多机器跑的,这个实验里的是单机运行,用多进程来模拟「分布式」。Coordinator 和 Worker 之间通过 RPC 实现通信。

这里值得注意的是,在 MapReduce 的论文中 Reduce Worker 要用 RPC 找 Map Worker 拉取 intermediate output 作为 Reduce 的 input,在这个实验中是没有这个环节的,因为 Coordinator 和 Worker 在这个实验中通过共享一个目录和遵循同一个文件命名规则,避免了 RPC 拉文件这个环节。

做这个实验我其实没有看 video,只读了 MapReduce 的论文,这篇论文对完成这个实验是很重要的,尤其是 1-3 章。第 3 章讲了实现细节和容错方法,有些部分虽然在实验中没有涉及,但是通过阅读第三章可以窥见实现一个工业级的 MapReduce 需要面临 / 解决哪些问题,很值得一读。

Overview

在开始写 code 之前我们需要理解一些细节。

首先,一个 Worker 并不专门做某一种类型的工作(Map / Reduce)。这是看这张架构图最初可能会有的一个误区。这里需要理解:Worker 自己是不知道自己要做 Map 还是 Reduce 的,必须要等 Coordinator 分配一个工作给它,它才知道。

这个分配的过程是 Worker 主动问 Coordinator 要一个任务,Coordinator 按照当前的任务执行状态分配一个需要执行的任务给该 Worker。所以看一看出 assign map 和 assign reduce 是 Coordinator 给 Worker 的 RPC Reply。

Worker 要做的事情其实就是一个类似于一个消息循环 / 时间循环,更准确地说是轮询循环。它用一个死循环不停地问 Coordinator 下面要干什么,接着根据 Coordinator 分配的任务类型来决定怎么处理这个任务。

Map Phase 和 Reduce Phase 并不同时存在:只有在所有的 Map 任务都做完后,Coordinator 才会开始分配 Reduce 任务。那么一定会存在一些时候,有一些 Worker 没事干,等待其他 Worker 完成最后的 Map 任务。

Coordinator 需要维护哪些任务正在执行中,哪些任务还没开始做。超时未完成的任务要 re-schedule,这是 MapReduce 的容错机制之一。这个实验并不要求真正实现论文中的 Backup Tasks,看上去为了防止过于激进地 re-schedule,实验指导中说明了只需要对超时的 task 做 re-schedule,不要做非必要的 re-schedule(有一个专门的 test case 检查这件事情)。

RPC Contract

在这个 lab 里,Coordinator 和 Worker 合作的最核心的协议就是两个函数:GetTask 和 CompleteTask:Worker 调用 GetTask 向 Coordinator 请求一个具体的 task,然后执行;在这个 task 执行完毕后,Worker 使用 CompleteTask 告知 Coordinator 这个任务完成了。

对于 Map 任务,Coordinator 需要告诉 Worker 输入文件是什么以及 Reduce 任务的数量,以便一个 Map 任务生成 NReduce 个中间文件。

对于 Reduce 任务,Coordinator 需要告知 Reduce Worker 要取的中间文件的路径。这个路径本应该是 Map Worker 在告知任务完成的时候传给 Coordinator,Coordinator 再告知 Reduce Worker 「文件在哪台机器上的哪个路径下」,Worker 通过 RPC 来拉去这些文件。在这个实验中,由于单机运行加上共享文件目录,Ruduce Worker 只需要知道自己做的是哪个 Reduce 任务(iReduce)以及 Map 任务的总数,就可以构造出这些文件的文件名。

对于 Wait 和 Exit 任务,Coordinator 不需要告知 Worker 额外的信息。Coordinator 发现该分发的 Map / Reduce 任务已经全部分发,并且没有需要 re-schedule 的任务时,如果这个时候有 Worker 来 Get Task,会领取到 Wait 或者 Exit。等待 Map 全部完成、等待 Reduce 全部完成时,Coordinator 会要求空闲的 Worker Wait。全部完成后,Coordinator 会分发一个 Exit 任务,Worker 接收到 Exit 后退出进程。

起初,为了打 log 调试我还增加了一个 GetWorkerId:在 Worker 刚刚启动的时候,会通过这个接口让 Coordinator 分配一个 Worker ID。后来我发现用 PID 就可以。不过我还是保留了这个接口,因为在 Worker 和 Coordinator 同时启动的时候,Worker 如果成功地收到了 Coordinator 分配的 ID,说明 Coordinator 已经启动好了,接下来 Worker 就可以向 Coordinator 要任务了。这有点像是 TCP 中的握手,如果没有这个「握手」,可能 Coordinator 还没有准备好(比如 Coordinator 还没有开始监听 RPC 使用的 HTTP 端口),此时 Worker GetTask 发现连接不上 Coordinator,情况就会变得复杂:Worker 需要 Retry,需要分辨到底是 Coordinator 没准备好还是挂掉了,并且要做出反应,等等。有了这个「握手」,在代码中可以清晰地体现出,这个 Retry 只发生在 Worker 刚启动过的时候,当后面的 RPC 调用基于这个「握手」可以假定 Coordinator 已经是准备好的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 
// RPC for worker registration.
//

type GetWorkerIdArgs struct {
}

type GetWorkerIdReply struct {
WorkerId int
}

//
// RPC for task assignment.
//

type TaskType int

const (
TaskType_None TaskType = iota
TaskType_Map
TaskType_Reduce
TaskType_Wait
TaskType_Exit
)

type GetTaskArgs struct {
}

type GetTaskReply struct {
TaskId int
TaskType TaskType

// One-of-like fields for different task types.
MapSpec struct {
InputFile string
NReduce int
}

ReduceSpec struct {
IReduce int
NMap int
}
}

//
// RPC for task completion notification.
//

type CompleteTaskArgs struct {
TaskId int
WorkerId int
// We don't need to explictly report the output files,
// as the reduce worker can deduce them based on the task id.
}

type CompleteTaskReply struct {
}

Worker

Worker 框架

接下来我们来实现 Worker。Worker 要做的事情大致是这样的:先找 Coordinator 握手,然后进入轮询循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
//
// Worker id for this worker.
// Initialized to -1 to indicate unregistered state.
//

var selfId int = -1

//
// Map and Reduce functions for this worker, provided by the application.
//

var mapFunc func(string, string) []KeyValue = nil
var reduceFunc func(string, []string) string = nil

//
// main/mrworker.go calls this function.
//
func Worker(
mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

mapFunc, reduceFunc = mapf, reducef

if registerSelf() != nil {
os.Exit(1)
}

if handleTasks() != nil {
os.Exit(1)
}

os.Exit(0)
}

func registerSelf() error {
for i := 0; i < 10 /* retry threshold */ ; i++ {
args := GetWorkerIdArgs{}
reply := GetWorkerIdReply{}

ok := call("Coordinator.GetWorkerId", &args, &reply)

if ok {
selfId = reply.WorkerId
return nil
}

// 1. The failure might be due to coordinator not ready yet.
// We therefore retry after a short wait.
// 2. Once the call still fails after several retries (here the threshold is 10),
// we assume that the coordinator has crashed, and we exit the worker.
time.Sleep(1 * time.Second);
}

return errors.New("Registration failed.")
}

func handleTasks() error {
for {
args := GetTaskArgs{}
reply := GetTaskReply{}

ok := call("Coordinator.GetTask", &args, &reply)

if !ok {
// Assume coordinator has crashed. Exit worker.
return errors.New("Failed to grab a new task")
}

switch reply.TaskType {
case TaskType_Map:
// Once the map handler fails, the coordinator will re-schedule
// as soon as the task is timeout.
// So here we just continue to grab new tasks, no matter it succeeds or fails.
mapHandler(reply.TaskId, reply.MapSpec.InputFile, reply.MapSpec.NReduce)

case TaskType_Reduce:
reduceHandler(reply.TaskId, reply.ReduceSpec.IReduce, reply.ReduceSpec.NMap)

case TaskType_Wait:
time.Sleep(500 * time.Millisecond)

case TaskType_Exit:
return nil

default:
return fmt.Errorf("Unknown task type %v", reply.TaskType)
}
}
}

Map Handler

Map Handler 需要做这三步:读取输入、执行 Map Function、输出 nReduce 个中间文件。

读取

读取输入时,输入文件很小,可以直接 read 到内存里。真实世界中,这个过程往往是流式读取的,Splitting 是在 Map Task 启动之前由框架来完成的,每个 Map Task 负责处理一个分块,大约 16 - 64 MB 左右(如今随着硬件的发展这个块可以更大,比如 128 MB 左右),在 Map Task 中采用流式读取。

这里有个有意思的问题:起初我在想 16-64 MB 对于内存来讲并不是不可接受,为什么已经做了 Splitting 却不能把它直接 read 进内存里,我就去问 Gemini。Gemini 大概给了两个原因:当年 Google 采用的机器可能内存只有 2G / 4G,几十兆的内存是相当昂贵的;为了压榨 CPU,一台物理机上可能要同时跑很多 MapReduce 程序,这个时候会有并发放大;除此以外还需要考虑数据膨胀、GC 压力等,所以目前仍然认为直接 read 进内存是不可接受的,并且流式读取可以兼容很大的分块,比如 512 MB。

中间文件

输出 nReduce 个中间文件的时候,并不是一次性输出的,也是通过 append 的方式。这就意味着不能把 nReduce 个文件的内容先放在内存里。在论文中,中间文件是先存在 Worker 的磁盘上的,在 Reduce 阶段 Worker 之间通过 RPC 拉文件。这里 Map Handler 要提前打开这 nReduce 个文件,在全部写完之后再关闭这些文件。

Atomic Rename

在 Map Task 执行的过程中,这些输出文件全部是临时的,在 Map Task 执行结束后再 atomic rename 成最终的中间文件。

这个 atomic 指的是:操作要么完全成功,要么完全失败,不存在「移动了一半数据」或者「文件名改了但内容没过去」这种中间状态;对于外部观察者(比如其他的 Reader),在重命名发生的瞬间,他们看到的要么是旧文件,要么是新文件。在本地可以通过 os.CreateTempFile + os.Rename 的机制来实现。

这里有两个值得注意的地方:

  • os.CreateTempFile 的文件名要和最终的文件名区别开来。实验指导中推荐的中间文件名称是 mr-{taskId}-{iReduce},那么临时文件可以命名成 mr-temp-{taskId}-{iReduce}
  • 光这样做仍然是不够的,考虑这样一种情况:Worker A 做某个任务超时了,Coordinator re-schedule 了这个任务给 Worker B。由于 taskId 没变,此时 Worker A 和 Worker B 创建的临时文件都叫 mr-temp-{taskId}-{iReduce},这里会产生冲突。这里我们可以给文件生成一个随机后缀来解决这个问题。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
//
// Handler for map tasks.
//

func mapHandler(taskId int, inputFile string, nReduce int) error {
// Read input file.
contentBytes, err := os.ReadFile(inputFile)
if err != nil {
return err
}

content := string(contentBytes)

// Execute the map function.
intermediateKVs := mapFunc(inputFile, content)

// Prepare nReduce temporary files for each reduce partition.
intermediateFiles, err := openTempFiles(taskId, nReduce)
if err != nil {
return err
}

// Files will be closed at `closeAndRenameTempFiles`.
// However, in case of any early return, we still place a defer here.
// For FDs that has been closed, re-closing won't have any effect.
defer closeTempFiles(intermediateFiles)

// Write intermediate KVs to corresponding temporary partition files.
for _, kv := range intermediateKVs {
partitionId := ihash(kv.Key) % nReduce
f := intermediateFiles[partitionId]
jsonEncoder := json.NewEncoder(f)
err = jsonEncoder.Encode(&kv)
if err != nil {
return err
}
}

// Rename temporary files to final intermediate files.
closeAndRenameTempFiles(taskId, intermediateFiles);

err = notifyTaskCompletion(taskId);
if err != nil {
return err
}

return nil
}

func openTempFiles(taskId int, nReduce int) ([]*os.File, error) {
tempFiles := make([]*os.File, nReduce)
for i := 0; i < nReduce; i++ {
// Generate random suffix to avoid name collision
// when one task is re-executed.
filename := fmt.Sprintf("mr-temp-%d-%d-*", taskId, i)
f, err := os.CreateTemp(".", filename)
if err != nil {
return nil, err
}

tempFiles[i] = f
}

return tempFiles, nil
}

func closeTempFiles(tempFiles []*os.File) {
for _, f := range tempFiles {
if f != nil {
f.Close()
}
}
}

func closeAndRenameTempFiles(taskId int, tempFiles []*os.File) error {
nReduce := len(tempFiles)
for i := 0; i < nReduce; i++ {
tempFilename := tempFiles[i].Name()
finalFilename := fmt.Sprintf("mr-%d-%d", taskId, i)
tempFiles[i].Close() // Close the temporary file before renaming.
err := os.Rename(tempFilename, finalFilename)
if err != nil {
return err
}
}

return nil
}

func notifyTaskCompletion(taskId int) error {
args := CompleteTaskArgs{
WorkerId: selfId,
TaskId: taskId,
}
reply := CompleteTaskReply{}

ok := call("Coordinator.CompleteTask", &args, &reply)
if !ok {
return errors.New("Failed to notify task completion")
}

return nil
}

Reduce Handler

Reduce Handler 要做的事情是对 Map Task 中间文件生成的 KV 进行排序、对 key 相同的 KV 做聚合,最终把聚合好的结果写入输出文件。

专注在 RPC 和容错,实验依旧用直接把中间内存读入文件的方式代替了流式处理。这里的输出文件也一样需要用到 Atomic Rename,本地文件的 Atomic Rename 和 Map 中提到的流程一样,由 kernel 中的 inode switch 机制来保证原子性。

排序

首先要理解为什么要排序。Map 做的事情其实的本质是 group by,一个 Map 任务会把哈希值相同的 key 输出到同一个中间文件里。Reduce 的本质是 aggregate,但是我们是要对 key 做 aggragate,而不是对 hash(key) 做 aggregate。Reduce 能读到的中间文件中所有 key 的 hash(key) 都是一致的,但存在不止一个 key,我们要做的就是再次「group by」,这里就需要用到排序(有时候可能还会用到外排序)。我们把 key 相同的 KV 排在一个连续的区间里,这样方便后面作为一个 batch 扔给 Reduce。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
//
// Handler for reduce tasks.
//

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func selectValues(a []KeyValue) []string {
values := make([]string, len(a))
for i, kv := range a {
values[i] = kv.Value
}
return values
}

func reduceHandler(taskId int, iReduce int, nMap int) error {
// Read and sort intermediate KVs.
// Here, for a specified iReduce, there are always:
// - nMap intermediate files.
// - Multiple keys to handle. (A sort is therefore needed.)
kvs, err := iReduceKVs(iReduce, nMap)
if err != nil {
return err
}

sort.Sort(ByKey(kvs))

// Create output temporary file.
outputTempFile, err := os.CreateTemp(".", fmt.Sprintf("mr-out-%d-*", iReduce))
if err != nil {
return err
}
defer outputTempFile.Close()

// Perform reduce operation for each key.
var off int = 0
var nEntries int = len(kvs)

for off < nEntries {
key := kvs[off].Key
s, e := off, off + 1 // [s, e) is the range for this key
for e < nEntries && kvs[e].Key == key {
e++
}
off = e
result := reduceFunc(key, selectValues(kvs[s:e]))
fmt.Fprintf(outputTempFile, "%v %v\n", key, result);
}

// Close and rename temporary output file.
tempFilename := outputTempFile.Name()
finalFilename := fmt.Sprintf("mr-out-%d", iReduce)
outputTempFile.Close()
os.Rename(tempFilename, finalFilename)

// Notify coordinator about task completion.
err = notifyTaskCompletion(taskId);
if err != nil {
return err
}

return nil
}

func iReduceKVs(iReduce int, nMap int) ([]KeyValue, error) {
kvs := []KeyValue{}

for i := 0; i < nMap; i++ {
filename := fmt.Sprintf("mr-%d-%d", i, iReduce)
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()

jsonDecoder := json.NewDecoder(f)
for {
var kv KeyValue
err = jsonDecoder.Decode(&kv)
if err == io.EOF {
break
}

if err != nil {
return nil, err
}

kvs = append(kvs, kv)
}
}

return kvs, nil
}

Coordinator

Coordinator 要做的事情是:处理握手、分配任务以及维护任务的执行状态(处理已完成的任务和需要超时重做的任务)。

我们在实现 Coordinator 的时候要注意它是会被并发调用的,所以要处理 race condition。

状态管理

这里我在 Coordinator 里维护了两个队列:

  • NotStartedQueue - 维护还没有开始做的任务;
  • InFlightQueue - 维护已经分发出去但是没有收到运行结果的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type InFlightTask struct {
TaskId int
StartTime int64
}

type TaskStatus int

const (
TaskStatus_NotStarted TaskStatus = iota
TaskStatus_InFlight
TaskStatus_Finished
)

type Coordinator struct {
NMap int
NReduce int

// NMap + NReduce tasks only, indexed by taskId.
// Other types of tasks (e.g., waiting tasks) are not recorded here.
Tasks []GetTaskReply
TaskStatuses []TaskStatus
SlotLocks []sync.Mutex

FinishedMapCount int
FinishedReduceCount int
FinishedCountLock sync.Mutex

// Queue of not-started tasks.
NotStartedQueue *list.List
NotStartedQueueLock sync.Mutex

// Queue of in-flight tasks.
InFlightQueue *list.List
InFlightQueueLock sync.Mutex

nextWorkerId int
workerIdLock sync.Mutex
}

在每次做任务分发前(即 Coordinator 要回应一个 GetTask 请求之前),首先要为 InFlightQueue 维护这样一个不变的状态:InFlightQueue 里只存在未超时的任务,超时的任务需要重新回到 NotStartedQueue。

  • 然后,再开始从 NotStartedQueue 中取出队头元素,分配给 Worker。InFlightQueue 中的元素永远是根据分发时间入队的,所以它的头部的元素永远是「最有可能超时的任务」。
  • 如果此时 NotStartedQueue 中没有元素了,就检查任务是不是完全做完了,如果还没有做完就返回一个 TaskType_Wait,让当前的 Worker 等待其他 Worker,否则返回 TaskType_Exit 命令 Worker 进程关闭。这里的「完全做完了」指的是 Reduce 任务完全做完了。

这里要注意要给队列上锁,因为他们会被并发访问到。并且这里要注意避免死锁:如果有锁嵌套的话,进入顺序必须一致;或者干脆避免嵌套的锁。

两个阶段

任务执行需要被分成 Map Phase 和 Reduce Phase,在所有的 Map Phase 的任务做完之前,Reduce Phase 的任务是不能入队的。我们需要维护已经完成的 Map 任务的数量,每次有新的 Map 任务被完成的时候给这个数量自增,发现它达到 NMap 的时候就把所有 Reduce Tasks 推入 NotStartedQueue,新阶段开始。

代码实现

GetTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
//
// RPC: GetTask
//

func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
// Handle in-flight tasks that have timed out.
c.handleTimeoutTasks();

// Try to assign a not-started task.
{
c.NotStartedQueueLock.Lock()

if c.NotStartedQueue.Len() > 0 {
// Assign a not-started task.
e := c.NotStartedQueue.Front()
taskId := e.Value.(int)
c.NotStartedQueue.Remove(e)
c.NotStartedQueueLock.Unlock()

// Enqueue the task to in-flight queue.
c.InFlightQueueLock.Lock()
c.InFlightQueue.PushBack(InFlightTask{
TaskId: taskId,
StartTime: time.Now().UnixNano(),
})
c.InFlightQueueLock.Unlock()

// Mark the task as in-flight.
c.SlotLocks[taskId].Lock()
c.TaskStatuses[taskId] = TaskStatus_InFlight
c.SlotLocks[taskId].Unlock()

*reply = c.Tasks[taskId]
return nil
}

c.NotStartedQueueLock.Unlock()
}

// No timeout tasks, and NotStartedQueue is empty.
{
c.FinishedCountLock.Lock()

if c.FinishedReduceCount < c.NReduce {
// Job is not done yet. Ask worker to wait.
reply.TaskType = TaskType_Wait
} else {
// All tasks are finished. Ask worker to exit.
reply.TaskType = TaskType_Exit
}

c.FinishedCountLock.Unlock()
return nil
}
}

func (c *Coordinator) handleTimeoutTasks() {
// This function handles timed-out in-flight tasks.
// - For a timed-out task, if it's not finished yet,
// remove it from in-flight queue and enqueue it back to not-started queue.
// - For a timedout task, if it's already finished, just remove it from in-flight queue.

c.InFlightQueueLock.Lock()

for c.InFlightQueue.Len() > 0 {
e := c.InFlightQueue.Front()
task := e.Value.(InFlightTask)
now := time.Now().UnixNano()
if task.StartTime + int64(time.Second * 10) >= now {
break
}

taskId := task.TaskId
c.InFlightQueue.Remove(e)

// Temporarily unlock in-flight lock to avoid deadlock / unnecessary nested locks.
c.InFlightQueueLock.Unlock()

// Mark the task as not-started.
c.SlotLocks[taskId].Lock()
if c.TaskStatuses[taskId] == TaskStatus_Finished {
c.SlotLocks[taskId].Unlock()
} else {
c.TaskStatuses[taskId] = TaskStatus_NotStarted
c.SlotLocks[taskId].Unlock()

// Re-enqueue the task to not-started queue.
c.NotStartedQueueLock.Lock()
c.NotStartedQueue.PushBack(taskId)
c.NotStartedQueueLock.Unlock()
}

// Re-acquire the in-flight lock for the next iteration.
c.InFlightQueueLock.Lock()
}

c.InFlightQueueLock.Unlock()
}

CompleteTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//
// RPC: CompleteTask
//

func (c *Coordinator) CompleteTask(args *CompleteTaskArgs, reply *CompleteTaskReply) error {
taskId := args.TaskId

c.SlotLocks[taskId].Lock()
if c.TaskStatuses[taskId] == TaskStatus_Finished {
c.SlotLocks[taskId].Unlock()
return nil
}

// 1. Mark the task as finished.
c.TaskStatuses[taskId] = TaskStatus_Finished
c.SlotLocks[taskId].Unlock()

// 2. Maintain counting.
c.FinishedCountLock.Lock()
if taskId < c.NMap {
c.FinishedMapCount++
if (c.FinishedMapCount == c.NMap) {
// Enqueue reduce tasks once all map tasks are finished.
c.FinishedCountLock.Unlock()
c.enqueueReduceTasks()
} else {
c.FinishedCountLock.Unlock()
}
} else {
c.FinishedReduceCount++
c.FinishedCountLock.Unlock()
}

return nil
}

func (c* Coordinator) enqueueReduceTasks() {
c.NotStartedQueueLock.Lock()
defer c.NotStartedQueueLock.Unlock()

for i := 0; i < c.NReduce; i++ {
c.NotStartedQueue.PushBack(c.NMap + i)
}
}

其他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (c *Coordinator) init(files []string, nReduce int) {
c.NMap = len(files)
c.NReduce = nReduce

c.Tasks = make([]GetTaskReply, c.NMap + c.NReduce)
for i := 0; i < c.NMap; i++ {
c.Tasks[i].TaskId = i;
c.Tasks[i].TaskType = TaskType_Map
c.Tasks[i].MapSpec.InputFile = files[i]
c.Tasks[i].MapSpec.NReduce = nReduce
}

for i := 0; i < c.NReduce; i++ {
c.Tasks[c.NMap + i].TaskId = c.NMap + i
c.Tasks[c.NMap + i].TaskType = TaskType_Reduce
c.Tasks[c.NMap + i].ReduceSpec.IReduce = i
c.Tasks[c.NMap + i].ReduceSpec.NMap = c.NMap
}

c.TaskStatuses = make([]TaskStatus, c.NMap + c.NReduce)
for i := 0; i < c.NMap + c.NReduce; i++ {
c.TaskStatuses[i] = TaskStatus_NotStarted
}

c.SlotLocks = make([]sync.Mutex, c.NMap + c.NReduce)
c.FinishedMapCount = 0
c.FinishedReduceCount = 0

c.NotStartedQueue = list.New()
for i := 0; i < c.NMap; i++ {
// Initially, only map tasks are in the not-started queue.
c.NotStartedQueue.PushBack(i)
}

c.InFlightQueue = list.New()
c.nextWorkerId = 0
}

//
// RPC: GetWorkerId
//

func (c *Coordinator) GetWorkerId(args *GetTaskArgs, reply *GetWorkerIdReply) error {
c.workerIdLock.Lock()
reply.WorkerId = c.nextWorkerId
c.nextWorkerId++
c.workerIdLock.Unlock()
return nil
}

//
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
c.FinishedCountLock.Lock()
defer c.FinishedCountLock.Unlock()
return c.FinishedReduceCount == c.NReduce
}

//
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
c.init(files, nReduce)
c.server()
return &c
}

测试结果