设为首页 加入收藏

TOP

实现etcd服务注册与发现(四)
2023-07-23 13:28:58 】 浏览:97
Tags:实现 etcd
.ServiceInfo.Name + "/" + s.ServiceInfo.Ip }

3、增加服务发现

服务发现流程

  1. 实现grpc中resolver.Builder接口的Build方法
  2. 通过etcdclient获取并监听grpc服务(是否有新增或者删除)
  3. 更新到resolver.State,State 包含与 ClientConn 相关的当前 Resolver 状态,包括grpc的地址resolver.Address
package server

import (
	"context"
	"encoding/json"
	"fmt"
	"go.etcd.io/etcd/api/v3/mvccpb"
	clientv3 "go.etcd.io/etcd/client/v3"
	"google.golang.org/grpc/resolver"
)

type Discovery struct {
	endpoints  []string
	service    string
	client     *clientv3.Client
	clientConn resolver.ClientConn
}

func NewDiscovery(endpoints []string, service string) resolver.Builder {
	return &Discovery{
		endpoints: endpoints,
		service:   service,
	}
}

func (d *Discovery) ResolveNow(rn resolver.ResolveNowOptions) {

}

func (d *Discovery) Close() {

}

func (d *Discovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	var err error
	d.client, err = clientv3.New(clientv3.Config{
		Endpoints: d.endpoints,
	})
	if err != nil {
		return nil, err
	}

	d.clientConn = cc

	go d.watch(d.service)

	return d, nil
}

func (d *Discovery) Scheme() string {
	return "etcd"
}

func (d *Discovery) watch(service string) {
	addrM := make(map[string]resolver.Address)
	state := resolver.State{}

	update := func() {
		addrList := make([]resolver.Address, 0, len(addrM))
		for _, address := range addrM {
			addrList = append(addrList, address)
		}
		state.Addresses = addrList
		err := d.clientConn.UpdateState(state)
		if err != nil {
			fmt.Println("更新地址出错:", err)
		}
	}
	resp, err := d.client.Get(context.Background(), service, clientv3.WithPrefix())
	if err != nil {
		fmt.Println("获取地址出错:", err)
	} else {
		for i, kv := range resp.Kvs {
			info := &ServiceInfo{}
			err = json.Unmarshal(kv.Value, info)
			if err != nil {
				fmt.Println("解析value失败:", err)
			}
			addrM[string(resp.Kvs[i].Key)] = resolver.Address{
				Addr:       info.Ip,
				ServerName: info.Name,
			}
		}
	}

	update()

	dch := d.client.Watch(context.Background(), service, clientv3.WithPrefix(), clientv3.WithPrevKV())
	for response := range dch {
		for _, event := range response.Events {
			switch event.Type {
			case mvccpb.PUT:
				info := &ServiceInfo{}
				err = json.Unmarshal(event.Kv.Value, info)
				if err != nil {
					fmt.Println("监听时解析value报错:", err)
				} else {
					addrM[string(event.Kv.Key)] = resolver.Address{Addr: info.Ip}
				}
				fmt.Println(string(event.Kv.Key))
			case mvccpb.DELETE:
				delete(addrM, string(event.Kv.Key))
				fmt.Println(string(event.Kv.Key))
			}
		}
		update()
	}
}

4、grpc课件服务

common参数

package common

const CoursewareRpc = "rpc.courseware"

var Endpoints = []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"}

生成课件服务grpc

syntax = "proto3";

package rpc;
option go_package = "./courseware";

message GetRequest {
  uint64 Id = 1;
}
message GetResponse {
  uint64 Id = 1;
  string Code = 2;
  string Name = 3;
  uint64 Type = 4;
}


service Courseware {
  rpc Get(GetRequest) returns(GetResponse);
}
protoc --go_out=./ --go-grpc_out=./ coursewar
首页 上一页 1 2 3 4 5 下一页 尾页 4/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇垃圾回收 下一篇go-zero docker-compose搭建课件..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目