inished == c.nFiles {
c.reducePhase()
} else if c.state == reduceType && c.finished == c.nReduce {
close(c.reduceChan)
c.state = done
}
}
return nil
}
func (c *Coordinator) reducePhase() {
//fmt.Printf("reduce phase\n")
close(c.mapChan)
c.state = reduceType
c.tasks = make([]*task, c.nReduce)
c.finished = 0
c.reduceChan = make(chan *task, c.nReduce)
for i := 0; i < c.nReduce; i++ {
c.tasks[i] = &task{id: i}
c.reduceChan <- c.tasks[i]
}
}
worker处会调用call("Coordinator.CallTaskDone", &args, &reply)
来报告某任务的完成
首先判断c.state != args.Tp
,即报告完成的任务类型和当前系统状态不匹配,可能发生在该情况:work-1请求了map-1任务,但是work-1运行太缓慢导致Coordinator监测到map-1任务超时,于是把map-1任务分配给了work-2。当所有map任务完成时,Coordinator进入了reduce阶段,这时work-1才报告map-1任务完成,与当前系统状态不匹配,故会直接返回
若该任务未完成,则将该任务标记未已完成,c.finished++
。
- 如果当前为map阶段并且所有map任务已完成
c.state == mapType && c.finished == c.nFiles
,则进入reduce阶段:
- 关闭map channel
- 将系统状态设为reduce
- 重置c.tasks为一系列reduce任务
- 创建长度为c.nReduce的reduce channel
- 放入任务
- 如果当前为reduce阶段并且所有map任务已完成
c.state == reduceType && c.finished == c.nReduce
,则进入done阶段:
- 关闭reduce channel
- 将系统状态设为done
3.2.5 监测超时任务goroutine
func (c *Coordinator) watch() {
for {
time.Sleep(time.Second)
c.mu.Lock()
if c.state == done {
return
}
for _, task := range c.tasks {
if task.state == executing && time.Since(task.start) > timeout {
task.state = spare
switch c.state {
case mapType:
c.mapChan <- task
case reduceType:
c.reduceChan <- task
}
}
}
c.mu.Unlock()
}
}
如果当前系统状态为done了,可以退出协程了
循环任务列表,如果该任务状态是正在执行但是超时了time.Since(task.start) > timeout
(time.Since可以计算系统当前时间距离start过去了多少时间),将该任务状态重置为空闲状态,并且根据系统当前状态,把该任务重新放入对应的channel中
3.3 Worker
3.3.1 主流程
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
args := CallTaskArgs{}
reply := CallTaskReply{}
ok := call("Coordinator.CallTask", &args, &reply)
//now := time.Now()
if ok {
switch reply.Tp {
case mapType:
executeMap(reply.FileName, reply.NReduce, reply.TaskID, mapf)
case reduceType:
executeReduce(reply.NFiles, reply.TaskID, reducef)
case waitting:
time.Sleep(time.Second * 2)
continue
case done:
os.Exit(0)
}
} else {
time.Sleep(time.Second * 2)
continue
}
//fmt.Printf("finish task: %v %v use %v\n", reply.TaskID, rs(reply.Tp), time.Since(now).Seconds())
a := CallTaskDoneArgs{reply.TaskID, reply.Tp}
r := CallTaskDoneReply{}
call("Coordinator.CallTaskDone", &a, &r)
time.Sleep(time.Second * 2)
}
}
首先向Coordinator发送请求任务rpc:
- map任务:执行
- reduce任务:执行
- waitting:当前Coordinator没有空闲任务,sleep一段时间再请求
- done:所有任务已完成,退出
任务执行完成后,报告任务完成
3.3.2 执行map任务
func executeMap(fileName string, nReduce, taskID int, mapf func(string, string) []KeyValue) {
file, err := os.Open(fileName)
if err != nil {
log.Fatalf("cannot open %v", fileName)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", fileName)
}
file.Close()
kva := mapf(fileName, string(content))
// 上面的代码参考mrsequential.go
files := []*os.File{}
tmpFileNames := []string{}
encoders := []*json.En