etcd_etcd1_1 ... done
Creating etcd_etcd2_1 ... done
Creating etcd_etcd3_1 ... done
测试集群可用性
# 登录其中一个节点
docker exec -it 5f97bf0b446f6e6514576fc1eb46c2f60d2c2b3e3f3ee3b1ad6219414fa915c8 /bin/sh
# 写入一个键值
etcdctl put name "liuyuede"
OK
# 查看
etcdctl get name
name
liuyuede
# 登录另外俩个节点
docker exec -it a6ccc9b6e5cc81ee7c779e2b9e7235cd6d814e92fbc66b7e4846798acff8ee2a /bin/sh
etcdctl get name
name
liuyuede
docker exec -it 6817fa89e3e9e422628e0049910b672df389c62d41bf2349a0f77e22c99e5270 /bin/sh
etcdctl get name
name
liuyuede
etcd集群采用的是raft协议,一般至少为俩个集群,只有一个master,如果删除到只剩一个节点当前节点也不能提供服务
查看集群情况
etcdctl --endpoints=http://0.0.0.0:12379,0.0.0.0:22379,0.0.0.0:32379 endpoint status --write-out=table
+----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
| ENDPOINT | ID | VERSION | DB SIZE | IS LEADER | IS LEARNER | RAFT TERM | RAFT INDEX | RAFT APPLIED INDEX | ERRORS |
+----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
| http://0.0.0.0:12379 | ade526d28b1f92f7 | 3.5.4 | 20 kB | true | false | 3 | 13 | 13 | |
| 0.0.0.0:22379 | d282ac2ce600c1ce | 3.5.4 | 20 kB | false | false | 3 | 13 | 13 | |
| 0.0.0.0:32379 | bd388e7810915853 | 3.5.4 | 20 kB | false | false | 3 | 13 | 13 | |
+----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
2、增加服务注册功能
服务注册的流程
- 向etcd新增一个包含rpc服务信息的键值对,并设置租约(比如5秒过期)
- 利用保活函数KeepAlive不断续约
package server
import (
"context"
"encoding/json"
"errors"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)
type ServiceInfo struct {
Name string
Ip string
}
type Service struct {
ServiceInfo ServiceInfo
stop chan error
leaseId clientv3.LeaseID
client *clientv3.Client
}
func NewService(serviceInfo ServiceInfo, endpoints []string) (service *Service, err error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: time.Second * 10,
})
if err != nil {
return nil, err
}
service = &Service{
ServiceInfo: serviceInfo,
client: client,
}
return
}
func (s *Service) Start(ctx context.Context) (err error) {
alive, err := s.KeepAlive(ctx)
if err != nil {
return
}
for {
select {
case err = <-s.stop: // 服务端关闭返回错误
return err
case <-s.client.Ctx().Done(): // etcd关闭
return errors.New("server closed")
case _, ok := <-alive:
if !ok { // 保活通道关闭
return s.revoke(ctx)
}
}
}
}
func (s *Service) KeepAlive(ctx context.Context) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
info := s.ServiceInfo
key := s.getKey()
val, _ := json.Marshal(info)
// 创建租约
leaseResp, err := s.client.Grant(ctx, 5)
if err != nil {
return nil, err
}
// 写入etcd
_, err = s.client.Put(ctx, key, string(val), clientv3.WithLease(leaseResp.ID))
if err != nil {
return nil, err
}
s.leaseId = leaseResp.ID
return s.client.KeepAlive(ctx, leaseResp.ID)
}
// 取消租约
func (s *Service) revoke(ctx context.Context) error {
_, err := s.client.Revoke(ctx, s.leaseId)
return err
}
func (s *Service) getKey() string {
return s