设为首页 加入收藏

TOP

mit 6.824 lab1分析(四)
2023-07-23 13:30:33 】 浏览:73
Tags:mit 6.824lab1分析
inished == c.nFiles { c.reducePhase() } else if c.state == reduceType && c.finished == c.nReduce { close(c.reduceChan) c.state = done } } return nil } func (c *Coordinator) reducePhase() { //fmt.Printf("reduce phase\n") close(c.mapChan) c.state = reduceType c.tasks = make([]*task, c.nReduce) c.finished = 0 c.reduceChan = make(chan *task, c.nReduce) for i := 0; i < c.nReduce; i++ { c.tasks[i] = &task{id: i} c.reduceChan <- c.tasks[i] } }

worker处会调用call("Coordinator.CallTaskDone", &args, &reply)来报告某任务的完成

首先判断c.state != args.Tp,即报告完成的任务类型和当前系统状态不匹配,可能发生在该情况:work-1请求了map-1任务,但是work-1运行太缓慢导致Coordinator监测到map-1任务超时,于是把map-1任务分配给了work-2。当所有map任务完成时,Coordinator进入了reduce阶段,这时work-1才报告map-1任务完成,与当前系统状态不匹配,故会直接返回

若该任务未完成,则将该任务标记未已完成,c.finished++

  1. 如果当前为map阶段并且所有map任务已完成c.state == mapType && c.finished == c.nFiles,则进入reduce阶段:
    1. 关闭map channel
    2. 将系统状态设为reduce
    3. 重置c.tasks为一系列reduce任务
    4. 创建长度为c.nReduce的reduce channel
    5. 放入任务
  2. 如果当前为reduce阶段并且所有map任务已完成c.state == reduceType && c.finished == c.nReduce,则进入done阶段:
    1. 关闭reduce channel
    2. 将系统状态设为done

3.2.5 监测超时任务goroutine

func (c *Coordinator) watch() {
	for {
		time.Sleep(time.Second)
		c.mu.Lock()
		if c.state == done {
			return
		}
		for _, task := range c.tasks {
			if task.state == executing && time.Since(task.start) > timeout {
				task.state = spare
				switch c.state {
				case mapType:
					c.mapChan <- task
				case reduceType:
					c.reduceChan <- task
				}
			}
		}
		c.mu.Unlock()
	}
}

如果当前系统状态为done了,可以退出协程了

循环任务列表,如果该任务状态是正在执行但是超时了time.Since(task.start) > timeout(time.Since可以计算系统当前时间距离start过去了多少时间),将该任务状态重置为空闲状态,并且根据系统当前状态,把该任务重新放入对应的channel中

3.3 Worker

3.3.1 主流程

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	for {
		args := CallTaskArgs{}
		reply := CallTaskReply{}
		ok := call("Coordinator.CallTask", &args, &reply)
		//now := time.Now()
		if ok {
			switch reply.Tp {
			case mapType:
				executeMap(reply.FileName, reply.NReduce, reply.TaskID, mapf)
			case reduceType:
				executeReduce(reply.NFiles, reply.TaskID, reducef)
			case waitting:
				time.Sleep(time.Second * 2)
				continue
			case done:
				os.Exit(0)
			}
		} else {
			time.Sleep(time.Second * 2)
			continue
		}
		//fmt.Printf("finish task: %v %v use %v\n", reply.TaskID, rs(reply.Tp), time.Since(now).Seconds())
		a := CallTaskDoneArgs{reply.TaskID, reply.Tp}
		r := CallTaskDoneReply{}
		call("Coordinator.CallTaskDone", &a, &r)
		time.Sleep(time.Second * 2)
	}
}

首先向Coordinator发送请求任务rpc:

  1. map任务:执行
  2. reduce任务:执行
  3. waitting:当前Coordinator没有空闲任务,sleep一段时间再请求
  4. done:所有任务已完成,退出

任务执行完成后,报告任务完成

3.3.2 执行map任务

func executeMap(fileName string, nReduce, taskID int, mapf func(string, string) []KeyValue) {
	file, err := os.Open(fileName)
	if err != nil {
		log.Fatalf("cannot open %v", fileName)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", fileName)
	}
	file.Close()
	kva := mapf(fileName, string(content))
    // 上面的代码参考mrsequential.go
	files := []*os.File{}
	tmpFileNames := []string{}
	encoders := []*json.En
首页 上一页 1 2 3 4 下一页 尾页 4/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Go语言入门3(数组) 下一篇变量、常量

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目