ction grpcConn) {
return streamStub.requestBiStream(new StreamObserver<Payload>() {
@Override
public void onNext(Payload payload) {
try {
Object parseBody = GrpcUtils.parse(payload);
final Request request = (Request) parseBody;
if (request != null) {
try {
// 调用ServerRequestHandler处理请求
Response response = handleServerRequest(request);
if (response != null) {
response.setRequestId(request.getRequestId());
sendResponse(response);
}
// ...
NamingPushRequestHandler的处理逻辑:
public Response requestReply(Request request) {
if (request instanceof NotifySubscriberRequest) {
NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
return new NotifySubscriberResponse();
}
return null;
}
serviceInfoHolder.processServiceInfo方法:
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
// 推送一个InstancesChangeEvent事件
NotifyCenter.publishEvent(new InstancesChangeEvent(
serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
推送一个InstancesChangeEvent事件:
-
NotifyCenter维护着一个EventPublisher集,当有事件时,会选择一个目标EventPublisher
-
通过publish方法将事件保存到一个Event队列
public boolean publish(Event event) {
checkIsStart();
boolean success = this.queue.offer(event);
if (!success) {
// 当队列操作失败时,直接使用当前线程处理事件
receiveEvent(event);
return true;
}
return true;
}
-
EventPublisher是一个线程,在NotifyCenter初始化时启动。run方法会从Event队列取事件,使用receiveEvent(event)进行处理
-
receiveEvent方法查找所有的Subscriber,其中就有最初创建的InstancesChangeNotifier,调用订阅者onEvent方法
服务端
服务注册
InstanceRequestHandler处理器
注册中心的rpc处理器在com.alibaba.nacos.naming.remote.rpc.handler包,处理服务注册和下线的处理器是InstanceRequestHandler类:
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
private final EphemeralClientOperationServiceImpl clientOperationService;
public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
this.clientOperationService = clientOperationService;
}
@Secured(action = ActionTypes.WRITE, parser = NamingResourceParser.class)
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
switch (request.getType()) {
// 服务注册
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta);
// 服务下线
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return dereg