网站备案备的是域名还是空间,网站建设与维护工作内容,合肥seo排名优化公司,wordpress底部漂浮笔记1#xff1a;读操作包括两种#xff0c;readIndex和serilizable#xff0c;readIndex指一致性读#xff0c;一旦a读到了数据x#xff0c;那么a及a以后的数据都能读到x#xff0c;readIndex读会先确认本leader是不是有效地leader#xff0c;如果有效则记录此刻的comm…笔记1读操作包括两种readIndex和serilizablereadIndex指一致性读一旦a读到了数据x那么a及a以后的数据都能读到xreadIndex读会先确认本leader是不是有效地leader如果有效则记录此刻的commiteIndex作为confirmIndex等到applyIndexconfirmIndex时就可以进行serilizable读了而serilizableRead就是副本读直接读leader的数据。
笔记2a、b、c三个readIndex读请求先后到达etcd他们对应的confirmIndex分别为xa,xb,xc且xaxbxcapplyIndexxc时会通过close(ch)来一次性唤醒三个a、b、c这三个请求但是使用close(chan)的方式唤醒时这三个请求被唤醒时的顺序是随机的close(ch)只是一个唤醒操作唤醒以后读请求开始执行最终会来到rangerange函数开始执行的时候会先把请求包装到一个读事务中这个读事务会保存读事务创建时etcdserver的currentRev即etcdserver此刻的最大版本号也就是说readIndex是读最新的数据虽然请求对应的confirmIndex有大小有先后但是这只是一个要求一旦满足要求就读最新的数据是最新的数据读事务采用的是concurrent模式也就是加读锁-复制-解读锁-执行读取也就是采用读写间隔模式假设c先唤醒然后c加锁然后读到的版本号为k然后解读锁然后正好此时另一个goroute加写锁提交解写锁因为版本号是递增的所以此时etcd的最新版本号肯定大于k的假设为kz然后a请求开始执行加读锁-读取版本号为kz然后解读锁假设c读取keyk1这个key然后a也读取keyk1这个key版本号kz必定大于k可以肯定的是如果c读到了版本号为k的数据那么在他后面的a也必定能读到版本k的数据如果有更新的数据那么就是读更新的数据即一个数据已经读到那么他后面的所有请求都能读到这个数据后者这个数据的更新的值
serilizableRead最终也是通过事务来实现的也就是说etcd不管是读还是写最终都是通过事务来执行
etcd raft rangeetcd不管是读还是写最终都是通过事务来执行如果没有事务就会包装一个事务都分为两步第一步是raft流程第二步是执行事务raft流程中如果是写操作则需要写同步日志而如果是readIndex读则不是同步日志而是同步状态第二步执行事务时如果是读就封装读事务如果是写就是写事务 v3_server.EtcdServer.Range #就两步1先阻塞直到其他线程通知他可以读2去数据库读取最新数据if !serilizable{ #serilizable表示直接读leader,!serilizable表示ReadIndex即线性一致性读#ReadIndex读就是先等待然后直到满足readIndex读条件之后再进行serilizable读#具体步骤就是1就是leader首先确认自己此刻是不是leader,#因为有可能网络分区等原因导致leader实际不是leader#2如果是那么当前leader此刻的commitedIndex就是此读请求对应的ReadIndex#当本节点的 applyIndexreadIndex时这个readIndex读请求就可以执行serilizable读了v3_server.EtcdServer.linearizableReadNotify #执行等待直到appliedIndexReadIndex后才去数据库读数据#这个函数的逻辑很简单#写readWaitc这个chan来通知另一个goroute有人要readIndex读#通知完后本线程就等待readNotifier这个chan即等待其他goroute通知#当另一个goroute准备好后就会写readNotifier这个chan来唤醒当前请求#goroute是很轻量的所以直接阻塞就行不停的来请求不停的创新的goroute就行nc : s.readNotifier #获取通知chan#当readIndex准备好的时候其他goroute就会写这个notifyChan来通知本gorouteselect:case s.readwaitc - struct{}{} #发消息到readwatic chan 来通知linearizableReadLoop函数有人需要ReadIndex#readWaitC的容量大小为1deafult: #如果readWaitc满了导致写失败那么就直接跳过此步继续往下走#也就是说写readWaitC是一个非阻塞操作不管有没有写成功都会继续往下走select case -nc.c: #通知完readWaitc后当前goroute就阻塞在这个readNotifier直到被通知----------------------------------------another thread 1: #当过半节点都承认leader节点有效地时候#thread1 会通过写chan来通知上层程序leader当前是有效的即leader的commitedIndex是有效地#后续上层程序只需要等待appliedIndexconfirmIndex即可#confirmIndex即请求到来时leader的commitedIndexetcdserver.EtcdServer.linearizableReadLoop #一个死循环,获取此刻最新的commitedIndex通过chan接受上层发来的ReadIndex请求#然后也通过chan把处理结果返回给发请求的线程for: #就是一个死循环监听某个chan如果收到上层发来请求就唤醒#然后创建一个chan发给其他线程然后等待其他线程发回结果idutil.Generator.Next #为本轮即将到来的请求先提前生成一个唯一reqid后续用来检索select:case -leaderChangedNotifier: #如果等待期间leader变了 continue #则放弃本轮循环直接开始新的一轮循环case -s.readwaitc: #从readWaitc收到其他线程发来的ReadIndex的请求。一次for循环处理一个读请求 case -s.stopping: #如果etcdserver停止了那么退出循环结束returnnextnr : newNotifier() #创建一个新的notifys.readMu.Lock() nr : s.readNotifier #获取旧的notifys.readNotifier nextnr #用新的notify替换掉旧的notify#本轮我们会处理旧notify并且新请求会挂在新notify下面s.readMu.Unlock() #这里的逻辑是这样的#我们用a代表linearizableReadNotifyb代表linearizableReadLoop#用rc代表readWaitC用nc代表notifier#所以流程就简化为a收到客户端发来的请求req然后写rc通知b然后在nc上阻塞#然后b收到通知后就处理请求处理完后唤醒nc上等待的所有a(一个req对应一个a)#因为b是串行处理即从rc取一个请求然后处理处理完后才会从rc读下一个请求#而且a可能同时收到很多请求假设有req1,req2,...,reqK,...,reqN#再加上rc的容量是1所以会导致这样一种情况#a收到reqK然后发给bb从rc取出reqK然后a写reqK1到rc会成功#b然后处理reqK此时a仍然不停的收到新的请求#这样就会导致reqK1之后的请求都会写rc失败但是a即使写rc失败#也会阻塞在nc上当a收到请求reqN时此时b刚好处理完开始下一次循环#b会从rc读取reqK1然后rc空了a就会写reqN成功#当b处理完reqK1的时候下一个处理的请求就是reqN了#也就是说当b处理完reqN的时候不能仅仅通知reqN对应的请求#还必须唤醒所有在reqK2到reqN之间的所有阻塞在nc上的请求#所以解决思路就是用n1、n2两个nc并且交替处理。#就是说处理n1的时候所有后续到来的请求都阻塞在n2上#然后n1处理完后处理n2然后在处理n2的时候就让后续到来的请求都阻塞在n1上#这样我们处理完n1的时候直接n1.notifyAll就能唤醒所有之前阻塞在n1上的请求#同理处理完n2的时候直接n2.notifyAll就能唤醒所有之前阻塞在n2上的请求 #这个解决思路底层依赖于这样一个原理#一旦readIndex读请求reqN处理完了即达到可进行readIndex读的时候#reqN之前的所有readIndex读请求一定都满足readIndex读的条件#具体点说就是后到来的请求y的readIndex必定大于在他之前的请求x的readIndex#即当applyIndexreadIndex_y的时候必定有applyIndexreadIndex_xv3_server.EtcdServer.requestCurrentIndex #获取此刻commitedIndex的值并保存到一个叫confirmIndex的变量中#注意一个notifer下面会挂一大串请求但是他这里只需要请求一次就行#因为请求等待的readIndex不会大于此刻的commitedIndex#所以当appliedcommitedIndex时表示所有readIndexcommitedIndex的#读请求的一致性要求都可以满足#从而他会一次性唤醒所有readIndex此刻commmitedIndex的读请求etcdserver.EtcdServer.sendReadIndex #获取最新commitedIndex,会同步等待直到ok或者出错raft.node.ReadIndex #通过向raft状态机发送一个MsgReadIndex消息来获取#丢完消息后就返回让他异步去处理raft.node.step(pb.MsgReadIndex,reqid) #构造一个MsgReadIndex消息#前面生成的reqid作为数据部分放在消息的data字段中然后处理raft.node.stepWithWaitOptioncase n.recvc - m #把前面构造的的MsgReadIndex消息发到recvc然后此处就返回了#让其他goroute异步走一遍stepLeader或者stepFollower(根据节点角色决定)#这个recvc就是专门收发其他节点发来的消息当然也可以自己发给自己#读取实践中有三种方式1Log每次读也写一条日志#2readIndex就录一个commitedIndex#直到appliedIndex记录的commitedIndex#3直接从本地读不经过leader#log方式太慢了readIndex还是需要一轮广播直接本地读不安全for: #这是一个死循环其他线程处理完MsgReadIndex消息后#会通过填充readStateC chan来解除死循环#这个for循环是上面那个requestCurrentIndex函数里的#requestCurrentIndex函数会阻塞#直到node.run中把readState中的chan发给他来唤醒它select case rs : -s.r.readStateC #阻塞在readStateC上其他线程处理完MsgReadIndex消息后#其他线程会把处理结果写到这个readStateC中来跳出循环#linearizableReadNotify这个死循环有三种结束等待的方式#1超时或者error结束等待2readStateC3notifier#一个notifier对象可能对应1批ReadIndex请求#只要这一批有一个请求完成了#那么他完成时会通知本批次所有请求都结束等待return rs.ReadIndex #到达这里说明该请求已经被批准了此处返回结果 case -firstCommitInTermNotifier #收到了当前任期第一次提交发来的通知。#即当客户端发来ReadIndex的时候本leader才刚获得leader资格#在他的这个任期内集群还没有发生过commited事件#所以必须等待假设旧leader提交到了x3然后崩溃#然后新leader当选因为此时集群变了比如旧leader崩溃了#导致没有过半节点到达x3,#那么新leader就不能从x3开始提交#需要重新确定commited这是一个不断尝试的过程#也就是说这是一个不断变化的过程#所以在新leader确定commited之前不能读取#所以新leader第一次提交之前到来的请求#都需要在新leader第一次提交之后重新尝试#即重新丢一个MsgReadIndex到raft重做一遍etcdserver.EtcdServer.sendReadIndex time.Timer.Reset #重置定时器case -retryTimer.C:etcdserver.EtcdServer.sendReadIndex time.Timer.Reset case -leaderChangedNotifier #如果leader变了则放弃所有读请求并返回错误returncase -errorTimer.C #超时返回错误return ....后面的流程此处略后面再补充.... ----------------------------------------another thread 2: #node.recvc收到etcdserver发来的ReadIndex请求即发来的MsgReadIdnex消息raft.node.runfor:case m : -n.recvc #etcdserver发来的MsgReadIndex消息raft.raft.Step//if roleleader: #如果当前节点角色是leader则走stepLeaderraft.stepLeadercase pb.MsgReadIndex #处理思想就是走一遍heartbeat流程。#如果heartbeat流程中有过半节点拥护当前节点那么当前节点就是有效地leader#那么此leader当前的commitedIndex就是此请求对应的ReadIndex#即后面说的变量confirmIndex#对MsgReadIndex消息的处理流程如下#1用一个map acks保存所有节点对该ReadIndex的投票情况map的key是节点id#2发送MsgHeartbeat消息给所有节点#3收到一个MsgHeartbeatResp时不但要标记该节点x是活跃的#还要同时令acks[x]true即认为该节点是赞同当前leader和ReadIndex的if !raft.raft.committedEntryInCurrentTerm #如果当前leader在任期内还没有提交过日志#那么就直接挂起这个ReadIndex然后直接返回#因为在处理完一个ReadIndex时会同时唤醒所有index#在他之前的所有ReadIndex请求#所以这里可以安心挂起因为后续的ReadIndex会唤醒它#本文后面会解释append(r.pendingReadIndexMessages) #挂起即把请求放到一个pending数组然后直接返回不管这个请求了return raft.sendMsgReadIndexResponse #发送heartbeat消息给所有peer节点#两步1leader自己给自己投一票2发消息给followercase ReadOnlySafe: #ReadOnlySafe表示ReadIndex读#safe的含义是走一遍quorum投票只有过半节点赞成才会执行raft.readOnly.addRequest(r.raftLog.committed, m) #保存此刻的commitedIndex以及本次请求#此刻的commitedIndex就是本次请求对应的readIndex#MsgReadIndex消息的数据字段包含了本次ReadIndex的reqid#当ReadIndex处理完毕后那么保存的这个commitedIndex值#就是confirmIndexro.pendingReadIndex[s] readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)}#这里就是把req以及对应的投票信息保存到一个map中#keyreqid,value投票信息ro.readIndexQueue append(ro.readIndexQueue, s) #readIndexQueue保存了所有readindex读请求#并且请求是按顺序保存的所以当x满足readIndex条件时#队列中所有在x之前的请求必定满足readIndex条件raft.readOnly.recvAck(r.id, m.Entries[0].Data) #消息的Data字段实际就是ReadIndex对应的reqid,#r就代表本节点这里就是当前节点默认是投自己一票#只有leader才会走到这里follower会直接丢给leaderro:pendingReadIndex[reqid] #获取reqid对应的投票信息ro.acks[id]true #对于reqid对应的这个ReadIndexleader肯定是表示支持的#只有活跃节点才会放到这个map acks中#如果后续检测到这个map中有过半节点数#那么就认为reqid对应的ReadIndex被批准了#就可以通过chan来通知上层可以去数据库读数据了r.bcastHeartbeatWithCtx #广播heartbeat消息给所有peer节点 #follower节点对heartbeat消息的处理很简单#就简单返回本身的commitedIndex给leadercase ReadOnlyLeaseBased #ReadOnlyLeaseBased表示LeaseRead即副本读即采用了租约#当一个领导者被选举出来时它会获得一个租约#这个租约保证了在一定的时间窗口内#集群可以基于这个领导者提供一致性视图来处理只读请求#但是leader可能并不是真的leader所以租约读是不安全的//if rolefollower: #如果当前节点是follower则直接把请求丢给leaderstepFollowerswitch:case pb.MsgReadIndex:m.To r.leadr.send(m) ----------------------------------------another thread 3: #thread 3处理MsgHeartbetaResp消息,即follower发回来的响应#如果有过半节点承认reqid对应的ReadIndex就通知上层当前leader是有效的#reqid对应的ReadIndex请求可以结束等待了raft.node.runfor:case m : -n.recvc #peer节点发来的MsgHeartBeatRespraft.raft.Stepraft.stepLeadercase pb.MsgHeartbeatRes #对MsgHeartBeatResp的处理主要包括三步#1标记该节点x是近期活跃的#2标记reqid对应的acks[x]true#3如果过半则写chan来通知上层可以结束对该reqid对应的ReadIndex的等待了progress.RecentActivetrue #1标记该节点是近期活跃if pr.Match r.raftLog.lastIndex #follower节点会把自己的commitedIndex告知leader节点#此处发现follower节点落后了所以发送MsgApp通知他追赶 raft.raft.sendAppendraft.readOnly.recvAck(perrId,reqid) #2标记reqid.acks[perrId]true即该peer节点支持leaderquorum.JointConfig.VoteResult #3计算投票结果即看reqid对应的acks map中是否有过半数节点#前面说过acks map保存的是所有活跃的且承认当前节点是leader的节点#如果有则通知上层linearizableReadLoop reqid对应的读请求可以结束等待了#就是一个count看是否过半rssraft.readOnly.advance(m.Index)#从等待队列移除所有index在m.Index之前的所有pendingRequest,#会把满足要求的pendingRequest放到一个叫做rssd的数组中#我们前面把reqid和一个commitedIndex假设值为x绑定在一起#当x可以结束等待时那些commitedIndex小于x的ReadIndex请求肯定可以结束等待了raft.raft.responseToReadIndexReq #根据rss中的请求构造MsgReadIndexResp消息#这个MsgReadIndexResp消息中包含了reqid对应的ReadIndex值#即当时的commitedIndex值#如果消息来源是follower则把MsgReadIndexResp消息发给follower#follower再把该req放到readState中(readState用来保存当前已经批准的的读请求)#然后会把readState中的元素发到指定的r.readStateC#linearizableReadLoop 每次循环就是在等待这个readStateC#我们通过前面的步骤已经确定了该req对应的读请求所等待的commitedIndex值#因为客户端如果请求的是follower节点follower节点会把请求转发给follower#leader会把批准的ReadIndex值放到这个MsgReadIndexResp中(假设用变量confirmIndex表示)#这样后续当follower节点发现本机appliedIndexconfirmIndex时#就可以遍历readState中的所有读请求#凡是req.confirmIndexappliedIndex的读请求都可以解除阻塞#如果消息来源是leader自己一样的把他加到leader节点自己的readState数组#在node.run在下一次循环中会检测到readState不为空然后就触发case rd-r.Ready()if req.From None || req.From r.id { #如果是自己发给自己的r.readStates append(r.readStates, ReadState{ #直接把ReadState丢到leader自己的readStates中#后续会把readStates里的数据丢到readStatesCIndex: readIndex,RequestCtx: req.Entries[0].Data,})return pb.Message{}}return pb.Message{ #如果是follower发来的ReadIndex读请求Type: pb.MsgReadIndexResp, #那么就返回MsgReadIndexResp消息给followerTo: req.From,Index: readIndex, #readIndex是req创建时的leader的commitedIndexEntries: req.Entries,}if req.to!None:raft.raft.send(pb.MsgReadIndexResp) #把MsgReadIndexResp响应消息返回给follower----------------------------------------another thread 4: #上面已经把批准的ReadIndex请求放到readState了#然后readState不为空会被node Ready()检测到#然后raftnode.run会把readState最后一个元素发到指定的r.readStateC以激活下一步#在他之前的必定满足要求所以发送最后一个就行了raft.raftNode.runfor:selectcase rd : -r.Ready() if len(rd.ReadStates) ! 0 { case r.readStateC - rd.ReadStates[len(rd.ReadStates)-1] #发送最新的readState到指定chan激活相关线程#因为他是不断循环的只要readState不为空那么就会继续ready#继续处理直到为空----------------------------------------//这里又回到another thread 1 中的etcdserver.EtcdServer.linearizableReadLoop函数etcdserver.EtcdServer.linearizableReadLoopetcdserver.EtcdServer.linearizableReadLoop ...... #当requestCurrentIndex返回后就可以获取此刻得appliedIndexetcdserver.EtcdServer.getAppliedIndex #获取当前的appliedIndexif appliedIndexconfirmIndex #如果还没有apply到confirmIndex#即读请求到来时的有效commitedIndex值就继续等待case - wait.timelist.Wait(confirmIndex) #继续阻塞直到etcdserver.EtcdServer.applyAll线程#在完成一此apply操作后主动唤醒所有appliedIndex之前的读请求#这个Wait会创建一个chan#applyAll唤醒它时调用close(ch)来填充这个chan来结束阻塞etcdserver.notifier.notify #当属于同一个notifier的一批请求中的某个被批准的时候#会唤醒所有在等待这个oldnotifier的读请求#即会唤醒linearizableReadNotify goroute#一个请求对应一个linearizableReadNotify goroute#所以这里一次就会唤醒很多歌gorouteclose(oldnotifier chan) #close(chan)会唤醒所有等待这个chan的线程----------------------------------------//这里就回到linearizableReadNotify v3_server.EtcdServer.Range etcdserver.EtcdServer.doSerialize #serializeRead指直接从从bbolt数据库读取数据#而ReadIndex读则相当于在serializeRead之前增加了一个wait操作#直到appliedIndexcommitedIndex#线性读要求读最新数据这里就直接去数据库读了#doSerialize就是LeaseRead#当ReadIndex读请求被放行以后就执行LeaseRead#这个serialize就是调用txn.Range来读取#即serialize是一个读事务apply.applierV3backend.Rangeif txn nil: #如果事务为空txn a.s.kv.Read(mvcc.ConcurrentReadTxMode) #则创建对应的conncurentReadTx事务对象......backend.readTx.buf.unsafeCopy #conncurrentReadTx会复制一份readTx的readBuf#因为applierV3backend.Range会在txn中调用即op Range#也可能直接调用所以txn可能为空也可能不为空...revs.currentRev #本次读事务看到的版本号就是此刻etcd最新的版本号defer txn.End()metrics_txn.metricsTxnWrite.Range #doSerialize就是LeaseRead当ReadIndex读请求被放行以后就执行LeaseRead#这个Lease Read就是调用txn.Range来读取kvstore_txn.storeTxnRead.Rangekvstore_txn.storeTxnRead.rangeKeys(tr.Rev())#读取revision版本不超过tr.Rev的key即只能读到事务开始前就完成的key#笔记apply之后只是把writeTxn请求丢给底层的batchTxBufferd就返回了#然后readbuf此时是没有这些新数据的也就是此时读还是只能看到旧版本的数据#当batchTxBuffered提交以后就会更新readBuf并更新s.currentRev#此后的事务就能看到最新的版本号以及从readBuf读到最新的数据了revPairsindex.treeIndex.Revisions(key,end,atRev) #从treeIndex获取对应的revision#key表示要查找的key范围的起点,end表示key范围的终点,#atRev表示版本号即不会读取atRev之后的版本的数据#举个例子#key为a的数据有修改了三次对应三个版本号:rev13 rev26 rev310#假设事务开始时的rev8即atRev8#那么就只会读取该key的rev小于8的最新的数据即rev6if endnil: #endnil表示本次只读取一个keyindex.treeIndex.Get(key,atRev) keyi : keyIndex{key: key}index.treeIndex.keyIndex(keyi) #从treeIndex中获取key对应的keyIndex结构treeIndex是一棵b树#etcd所有key的keyIndex都会放在内存这也限制了etcd支持的数据集大小key_index.keyIndex.Get #从key对应的keyIndex中获取数据对应的revkey_index.keyIndex.findGeneration#先从keyIndex中获取generationkey_index.keyIndex.walk #一个generation中可能多次修改key所以generation可能含有多个版本号#这里就是选一个不超过且最靠近atRev的版本号包括rev包括(main,sub)#获取了(main,sub)后就唯一确定了bbolt中的一个数据for revPairs: #遍历获取的所有rev对即(main,sub)对revision.revToBytes(revpair, revBytes) #把(main,sub)转换成key即bbolt也是kv只不过keymain_subbinary.BigEndian.PutUint64(bytes, uint64(rev.main))bytes[8] _binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))readTx.baseReadTx.UnsafeRange #读取操作很简单1先尝试从readBUf读tx_buffer.txReadBuffer.Range #这里就是baseReadTx.buf.Range即从readBuf读取if int64(len(keys)) limit: #如果从readBuf读到了指定数量的数据那就直接返回否则就要去读bboltreturn keys, valsbatch_tx.unsafeRange #readBuf中没有再去bbolt读取#笔记读过的数据不会丢到readBuf中readBuf中保存的是最新commit的数据
stepFollower对leader发来的heartbeat消息的处理:
stepFollower:case pb.MsgHeartbeat: #leader发来msgHeartbeat消息mr.lead m.From #m.From表示leaderraft.raft.handleHeartbeat log.raftLog.commitTo(m.Commit) #follower尝试把自己的commitIndex提升到m.Commit#m.Commit表示leader此刻的commitIndexif l.committed tocommit { #如果follower的commitIndex a leader的commitIndex bif l.lastIndex() tocommit { #并且如果follower的最大日志索引还没有到达bl.logger.Panicf( #则说明follower出了问题可能是落后了 tocommit(%d) is out of range [lastIndex(%d)]. #这里打印日志然后直接杀掉followerWas the raft log corrupted, truncated, or lost?, #估计会有recover重启follower吧tocommit, l.lastIndex())}l.committed tocommit #否则把follower.commitIndex设置为leader的commitIndex b}raft.raft.send(pb.Message{ #发回对leaderHeartbeat的响应To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})case pb.MsgReadIndex: #如果当前节点是follower并且收到ReadIndex读请求 r.send(m) #那么就直接把请求转发给leadercase pb.MsgReadIndexResp: #如果收到leader发来的对readIndex读请求的响应#注意leader会等待readIndex读请求条件满足时#才会发送响应MsgReadIndexResp给followerr.readStates append(r.readStates, #follower收到后就把对应的数据丢到readStates中ReadState{ #然后就会激活follower上阻塞的readIndex读请求Index: m.Index, #后面就是serilizable读了RequestCtx: m.Entries[0].Data})