ger.getClient(clientId);
if (null == client) {
return Optional.empty();
}
// 查找该client指定service注册的实例信息
// AbstractClient使用Map<Service, InstancePublishInfo>结构保存
// 前文介绍过在服务注册时会使用client.addServiceInstance方法添加注册信息
return Optional.ofNullable(client.getInstancePublishInfo(service));
}
前文介绍过ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。
服务订阅
SubscribeServiceRequestHandler处理器
SubscribeServiceRequestHandler类负责客户端的服务订阅请求:
public class SubscribeServiceRequestHandler extends
RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {
private final ServiceStorage serviceStorage;
private final NamingMetadataManager metadataManager;
private final EphemeralClientOperationServiceImpl clientOperationService;
public SubscribeServiceRequestHandler(ServiceStorage serviceStorage,
NamingMetadataManager metadataManager,
EphemeralClientOperationServiceImpl clientOperationService) {
this.serviceStorage = serviceStorage;
this.metadataManager = metadataManager;
this.clientOperationService = clientOperationService;
}
@Secured(action = ActionTypes.READ, parser = NamingResourceParser.class)
public SubscribeServiceResponse handle(
SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();
String serviceName = request.getServiceName();
String groupName = request.getGroupName();
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
Service service = Service.newService(namespaceId, groupName, serviceName, true);
// 封装Subscriber对象:客户端IP、版本、命名空间等
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app,
meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());
ServiceInfo serviceInfo = handleClusterData(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null),
subscriber);
if (request.isSubscribe()) {
// 服务订阅
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
} else {
// 取消订阅
clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
}
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
private ServiceInfo handleClusterData(
ServiceInfo data, ServiceMetadata metadata, Subscriber subscriber) {
return StringUtils.isBlank(subscriber.getCluster()) ? data
: ServiceUtil.selectInstancesWithHealthyProtection(data, metadata, subscriber.getCluster());
}
}
服务订阅核心流程
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
// 为该client绑定service -> subscriber关系
client.addServiceSubscriber(singleton, subscriber);
client.setLastUpdatedTime();
// 推送一个ClientSubscribeServiceEvent事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}
事件处理流程
ClientSubscribeServiceEvent事件:Client subscribe service event. 由ClientServi