type GetTaskReply struct { TaskId int TaskType TaskType
// One-of-like fields for different task types. MapSpec struct { InputFile string NReduce int }
ReduceSpec struct { IReduce int NMap int } }
// // RPC for task completion notification. //
type CompleteTaskArgs struct { TaskId int WorkerId int // We don't need to explictly report the output files, // as the reduce worker can deduce them based on the task id. }
funcregisterSelf()error { for i := 0; i < 10/* retry threshold */ ; i++ { args := GetWorkerIdArgs{} reply := GetWorkerIdReply{}
ok := call("Coordinator.GetWorkerId", &args, &reply) if ok { selfId = reply.WorkerId returnnil }
// 1. The failure might be due to coordinator not ready yet. // We therefore retry after a short wait. // 2. Once the call still fails after several retries (here the threshold is 10), // we assume that the coordinator has crashed, and we exit the worker. time.Sleep(1 * time.Second); }
return errors.New("Registration failed.") }
funchandleTasks()error { for { args := GetTaskArgs{} reply := GetTaskReply{} ok := call("Coordinator.GetTask", &args, &reply) if !ok { // Assume coordinator has crashed. Exit worker. return errors.New("Failed to grab a new task") }
switch reply.TaskType { case TaskType_Map: // Once the map handler fails, the coordinator will re-schedule // as soon as the task is timeout. // So here we just continue to grab new tasks, no matter it succeeds or fails. mapHandler(reply.TaskId, reply.MapSpec.InputFile, reply.MapSpec.NReduce)
case TaskType_Reduce: reduceHandler(reply.TaskId, reply.ReduceSpec.IReduce, reply.ReduceSpec.NMap)
case TaskType_Wait: time.Sleep(500 * time.Millisecond)
case TaskType_Exit: returnnil
default: return fmt.Errorf("Unknown task type %v", reply.TaskType) } } }
// Execute the map function. intermediateKVs := mapFunc(inputFile, content)
// Prepare nReduce temporary files for each reduce partition. intermediateFiles, err := openTempFiles(taskId, nReduce) if err != nil { return err }
// Files will be closed at `closeAndRenameTempFiles`. // However, in case of any early return, we still place a defer here. // For FDs that has been closed, re-closing won't have any effect. defer closeTempFiles(intermediateFiles)
// Write intermediate KVs to corresponding temporary partition files. for _, kv := range intermediateKVs { partitionId := ihash(kv.Key) % nReduce f := intermediateFiles[partitionId] jsonEncoder := json.NewEncoder(f) err = jsonEncoder.Encode(&kv) if err != nil { return err } }
// Rename temporary files to final intermediate files. closeAndRenameTempFiles(taskId, intermediateFiles);
// Perform reduce operation for each key. var off int = 0 var nEntries int = len(kvs)
for off < nEntries { key := kvs[off].Key s, e := off, off + 1// [s, e) is the range for this key for e < nEntries && kvs[e].Key == key { e++ } off = e result := reduceFunc(key, selectValues(kvs[s:e])) fmt.Fprintf(outputTempFile, "%v %v\n", key, result); }
// Close and rename temporary output file. tempFilename := outputTempFile.Name() finalFilename := fmt.Sprintf("mr-out-%d", iReduce) outputTempFile.Close() os.Rename(tempFilename, finalFilename)
// Notify coordinator about task completion. err = notifyTaskCompletion(taskId); if err != nil { return err }
// NMap + NReduce tasks only, indexed by taskId. // Other types of tasks (e.g., waiting tasks) are not recorded here. Tasks []GetTaskReply TaskStatuses []TaskStatus SlotLocks []sync.Mutex
FinishedMapCount int FinishedReduceCount int FinishedCountLock sync.Mutex
// Queue of not-started tasks. NotStartedQueue *list.List NotStartedQueueLock sync.Mutex
// Queue of in-flight tasks. InFlightQueue *list.List InFlightQueueLock sync.Mutex
func(c *Coordinator)GetTask(args *GetTaskArgs, reply *GetTaskReply)error { // Handle in-flight tasks that have timed out. c.handleTimeoutTasks();
// Try to assign a not-started task. { c.NotStartedQueueLock.Lock() if c.NotStartedQueue.Len() > 0 { // Assign a not-started task. e := c.NotStartedQueue.Front() taskId := e.Value.(int) c.NotStartedQueue.Remove(e) c.NotStartedQueueLock.Unlock()
// Enqueue the task to in-flight queue. c.InFlightQueueLock.Lock() c.InFlightQueue.PushBack(InFlightTask{ TaskId: taskId, StartTime: time.Now().UnixNano(), }) c.InFlightQueueLock.Unlock()
// Mark the task as in-flight. c.SlotLocks[taskId].Lock() c.TaskStatuses[taskId] = TaskStatus_InFlight c.SlotLocks[taskId].Unlock()
*reply = c.Tasks[taskId] returnnil }
c.NotStartedQueueLock.Unlock() }
// No timeout tasks, and NotStartedQueue is empty. { c.FinishedCountLock.Lock()
if c.FinishedReduceCount < c.NReduce { // Job is not done yet. Ask worker to wait. reply.TaskType = TaskType_Wait } else { // All tasks are finished. Ask worker to exit. reply.TaskType = TaskType_Exit }
c.FinishedCountLock.Unlock() returnnil } }
func(c *Coordinator)handleTimeoutTasks() { // This function handles timed-out in-flight tasks. // - For a timed-out task, if it's not finished yet, // remove it from in-flight queue and enqueue it back to not-started queue. // - For a timedout task, if it's already finished, just remove it from in-flight queue.
c.InFlightQueueLock.Lock()
for c.InFlightQueue.Len() > 0 { e := c.InFlightQueue.Front() task := e.Value.(InFlightTask) now := time.Now().UnixNano() if task.StartTime + int64(time.Second * 10) >= now { break }
// Mark the task as not-started. c.SlotLocks[taskId].Lock() if c.TaskStatuses[taskId] == TaskStatus_Finished { c.SlotLocks[taskId].Unlock() } else { c.TaskStatuses[taskId] = TaskStatus_NotStarted c.SlotLocks[taskId].Unlock()
// Re-enqueue the task to not-started queue. c.NotStartedQueueLock.Lock() c.NotStartedQueue.PushBack(taskId) c.NotStartedQueueLock.Unlock() }
// Re-acquire the in-flight lock for the next iteration. c.InFlightQueueLock.Lock() }
c.NotStartedQueue = list.New() for i := 0; i < c.NMap; i++ { // Initially, only map tasks are in the not-started queue. c.NotStartedQueue.PushBack(i) }
// // main/mrcoordinator.go calls Done() periodically to find out // if the entire job has finished. // func(c *Coordinator)Done()bool { c.FinishedCountLock.Lock() defer c.FinishedCountLock.Unlock() return c.FinishedReduceCount == c.NReduce }
// // create a Coordinator. // main/mrcoordinator.go calls this function. // nReduce is the number of reduce tasks to use. // funcMakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{} c.init(files, nReduce) c.server() return &c }