er string // 连接地址
}
// 使用连接池的NewObjectPoolWithDefaultConfig创建连接,需要实现PooledObjectFactory接口
func (f *connectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
c, err := client.MakeClient(f.Peer)
if err != nil {
return nil, err
}
c.Start()
return pool.NewPooledObject(c), nil
}
func (f *connectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
c, ok := object.Object.(*client.Client)
if !ok {
return errors.New("type mismatch")
}
c.Close()
return nil
}
func (f *connectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
// do validate
return true
}
func (f *connectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
// do activate
return nil
}
func (f *connectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
// do passivate
return nil
}
client_pool:使用连接池的NewObjectPoolWithDefaultConfig创建连接,需要实现PooledObjectFactory接口
redis.conf
self 127.0.0.1:6379
peers 127.0.0.1:6380
配置中写自己和其他节点的地址
cluster/cluster_database.go
// cluster_database用于对key的路由
type clusterDatabase struct {
self string
nodes []string // 所有节点
peerPicker *consistenthash.NodeMap // 节点的添加和选择
peerConnection map[string]*pool.ObjectPool // Map<node, 连接池>
db databaseface.Database // 单机database
}
func MakeClusterDatabase() *clusterDatabase {
cluster := &clusterDatabase{
self: config.Properties.Self,
db: database.NewStandaloneDatabase(),
peerPicker: consistenthash.NewNodeMap(nil),
peerConnection: make(map[string]*pool.ObjectPool),
}
nodes := make([]string, 0, len(config.Properties.Peers)+1)
for _, peer := range config.Properties.Peers {
nodes = append(nodes, peer)
}
nodes = append(nodes, config.Properties.Self)
cluster.peerPicker.AddNode(nodes...)
ctx := context.Background()
for _, peer := range config.Properties.Peers {
cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{
Peer: peer,
})
}
cluster.nodes = nodes
return cluster
}
func (cluster *clusterDatabase) Close() {
cluster.db.Close()
}
func (cluster *ClusterDatabase) AfterClientClose(c resp.Connection) {
cluster.db.AfterClientClose(c)
}
// 表示Redis的指令类型
type CmdFunc func(cluster *clusterDatabase, c resp.Connection, cmdAndArgs [][]byte) resp.Reply
cluster_database用于对key的路由
clusterDatabase:
nodes:所有节点
peerPicker :节点的添加和选择
peerConnection:Map<node, 连接池>
db:单机database
CmdFunc:表示Redis的指令类型
cluster/com.go
com:与其他节点通信。执行模式有本地(自己执行),转发(别人执行),群发(所有节点执行)
// 从连接池拿一个连接
func (cluster *clusterDatabase) getPeerClient(peer string) (*client.Client, error) {
factory, ok := cluster.peerConnection[peer]
if !ok {
return nil, errors.New("connection factory not found")
}
raw, err := factory.BorrowObject(context.Background())
if err != nil {
return nil, err
}
conn, ok := raw.(*client.Client)
if !ok {
return nil, errors.New("connection factory make wrong type")
}
return conn, nil
}
// 归还连接
func (cluster *clusterDatabase) returnPeerClient(peer string, peerClient *client.Client) error {
connectionFactory, ok := cluster.peerConnection[peer]
if !ok {
return errors.New("connection factory not found")
}
return connectionFactory.ReturnObjec