设为首页 加入收藏

TOP

GO实现Redis:GO实现Redis集群(5)(二)
2023-07-23 13:31:27 】 浏览:87
Tags:实现 Redis 集群
&Client{ addr: addr, conn: conn, pendingReqs: make(chan *request, chanSize), waitingReqs: make(chan *request, chanSize), working: &sync.WaitGroup{}, }, nil } // Start starts asynchronous goroutines func (client *Client) Start() { client.ticker = time.NewTicker(10 * time.Second) go client.handleWrite() go func() { err := client.handleRead() if err != nil { logger.Error(err) } }() go client.heartbeat() } // Close stops asynchronous goroutines and close connection func (client *Client) Close() { client.ticker.Stop() // stop new request close(client.pendingReqs) // wait stop process client.working.Wait() // clean _ = client.conn.Close() close(client.waitingReqs) } func (client *Client) handleConnectionError(err error) error { err1 := client.conn.Close() if err1 != nil { if opErr, ok := err1.(*net.OpError); ok { if opErr.Err.Error() != "use of closed network connection" { return err1 } } else { return err1 } } conn, err1 := net.Dial("tcp", client.addr) if err1 != nil { logger.Error(err1) return err1 } client.conn = conn go func() { _ = client.handleRead() }() return nil } func (client *Client) heartbeat() { for range client.ticker.C { client.doHeartbeat() } } func (client *Client) handleWrite() { for req := range client.pendingReqs { client.doRequest(req) } } // Send sends a request to redis server func (client *Client) Send(args [][]byte) resp.Reply { request := &request{ args: args, heartbeat: false, waiting: &wait.Wait{}, } request.waiting.Add(1) client.working.Add(1) defer client.working.Done() client.pendingReqs <- request timeout := request.waiting.WaitWithTimeout(maxWait) if timeout { return reply.MakeErrReply("server time out") } if request.err != nil { return reply.MakeErrReply("request failed") } return request.reply } func (client *Client) doHeartbeat() { request := &request{ args: [][]byte{[]byte("PING")}, heartbeat: true, waiting: &wait.Wait{}, } request.waiting.Add(1) client.working.Add(1) defer client.working.Done() client.pendingReqs <- request request.waiting.WaitWithTimeout(maxWait) } func (client *Client) doRequest(req *request) { if req == nil || len(req.args) == 0 { return } re := reply.MakeMultiBulkReply(req.args) bytes := re.ToBytes() _, err := client.conn.Write(bytes) i := 0 for err != nil && i < 3 { err = client.handleConnectionError(err) if err == nil { _, err = client.conn.Write(bytes) } i++ } if err == nil { client.waitingReqs <- req } else { req.err = err req.waiting.Done() } } func (client *Client) finishRequest(reply resp.Reply) { defer func() { if err := recover(); err != nil { debug.PrintStack() logger.Error(err) } }() request := <-client.waitingReqs if request == nil { return } request.reply = reply if request.waiting != nil { request.waiting.Done() } } func (client *Client) handleRead() error { ch := parser.ParseStream(client.conn) for payload := range ch { if payload.Err != nil { client.finishRequest(reply.MakeErrReply(payload.Err.Error())) continue } client.finishRequest(payload.Data) } return nil }

go.mod

require github.com/jolestar/go-commons-pool/v2 v2.1.2

key的转发需要当前节点存储其他节点的连接,互相作为客户端,使用连接池将其他连接池化

cluster/client_pool.go

type connectionFactory struct {
   Pe
首页 上一页 1 2 3 4 5 下一页 尾页 2/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇小心golang中的无类型常量 下一篇Go 语言数组和切片的区别

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目