当前位置: 首页 > news >正文

上海网站建设海淘科技wordpress默认分类链接

上海网站建设海淘科技,wordpress默认分类链接,网络销售怎么做才能有业务,线上推广费用预算文章目录 写在前面总体思路分析代码实现参考链接 写在前面 具体上次写6824的第一篇文章已经过去了快一个月#xff0c;上次学习了MapReduce论文相关理论后一直没有继续写代码实现#xff0c;自己一边要搞论文没有整块时间实现#xff0c;这两天抽写了相关代码#xff0c;算… 文章目录 写在前面总体思路分析代码实现参考链接 写在前面 具体上次写6824的第一篇文章已经过去了快一个月上次学习了MapReduce论文相关理论后一直没有继续写代码实现自己一边要搞论文没有整块时间实现这两天抽写了相关代码算是对Lab1的一个交代。 我写的大部分代码都是参考别人的已经实现的我写这篇博客的目的也并不是为了传播高深的技术我只是记录一下自己从代码开始时的一头雾水到参考别人代码实现了感叹Lab1的伟大之处。方便后续自己回顾。 看到别人博客提及每个hits都有用自己在没有写代码时并不能完全理解官方页面提到的Hits的具体实现方式。我觉得最重要的自己把这些实现都搞懂了原理随后自己再去回顾这些提示感觉写的很秒。 总体思路分析 MapReduce论文理解 https://blog.csdn.net/weixin_45863010/article/details/142641061 官方Lab1链接 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html 上面图片简单概述首先协调者Coordinator启动并且注册RPC调用通道随后Woker使用RPC调用去Coordinator获取任务。 Worker根本不区分是否是Map-Worker还是ReduceWorker。Worker充当Map还是Reduce的角色是根据任务类型来划分的如果是Map任务就执行Map任务如果是Reduce任务就执行Reduce任务。「我在这里没有使用不是Map任务就执行Reduce任务具体原因可以参考下面代码实现」Coordinator启动后首先初始化所有Map任务当且只有所有Map任务全部执行完成才能进行下一阶段Reduce才能够初始化Reduce任务给Worker执行。「Reduce的执行依赖与Worker执行产生的中间文件会将中间文件根据对应的reduceNum分配给一个Worker执行」对于GFS或者本地磁盘存储Map产生的中间文件非常重要MapWorker执行任务后会将任务执行结果存储到上述文件系统中随后Coordinator根据ReduceNum分配需要执行的任务到ReduceWorker。在ReduceWorker获取到所有文件内容后首先会排序。 我根据我写代码的理解画一个流程图如下 代码实现 下面我将给出自己参考别人的代码实现主要涉及到三个文件我直接贴出对应的三个文件代码里面附有详细的注释 rpc.go package mr// // RPC definitions. // // remember to capitalize all names. //import os import strconv// // example to show how to declare the arguments // and reply for an RPC. //// 定义任务结构体 type TaskRequest struct{}type TaskType inttype Task struct {TaskType TaskTypeTaskId intFileSlice []stringReduceNum int }// 定义各种任务类型 // WaitingTask是否是必要的是必要的解决当Map任务没有全部完成成此时调用GetTask返回WaitingTask // Worker在接受到这个任务状态后进行短暂的休眠随后继续调用任务执行后续流程 const (MapTask TaskType iotaReduceTaskWaitingTaskExitTask )type ExampleArgs struct {X int }type ExampleReply struct {Y int }// Add your RPC definitions here.// Cook up a unique-ish UNIX-domain socket name // in /var/tmp, for the coordinator. // Cant use the current directory since // Athena AFS doesnt support UNIX-domain sockets. func coordinatorSock() string {s : /var/tmp/5840-mr-s strconv.Itoa(os.Getuid())return s } coordinator.go package mrimport (fmtio/ioutillogstrconvstringssynctime ) import net import os import net/rpc import net/http// Phase 定义当前总体进度分为三个阶段Map阶段、Reduce阶段、Done阶段 type Phase int// State 定义任务状态 type State int// 当前整体进度分为三个阶段 const (MapPhase Phase iotaReducePhaseAllDone )// 任务状态类型 // 任务对应的三种状态如何切换的初始任务时将所有任务状态设置为Waiting // worker调用rpc执行某个任务时任务状态由WaitingWorking // worker执行任务完成后调用rpc将任务状态有WorkingDone至此任务完成上面后两条暂时仅仅针对Map任务 // worker执行Reduce任务时原理仍然同上状态由 WaitingWorkingDone之间切换 const (Working State iota // 此阶段在工作Waiting // 此阶段在等待执行, 当Map任务没有执行完成此时Reduce任务需要等待Done // 此阶段已经做完 )// TaskMetaInfo 保存任务的元数据任务开始时间、任务状态、任务对应的指针以便后续找到任务 // 思考为什么不将任务执行状态以及任务执行开始时间定义到具体任务Task结构体中也可以这样定义但不符合要求 // 这些信息不需要暴露给Worker只要Coordinator维护这些信息并且能够根据这些信息判断出哪些任务需要执行即可 type TaskMetaInfo struct {// 添加任务开始执行时间StartTime time.TimeState State// 传入任务的指针为了任务从通道中取出来之后能够通过地址标记这个任务已经完成TaskAdr *Task }// TaskMetaHolder 任务元信息结构体主要存储任务进行的状态、任务开始时间以及任务对应的地址「以能够随时更改任务状态」 type TaskMetaHolder struct {TaskMeta map[int]*TaskMetaInfo }func (t *TaskMetaHolder) acceptTaskMetaInfo(taskMetaInfo *TaskMetaInfo) bool {taskId : taskMetaInfo.TaskAdr.TaskIdmeta, _ : t.TaskMeta[taskId]if meta ! nil {fmt.Printf([acceptTaskMetaInfo] the taskId %v have contain metaInfo, taskId)return false} else {t.TaskMeta[taskId] taskMetaInfo}return true }// the function is to judge waiting task to working func (t *TaskMetaHolder) judgeTaskState(taskId int) bool {taskInfo, ok : t.TaskMeta[taskId]if !ok || taskInfo.State ! Waiting {return false}taskInfo.StartTime time.Now()taskInfo.State Workingreturn true }// the function is to judge if all tasks have done, server subsequent phase func (t *TaskMetaHolder) allTaskDone() bool {var (mapDoneNum 0mapUnDoneNum 0reduceDoneNum 0reduceUnDoneNum 0)taskMeta : t.TaskMetafor _, taskMetaInfo : range taskMeta {if taskMetaInfo.TaskAdr.TaskType MapTask {if taskMetaInfo.State Done {mapDoneNum} else {mapUnDoneNum}} else if taskMetaInfo.TaskAdr.TaskType ReduceTask {if taskMetaInfo.State Done {reduceDoneNum} else {reduceUnDoneNum}}}if (mapDoneNum 0 mapUnDoneNum 0) (reduceDoneNum 0 reduceUnDoneNum 0) {return true} else {if reduceDoneNum 0 reduceUnDoneNum 0 {return true}}return false }type Coordinator struct {// Your definitions here.Phase PhaseMapChan chan *TaskReduceChan chan *TaskReduceNum intFiles []stringTaskId int // 这个字段主要作用生成递增IDTaskMetaHolder TaskMetaHoldermu sync.Mutex }// Your code here -- RPC handlers for the worker to call.// an example RPC handler. // // the RPC argument and reply types are defined in rpc.go. func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {reply.Y args.X 1return nil }// start a thread that listens for RPCs from worker.go func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e : net.Listen(tcp, :1234)sockname : coordinatorSock()os.Remove(sockname)l, e : net.Listen(unix, sockname)if e ! nil {log.Fatal(listen error:, e)}go http.Serve(l, nil) }// main/mrcoordinator.go calls Done() periodically to find out // if the entire job has finished. func (c *Coordinator) Done() bool {//The coordinator, as an RPC server, will be concurrent; dont forget to lock shared datac.mu.Lock()defer c.mu.Unlock()ret : false// Your code here.if c.Phase AllDone {fmt.Println(All tasks have Done)ret true} else {//fmt.Println(Not All tasks have Done)}return ret }// create a Coordinator. // main/mrcoordinator.go calls this function. // nReduce is the number of reduce tasks to use. func MakeCoordinator(files []string, nReduce int) *Coordinator {c : Coordinator{Phase: MapPhase,MapChan: make(chan *Task, len(files)),ReduceChan: make(chan *Task, nReduce),ReduceNum: nReduce,Files: files,TaskMetaHolder: TaskMetaHolder{TaskMeta: make(map[int]*TaskMetaInfo, nReducelen(files)),},TaskId: 0,}// Your code here.c.MakeMapTask(files)c.server()// 开启一个探测器监测任务执行时间是否过长go c.crashHandler()return c }func (c *Coordinator) MakeMapTask(files []string) {for _, file : range files {taskID : c.genTaskId()task : Task{TaskType: MapTask,TaskId: taskID,FileSlice: []string{file},ReduceNum: c.ReduceNum,}// 保存任务初始状态taskMetaInfo : TaskMetaInfo{State: Waiting, TaskAdr: task}c.TaskMetaHolder.acceptTaskMetaInfo(taskMetaInfo)c.MapChan - task} }func (c *Coordinator) MakeReduceTask() {for reduceNum : 0; reduceNum c.ReduceNum; reduceNum {taskID : c.genTaskId()task : Task{TaskType: ReduceTask,TaskId: taskID,FileSlice: c.selectReduceNum(reduceNum),ReduceNum: c.ReduceNum,}// 保存任务初始状态taskMetaInfo : TaskMetaInfo{State: Waiting, TaskAdr: task}c.TaskMetaHolder.acceptTaskMetaInfo(taskMetaInfo)c.ReduceChan - task} }/* *构建Reduce任务需要将Map任务存储的所有中间文件按照ReduceNum构建任务 一个File也就是一个Map任务执行结果会根据Key的哈希值将一个File分散为ReduceNums个任务如mr-0-0\mr-0-1\mr-0-2 末尾数字为需要分配给某个Reduce执行的文件中间的数字为对于的Map的任务标识也就是TaskId */ func (c *Coordinator) selectReduceNum(reduceNum int) []string {var res []stringpath, _ : os.Getwd()files, err : ioutil.ReadDir(path)if err ! nil {log.Fatal([selectReduceNum] failure, err)return res}for _, file : range files {if strings.HasPrefix(file.Name(), mr-) strings.HasSuffix(file.Name(), strconv.Itoa(reduceNum)) {res append(res, file.Name())}}return res }/* * 为什么需要一个全局唯一ID生成器主要Map的worker个数为len(files), Reduce的worker个数为ReduceNum个 * Coordinator中有一个属性TaskMetaHolder用于保存任务的元数据更内层使用一个map表格存储各个任务的元信息key为任务ID同时任务总数为 * Map对应的workerReduce对应的worker所以需要使用一个全局任务Id生成器生成递增的任务ID*/ func (c *Coordinator) genTaskId() int {res : c.TaskIdc.TaskIdreturn res }func (c *Coordinator) PullTask(taskReq *TaskRequest, taskResp *Task) error {c.mu.Lock()defer c.mu.Unlock()switch c.Phase {case MapPhase:{if len(c.MapChan) 0 {*taskResp *-c.MapChanif !c.TaskMetaHolder.judgeTaskState(taskResp.TaskId) {fmt.Println([PullTask] task state is , c.TaskMetaHolder.TaskMeta[taskResp.TaskId].State)}} else {// Map对应的任务被分发完了但此时任务并没有全部完成此时将任务状态设置为waiting状态taskResp.TaskType WaitingTask// 检查Map任务是否都完成,完成后将流程进入Reduce阶段if c.TaskMetaHolder.allTaskDone() {c.toNextPhase()}return nil}}case ReducePhase:{if len(c.ReduceChan) 0 {*taskResp *-c.ReduceChanif !c.TaskMetaHolder.judgeTaskState(taskResp.TaskId) {fmt.Println([PullTask] task state is , c.TaskMetaHolder.TaskMeta[taskResp.TaskId].State)}} else {taskResp.TaskType WaitingTaskif c.TaskMetaHolder.allTaskDone() {c.toNextPhase()}return nil}}case AllDone:{taskResp.TaskType ExitTask}default:panic([PullTask] invalid Phase)}return nil }func (c *Coordinator) MarkDone(task *Task, taskResp *Task) error {c.mu.Lock()defer c.mu.Unlock()switch task.TaskType {case MapTask:{metaInfo, ok : c.TaskMetaHolder.TaskMeta[task.TaskId]if ok metaInfo.State Working {metaInfo.State Donefmt.Printf([MarkDone] task is done, the taskId is: %v, the taskType is %v\n, task.TaskId, task.TaskType)} else {fmt.Printf([MarkDone] error, the task not to be done, taskId id : %v, the tasktype is %v\n , task.TaskId, task.TaskType)}break}case ReduceTask:{metaInfo, ok : c.TaskMetaHolder.TaskMeta[task.TaskId]if ok metaInfo.State Working {metaInfo.State Donefmt.Printf([MarkDone] task is done, the taskId is: %v, the taskType is %v\n, task.TaskId, task.TaskType)} else {fmt.Printf([MarkDone] error, the task not to be done, taskId id : %v, the tasktype is %v\n , task.TaskId, task.TaskType)}break}default:{panic([MarkDone] invalid TaskType)}}return nil }func (c *Coordinator) toNextPhase() {switch c.Phase {case MapPhase:{c.MakeReduceTask()c.Phase ReducePhase}case ReducePhase:{c.Phase AllDone}default:panic([toNextPhase] invalid phase)} }/* * 为什么要设置这个探测器参考如下回答。自己写的时候考虑到这个仅仅将当前对应的任务状态由Working更改为Waiting并将其加入到对应的chan通道中 但worker中并没有终止相应任务的执行此举是否会造成一个worker执行多次 1.无论worker中一个任务是否执行多次对于Map来说产生的中间文件名称是一样的后续分配给新的worker后输出文件会覆盖前面的worker输出的文件并且是从头填写 The coordinator cant reliably distinguish between crashed workers, workers that are alive but have stalled for some reason, and workers that are executing but too slowly to be useful. The best you can do is have the coordinator wait for some amount of time, and then give up and re-issue the task to a different worker. For this lab, have the coordinator wait for ten seconds; after that the coordinator should assume the worker has died (of course, it might not have). */ func (c *Coordinator) crashHandler() {for {// 关于这个休眠时间的思考// 如果不设置这个休眠时间可能导致探测器协程不断获取锁释放锁不断循环从而导致分发任务的方法PullTask无法获取锁// 从而无法执行后续任务这里类似时间片算法的使用了。time.Sleep(2 * time.Second)c.mu.Lock()if c.Phase AllDone {c.mu.Unlock()break}for _, metaInfo : range c.TaskMetaHolder.TaskMeta {if metaInfo.State Working time.Since(metaInfo.StartTime) 9*time.Second {if metaInfo.TaskAdr.TaskType MapTask {c.MapChan - metaInfo.TaskAdrmetaInfo.State Waiting} else if metaInfo.TaskAdr.TaskType ReduceTask {c.ReduceChan - metaInfo.TaskAdrmetaInfo.State Waiting}}}c.mu.Unlock()} } worker.go package mrimport (encoding/jsonfmtio/ioutilossortstrconvtime ) import log import net/rpc import hash/fnv// Map functions return a slice of KeyValue. type KeyValue struct {Key stringValue string }// for sorting by key. type BySort []KeyValue// for sorting by key. func (a BySort) Len() int { return len(a) } func (a BySort) Swap(i, j int) { a[i], a[j] a[j], a[i] } func (a BySort) Less(i, j int) bool { return a[i].Key a[j].Key }// use ihash(key) % NReduce to choose the reduce // task number for each KeyValue emitted by Map. func ihash(key string) int {h : fnv.New32a()h.Write([]byte(key))return int(h.Sum32() 0x7fffffff) }// main/mrworker.go calls this function. func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.// uncomment to send the Example RPC to the coordinator.//CallExample()// ----------------------------------------第一次实现----------------------------------// GetTask//for i : 0; i 2; i {// task : GetTask()// DoMapTask(task, mapf)//}// ----------------------------------------第二次调整-----------------------------------// 对任务状态加入枚举flag : truefor flag {task : GetTask()switch task.TaskType {case MapTask:{DoMapTask(task, mapf)TaskDone(task)}case ReduceTask:{DoReduceTask(task, reducef)TaskDone(task)}case WaitingTask:{//执行到此处说明Map任务有部分尚未完成此时不能继续执行后续的Reduce任务只有等待所有Map任务全部完成后才能执行Reduce//所以在此处休眠一会fmt.Printf([Worker] the task is waiting, taskId : %v\n\n, task.TaskId)time.Sleep(2 * time.Second)}case ExitTask:{fmt.Println([Worker] Exit Task)flag false}}} }// GetTask Get a Task func GetTask() Task {taskReq : TaskRequest{}taskResp : Task{}ok : call(Coordinator.PullTask, taskReq, taskResp)if ok {fmt.Printf([GetTask] success get task : %v\n, taskResp)} else {fmt.Printf([GetTask] call failed!\n)}return taskResp }func DoMapTask(task *Task, mapf func(string, string) []KeyValue) {/***Map任务主要流程分为三部分1. 根据RPC调用获取到文件名调用已经写好的Map函数生成中间文件(intermediate)2. 根据RPC返回的任务参数中的ReduceNum的数值对中间文件(intermediate)进行分组,分组依据根据字母结构体的Key取Hash值随后对ReduceNum取余。3. 将分组后的中间文件保存到临时文件中---------------------个人理解-----------------------------------------生成的临时文件名mr-X-Y(X是文件idY是哈希后对应ReduceNum)对于文件Id就是运行coordinator传入第二个参数files(文件集)中文件的顺序也就是说Map函数的主要功能是将传入的某个文件的词频统计出来(准确的说并没有统计词频可以阅读wc.go源代码)仅仅将「单词-1」统计出来for _, w : range words {kv : mr.KeyValue{w, 1}kva append(kva, kv)随后将某个文件的所有词频根据Key也就是单词根据单词的哈希值将单词分组分组个数为ReduceNum*/var intermediate []KeyValuefmt.Printf([DoMapTask] worker is map taskId : %v, fileName : %v\n, task.TaskId, task.FileSlice[0])file, err : os.Open(task.FileSlice[0])if err ! nil {log.Fatalf([DoMapTask] cannot open %v, task.FileSlice[0])}content, err : ioutil.ReadAll(file)if err ! nil {log.Fatalf([DoMapTask] cannot read %v, task.FileSlice[0])}file.Close()intermediate mapf(task.FileSlice[0], string(content))reduceNum : task.ReduceNumhashKV : make([][]KeyValue, reduceNum)for _, value : range intermediate {index : ihash(value.Key) % reduceNumhashKV[index] append(hashKV[index], value)}// 放入中间文件for i : 0; i reduceNum; i {fileName : mr- strconv.Itoa(task.TaskId) - strconv.Itoa(i)tempFile, err : os.Create(fileName)if err ! nil {log.Fatalf([DoMapTask] create temp file: %v failed., fileName)}enc : json.NewEncoder(tempFile)for _, kv : range hashKV[i] {err enc.Encode(kv)if err ! nil {log.Fatalf([DoMapTask] encode error: %v, err)}}tempFile.Close()} }func DoReduceTask(task *Task, reducef func(string, []string) string) {// 关于这个Reduce任务为什么需要先写入临时中间文件随后等中间文件写完后在重命名临时文件为最终文件/**To ensure that nobody observes partially written files in the presence of crashes,the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written.You can use ioutil.TempFile (or os.CreateTemp if you are running Go 1.17 or later) to create a temporary fileand os.Rename to atomically rename it.最上面一句确保发生崩溃时没有人观察到部分写入的文件使用中间临时文件在发生崩溃时临时文件不会保存此外使用中间文件命令和最终输出文件名称不一样能够保证输出的符合要求格式的文件都是完整的不完整的文件即使没有删除由于命名和要求格式不一样也不会考虑*/reduceFileNum : task.TaskIdintermediate : shuffle(task.FileSlice)dir, _ : os.Getwd()tempFile, err : os.CreateTemp(dir, mr-tmp-*)if err ! nil {log.Fatal([DoReduceTask] Failed to create temp file, err)}i : 0// Debug//fmt.Printf(the intermediate length is %v\n, len(intermediate))for i len(intermediate) {j : i 1for j len(intermediate) intermediate[j].Key intermediate[i].Key {j}var values []stringfor k : i; k j; k {values append(values, intermediate[k].Value)}output : reducef(intermediate[i].Key, values)fmt.Fprintf(tempFile, %v %v\n, intermediate[i].Key, output)i j}tempFile.Close()fn : fmt.Sprintf(mr-out-%d, reduceFileNum)os.Rename(tempFile.Name(), fn) }func shuffle(files []string) []KeyValue {var kva []KeyValuefor _, filepath : range files {file, _ : os.Open(filepath)dec : json.NewDecoder(file)for {var kv KeyValueif err : dec.Decode(kv); err ! nil {break}kva append(kva, kv)}file.Close()// 删除临时文件//os.Remove(filepath)}sort.Sort(BySort(kva))return kva }func TaskDone(task *Task) {taskReq : tasktaskResp : Task{}ok : call(Coordinator.MarkDone, taskReq, taskResp)if ok {fmt.Printf([TaskDone] success mark task : %v\n, taskReq)} else {fmt.Printf([TaskDone] call failed!\n)} }// example function to show how to make an RPC call to the coordinator. // // the RPC argument and reply types are defined in rpc.go. func CallExample() {// declare an argument structure.args : ExampleArgs{}// fill in the argument(s).args.X 99// declare a reply structure.reply : ExampleReply{}// send the RPC request, wait for the reply.// the Coordinator.Example tells the// receiving server that wed like to call// the Example() method of struct Coordinator.ok : call(Coordinator.Example, args, reply)if ok {// reply.Y should be 100.fmt.Printf(reply.Y %v\n, reply.Y)} else {fmt.Printf(call failed!\n)} }// send an RPC request to the coordinator, wait for the response. // usually returns true. // returns false if something goes wrong. func call(rpcname string, args interface{}, reply interface{}) bool {// c, err : rpc.DialHTTP(tcp, 127.0.0.1:1234)sockname : coordinatorSock()c, err : rpc.DialHTTP(unix, sockname)if err ! nil {log.Fatal(dialing:, err)}defer c.Close()err c.Call(rpcname, args, reply)if err nil {return true}fmt.Println(err)return false } 参考链接 https://blog.csdn.net/weixin_51322383/article/details/132068745 https://blog.csdn.net/weixin_45938441/article/details/124018485
http://www.dnsts.com.cn/news/103874.html

