设为首页 加入收藏

TOP

Nacos源码 (3) 注册中心(二)
2023-08-26 21:11:08 】 浏览:147
Tags:Nacos 源码
rService(serviceName, groupName, instance); }

查询实例

示例代码:

NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
List<Instance> instances = namingService.getAllInstances(ORDER_SERVICE, true);

System.out.printf(">> instance count=%d\n", instances.size());

for (Instance instance : instances) {
    System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
            instance.getServiceName(), instance.getInstanceId(),
            instance.getClusterName(), instance.getIp(), instance.getPort());
}

提供了几个重载的getAllInstances方法,最重要的参数就是subscribe,当为true时,会向服务端发送订阅请求,之后一直从ServiceInfoHolder中获取服务实例信息,而不再向服务端发送查询请求。

public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
        boolean subscribe) throws NacosException {
    ServiceInfo serviceInfo;
    String clusterString = StringUtils.join(clusters, ",");
    if (subscribe) {
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        if (null == serviceInfo) {
            // 订阅请求
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        // 查询请求
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}

服务订阅

示例代码:

NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
namingService.subscribe(ORDER_SERVICE, new EventListener() {
    @Override
    public void onEvent(Event event) {
        NamingEvent e = (NamingEvent) event;
        System.out.println("serviceName=" + e.getServiceName());
        List<Instance> instances = e.getInstances();
        System.out.printf(">> instance count=%d\n", instances.size());

        for (Instance instance : instances) {
            System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
                    instance.getServiceName(), instance.getInstanceId(),
                    instance.getClusterName(), instance.getIp(), instance.getPort());
        }
    }
});

TimeUnit.SECONDS.sleep(1200);

subscribe方法:

public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    String clusterString = StringUtils.join(clusters, ",");
    // 将listener保存到listenerMap中
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    // 发送订阅请求
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

实例变化的方法调用栈:

当收到服务端的实例变化事件时,会触发grpc层的观察者监听:

public void onMessage(RespT message) {
  if (firstResponseReceived && !streamingResponse) {
    throw Status.INTERNAL
        .withDescription("More than one responses received for unary or client-streaming call")
        .asRuntimeException();
  }
  firstResponseReceived = true;
  // 调用观察者
  observer.onNext(message);

  if (streamingResponse && adapter.autoFlowControlEnabled) {
    // Request delivery of the next inbound message.
    adapter.request(1);
  }
}

此处的observer是在创建rpc连接的时候注册的:

private StreamObserver<Payload> bindRequestStream(
        final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
        final GrpcConne
首页 上一页 1 2 3 4 5 6 7 下一页 尾页 2/9/9
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇quarkus数据库篇之一:比官方demo.. 下一篇JDK 17 营销初体验 —— 亚毫秒停..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目