,或连接不成功,获取zookeeper服务器的连接
if !electionManager.isConnected() {
conn, connChan, err := zk.Connect(electionManager.ZKConfig.Servers, time.Second)
if err != nil {
return err
}
// 等待连接成功
for {
isConnected := false
select {
case connEvent := <-connChan:
if connEvent.State == zk.StateConnected {
isConnected = true
fmt.Println("connect to zookeeper server success!")
}
case _ = <-time.After(time.Second * 3): // 3秒仍未连接成功则返回连接超时
return errors.New("connect to zookeeper server timeout!")
}
if isConnected {
break
}
}
electionManager.ZKClientConn = conn
}
return nil
}
// 选举master
func (electionManager *ElectionManager) electMaster() error {
err := electionManager.initConnection()
if err != nil {
return err
}
// 判断zookeeper中是否存在root目录,不存在则创建该目录
isExist, _, err := electionManager.ZKClientConn.Exists(electionManager.ZKConfig.RootPath)
if err != nil {
return err
}
if !isExist {
path, err := electionManager.ZKClientConn.Create(electionManager.ZKConfig.RootPath, nil, 0, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
if electionManager.ZKConfig.RootPath != path {
return errors.New("Create returned different path " + electionManager.ZKConfig.RootPath + " != " + path)
}
}
// 创建用于选举master的ZNode,该节点为Ephemeral类型,表示客户端连接断开后,其创建的节点也会被销毁
masterPath := electionManager.ZKConfig.RootPath + electionManager.ZKConfig.MasterPath
path, err := electionManager.ZKClientConn.Create(masterPath, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err == nil { // 创建成功表示选举master成功
if path == masterPath {
fmt.Println("elect master success!")
electionManager.IsMasterQ <- true
} else {
return errors.New("Create returned different path " + masterPath + " != " + path)
}
} else { // 创建失败表示选举master失败
fmt.Printf("elect master failure, ", err)
electionManager.IsMasterQ <- false
}
return nil
}
// 监听zookeeper中master znode,若被删除,表示master故障或网络迟缓,重新选举
func (electionManager *ElectionManager) watchMaster() {
for {
// watch zk根znode下面的子znode,当有连接断开时,对应znode被删除,触发事件后重新选举
children, state, childCh, err := electionManager.ZKClientConn.ChildrenW(electionManager.ZKConfig.RootPath + electionManager.ZKConfig.MasterPath)
if err != nil {
fmt.Println("watch children error, ", err)
}
fmt.Println("watch children result, ", children, state)
select {
case childEvent := <-childCh:
if childEvent.Type == zk.EventNodeDeleted {
fmt.Println("receive znode delete event, ", childEvent)
// 重新选举
fmt.Println("start elect new master ...")
err = electionManager.electMaster()
if err != nil {
fmt.Println("elect new master error, ", err)
}
}
}
}
}
func main() {
// zookeeper配置
zkConfig := &ZookeeperConfig{
Servers: []string{"192.168.40.129"},
RootPath: "/ElectMasterDemo",
MasterPath: "/master",
}
// main goroutine 和 选举goroutine之间通信的channel,同于返回选角结果
isMasterChan := make(chan bool)
var isMaster bool
// 选举
electionManager := NewElectionManager(zkConfig, isMasterChan)
go electionManager.Run()
|