相关文章:

  • 怎么建个免费英文网站洛阳建网站公司
  • 加快网站平台建设seo是什么服
  • 沙井网站推广长腿蜘蛛wordpress
  • 北京建行网站中国制造网国际站
  • WordPress部分内容登录可见网站运营推广选择乐云seo
  • 上海外贸soho网站建设房屋平面图设计app
  • 深圳手机网站模板Wordpress设置Ip不开放
  • 潍坊做网站公司补脾最商城网站前台模板
  • 黄骅网站建设公司php做网站用html做吗
  • 东营房地产网站建设建立网站的关键是定位
  • 专业建站提供商重庆装修公司平台
  • 手机如何创造网站app排行榜
  • 漯河最新今天的消息网站建设阿华seo
  • 大连网站排名优wordpress4.5.3zhcn
  • 商务网站规划设计要点vs2012 建网站
  • 企业网站和官网的区别如何自己做淘宝客网站
  • 手机上传视频网站开发网上怎么自己注销营业执照
  • 网站头部代码营销策略模板
  • 免费建站网站号如何投放网络广告
  • 夏邑好心情网站建设有限公司扩展名网站
  • 聚美优品网站模版无极电影网首页
  • 外汇平台网站开发需求说明福州商城网站开发公司
  • 新网 网站建设农机局网站建设方案
  • 计算机网络技术电商网站建设与运营方向windows优化大师有哪些功能
  • elementui 做的网站html网页小游戏代码
  • 怎么做网站安全运维泰州哪家做网站建设比较好
  • 2019年建设什么网站好重庆建网站 私单
  • 怎么做qq钓鱼网站吗深圳网站设计公司在哪里
  • 做网站需要多少钱 网络服务视频号直播怎么引流
  • 网站规划建设实训报告网站建站客户需求表单