山东省建设局网站,如何提高网站的点击率,企业网站优化的原则,seo工具查询1、etcd分布式锁及事务
1.1 前言
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中#xff0c;常常需要协调他们的动作。如
果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源#xff0c;那么访问这些资源的时候#xff0c;往往需要…1、etcd分布式锁及事务
1.1 前言
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中常常需要协调他们的动作。如
果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源那么访问这些资源的时候往往需要互斥
来防止彼此干扰来保证一致性在这种情况下便需要使用到分布式锁。
1.2 etcd分布式锁设计
1、排他性任意时刻只能有一个机器的一个线程能获取到锁。
通过在etcd中存入key值来实现上锁删除key实现解锁参考下面伪代码
func Lock(key string, cli *clientv3.Client) error {//获取key判断是否存在锁resp, err : cli.Get(context.Background(), key)if err ! nil {return err}//锁存在返回上锁失败if len(resp.Kvs) 0 {return errors.New(lock fail)}_, err cli.Put(context.Background(), key, lock)if err ! nil {return err}return nil
}
//删除key解锁
func UnLock(key string, cli *clientv3.Client) error {_, err : cli.Delete(context.Background(), key)return err
}当发现已上锁时直接返回lock fail。也可以处理成等待解锁解锁后竞争锁。
//等待key删除后再竞争锁
func waitDelete(key string, cli *clientv3.Client) {rch : cli.Watch(context.Background(), key)for wresp : range rch {for _, ev : range wresp.Events {switch ev.Type {case mvccpb.DELETE: //删除return}}}
}2、容错性只要分布式锁服务集群节点大部分存活client就可以进行加锁解锁操作。
etcd基于Raft算法确保集群中数据一致性。
3、避免死锁分布式锁一定能得到释放即使client在释放之前崩溃。
上面分布式锁设计有缺陷假如client获取到锁后程序直接崩了没有解锁那其他线程也无法拿到锁导致死锁
出现。
通过给key设定leases来避免死锁但是leases过期时间设多长呢假如设了30秒而上锁后的操作比30秒
大会导致以下问题 操作没完成锁被别人占用了不安全 操作完成后进行解锁这时候把别人占用的锁解开了
解决方案给key添加过期时间后以Keep leases alive方式延续leases当client正常持有锁时锁不会过
期当client程序崩掉后程序不能执行Keep leases alive从而让锁过期避免死锁。看以下伪代码
//上锁
func Lock(key string, cli *clientv3.Client) error {//获取key判断是否存在锁resp, err : cli.Get(context.Background(), key)if err ! nil {return err}//锁存在等待解锁后再竞争锁if len(resp.Kvs) 0 {waitDelete(key, cli)return Lock(key)}//设置key过期时间resp, err : cli.Grant(context.TODO(), 30)if err ! nil {return err}//设置key并绑定过期时间_, err cli.Put(context.Background(), key, lock, clientv3.WithLease(resp.ID))if err ! nil {return err}//延续key的过期时间_, err cli.KeepAlive(context.TODO(), resp.ID)if err ! nil {return err}return nil
}
//通过让key值过期来解锁
func UnLock(resp *clientv3.LeaseGrantResponse, cli *clientv3.Client) error {_, err : cli.Revoke(context.TODO(), resp.ID)return err
}经过以上步骤我们初步完成了分布式锁设计。其实官方已经实现了分布式锁它大致原理和上述有出入接下来
我们看下如何使用官方的分布式锁。
1.3 etcd分布式锁使用
package mainimport (contextfmtgithub.com/coreos/etcd/clientv3github.com/coreos/etcd/clientv3/concurrencylog
)var endpoints []string{localhost:2379}func ExampleMutex_Lock() {cli, err : clientv3.New(clientv3.Config{Endpoints: endpoints})if err ! nil {log.Fatal(err)}defer cli.Close()// create two separate sessions for lock competitions1, err : concurrency.NewSession(cli)if err ! nil {log.Fatal(err)}defer s1.Close()m1 : concurrency.NewMutex(s1, /my-lock/)s2, err : concurrency.NewSession(cli)if err ! nil {log.Fatal(err)}defer s2.Close()m2 : concurrency.NewMutex(s2, /my-lock/)// acquire lock for s1if err : m1.Lock(context.TODO()); err ! nil {log.Fatal(err)}fmt.Println(acquired lock for s1)m2Locked : make(chan struct{})go func() {defer close(m2Locked)// wait until s1 is locks /my-lock/if err : m2.Lock(context.TODO()); err ! nil {log.Fatal(err)}}()if err : m1.Unlock(context.TODO()); err ! nil {log.Fatal(err)}fmt.Println(released lock for s1)-m2Lockedfmt.Println(acquired lock for s2)
}func main() {ExampleMutex_Lock()
}# 输出
acquired lock for s1
released lock for s1
acquired lock for s2此代码来源于官方文档etcd分布式锁使用起来很方便。
1.4 etcd事务
顺便介绍一下etcd事务先看这段伪代码
Txn(context.TODO()).If(//如果以下判断条件成立Compare(Value(k1), , v1),Compare(Version(k1), , 2)
).Then(//则执行Then代码段OpPut(k2,v2), OpPut(k3,v3)
).Else(//否则执行Else代码段OpPut(k4,v4), OpPut(k5,v5)
).Commit()//最后提交事务package mainimport (contextfmtgithub.com/coreos/etcd/clientv3logtime
)var endpoints []string{localhost:2379}func ExampleKV_txn() {cli, err : clientv3.New(clientv3.Config{Endpoints: endpoints,DialTimeout: 5 * time.Second,})if err ! nil {log.Fatal(err)}defer cli.Close()kvc : clientv3.NewKV(cli)_, err kvc.Put(context.TODO(), key, xyz)if err ! nil {log.Fatal(err)}ctx, cancel : context.WithTimeout(context.Background(), 5*time.Second)_, err kvc.Txn(ctx).// txn value comparisons are lexicalIf(clientv3.Compare(clientv3.Value(key), , abc)).// the Then runs, since xyz abcThen(clientv3.OpPut(key, XYZ)).// the Else does not runElse(clientv3.OpPut(key, ABC)).Commit()cancel()if err ! nil {log.Fatal(err)}gresp, err : kvc.Get(context.TODO(), key)cancel()if err ! nil {log.Fatal(err)}for _, ev : range gresp.Kvs {fmt.Printf(%s : %s\n, ev.Key, ev.Value)}
}func main() {ExampleKV_txn()
}# 输出
key : XYZ上面的使用例子代码来自官方文档。
1.5 总结
如果发展到分布式服务阶段且对数据的可靠性要求很高选etcd实现分布式锁不会错。一般的Redis分布式
锁可能出现锁丢失的情况如果你是Java开发者可以使用Redisson客户端实现分布式锁据说不会出现锁丢
失的情况。