青浦网站建设推广,html5网站建设中模板,ui设计一般要学多久,wordpress 点赞功能本文主要介绍etcd在分布式多节点服务中如何实现选主。
1、基础知识
在开始之前#xff0c;先介绍etcd中 Version, Revision, ModRevision, CreateRevision 几个基本概念。 1、version 作用域为key#xff0c;表示某个key的版本#xff0c;每个key刚创建的version为1#…本文主要介绍etcd在分布式多节点服务中如何实现选主。
1、基础知识
在开始之前先介绍etcd中 Version, Revision, ModRevision, CreateRevision 几个基本概念。 1、version 作用域为key表示某个key的版本每个key刚创建的version为1每次更新key这个值都会自增表示这个key自创建以来更新的次数。 2、revision 作用域为集群单调递增集群内任何key的增删改都会使它自增。可以把它理解为集群的一个逻辑状态标志。记录了每一次集群内的增删改操作。 3、ModRevision 作用域为key表示某个key修改时的版本它等于修改这个key时的revision的值。 4、CreateRevision 作用域为key表示某个key创建时的版本等于创建这个key时revision的值再删除之前会保持不变。 现在在etcd里面放一个key 可以看到Version是1CreateRevision与Modrevision和Revision相等都是7677720。 在修改了key的值以后 version自增变为2CreateRevision没有变动。modRevision等于修改时的Revsion。Revision已经跑到前面去了因为此时还有其他的程序在修改etcd里面的key。
2、选举
先初始化一个session和选举electionKey : /my-election// Create an election sessionsession, err : concurrency.NewSession(client, concurrency.WithTTL(10))if err ! nil {log.Fatal(err)}defer session.Close()election : concurrency.NewElection(session, electionKey)在启动了三台服务后在etcd里面找到以下三个key现在的leader是第一台 去看下compaign的源码
func (e *Election) Campaign(ctx context.Context, val string) error {s : e.sessionclient : e.session.Client()//用leaseID与前面的key前缀拼成keyk : fmt.Sprintf(%s%x, e.keyPrefix, s.Lease())//判断当前key的createRevision是否是0也就是否创建txn : client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), , 0))txn txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))txn txn.Else(v3.OpGet(k))resp, err : txn.Commit()if err ! nil {return err}//获取key的Revisione.leaderKey, e.leaderRev, e.leaderSession k, resp.Header.Revision, sif !resp.Succeeded {kv : resp.Responses[0].GetResponseRange().Kvs[0]e.leaderRev kv.CreateRevisionif string(kv.Value) ! val {if err e.Proclaim(ctx, val); err ! nil {e.Resign(ctx)return err}}}_, err waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)if err ! nil {// clean up in case of context cancelselect {case -ctx.Done():e.Resign(client.Ctx())default:e.leaderSession nil}return err}e.hdr resp.Headerreturn nil
}用当前key和leaseid在etcd中创建一个key并获取到key此时的Revision。
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {getOpts : append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))for {resp, err : client.Get(ctx, pfx, getOpts...)if err ! nil {return nil, err}if len(resp.Kvs) 0 {return resp.Header, nil}lastKey : string(resp.Kvs[0].Key)if err waitDelete(ctx, client, lastKey, resp.Header.Revision); err ! nil {return nil, err}}
}func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {cctx, cancel : context.WithCancel(ctx)defer cancel()var wr v3.WatchResponsewch : client.Watch(cctx, key, v3.WithRev(rev))for wr range wch {for _, ev : range wr.Events {if ev.Type mvccpb.DELETE {return nil}}}if err : wr.Err(); err ! nil {return err}if err : ctx.Err(); err ! nil {return err}return fmt.Errorf(lost watcher waiting for delete)
}这里get的option找到/my-ection前缀下最新创建的key并且createRev的值小于等于当前key创建的createRev-1.
// WithLastCreate gets the key with the latest creation revision in the request range.
func WithLastCreate() []OpOption { return withTop(SortByCreateRevision, SortDescend) }// WithMaxCreateRev filters out keys for Get with creation revisions greater than the given revision.
func WithMaxCreateRev(rev int64) OpOption { return func(op *Op) { op.maxCreateRev rev } }此时/my-election在这个revision之前没有任何key所以node2启动直接竞选到了主。 紧接着node3启动在这个get它能获取到node2创建的key所以node3去watchnode2的的删除事件。 同理node4启动以后在这里的get它获取的是node3的key它去wachnode3的删除事件。 当node2释放后node3获取到node2的删除事件变成主。node3释放以后node4变成watch到node3 的删除事件变成主。
3、观察者
在上面的例子中如果node4挂了以后node2能继续变回为leader吗 在引入observe以后可以做到。 找到/my-election这个前缀下最开始创建的key也就是node2。 然后把这个key放入返回ch中。
func (e *Election) observe(ctx context.Context, ch chan- v3.GetResponse) {client : e.session.Client()defer close(ch)for {resp, err : client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)if err ! nil {return}var kv *mvccpb.KeyValuevar hdr *pb.ResponseHeaderif len(resp.Kvs) 0 {cctx, cancel : context.WithCancel(ctx)// wait for first key put on prefixopts : []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}wch : client.Watch(cctx, e.keyPrefix, opts...)for kv nil {wr, ok : -wchif !ok || wr.Err() ! nil {cancel()return}// only accept puts; a delete will make observe() spinfor _, ev : range wr.Events {if ev.Type mvccpb.PUT {hdr, kv wr.Header, ev.Kv// may have multiple revs; hdr.rev the last rev// set to kvs rev in case batch has multiple Putshdr.Revision kv.ModRevisionbreak}}}cancel()} else {hdr, kv resp.Header, resp.Kvs[0]}select {case ch - v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:case -ctx.Done():return}cctx, cancel : context.WithCancel(ctx)wch : client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision1))keyDeleted : falsefor !keyDeleted {wr, ok : -wchif !ok {cancel()return}for _, ev : range wr.Events {if ev.Type mvccpb.DELETE {keyDeleted truebreak}resp.Header wr.Headerresp.Kvs []*mvccpb.KeyValue{ev.Kv}select {case ch - *resp:case -cctx.Done():cancel()return}}}cancel()}
}wch : client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision1)) 接下里watch/my-election下第一个创建的key在当前revision以后的delete事件。如果有delete事件则把这个消息通知出去。也就是发现了当前主的节点发生了delete事件主发生了变化。 同理当node2 down掉它的key被删除node3变成了主observe此时watch的就是node3的key因为此时node3创建的key是/my-election下的第一个key。 完整代码
package mainimport (contextfmtlogostimeclientv3 go.etcd.io/etcd/client/v3go.etcd.io/etcd/client/v3/concurrency
)const (FOLLOWER followerLEADER leader
)var state string FOLLOWER
var preState string FOLLOWERfunc compaign(election *concurrency.Election, val string) {// Campaign for leadershipfmt.Println(start compaign!)if err : election.Campaign(context.Background(), val); err ! nil {log.Fatal(err)}fmt.Println(Became leader!)preState statestate LEADER// Hold leadership until a key pressfmt.Println(Press Enter to release leadership...)fmt.Scanln()// Resign leadershipif err : election.Resign(context.Background()); err ! nil {log.Fatal(err)}//preState state//state FOLLOWERfmt.Println(Released leadership.)
}func observe(election *concurrency.Election, val string) {ch : election.Observe(context.Background())for {select {case rsp, ok : -ch:if !ok {fmt.Println(now I am follower)//election.Campaign(context.Background(), args[1])//重新开始观察go observe(election, val)return} else {fmt.Printf(leader now is:%s\n, string(rsp.Kvs[0].Value))if string(rsp.Kvs[0].Value) val {fmt.Println(still be leader)preState statestate LEADER} else {fmt.Println(now become follower)preState statestate FOLLOWERif preState LEADER {go compaign(election, val)}}}}}
}func main() {// Connect to etcdargs : os.Argsclient, err : clientv3.New(clientv3.Config{Endpoints: []string{ipdizhi}, // Replace with your etcd endpointsDialTimeout: 5 * time.Second,Username: user,Password: password,})if err ! nil {log.Fatal(err)}defer client.Close()// Key for leader electionelectionKey : /my-election// Create an election sessionsession, err : concurrency.NewSession(client, concurrency.WithTTL(10))if err ! nil {log.Fatal(err)}defer session.Close()election : concurrency.NewElection(session, electionKey)go compaign(election, args[1])go observe(election, args[1])for {}
}