设为首页 加入收藏

TOP

Nacos源码 (3) 注册中心(九)
2023-08-26 21:11:08 】 浏览:150
Tags:Nacos 源码
ger.getClient(clientId); if (null == client) { return Optional.empty(); } // 查找该client指定service注册的实例信息 // AbstractClient使用Map<Service, InstancePublishInfo>结构保存 // 前文介绍过在服务注册时会使用client.addServiceInstance方法添加注册信息 return Optional.ofNullable(client.getInstancePublishInfo(service)); }

前文介绍过ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。

服务订阅

SubscribeServiceRequestHandler处理器

SubscribeServiceRequestHandler类负责客户端的服务订阅请求:

public class SubscribeServiceRequestHandler extends 
             RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {

    private final ServiceStorage serviceStorage;

    private final NamingMetadataManager metadataManager;

    private final EphemeralClientOperationServiceImpl clientOperationService;

    public SubscribeServiceRequestHandler(ServiceStorage serviceStorage,
            NamingMetadataManager metadataManager,
            EphemeralClientOperationServiceImpl clientOperationService) {
        this.serviceStorage = serviceStorage;
        this.metadataManager = metadataManager;
        this.clientOperationService = clientOperationService;
    }

    @Secured(action = ActionTypes.READ, parser = NamingResourceParser.class)
    public SubscribeServiceResponse handle(
           SubscribeServiceRequest request, RequestMeta meta) throws NacosException {

        String namespaceId = request.getNamespace();
        String serviceName = request.getServiceName();
        String groupName = request.getGroupName();
        String app = request.getHeader("app", "unknown");
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        Service service = Service.newService(namespaceId, groupName, serviceName, true);

        // 封装Subscriber对象:客户端IP、版本、命名空间等
        Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app,
                meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());

        ServiceInfo serviceInfo = handleClusterData(serviceStorage.getData(service),
                metadataManager.getServiceMetadata(service).orElse(null),
                subscriber);

        if (request.isSubscribe()) {
            // 服务订阅
            clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
        } else {
            // 取消订阅
            clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
        }
        return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
    }

    private ServiceInfo handleClusterData(
            ServiceInfo data, ServiceMetadata metadata, Subscriber subscriber) {
        return StringUtils.isBlank(subscriber.getCluster()) ? data
                : ServiceUtil.selectInstancesWithHealthyProtection(data, metadata, subscriber.getCluster());
    }
}

服务订阅核心流程

public void subscribeService(Service service, Subscriber subscriber, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    Client client = clientManager.getClient(clientId);
    // 为该client绑定service -> subscriber关系
    client.addServiceSubscriber(singleton, subscriber);
    client.setLastUpdatedTime();
    // 推送一个ClientSubscribeServiceEvent事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

事件处理流程

ClientSubscribeServiceEvent事件:Client subscribe service event. 由ClientServi

首页 上一页 6 7 8 9 下一页 尾页 9/9/9
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇quarkus数据库篇之一:比官方demo.. 下一篇JDK 17 营销初体验 —— 亚毫秒停..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目