广东购物网站建设价格,搜索引擎不友好的网站特征,济南网站建设(选 聚搜网络),天眼查询个人 企业查询目录总体概要核心结构体coordinator思路#xff1a;任务池管理RPC函数worker思路:实现细节总体概要
程序主要由mrcoordinator.go、mrworker.go为启动模块。
mrcoordinator.go: 启动rpc服务#xff0c;循环等待m.Done()为true时退出。mrwoker.go:调用mr.worker(mapf, reduce…
目录总体概要核心结构体coordinator思路任务池管理RPC函数worker思路:实现细节总体概要
程序主要由mrcoordinator.go、mrworker.go为启动模块。
mrcoordinator.go: 启动rpc服务循环等待m.Done()为true时退出。mrwoker.go:调用mr.worker(mapf, reducef)函数执行map/reduce任务。
核心结构体
Coordinator协调者持有任务池能够查看任务的完成情况。任务的状态主要分为三种
“working”正在执行“success”执行成功“offline”任务未开始 或 任务掉线
//任务池保存目前所有任务状态
type TaskPool struct {MapTasks []MapTaskMapSuccessNum int //map任务完成数ReduceTasks []ReduceTaskReduceSuccessNum int //reduce任务完成数mutex sync.Mutex
}type MapTask struct {id intFileName stringstatus string //任务状态: working、success offlinemutex sync.Mutex
}type ReduceTask struct {id intstatus string //任务状态: working 、success、offlinemutex sync.Mutex
}coordinator思路
任务池管理
调用CreateTaskPool函数初始化任务池将所有任务分成map0,map1…reduce0,reduce1…。
c.taskPool CreateTaskPool(files, nReduce)创建Add…Task()函数用于添加相应的任务将任务的状态变成working。
//添加Map任务 如果成功返回(序号,文件名true)。 失败返回(0,,false)
func (p TaskPool) AddMapTask() (idx int, fileName string, ok bool)
//添加Reduce任务 成功返回(reduce任务序列号map任务总数true)
func (p *TaskPool) AddReduceTask() (idx int, mapTaskNum int, ok bool)RPC函数
任务请求由worker调用args暂时没用返回reply为worker被分配的任务。
//RPC请求任务
func (c *Coordinator) RequestTask(args *RequestTaskArgs, reply *RequestTaskReply) error成功执行通知worker在成功执行已分配的任务后会通过rpc告诉coordinator
//RPC通知执行成功
func (c *Coordinator) SuccessExecuteInfo(args *SuccessExecuteArgs, reply *SuccessExecuteReply) errorworker思路:
worker不断重复一个for循环:
CallRequestTask() //通过rpc获取任务HandleMapTask()/HandleReduceTask() //处理对应的任务CallSuccessExecute(task.Id, task.TaskType) //通知coordinator任务已经完成
实现细节
问题1由于reduce任务必须要在map任务之后去执行所以需要解决在所有map任务都属于working或success状态时map任务没有全部完成但是所有的map任务都有人在做或已经完成新来一个worker该怎么办。
解决方案在加入map任务时若发现处于上面状态返回特殊的返回值如任务的内容fileName为空这样worker通过返回值就知道worker属于冗余状态worker便会休眠两秒在两秒之后再去请求任务。
问题2worker在获取任务之后挂掉了怎么办 解决方案在coordinator分配任务给worker时同时开启一个goroutine用来检测worker是否在10s内完成任务。如果没有完成任务的标志还是working将任务强行下线。
//添加任务的同时创建goroutine检测10s是否完成任务go func(p TaskPool, id int) {time.Sleep(10 * time.Second)p.MapTasks[id].mutex.Lock()if p.MapTasks[id].status working {p.MapTasks[id].status offline}p.MapTasks[id].mutex.Unlock()}(p, id)问题3worker挂掉之后新的worker接手任务之前的任务怎么办要保证任务的正确结果。 解决方案检测任务文件是否存在如果存在则删除后面再重新创建。
//检测:上次任务的遗留。判断是否存在如果存在则删除 mr_reply.id_[0...nReduce-1]for i : 0; i reply.NReduce; i {writeFileName : fmt.Sprintf(mr_%s_%s, strconv.Itoa(reply.Id), strconv.Itoa(i))if FileIsExists(writeFileName) {err : os.Remove(writeFileName)if err ! nil {panic(err)}}}问题4单个worker如何解决全部map、reduce任务。 解决方案worker跑在一个for循环上for循环是否执行由一个bool型的变量Continue来决定。Continue的值由rpc通知coordinator任务完成时返回。如果整个任务没有完成则返回true否则返回false。
var Continue bool truefor Continue {Continue falsetask : CallRequestTask() //rpc请求任务if task.TaskType map {//map任务HandleMapTask(mapf, task)Continue CallSuccessExecute(task.Id, task.TaskType)} else if task.TaskType reduce {//reduce任务HandleReduceTask(reducef, task)Continue CallSuccessExecute(task.Id, task.TaskType)} else {//map or reduce存在working状态time.Sleep(2 * time.Second)Continue true}}