&Client{
addr: addr,
conn: conn,
pendingReqs: make(chan *request, chanSize),
waitingReqs: make(chan *request, chanSize),
working: &sync.WaitGroup{},
}, nil
}
// Start starts asynchronous goroutines
func (client *Client) Start() {
client.ticker = time.NewTicker(10 * time.Second)
go client.handleWrite()
go func() {
err := client.handleRead()
if err != nil {
logger.Error(err)
}
}()
go client.heartbeat()
}
// Close stops asynchronous goroutines and close connection
func (client *Client) Close() {
client.ticker.Stop()
// stop new request
close(client.pendingReqs)
// wait stop process
client.working.Wait()
// clean
_ = client.conn.Close()
close(client.waitingReqs)
}
func (client *Client) handleConnectionError(err error) error {
err1 := client.conn.Close()
if err1 != nil {
if opErr, ok := err1.(*net.OpError); ok {
if opErr.Err.Error() != "use of closed network connection" {
return err1
}
} else {
return err1
}
}
conn, err1 := net.Dial("tcp", client.addr)
if err1 != nil {
logger.Error(err1)
return err1
}
client.conn = conn
go func() {
_ = client.handleRead()
}()
return nil
}
func (client *Client) heartbeat() {
for range client.ticker.C {
client.doHeartbeat()
}
}
func (client *Client) handleWrite() {
for req := range client.pendingReqs {
client.doRequest(req)
}
}
// Send sends a request to redis server
func (client *Client) Send(args [][]byte) resp.Reply {
request := &request{
args: args,
heartbeat: false,
waiting: &wait.Wait{},
}
request.waiting.Add(1)
client.working.Add(1)
defer client.working.Done()
client.pendingReqs <- request
timeout := request.waiting.WaitWithTimeout(maxWait)
if timeout {
return reply.MakeErrReply("server time out")
}
if request.err != nil {
return reply.MakeErrReply("request failed")
}
return request.reply
}
func (client *Client) doHeartbeat() {
request := &request{
args: [][]byte{[]byte("PING")},
heartbeat: true,
waiting: &wait.Wait{},
}
request.waiting.Add(1)
client.working.Add(1)
defer client.working.Done()
client.pendingReqs <- request
request.waiting.WaitWithTimeout(maxWait)
}
func (client *Client) doRequest(req *request) {
if req == nil || len(req.args) == 0 {
return
}
re := reply.MakeMultiBulkReply(req.args)
bytes := re.ToBytes()
_, err := client.conn.Write(bytes)
i := 0
for err != nil && i < 3 {
err = client.handleConnectionError(err)
if err == nil {
_, err = client.conn.Write(bytes)
}
i++
}
if err == nil {
client.waitingReqs <- req
} else {
req.err = err
req.waiting.Done()
}
}
func (client *Client) finishRequest(reply resp.Reply) {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
logger.Error(err)
}
}()
request := <-client.waitingReqs
if request == nil {
return
}
request.reply = reply
if request.waiting != nil {
request.waiting.Done()
}
}
func (client *Client) handleRead() error {
ch := parser.ParseStream(client.conn)
for payload := range ch {
if payload.Err != nil {
client.finishRequest(reply.MakeErrReply(payload.Err.Error()))
continue
}
client.finishRequest(payload.Data)
}
return nil
}
go.mod
require github.com/jolestar/go-commons-pool/v2 v2.1.2
key的转发需要当前节点存储其他节点的连接,互相作为客户端,使用连接池将其他连接池化
cluster/client_pool.go
type connectionFactory struct {
Pe
|