中山网站建设华联在线,宣传片拍摄的意义,网站开发指南,wordpress多语言包采用一致性hash算法将key分散到不同的节点#xff0c;客户端可以连接到集群中任意一个节点https://github.com/csgopher/go-redis本文涉及以下文件#xff1a; consistenthash#xff1a;实现添加和选择节点方法 standalone_database#xff1a;单机database client#x…
采用一致性hash算法将key分散到不同的节点客户端可以连接到集群中任意一个节点https://github.com/csgopher/go-redis本文涉及以下文件 consistenthash实现添加和选择节点方法 standalone_database单机database client客户端 client_pool实现连接池 cluster_database对key进行路由 com与其他节点通信 routerpingkeysdelselect各类命令的转发具体逻辑
一致性哈希 为什么需要一致性 hash 在采用分片方式建立分布式缓存时我们面临的第一个问题是如何决定存储数据的节点。最自然的方式是参考 hash 表的做法假设集群中存在 n 个节点我们用 node hashCode(key) % n 来决定所属的节点。 普通 hash 算法解决了如何选择节点的问题但在分布式系统中经常出现增加节点或某个节点宕机的情况。若节点数 n 发生变化, 大多数 key 根据 node hashCode(key) % n 计算出的节点都会改变。这意味着若要在 n 变化后维持系统正常运转需要将大多数数据在节点间进行重新分布。这个操作会消耗大量的时间和带宽等资源这在生产环境下是不可接受的。 算法原理 一致性 hash 算法的目的是在节点数量 n 变化时, 使尽可能少的 key 需要进行节点间重新分布。一致性 hash 算法将数据 key 和服务器地址 addr 散列到 2^32 的空间中。 我们将 2^32 个整数首尾相连形成一个环首先计算服务器地址 addr 的 hash 值放置在环上。然后计算 key 的 hash 值放置在环上顺时针查找将数据放在找到的的第一个节点上。 在增加或删除节点时只有该节点附近的数据需要重新分布从而解决了上述问题。如果服务器节点较少则比较容易出现数据分布不均匀的问题一般来说环上的节点越多数据分布越均匀。我们不需要真的增加一台服务器只需要将实际的服务器节点映射为几个虚拟节点放在环上即可。 参考https://www.cnblogs.com/Finley/p/14038398.html lib/consistenthash/consistenthash.go
type HashFunc func(data []byte) uint32type NodeMap struct {hashFunc HashFuncnodeHashs []int nodehashMap map[int]string
}func NewNodeMap(fn HashFunc) *NodeMap {m : NodeMap{hashFunc: fn,nodehashMap: make(map[int]string),}if m.hashFunc nil {m.hashFunc crc32.ChecksumIEEE}return m
}func (m *NodeMap) IsEmpty() bool {return len(m.nodeHashs) 0
}func (m *NodeMap) AddNode(keys ...string) {for _, key : range keys {if key {continue}hash : int(m.hashFunc([]byte(key)))m.nodeHashs append(m.nodeHashs, hash)m.nodehashMap[hash] key}sort.Ints(m.nodeHashs)
}func (m *NodeMap) PickNode(key string) string {if m.IsEmpty() {return }hash : int(m.hashFunc([]byte(key)))idx : sort.Search(len(m.nodeHashs), func(i int) bool {return m.nodeHashs[i] hash})if idx len(m.nodeHashs) {idx 0}return m.nodehashMap[m.nodeHashs[idx]]
}HashFunchash函数定义Go的hash函数就是这样定义的NodeMap存储所有节点和节点的hash
nodeHashs各个节点的hash值顺序的nodehashMaphash, 节点
AddNode添加节点到一致性哈希中PickNode选择节点。使用二分查找如果hash比nodeHashs中最大的hash还要大idx0
database/standalone_database.go
type StandaloneDatabase struct {dbSet []*DBaofHandler *aof.AofHandler
}func NewStandaloneDatabase() *StandaloneDatabase {......
}把database/database改名为database/standalone_database再增加一个cluster_database用于对key的路由
resp/client/client.go
// Client is a pipeline mode redis client
type Client struct {conn net.ConnpendingReqs chan *request // wait to sendwaitingReqs chan *request // waiting responseticker *time.Tickeraddr stringworking *sync.WaitGroup // its counter presents unfinished requests(pending and waiting)
}// request is a message sends to redis server
type request struct {id uint64args [][]bytereply resp.Replyheartbeat boolwaiting *wait.Waiterr error
}const (chanSize 256maxWait 3 * time.Second
)// MakeClient creates a new client
func MakeClient(addr string) (*Client, error) {conn, err : net.Dial(tcp, addr)if err ! nil {return nil, err}return Client{addr: addr,conn: conn,pendingReqs: make(chan *request, chanSize),waitingReqs: make(chan *request, chanSize),working: sync.WaitGroup{},}, nil
}// Start starts asynchronous goroutines
func (client *Client) Start() {client.ticker time.NewTicker(10 * time.Second)go client.handleWrite()go func() {err : client.handleRead()if err ! nil {logger.Error(err)}}()go client.heartbeat()
}// Close stops asynchronous goroutines and close connection
func (client *Client) Close() {client.ticker.Stop()// stop new requestclose(client.pendingReqs)// wait stop processclient.working.Wait()// clean_ client.conn.Close()close(client.waitingReqs)
}func (client *Client) handleConnectionError(err error) error {err1 : client.conn.Close()if err1 ! nil {if opErr, ok : err1.(*net.OpError); ok {if opErr.Err.Error() ! use of closed network connection {return err1}} else {return err1}}conn, err1 : net.Dial(tcp, client.addr)if err1 ! nil {logger.Error(err1)return err1}client.conn conngo func() {_ client.handleRead()}()return nil
}func (client *Client) heartbeat() {for range client.ticker.C {client.doHeartbeat()}
}func (client *Client) handleWrite() {for req : range client.pendingReqs {client.doRequest(req)}
}// Send sends a request to redis server
func (client *Client) Send(args [][]byte) resp.Reply {request : request{args: args,heartbeat: false,waiting: wait.Wait{},}request.waiting.Add(1)client.working.Add(1)defer client.working.Done()client.pendingReqs - requesttimeout : request.waiting.WaitWithTimeout(maxWait)if timeout {return reply.MakeErrReply(server time out)}if request.err ! nil {return reply.MakeErrReply(request failed)}return request.reply
}func (client *Client) doHeartbeat() {request : request{args: [][]byte{[]byte(PING)},heartbeat: true,waiting: wait.Wait{},}request.waiting.Add(1)client.working.Add(1)defer client.working.Done()client.pendingReqs - requestrequest.waiting.WaitWithTimeout(maxWait)
}func (client *Client) doRequest(req *request) {if req nil || len(req.args) 0 {return}re : reply.MakeMultiBulkReply(req.args)bytes : re.ToBytes()_, err : client.conn.Write(bytes)i : 0for err ! nil i 3 {err client.handleConnectionError(err)if err nil {_, err client.conn.Write(bytes)}i}if err nil {client.waitingReqs - req} else {req.err errreq.waiting.Done()}
}func (client *Client) finishRequest(reply resp.Reply) {defer func() {if err : recover(); err ! nil {debug.PrintStack()logger.Error(err)}}()request : -client.waitingReqsif request nil {return}request.reply replyif request.waiting ! nil {request.waiting.Done()}
}func (client *Client) handleRead() error {ch : parser.ParseStream(client.conn)for payload : range ch {if payload.Err ! nil {client.finishRequest(reply.MakeErrReply(payload.Err.Error()))continue}client.finishRequest(payload.Data)}return nil
}clientRedis客户端具体看https://www.cnblogs.com/Finley/p/14028402.html
go.mod
require github.com/jolestar/go-commons-pool/v2 v2.1.2key的转发需要当前节点存储其他节点的连接互相作为客户端使用连接池将其他连接池化
cluster/client_pool.go
type connectionFactory struct {Peer string // 连接地址
}func (f *connectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {c, err : client.MakeClient(f.Peer)if err ! nil {return nil, err}c.Start()return pool.NewPooledObject(c), nil
}func (f *connectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {c, ok : object.Object.(*client.Client)if !ok {return errors.New(type mismatch)}c.Close()return nil
}func (f *connectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {// do validatereturn true
}func (f *connectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {// do activatereturn nil
}func (f *connectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {// do passivatereturn nil
}client_pool使用连接池的NewObjectPoolWithDefaultConfig创建连接需要实现PooledObjectFactory接口
redis.conf
self 127.0.0.1:6379
peers 127.0.0.1:6380配置中写自己和其他节点的地址
cluster/cluster_database.go
type clusterDatabase struct {self stringnodes []stringpeerPicker *consistenthash.NodeMappeerConnection map[string]*pool.ObjectPooldb databaseface.Database
}func MakeClusterDatabase() *clusterDatabase {cluster : clusterDatabase{self: config.Properties.Self,db: database.NewStandaloneDatabase(),peerPicker: consistenthash.NewNodeMap(nil),peerConnection: make(map[string]*pool.ObjectPool),}nodes : make([]string, 0, len(config.Properties.Peers)1)for _, peer : range config.Properties.Peers {nodes append(nodes, peer)}nodes append(nodes, config.Properties.Self)cluster.peerPicker.AddNode(nodes...)ctx : context.Background()for _, peer : range config.Properties.Peers {cluster.peerConnection[peer] pool.NewObjectPoolWithDefaultConfig(ctx, connectionFactory{Peer: peer,})}cluster.nodes nodesreturn cluster
}func (cluster *clusterDatabase) Close() {cluster.db.Close()
}func (cluster *ClusterDatabase) AfterClientClose(c resp.Connection) {cluster.db.AfterClientClose(c)
}type CmdFunc func(cluster *clusterDatabase, c resp.Connection, cmdAndArgs [][]byte) resp.Replycluster_database用于对key的路由clusterDatabasenodes所有节点peerPicker 节点的添加和选择peerConnectionMapnode, 连接池db单机databaseCmdFunc表示Redis的指令类型
cluster/com.go
func (cluster *clusterDatabase) getPeerClient(peer string) (*client.Client, error) {factory, ok : cluster.peerConnection[peer]if !ok {return nil, errors.New(connection factory not found)}raw, err : factory.BorrowObject(context.Background())if err ! nil {return nil, err}conn, ok : raw.(*client.Client)if !ok {return nil, errors.New(connection factory make wrong type)}return conn, nil
}func (cluster *clusterDatabase) returnPeerClient(peer string, peerClient *client.Client) error {connectionFactory, ok : cluster.peerConnection[peer]if !ok {return errors.New(connection factory not found)}return connectionFactory.ReturnObject(context.Background(), peerClient)
}func (cluster *clusterDatabase) relay(peer string, c resp.Connection, args [][]byte) resp.Reply {if peer cluster.self {return cluster.db.Exec(c, args)}peerClient, err : cluster.getPeerClient(peer)if err ! nil {return reply.MakeErrReply(err.Error())}defer func() {_ cluster.returnPeerClient(peer, peerClient)}()peerClient.Send(utils.ToCmdLine(SELECT, strconv.Itoa(c.GetDBIndex())))return peerClient.Send(args)
}func (cluster *clusterDatabase) broadcast(c resp.Connection, args [][]byte) map[string]resp.Reply {result : make(map[string]resp.Reply)for _, node : range cluster.nodes {relay : cluster.relay(node, c, args)result[node] relay}return result
}communication与其他节点通信。执行模式有本地自己执行转发别人执行群发所有节点执行getPeerClient 从连接池拿一个连接returnPeerClient 归还连接relay 转发指令给其他客户端发送指令之前需要先发一下选择的dbbroadcast 指令广播给所有节点
cluster/router.go
func makeRouter() map[string]CmdFunc {routerMap : make(map[string]CmdFunc)routerMap[ping] pingrouterMap[del] DelrouterMap[exists] defaultFuncrouterMap[type] defaultFuncrouterMap[rename] RenamerouterMap[renamenx] RenamerouterMap[set] defaultFuncrouterMap[setnx] defaultFuncrouterMap[get] defaultFuncrouterMap[getset] defaultFuncrouterMap[flushdb] FlushDBrouterMap[select] execSelectreturn routerMap
}func defaultFunc(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {key : string(args[1])peer : cluster.peerPicker.PickNode(key)return cluster.relay(peer, c, args)
}defaultFunc转发指令的默认实现
cluster/ping.go
func ping(cluster *clusterDatabase, c resp.Connection, cmdAndArgs [][]byte) resp.Reply {return cluster.db.Exec(c, cmdAndArgs)
}cluster/rename.go
func Rename(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {if len(args) ! 3 {return reply.MakeErrReply(ERR wrong number of arguments for rename command)}src : string(args[1])dest : string(args[2])srcPeer : cluster.peerPicker.PickNode(src)destPeer : cluster.peerPicker.PickNode(dest)if srcPeer ! destPeer {return reply.MakeErrReply(ERR rename must within one slot in cluster mode)}return cluster.relay(srcPeer, c, args)
}Rename修改key的name两个key的hash必须在同一个节点中
cluster/keys.go
func FlushDB(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {replies : cluster.broadcast(c, args)var errReply reply.ErrorReplyfor _, v : range replies {if reply.IsErrorReply(v) {errReply v.(reply.ErrorReply)break}}if errReply nil {return reply.OkReply{}}return reply.MakeErrReply(error occurs: errReply.Error())
}cluster/del.go
func Del(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {replies : cluster.broadcast(c, args)var errReply reply.ErrorReplyvar deleted int64 0for _, v : range replies {if reply.IsErrorReply(v) {errReply v.(reply.ErrorReply)break}intReply, ok : v.(*reply.IntReply)if !ok {errReply reply.MakeErrReply(error)}deleted intReply.Code}if errReply nil {return reply.MakeIntReply(deleted)}return reply.MakeErrReply(error occurs: errReply.Error())
}cluster/select.go
func execSelect(cluster *clusterDatabase, c resp.Connection, cmdAndArgs [][]byte) resp.Reply {return cluster.db.Exec(c, cmdAndArgs)
}cluster/cluster_database.go
var router makeRouter()func (cluster *clusterDatabase) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {defer func() {if err : recover(); err ! nil {logger.Warn(fmt.Sprintf(error occurs: %v\n%s, err, string(debug.Stack())))result reply.UnknownErrReply{}}}()cmdName : strings.ToLower(string(cmdLine[0]))cmdFunc, ok : router[cmdName]if !ok {return reply.MakeErrReply(ERR unknown command cmdName , or not supported in cluster mode)}result cmdFunc(cluster, c, cmdLine)return
}resp/handler/handler.go
func MakeHandler() *RespHandler {var db databaseface.Databaseif config.Properties.Self ! len(config.Properties.Peers) 0 {db cluster.MakeClusterDatabase()} else {db database.NewStandaloneDatabase()}return RespHandler{db: db,}
}MakeHandler判断是单机还是集群
测试
先go build打开项目文件夹找到exe文件把exe文件和redis.conf放到一个文件夹里redis.conf改成如下然后启动exe文件。再回到GoLand启动第二个节点6379。
bind 0.0.0.0
port 6380appendonly yes
appendfilename appendonly.aofself 127.0.0.1:6380
peers 127.0.0.1:6379