设为首页 加入收藏

TOP

GO实现Redis:GO实现Redis集群(5)(五)
2023-07-23 13:31:27 】 浏览:85
Tags:实现 Redis 集群
t(context.Background(), peerClient) } // 转发指令给其他客户端,发送指令之前需要先发一下选择的db func (cluster *clusterDatabase) relay(peer string, c resp.Connection, args [][]byte) resp.Reply { if peer == cluster.self { return cluster.db.Exec(c, args) } peerClient, err := cluster.getPeerClient(peer) if err != nil { return reply.MakeErrReply(err.Error()) } defer func() { _ = cluster.returnPeerClient(peer, peerClient) }() peerClient.Send(utils.ToCmdLine("SELECT", strconv.Itoa(c.GetDBIndex()))) return peerClient.Send(args) } // 指令广播给所有节点 func (cluster *clusterDatabase) broadcast(c resp.Connection, args [][]byte) map[string]resp.Reply { result := make(map[string]resp.Reply) for _, node := range cluster.nodes { relay := cluster.relay(node, c, args) result[node] = relay } return result }

communication:与其他节点通信。执行模式有本地(自己执行),转发(别人执行),群发(所有节点执行)
getPeerClient :从连接池拿一个连接
returnPeerClient :归还连接
relay :转发指令给其他客户端,发送指令之前需要先发一下选择的db
broadcast :指令广播给所有节点

cluster/router.go

func makeRouter() map[string]CmdFunc {
    routerMap := make(map[string]CmdFunc)
    routerMap["ping"] = ping
    routerMap["del"] = Del
    routerMap["exists"] = defaultFunc
    routerMap["type"] = defaultFunc
    routerMap["rename"] = Rename
    routerMap["renamenx"] = Rename
    routerMap["set"] = defaultFunc
    routerMap["setnx"] = defaultFunc
    routerMap["get"] = defaultFunc
    routerMap["getset"] = defaultFunc
    routerMap["flushdb"] = FlushDB
	routerMap["select"] = execSelect
    return routerMap
}

func defaultFunc(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
    key := string(args[1])
    peer := cluster.peerPicker.PickNode(key)
    return cluster.relay(peer, c, args)
}

defaultFunc:转发指令的默认实现

cluster/ping.go

func ping(cluster *clusterDatabase, c resp.Connection, cmdAndArgs [][]byte) resp.Reply {
   return cluster.db.Exec(c, cmdAndArgs)
}

cluster/rename.go

func Rename(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
   if len(args) != 3 {
      return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
   }
   src := string(args[1])
   dest := string(args[2])

   srcPeer := cluster.peerPicker.PickNode(src)
   destPeer := cluster.peerPicker.PickNode(dest)

   if srcPeer != destPeer {
      return reply.MakeErrReply("ERR rename must within one slot in cluster mode")
   }
   return cluster.relay(srcPeer, c, args)
}

Rename:修改key的name,两个key的hash必须在同一个节点中

cluster/keys.go

// keys指令实现
func FlushDB(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
   replies := cluster.broadcast(c, args)
   var errReply reply.ErrorReply
   for _, v := range replies {
      if reply.IsErrorReply(v) {
         errReply = v.(reply.ErrorReply)
         break
      }
   }
   if errReply == nil {
      return &reply.OkReply{}
   }
   return reply.MakeErrReply("error occurs: " + errReply.Error())
}

cluster/del.go

// del指令实现
func Del(cluster *clusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
   replies := cluster.broadcast(c, args)
   var errReply reply.ErrorReply
   var deleted int64 = 0
   for _, v := range replies {
      if reply.IsErrorReply(v) {
         errReply = v.(reply.ErrorReply)
         break
      }
      intReply, ok := v.(*reply.IntReply)
      if !ok {
         errReply = reply.MakeErrReply(&quo
首页 上一页 2 3 4 5 下一页 尾页 5/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇小心golang中的无类型常量 下一篇Go 语言数组和切片的区别

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目