设为首页 加入收藏

TOP

Nacos源码 (3) 注册中心(四)
2023-08-26 21:11:08 】 浏览:148
Tags:Nacos 源码
ction grpcConn) { return streamStub.requestBiStream(new StreamObserver<Payload>() { @Override public void onNext(Payload payload) { try { Object parseBody = GrpcUtils.parse(payload); final Request request = (Request) parseBody; if (request != null) { try { // 调用ServerRequestHandler处理请求 Response response = handleServerRequest(request); if (response != null) { response.setRequestId(request.getRequestId()); sendResponse(response); } // ...

NamingPushRequestHandler的处理逻辑:

public Response requestReply(Request request) {
    if (request instanceof NotifySubscriberRequest) {
        NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
        serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
        return new NotifySubscriberResponse();
    }
    return null;
}

serviceInfoHolder.processServiceInfo方法:

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
    String serviceKey = serviceInfo.getKey();
    if (serviceKey == null) {
        return null;
    }
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    if (isEmptyOrErrorPush(serviceInfo)) {
        //empty or error push, just ignore
        return oldService;
    }
    serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
    boolean changed = isChangedServiceInfo(oldService, serviceInfo);
    if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
        serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
    }
    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    if (changed) {
        // 推送一个InstancesChangeEvent事件
        NotifyCenter.publishEvent(new InstancesChangeEvent(
                serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}

推送一个InstancesChangeEvent事件:

  1. NotifyCenter维护着一个EventPublisher集,当有事件时,会选择一个目标EventPublisher

  2. 通过publish方法将事件保存到一个Event队列

    public boolean publish(Event event) {
        checkIsStart();
        boolean success = this.queue.offer(event);
        if (!success) {
            // 当队列操作失败时,直接使用当前线程处理事件
            receiveEvent(event);
            return true;
        }
        return true;
    }
    
  3. EventPublisher是一个线程,在NotifyCenter初始化时启动。run方法会从Event队列取事件,使用receiveEvent(event)进行处理

  4. receiveEvent方法查找所有的Subscriber,其中就有最初创建的InstancesChangeNotifier,调用订阅者onEvent方法

服务端

服务注册

InstanceRequestHandler处理器

注册中心的rpc处理器在com.alibaba.nacos.naming.remote.rpc.handler包,处理服务注册和下线的处理器是InstanceRequestHandler类:

public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {

    private final EphemeralClientOperationServiceImpl clientOperationService;

    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }

    @Secured(action = ActionTypes.WRITE, parser = NamingResourceParser.class)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            // 服务注册
            case NamingRemoteConstants.REGISTER_INSTANCE:
                return registerInstance(service, request, meta);
            // 服务下线
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return dereg
首页 上一页 1 2 3 4 5 6 7 下一页 尾页 4/9/9
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇quarkus数据库篇之一:比官方demo.. 下一篇JDK 17 营销初体验 —— 亚毫秒停..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目