营销型网站可以吗,android手机开发,陕西省住房和城乡建设厅网站上怎么打印证书,兰州东方商易文化传播有限责任公司文章目录 前言一、项目大纲二、Raft模块1.Raft介绍2.大致内容Leader与选举日志同步、心跳raft日志的两个特点 3.主要流程1. raft类的定义关键函数m_nextIndex 和 m_matchIndex作用 2.启动初始化3.竞选leaderelectionTimeOutTicker:doElectionsendRequestVoteRequestVote 4.日志… 文章目录 前言一、项目大纲二、Raft模块1.Raft介绍2.大致内容Leader与选举日志同步、心跳raft日志的两个特点 3.主要流程1. raft类的定义关键函数m_nextIndex 和 m_matchIndex作用 2.启动初始化3.竞选leaderelectionTimeOutTicker:doElectionsendRequestVoteRequestVote 4.日志复制、心跳leaderHearBeatTickerdoHeartBeatsendAppendEntriesAppendEntries 5.snapshot快照快照是什么何时创建快照快照的传输 三.持久化1.持久化介绍1.raft节点的部分信息2.kvDb的快照 2.为什么要持久化这些内容3.什么时候持久化4.谁来调用持久化5.具体怎么实现持久化/使用哪个函数持久化 四.kvServer1.kvServer介绍2.kvServer怎么和上层kvDB沟通怎么和下层raft节点沟通3.kvServer怎么处理外部请求接收与响应外部请求 待续 前言
构建一种基于Raft一致性算法的分布式键值存储数据库以确保数据的一致性、可用性和分区容错性
一、项目大纲 raft节点raft算法实现的核心层负责与其他机器的raft节点沟通达到 分布式共识 的目的。raftServer负责raft节点与k-v数据库中间的协调服务负责持久化k-v数据库的数据可选。上层状态机k-v数据库负责数据存储。持久层负责相关数据的落盘对于raft节点根据共识算法要求必须对一些关键数据进行落盘处理以保证节点宕机后重启程序可以恢复关键数据对于raftServer可能会有一些k-v数据库的东西需要落盘持久化。RPC通信在 领导者选举、日志复制、数据查询、心跳等多个Raft重要过程中提供多节点快速简单的通信能力。
二、Raft模块
1.Raft介绍
参考 文章: 两万字长文解析raft算法原理 视频: 解析分布式共识算法之Raft算法
本项目用Raft解决的问题
一致性 通过Raft算法确保数据的强一致性使得系统在正常和异常情况下都能够提供一致的数据视图。可用性 通过分布式节点的复制和自动故障转移实现高可用性即使在部分节点故障的情况下系统依然能够提供服务。分区容错 处理网络分区的情况确保系统在分区恢复后能够自动合并数据一致性。
2.大致内容
详细的看上面参考网站
Leader与选举
Raft是一个强Leader 模型可以粗暴理解成Leader负责统领follower如果Leader出现故障那么整个集群都会对外停止服务直到选举出下一个Leader。
节点之间通过网络通信其他节点follower如何知道leader出现故障 leader会定时向集群中剩下的节点follower发送AppendEntry作为心跳hearbeat 以通知自己仍然存活。 可以推知如果follower在一段时间内没有接收leader发送的AppendEntry那么follower就会认为当前的leader 出现故障从而发起选举。 判断心跳超时可以用一个定时器和一个标志位来实现每到定时时间检查这期间有无AppendEntry 即可。AppendEntry作用 心跳携带日志entry及其辅助信息以控制日志的同步和日志向状态机提交通告leader的index和term等关键信息以便follower对比确认follower自己或者leader是否过期 follower知道leader出现故障后如何选举出leader follower认为leader故障后只能通过term增加变成candidate向其他节点发起RequestVoteRPC申请其他follower的选票过一段时间之后会发生如下情况 赢得选举马上成为leader 此时term已经增加了发现有符合要求的leader自己马上变成follower 了这个符合要求包括leader的term≥自己的term一轮选举结束无人变成leader那么循环这个过程 为了防止在同一时间有太多的follower转变为candidate导致一直无法选出leader Raft 采用了随机选举超时randomized election timeouts的机制 每一个candidate 在发起选举后都会随机化一个新的选举超时时间。符合什么条件的节点可以成为leader 也可以称为“选举限制”有限制的目的是为了保证选举出的 leader 一定包含了整个集群中目前已 committed 的所有日志。 当 candidate 发送 RequestVoteRPC 时会带上最后一个 entry 的信息。 所有的节点收到该请求后都会比对自己的日志如果发现自己的日志更新一些则会拒绝投票给该 candidate。 需要比较两个东西最新日志entry的term和对应的index。index为日志entry在整个日志的索引。
if 两个节点最新日志entry的term不同term大的日志更新
else最新日志entry的index大的更新
end这样的限制可以保证成为leader的节点其日志已经是多数节点中最完备的即包含了整个集群的所有 committed entries。
日志同步、心跳
在RPC中 日志同步 和 心跳 是放在一个RPC函数AppendEntryRPC中来实现的原因为
心跳RPC 可以看成是没有携带日志的特殊的日志同步RPC。对于一个follower如果leader认为其日志已经和自己匹配了那么在AppendEntryRPC中不用携带日志再携带日志属于无效信息了但其他信息依然要携带反之如果follower的日志只有部分匹配那么就需要在AppendEntryRPC中携带对应的日志。
为什么不直接让follower拷贝leader的日志 或者 leader发送全部的日志给follower leader发送日志的目的是让follower同步自己的日志当然可以让leader发送自己全部的日志给follower然后follower接收后就覆盖自己原有的日志但是这样就会携带大量的无效的日志因为这些日志follower本身就有。 因此raft的方式是先找到日志不匹配的那个点然后只同步那个点之后的日志。leader如何知道follower的日志是否与自己完全匹配 在AppendEntryRPC中携带上 entry的index和对应的term日志的term可以通过比较最后一个日志的index和term来得出某个follower日志是否匹配。如果发现不匹配那么如何知道哪部分日志是匹配的哪部分日志是不匹配的呢 leader每次发送AppendEntryRPC后follower都会根据其entry的index和对应的term来判断某一个日志是否匹配。 在leader刚当选会从最后一个日志开始判断是否匹配如果匹配那么后续发送AppendEntryRPC就不需要携带日志entry了。 如果不匹配那么下一次就发送 倒数第2个 日志entry的index和其对应的term来判断匹配 如果还不匹配那么依旧重复这个过程直到遇到一个匹配的日志。
raft日志的两个特点
两个节点的日志中有两个 entry 拥有相同的 index 和 term那么它们一定记录了相同的内容/操作即两个日志匹配两个节点的日志中有两个 entry 拥有相同的 index 和 term那么它们前面的日志entry也相同如何保证
保证第一点仅有 leader 可以生成 entry保证一致性保证第二点leader 在通过 AppendEntriesRPC 和 follower 通讯时除了带上自己的term等信息外还会带上entry的index和对应的term等信息follower在接收到后通过对比就可以知道自己与leader的日志是否匹配不匹配则拒绝请求。 leader发现follower拒绝后就知道entry不匹配那么下一次就会尝试匹配前一个entry直到遇到一个entry匹配并将不匹配的entry给删除覆盖。
3.主要流程
1. raft类的定义
class Raft :
{
private:std::mutex m_mtx;std::vectorstd::shared_ptr RaftRpc m_peers; //需要与其他raft节点通信这里保存与其他结点通信的rpc入口std::shared_ptrPersister m_persister; //持久化层负责raft数据的持久化int m_me; //raft是以集群启动这个用来标识自己的的编号int m_currentTerm; //记录当前的termint m_votedFor; //记录当前term给谁投票过std::vectormprrpc:: LogEntry m_logs; 日志条目数组包含了状态机要执行的指令集以及收到领导时的任期号// 这两个状态所有结点都在维护易失int m_commitIndex;int m_lastApplied; // 已经汇报给状态机上层应用的log 的index// 这两个状态是由leader来维护易失 这两个部分在内容补充的部分也会再讲解// 这两个状态的下标1开始因为通常commitIndex和lastApplied从0开始应该是一个无效的index因此下标从1开始std::vectorint m_nextIndex; //领导者使用 m_nextIndex 来确定需要发送给追随者的下一批日志条目。std::vectorint m_matchIndex; //追随者使用 m_matchIndex 来记录已经成功复制的日志条目并向领导者发送确认信息。enum Status{Follower,Candidate,Leader};// 保存当前身份Status m_status;std::shared_ptrLockQueueApplyMsg applyChan; // client从这里取日志client与raft通信的接口// ApplyMsgQueue chan ApplyMsg // raft内部使用的chanapplyChan是用于和服务层交互最后好像没用上// 选举超时std::chrono::_V2::system_clock::time_point m_lastResetElectionTime;// 心跳超时用于leaderstd::chrono::_V2::system_clock::time_point m_lastResetHearBeatTime;// 用于传入快照点// 储存了快照中的最后一个日志的Index和Termint m_lastSnapshotIncludeIndex;int m_lastSnapshotIncludeTerm;public:void AppendEntries1(const mprrpc::AppendEntriesArgs *args, mprrpc::AppendEntriesReply *reply); //日志同步 心跳 rpc 重点关注void applierTicker(); //定期向状态机写入日志非重点函数bool CondInstallSnapshot(int lastIncludedTerm, int lastIncludedIndex, std::string snapshot); //快照相关非重点void doElection(); //发起选举void doHeartBeat(); //leader定时发起心跳// 每隔一段时间检查睡眠时间内有没有重置定时器没有则说明超时了
// 如果有则设置合适睡眠时间睡眠到重置时间超时时间void electionTimeOutTicker(); //监控是否该发起选举了std::vectorApplyMsg getApplyLogs();int getNewCommandIndex();void getPrevLogInfo(int server, int *preIndex, int *preTerm);void GetState(int *term, bool *isLeader); //看当前节点是否是leadervoid InstallSnapshot( const mprrpc::InstallSnapshotRequest *args, mprrpc::InstallSnapshotResponse *reply); void leaderHearBeatTicker(); //检查是否需要发起心跳leadervoid leaderSendSnapShot(int server); void leaderUpdateCommitIndex(); //leader更新commitIndexbool matchLog(int logIndex, int logTerm); //对应Index的日志是否匹配只需要Index和Term就可以知道是否匹配void persist(); //持久化void RequestVote(const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply); //变成candidate之后需要让其他结点给自己投票bool UpToDate(int index, int term); //判断当前节点是否含有最新的日志int getLastLogIndex();void getLastLogIndexAndTerm(int *lastLogIndex, int *lastLogTerm);int getLogTermFromLogIndex(int logIndex);int GetRaftStateSize();int getSlicesIndexFromLogIndex(int logIndex); //设计快照之后logIndex不能与在日志中的数组下标相等了根据logIndex找到其在日志数组中的位置bool sendRequestVote(int server , std::shared_ptrmprrpc::RequestVoteArgs args , std::shared_ptrmprrpc::RequestVoteReply reply, std::shared_ptrint votedNum) ; // 请求其他结点的投票bool sendAppendEntries(int server ,std::shared_ptrmprrpc::AppendEntriesArgs args , std::shared_ptrmprrpc::AppendEntriesReply reply , std::shared_ptrint appendNums ) ; //Leader发送心跳后对心跳的回复进行对应的处理//rf.applyChan - msg //不拿锁执行 可以单独创建一个线程执行但是为了同意使用std:thread 避免使用pthread_create因此专门写一个函数来执行void pushMsgToKvServer(ApplyMsg msg); //给上层的kvserver层发送消息void readPersist(std::string data); std::string persistData();void Start(Op command,int* newLogIndex,int* newLogTerm,bool* isLeader ) ; // 发布发来一个新日志
// 即kv-server主动发起请求raft持久层保存snapshot里面的数据index是用来表示snapshot快照执行到了哪条命令void Snapshot(int index , std::string snapshot );public:void init(std::vectorstd::shared_ptr RaftRpc peers,int me,std::shared_ptrPersister persister,std::shared_ptrLockQueueApplyMsg applyCh); //初始化
关键函数
Raft的主要流程
领导选举sendRequestVote RequestVote 日志同步、心跳sendAppendEntries AppendEntries
定时器的维护
Raft向状态机定时写入applierTicker 心跳维护定时器leaderHearBeatTicker 选举超时定时器electionTimeOutTicker
持久化相关
哪些内容需要持久化什么时候需要持久化persist
m_nextIndex 和 m_matchIndex作用
m_nextIndex 保存leader下一次应该从哪一个日志开始发送给follower m_matchIndex表示follower在哪一个日志是已经匹配了的由于日志安全性某一个日志匹配那么这个日志及其之前的日志都是匹配的
一个比较容易弄错的问题是m_nextIndex 与m_matchIndex 是否有冗余即使用一个m_nextIndex 可以吗
显然是不行的m_nextIndex 的作用是用来寻找m_matchIndex 不能直接取代。我们可以从这两个变量的变化看在当选leader后m_nextIndex 初始化为最新日志indexm_matchIndex 初始化为0如果日志不匹配那么m_nextIndex 就会不断的缩减直到遇到匹配的日志这时候m_nextIndex 应该一直为m_matchIndex1 。
如果一直不发生故障那么后期m_nextIndex就没有太大作用了但是raft考虑需要考虑故障的情况因此需要使用两个变量。
2.启动初始化
void Raft::init(std::vectorstd::shared_ptrRaftRpc peers, int me, std::shared_ptrPersister persister, std::shared_ptrLockQueueApplyMsg applyCh) {m_peers peers; //与其他结点沟通的rpc类m_persister persister; //持久化类m_me me; //标记自己毕竟不能给自己发送rpc吧m_mtx.lock();//applierthis-applyChan applyCh; //与kv-server沟通
// rf.ApplyMsgQueue make(chan ApplyMsg)m_currentTerm 0; //初始化term为0m_status Follower; //初始化身份为followerm_commitIndex 0; m_lastApplied 0;m_logs.clear();for (int i 0;im_peers.size();i){m_matchIndex.push_back(0);m_nextIndex.push_back(0);}m_votedFor -1; //当前term没有给其他人投过票就用-1表示m_lastSnapshotIncludeIndex 0;m_lastSnapshotIncludeTerm 0;m_lastResetElectionTime now();m_lastResetHearBeatTime now();// initialize from state persisted before a crashreadPersist(m_persister-ReadRaftState());if(m_lastSnapshotIncludeIndex 0){m_lastApplied m_lastSnapshotIncludeIndex;//rf.commitIndex rf.lastSnapshotIncludeIndex 崩溃恢复不能读取commitIndex}m_mtx.unlock();// start ticker 开始三个定时器std::thread t(Raft::leaderHearBeatTicker, this);t.detach();std::thread t2(Raft::electionTimeOutTicker, this);t2.detach();std::thread t3(Raft::applierTicker, this);t3.detach();
}从上面可以看到一共产生了三个定时器分别维护选举、日志同步和心跳、raft节点与kv-server的联系。相互之间是比较隔离的
3.竞选leader
在Raft算法中每个节点无论是追随者follower还是候选人candidate都有一个选举定时器。如果追随者在一定的时间内没有收到任何来自领导者或候选人的消息它会认为当前没有有效的领导者然后启动选举定时器。一旦定时器到期追随者会转换为候选人状态并开始新一轮的领导者选举。
electionTimeOutTicker负责查看是否该发起选举如果该发起选举就执行doElection发起选举。doElection实际发起选举构造需要发送的rpc并多线程调用sendRequestVote处理rpc及其相应。sendRequestVote负责发送选举中的RPC在发送完rpc后还需要负责接收并处理对端发送回来的响应。RequestVote接收别人发来的选举请求主要检验是否要给对方投票。
electionTimeOutTicker:
选举定时器负责查看是否该发起选举如果该发起选举就执行doElection发起选举。
void Raft::electionTimeOutTicker() {// Check if a Leader election should be started.while (true) {m_mtx.lock();auto nowTime now(); //睡眠前记录时间auto suitableSleepTime getRandomizedElectionTimeout() m_lastResetElectionTime - nowTime;m_mtx.unlock();if (suitableSleepTime.count() 1) {std::this_thread::sleep_for(suitableSleepTime);}if ((m_lastResetElectionTime - nowTime).count() 0) { //说明睡眠的这段时间有重置定时器那么就没有超时再次睡眠continue;}doElection();}
}在死循环中 首先计算距离上次重置选举计时器的时间m_lastResetElectionTime - nowTime加上随机化的选举超时时间getRandomizedElectionTimeout 计算得到距离下一次超时应该睡眠的时间suitableSleepTime然后线程根据这个时间决定是否睡眠。 若超时时间未到线程进入睡眠状态若在此期间选举计时器被重置则继续循环。 若超时时间已到调用doElection() 函数启动领导者选举过程。
随机化的选举超时时间是为了避免多个追随者几乎同时成为候选人导致选举失败
doElection
实际发起选举构造需要发送的rpc并多线程调用sendRequestVote处理rpc及其相应。
void Raft::doElection() {lock_guardmutex g(m_mtx); //c11新特性使用raii避免死锁if (m_status ! Leader) {DPrintf([ ticker-func-rf(%d) ] 选举定时器到期且不是leader开始选举 \n, m_me);//当选举的时候定时器超时就必须重新选举不然没有选票就会一直卡住//重竞选超时term也会增加的m_status Candidate;///开始新一轮的选举m_currentTerm 1; //无论是刚开始竞选还是超时重新竞选term都要增加m_votedFor m_me; //即是自己给自己投票也避免candidate给同辈的candidate投persist(); std::shared_ptrint votedNum std::make_sharedint(1); // 使用 make_shared 函数初始化 // 重新设置定时器m_lastResetElectionTime now();// 发布RequestVote RPCfor (int i 0; i m_peers.size(); i) {if (i m_me) {continue;}int lastLogIndex -1, lastLogTerm -1;getLastLogIndexAndTerm(lastLogIndex, lastLogTerm);//获取最后一个log的term和下标以添加到RPC的发送//初始化发送参数std::shared_ptrmprrpc::RequestVoteArgs requestVoteArgs std::make_sharedmprrpc::RequestVoteArgs();requestVoteArgs-set_term(m_currentTerm);requestVoteArgs-set_candidateid(m_me);requestVoteArgs-set_lastlogindex(lastLogIndex);requestVoteArgs-set_lastlogterm(lastLogTerm);std::shared_ptrmprrpc::RequestVoteReply requestVoteReply std::make_sharedmprrpc::RequestVoteReply();//使用匿名函数执行避免其拿到锁std::thread t(Raft::sendRequestVote, this, i, requestVoteArgs, requestVoteReply,votedNum); // 创建新线程并执行函数并传递参数t.detach();}}
}sendRequestVote
负责发送选举中的RPC在发送完rpc后还需要根据调用RequestVote得到的reply响应结果,负责接收并处理对端发送回来的响应对发起投票的候选者状态进行更新
bool Raft::sendRequestVote(int server, std::shared_ptrmprrpc::RequestVoteArgs args, std::shared_ptrmprrpc::RequestVoteReply reply,std::shared_ptrint votedNum) {bool ok m_peers[server]-RequestVote(args.get(),reply.get());if (!ok) {return ok;//rpc通信失败就立即返回避免资源消耗}lock_guardmutex lg(m_mtx);if(reply-term() m_currentTerm){//回复的term比自己大说明自己落后了那么就更新自己的状态并且退出m_status Follower; //三变身份term和投票m_currentTerm reply-term();m_votedFor -1; //term更新了那么这个term自己肯定没投过票为-1persist(); //持久化return true;} else if ( reply-term() m_currentTerm ) {//回复的term比自己的term小不应该出现这种情况return true;}if(!reply-votegranted()){ //这个节点因为某些原因没给自己投票没啥好说的结束本函数return true;}//给自己投票了*votedNum *votedNum 1; //voteNum多一个if (*votedNum m_peers.size()/21) {//变成leader*votedNum 0; //重置voteDNum如果不重置那么就会变成leader很多次是没有必要的甚至是错误的// 第一次变成leader初始化状态和nextIndex、matchIndexm_status Leader;int lastLogIndex getLastLogIndex();for (int i 0; i m_nextIndex.size() ; i) {m_nextIndex[i] lastLogIndex 1 ;//有效下标从1开始因此要1m_matchIndex[i] 0; //每换一个领导都是从0开始见论文的fig2}std::thread t(Raft::doHeartBeat, this); //马上向其他节点宣告自己就是leadert.detach();persist(); }return true;
}RequestVote
得到投票请求后节点根据传递来的信息进行判断是否对其投票构造reply返回值
void Raft::RequestVote( const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply) {lock_guardmutex lg(m_mtx);Defer ec1([this]() - void { //应该先持久化再撤销lock因此这个写在lock后面this-persist();});//对args的term的三种情况分别进行处理大于小于等于自己的term都是不同的处理//reason: 出现网络分区该竞选者已经OutOfDate(过时if (args-term() m_currentTerm) {reply-set_term(m_currentTerm);reply-set_votestate(Expire);reply-set_votegranted(false);return;}//论文fig2:右下角如果任何时候rpc请求或者响应的term大于自己的term更新term并变成followerif (args-term() m_currentTerm) {m_status Follower;m_currentTerm args-term();m_votedFor -1;// 重置定时器收到leader的ae开始选举透出票//这时候更新了term之后votedFor也要置为-1}// 现在节点任期都是相同的(任期小的也已经更新到新的args的term了)// 要检查log的term和index是不是匹配的了int lastLogTerm getLastLogIndex();//只有没投票且candidate的日志的新的程度 ≥ 接受者的日志新的程度 才会授票if (!UpToDate(args-lastlogindex(), args-lastlogterm())) {//日志太旧了reply-set_term(m_currentTerm);reply-set_votestate(Voted);reply-set_votegranted(false);return;}// 当因为网络质量不好导致的请求丢失重发就有可能
// 因此需要避免重复投票if (m_votedFor ! -1 m_votedFor ! args-candidateid()) {reply-set_term(m_currentTerm);reply-set_votestate(Voted);reply-set_votegranted(false);return;} else {//同意投票m_votedFor args-candidateid();m_lastResetElectionTime now();//认为必须要在投出票的时候才重置定时器reply-set_term(m_currentTerm);reply-set_votestate(Normal);reply-set_votegranted(true);return;}
}4.日志复制、心跳 leaderHearBeatTicker负责查看是否该发送心跳了如果该发起就执行doHeartBeat。doHeartBeat实际发送心跳判断到底是构造需要发送的rpc并多线程调用sendRequestVote处理rpc及其响应。sendAppendEntries负责发送日志的RPC在发送完rpc后还需要负责接收并处理对端发送回来的响应。leaderSendSnapShot负责发送快照的RPC在发送完rpc后还需要负责接收并处理对端发送回来的响应。AppendEntries接收leader发来的日志请求主要检验用于检查当前日志是否匹配并同步leader的日志到本机。InstallSnapshot接收leader发来的快照请求同步快照到本机。
leaderHearBeatTicker
心跳定时器负责查看是否该发送心跳了如果该发起就执行doHeartBeat。
void Raft::leaderHearBeatTicker() {while (true) {auto nowTime now();m_mtx.lock();auto suitableSleepTime std::chrono::milliseconds(HeartBeatTimeout) m_lastResetHearBeatTime - nowTime;m_mtx.unlock();if (suitableSleepTime.count() 1) {suitableSleepTime std::chrono::milliseconds(1);}std::this_thread::sleep_for(suitableSleepTime);if ((m_lastResetHearBeatTime - nowTime).count() 0) { //说明睡眠的这段时间有重置定时器那么就没有超时再次睡眠continue;}doHeartBeat();}
}其基本逻辑和选举定时器electionTimeOutTicker一模一样 不一样之处在于设置的休眠时间不同这里是根据HeartBeatTimeout来设置固定时间。 而electionTimeOutTicker中是根据getRandomizedElectionTimeout() 设置随机一个时间。
doHeartBeat
实际发送心跳判断是Leader则构造需要发送的rpc并多线程调用sendRequestVote处理rpc及其响应。
void Raft::doHeartBeat() {std::lock_guardmutex g(m_mtx);if (m_status Leader) {auto appendNums std::make_sharedint(1); //正确返回的节点的数量//对Follower除了自己外的所有节点发送AEfor (int i 0; i m_peers.size(); i) {if(i m_me){ //不对自己发送AEcontinue;}//日志压缩加入后要判断是发送快照还是发送AEif (m_nextIndex[i] m_lastSnapshotIncludeIndex) {//应该发送的日志已经被压缩成快照必须发送快照了std::thread t(Raft::leaderSendSnapShot, this, i); t.detach();continue;}//发送心跳构造发送值int preLogIndex -1;int PrevLogTerm -1;getPrevLogInfo(i, preLogIndex, PrevLogTerm); //获取本次发送的一系列日志的上一条日志的信息以判断是否匹配std::shared_ptrmprrpc::AppendEntriesArgs appendEntriesArgs std::make_sharedmprrpc::AppendEntriesArgs();appendEntriesArgs-set_term(m_currentTerm);appendEntriesArgs-set_leaderid(m_me);appendEntriesArgs-set_prevlogindex(preLogIndex);appendEntriesArgs-set_prevlogterm(PrevLogTerm);appendEntriesArgs-clear_entries();appendEntriesArgs-set_leadercommit(m_commitIndex);// 作用是携带上prelogIndex的下一条日志及其之后的所有日志//leader对每个节点发送的日志长短不一但是都保证从prevIndex发送直到最后if (preLogIndex ! m_lastSnapshotIncludeIndex) {for (int j getSlicesIndexFromLogIndex(preLogIndex) 1; j m_logs.size(); j) {mprrpc::LogEntry *sendEntryPtr appendEntriesArgs-add_entries();*sendEntryPtr m_logs[j]; }} else {for (const auto item: m_logs) {mprrpc::LogEntry *sendEntryPtr appendEntriesArgs-add_entries();*sendEntryPtr item; }}int lastLogIndex getLastLogIndex();//初始化返回值const std::shared_ptrmprrpc::AppendEntriesReply appendEntriesReply std::make_sharedmprrpc::AppendEntriesReply();std::thread t(Raft::sendAppendEntries, this, i, appendEntriesArgs, appendEntriesReply,appendNums); // 创建新线程并执行b函数并传递参数t.detach();}m_lastResetHearBeatTime now(); //leader发送心跳重置心跳时间}
}sendAppendEntries
负责发送日志的RPC在发送完rpc后还需要负责接收并处理对端发送回来的响应。
bool
Raft::sendAppendEntries(int server, std::shared_ptrmprrpc::AppendEntriesArgs args, std::shared_ptrmprrpc::AppendEntriesReply reply,std::shared_ptrint appendNums) {// todo paper中5.3节第一段末尾提到如果append失败应该不断的retries ,直到这个log成功的被storebool ok m_peers[server]-AppendEntries(args.get(), reply.get());if (!ok) {return ok;}lock_guardmutex lg1(m_mtx);//对reply进行处理// 对于rpc通信无论什么时候都要检查termif(reply-term() m_currentTerm){m_status Follower;m_currentTerm reply-term();m_votedFor -1;return ok;} else if (reply-term() m_currentTerm) {//正常不会发生return ok;}if (m_status ! Leader) { //如果不是leader那么就不要对返回的情况进行处理了return ok;}//term相等if (!reply-success()){//日志不匹配正常来说就是index要往前-1既然能到这里第一个日志idnex 1发送后肯定是匹配的因此不用考虑变成负数//因为真正的环境不会知道是服务器宕机还是发生网络分区了if (reply-updatenextindex() ! -100) { //-100只是一个特殊标记而已没有太具体的含义这里表示任期落后了// 优化日志匹配让follower决定到底应该下一次从哪一个开始尝试发送m_nextIndex[server] reply-updatenextindex(); }// 如果感觉rf.nextIndex数组是冗余的看下论文fig2其实不是冗余的} else {*appendNums *appendNums 1; //到这里代表同意接收了本次心跳或者日志m_matchIndex[server] std::max(m_matchIndex[server],args-prevlogindex()args-entries_size() ); //同意了日志就更新对应的m_matchIndex和m_nextIndexm_nextIndex[server] m_matchIndex[server]1;int lastLogIndex getLastLogIndex();if (*appendNums 1 m_peers.size()/2) { //可以commit了//两种方法保证幂等性1.赋值为0 2.上面≥改为*appendNums 0; //置0//日志的安全性保证 leader只有在当前term有日志提交的时候才更新commitIndex因为raft无法保证之前term的Index是否提交//只有当前term有日志提交之前term的log才可以被提交只有这样才能保证“领导人完备性{当选领导人的节点拥有之前被提交的所有log当然也可能有一些没有被提交的}”//说白了就是只有当前term有日志提交才会提交if(args-entries_size() 0 args-entries(args-entries_size()-1).logterm() m_currentTerm){m_commitIndex std::max(m_commitIndex,args-prevlogindex() args-entries_size());}}}return ok;
}m_nextIndex[server] reply-updatenextindex(); 中涉及日志寻找匹配加速的优化 对于leader只有在当前term有日志提交的时候才更新commitIndex这个安全性保证详情看参考公众号文章的7.6是否一项提议只需要被多数派通过就可以提交
AppendEntries
接收leader发来的日志请求主要检验用于检查当前日志是否匹配并同步leader的日志到本机。
void Raft::AppendEntries1(const mprrpc:: AppendEntriesArgs *args, mprrpc::AppendEntriesReply *reply) {std::lock_guardstd::mutex locker(m_mtx);// 不同的人收到AppendEntries的反应是不同的要注意无论什么时候收到rpc请求和响应都要检查termif (args-term() m_currentTerm) {reply-set_success(false);reply-set_term(m_currentTerm);reply-set_updatenextindex(-100); // 论文中让领导人可以及时更新自己DPrintf([func-AppendEntries-rf{%d}] 拒绝了 因为Leader{%d}的term{%v} rf{%d}.term{%d}\n, m_me, args-leaderid(),args-term() , m_me, m_currentTerm) ;return; // 注意从过期的领导人收到消息不要重设定时器}Defer ec1([this]() - void { this-persist(); });//由于这个局部变量创建在锁之后因此执行persist的时候应该也是拿到锁的. //本质上就是使用raii的思想让persist()函数执行完之后再执行if (args-term() m_currentTerm) {// 三变 ,防止遗漏无论什么时候都是三变m_status Follower;m_currentTerm args-term();m_votedFor -1; // 这里设置成-1有意义如果突然宕机然后上线理论上是可以投票的// 这里可不返回应该改成让改节点尝试接收日志// 如果是领导人和candidate突然转到Follower好像也不用其他操作// 如果本来就是Follower那么其term变化相当于“不言自明”的换了追随的对象因为原来的leader的term更小是不会再接收其消息了}// 如果发生网络分区那么candidate可能会收到同一个term的leader的消息要转变为Follower为了和上面因此直接写m_status Follower; // 这里是有必要的因为如果candidate收到同一个term的leader的AE需要变成follower// term相等m_lastResetElectionTime now(); //重置选举超时定时器// 不能无脑的从prevlogIndex开始阶段日志因为rpc可能会延迟导致发过来的log是很久之前的// 那么就比较日志日志有3种情况if (args-prevlogindex() getLastLogIndex()) { //追随者的日志比领导者的要短。这种情况追随者需要从领导者那里接收缺失的日志条目。reply-set_success(false);reply-set_term(m_currentTerm);reply-set_updatenextindex(getLastLogIndex() 1);return;} else if (args-prevlogindex() m_lastSnapshotIncludeIndex) { // 如果prevlogIndex还没有更上快照//追随者可能已经通过快照机制截断了其日志 或 追随者接受了一个快照丢弃了快照索引之前的日志 或 领导者的日志落后reply-set_success(false);reply-set_term(m_currentTerm);reply-set_updatenextindex(m_lastSnapshotIncludeIndex 1); //不会浪费时间重试发送追随者已经用快照截断的日志条目}// 本机日志有那么长冲突(same index,different term),截断日志// 注意这里目前当args.PrevLogIndex rf.lastSnapshotIncludeIndex与不等的时候要分开考虑可以看看能不能优化这块if (matchLog(args-prevlogindex(), args-prevlogterm())) {//日志匹配那么就复制日志for (int i 0; i args-entries_size(); i) {auto log args-entries(i);if (log.logindex() getLastLogIndex()) { //超过就直接添加日志m_logs.push_back(log);} else { //没超过就比较是否匹配不匹配再更新而不是直接截断 检查当前日志条目是否已经存在于追随者的日志中。//判断追随者日志中相应索引位置的日志条目的任期是否与请求中的日志条目的任期相同。如果任期不同说明日志不匹配。//参考前面公众号文章 4.1 写 case3if (m_logs[getSlicesIndexFromLogIndex(log.logindex())].logterm() ! log.logterm()) { //不匹配就更新m_logs[getSlicesIndexFromLogIndex(log.logindex())] log;}}}if (args-leadercommit() m_commitIndex) {m_commitIndex std::min(args-leadercommit(), getLastLogIndex());// 这个地方不能无脑跟上getLastLogIndex()因为可能存在args-leadercommit()落后于 getLastLogIndex()的情况}// 领导会一次发送完所有的日志reply-set_success(true);reply-set_term(m_currentTerm);return;} else {// 不匹配不匹配不是一个一个往前而是有优化加速// PrevLogIndex 长度合适但是不匹配因此往前寻找 矛盾的term的第一个元素// 为什么该term的日志都是矛盾的呢也不一定都是矛盾的只是这么优化减少rpc而已// 什么时候term会矛盾呢很多情况比如leader接收了日志之后马上就崩溃等等reply-set_updatenextindex(args-prevlogindex());for (int index args-prevlogindex(); index m_lastSnapshotIncludeIndex; --index) {if (getLogTermFromLogIndex(index) ! getLogTermFromLogIndex(args-prevlogindex())) {reply-set_updatenextindex(index 1);break;}}reply-set_success(false);reply-set_term(m_currentTerm);return;}}日志寻找匹配加速 这部分在AppendEntries函数最后部分。
// 不匹配不匹配不是一个一个往前而是有优化加速
// PrevLogIndex 长度合适但是不匹配因此往前寻找 矛盾的term的第一个元素
// 为什么该term的日志都是矛盾的呢也不一定都是矛盾的只是这么优化减少rpc而已
// 什么时候term会矛盾呢很多情况比如leader接收了日志之后马上就崩溃等等
reply-set_updatenextindex(args-prevlogindex());for (int index args-prevlogindex(); index m_lastSnapshotIncludeIndex; --index) {if (getLogTermFromLogIndex(index) ! getLogTermFromLogIndex(args-prevlogindex())) {reply-set_updatenextindex(index 1);break;}
}reply-set_success(false);
reply-set_term(m_currentTerm);return;之前说过如果日志不匹配的话可以一个一个往前的倒退。但是这样的话可能会设计很多个rpc之后才能找到匹配的日志那么就一次多倒退几个数。 倒退几个呢这里认为如果某一个日志不匹配那么这一个日志所在的term的所有日志大概率都不匹配那么就倒退到 最后一个日志所在的term的最后那个命令。
5.snapshot快照
快照是什么
当在Raft协议中的日志变得太大时为了避免无限制地增长系统可能会采取快照snapshot的方式来压缩日志。快照是系统状态的一种紧凑表示形式包含在某个特定时间点的所有必要信息以便在需要时能够还原整个系统状态。
如果你学习过redis那么快照说白了就是rdb而raft的日志可以看成是aof日志。rdb的目的只是为了崩溃恢复的加速如果没有的话也不会影响系统的正确性这也是为什么选择不详细讲解快照的原因因为只是日志的压缩而已。
何时创建快照
快照通常在日志达到一定大小时创建。这有助于限制日志的大小防止无限制的增长。快照也可以在系统空闲时没有新的日志条目被追加创建。
快照的传输
快照的传输主要涉及kv数据库与raft节点之间不同raft节点之间。
kv数据库与raft节点之间因为快照是数据库的压缩表示因此需要由数据库打包快照并交给raft节点。当快照生成之后快照内设计的操作会被raft节点从日志中删除不删除就相当于有两份数据冗余了。
不同raft节点之间当leader已经把某个日志及其之前的内容变成了快照那么当涉及这部的同步时就只能通过快照来发送。
三.持久化
持久化就是把不能丢失的数据保存到磁盘。
1.持久化介绍
持久化的内容为两部分 1.raft节点的部分信息2.kvDb的快照
1.raft节点的部分信息
m_currentTerm 当前节点的Term避免重复到一个Term可能会遇到重复投票等问题。 m_votedFor 当前Term给谁投过票避免故障后重复投票。 m_logs raft节点保存的全部的日志信息。
不妨想一想其他的信息为什么不用持久化比如说身份、commitIndex、applyIndex等等。 applyIndex不持久化是经典raft的实现在一些工业实现上可能会优化从而持久化。 即applyIndex不持久化不会影响“共识”。
2.kvDb的快照
m_lastSnapshotIncludeIndex 快照的信息快照最新包含哪个日志Index m_lastSnapshotIncludeTerm 快照的信息快照最新包含哪个日志Term与m_lastSnapshotIncludeIndex 是对应的。
Snapshot是kvDb的快照也可以看成是日志因此:全部的日志 m_logs snapshot
因为Snapshot是kvDB生成的kvDB肯定不知道raft的存在而什么term、什么日志Index都是raft才有的概念因此snapshot中肯定没有term和index信息。所以需要raft自己来保存这些信息。 故快照与m_logs联合起来理解即可。
2.为什么要持久化这些内容
两部分原因共识安全、优化。 除了snapshot相关的部分其他部分都是为了共识安全。 而snapshot是因为日志一个一个的叠加会导致最后的存储非常大因此使用snapshot来压缩日志。 为什么snashot可以压缩日志 日志是追加写的对于一个变量的重复修改可能会重复保存理论上对一个变量的反复修改会导致日志不断增大。 而snapshot是原地写即只保存一个变量最后的值自然所需要的空间就小了。 3.什么时候持久化
需要持久化的内容发送改变的时候就要注意持久化。 比如term 增加日志增加等等。 *具体查看代码中的void Raft::persist() 相关内容
4.谁来调用持久化
谁来调用都可以只要能保证需要持久化的内容能正确持久化。 代码中选择的是raft类自己来完成持久化。因为raft类最方便感知自己的term之类的信息有没有变化。 注意虽然持久化很耗时但是持久化这些内容的时候不要放开锁以防其他线程改变了这些值导致其它异常。
5.具体怎么实现持久化/使用哪个函数持久化
其实持久化是一个非常难的事情因为持久化需要考虑速度、大小、二进制安全。 因此代码中目前采用的是使用boost库中的持久化实现将需要持久化的数据序列化转成std::string 类型再写入磁盘。 当然其他的序列化方式也少可行的可以看到这一块还是有优化空间的。
四.kvServer
1.kvServer介绍 图中是raftServer这里叫成kvServer是一样的。 kvServer其实是个中间组件负责沟通kvDB和raft节点。 那么外部请求是Server来负责加入后变成了
2.kvServer怎么和上层kvDB沟通怎么和下层raft节点沟通
std::shared_ptrLockQueueApplyMsg applyChan; //kvServer和raft节点的通信管道
std::unordered_mapstd::string, std::string m_kvDB; //kvDB用unordered_map来替代kvDB使用的是unordered_map来代替上层的kvDB因此没啥好说的。 raft节点其中LockQueue 是一个并发安全的队列这种方式其实是模仿的go中的channel机制。 在raft类中这里可以看到raft类中也拥有一个applyChankvSever和raft类都持有同一个applyChan来完成相互的通信。
3.kvServer怎么处理外部请求
从上面的结构图中可以看到kvServer负责与外部clerk通信。 那么一个外部请求的处理可以简单的看成
接收外部请求。本机内部与raft和kvDB协商如何处理该请求。返回外部响应。
接收与响应外部请求
对于1和3请求和返回的操作我们可以通过http、自定义协议等等方式实现但是既然我们已经写出了rpc通信的一个简单的实现源代码可见这里那就使用rpc来实现吧。 而且rpc可以直接完成请求和响应这一步后面就不用考虑外部通信的问题了好好处理好本机的流程即可。 相关函数是
void PutAppend(google::protobuf::RpcController *controller,const ::raftKVRpcProctoc::PutAppendArgs *request,::raftKVRpcProctoc::PutAppendReply *response,::google::protobuf::Closure *done) override;void Get(google::protobuf::RpcController *controller,const ::raftKVRpcProctoc::GetArgs *request,::raftKVRpcProctoc::GetReply *response,::google::protobuf::Closure *done) override;见名知意请求分成两种get和put也就是set。 如果是putAppendclerk中就调用PutAppend 的rpc。 如果是Getclerk中就调用Get 的rpc。 与raft节点沟通 在正式开始之前我们必须要先了解 线性一致性 的相关概念。
待续
代码如下示例