设为首页 加入收藏

TOP

mit 6.824 lab1分析(二)
2023-07-23 13:30:33 】 浏览:75
Tags:mit 6.824lab1分析
但此时worker-1节点才刚运行完map-1任务并报告给coordinator,coordinator检测到当前是reduce阶段,但收到报告完成的rpc是map类型,不会对其进行任何操作。

type CallTaskDoneArgs struct {
	TaskID int
	tp     taskType
}
type CallTaskDoneReply struct {
}

3.2 Coordinator

3.2.1 结构体设计

type taskState int

const (
	spare taskState = iota
	executing
	finish
)

type task struct {
	fileName string
	id       int
	state    taskState
	start    time.Time
}

首先设计一个task struct,该结构体代表一个任务

  • filename:在map阶段,用于coordinator告知worker要读取的初始文件
  • id: 该任务的id,传给worker,作用在RPC设计中提及
  • state:任务有三个状态:空闲、执行中、已完成。若空闲则可以分配给worker;若执行中,则监视该任务是否超时
  • start:任务刚开始执行的时间
type Coordinator struct {
	// Your definitions here.
	mu         sync.Mutex
	state      taskType
	tasks      []*task
	mapChan    chan *task
	reduceChan chan *task
	nReduce    int
	nFiles     int
	finished   int
}

接着设计主要Coordinator结构体

  • state:当前系统的状态,map阶段(分配map任务)、reduce阶段(分配reduce任务)、全部完成done(可以结束系统运行)
  • tasks: *task的切片,维护了一组任务
  • mapChanreduceChan:用于分发map、reduce任务的channel。map阶段,若有空闲map任务,则放至channel中,当有worker请求任务时,则可取出来。reduce阶段同理
  • finished:当前已完成任务的数量。map阶段,若finished == nFiles,则表示所有map任务完成,可以进入reduce阶段。reduce阶段同理,进入done

3.2.2 初始化

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}

	// Your code here.
	c.mapPhase(files, nReduce)
	go c.watch()
	c.server()
	return &c
}

func (c *Coordinator) mapPhase(files []string, nReduce int) {
	c.state = mapType                 //设置系统状态为map阶段
	c.nReduce = nReduce        
	c.nFiles = len(files)
	c.tasks = make([]*task, c.nFiles)
	c.mapChan = make(chan *task, c.nFiles) // c.nFiles长度的map channel
	for i := 0; i < c.nFiles; i++ {
		c.tasks[i] = &task{fileName: files[i], id: i}
		c.mapChan <- c.tasks[i]            // 刚开始所有任务都是空闲状态,放入channel中
	}
}

系统刚开始时即map阶段,mapPhase初始化Coordinator结构体。然后启动c.watch()协程,用于监视任务是否超时,放后面讲

3.2.3 分配任务

func (c *Coordinator) CallTask(args *CallTaskArgs, reply *CallTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.state == done {
		reply.Tp = done
	} else if c.state == mapType {
		switch len(c.mapChan) > 0 {
		case true:
			task := <-c.mapChan
			c.setReply(task, reply)
		case false:
			reply.Tp = waitting
		}
	} else {
		switch len(c.reduceChan) > 0 {
		case true:
			task := <-c.reduceChan
			c.setReply(task, reply)
		case false:
			reply.Tp = waitting
		}
	}
	return nil
}

func (c *Coordinator) setReply(t *task, reply *CallTaskReply) {
	if t.state == finish {
		reply.Tp = waitting
		return
	}
	reply.Tp = c.state
	reply.TaskID = t.id
	reply.NReduce = c.nReduce
	reply.NFiles = c.nFiles
	reply.FileName = t.fileName
	t.state = executing
	t.start = time.Now()
}

分配任务的主要函数,worker处会调用call("Coordinator.CallTask", &args, &reply)

  1. 若当前系统状态为done,则返回done,告知worker可以退出了
  2. 若当前系统状态为map阶段:如果当前有任务可以分配len(c.mapChan) > 0,则取出一个task,调用c.setReply(task, reply),将任务的相关信息填入reply中,并把task的当前状态设为执行中,开始时间设为time.Now()。如果没有可分配的任务,则设reply.Tp = waitting,让worker等待一会再请求任务
  3. 若当前系统状态为reduce阶段:同上

3.2.4 任务完成

func (c *Coordinator) CallTaskDone(args *CallTaskDoneArgs, reply *CallTaskDoneReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.state != args.Tp || c.state == done {
		return nil
	}
	if c.tasks[args.TaskID].state != finish {
		c.tasks[args.TaskID].state = finish
		c.finished++
		//fmt.Printf("task %v done\n", args.TaskID)
		if c.state == mapType && c.f
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Go语言入门3(数组) 下一篇变量、常量

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目