本与服务注册流程相同。
ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 会由DistroClientDataProcessor进行处理,同步客户端数据到所有服务节点。
ClientDeregisterServiceEvent事件:Client deregister service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。另外该处理器会推送一个ServiceChangedEvent事件。
InstanceMetadataEvent事件:实例元数据事件。由NamingMetadataManager进行处理,NamingMetadataManager管理客户端注册的服务和实例元数据信息。InstanceMetadataEvent事件会触发该处理器的实例过期判断。
ServiceChangedEvent事件:Service data changed event. 有两个处理器:
- NamingSubscriberServiceV2Impl - 触发回调服务订阅者任务
- DoubleWriteEventListener - 触发将服务信息同步到其他nacos节点任务
服务实例心跳
- 客户端会周期性的发送healthCheck请求
- 服务端每次收到客户端请求时都会更新对应connection的活跃时间戳
- 服务端也会周期性的检查客户端connection的活跃时间戳和客户端IP连接数,当超过一定的时间不活跃,服务端会发一个检测请求给客户端,当连接数超过阈值时将重置多余的连接
客户端healthCheck请求
客户端会周期性发送healthCheck请求,默认每5秒执行一次,在RpcClient中:
clientEventExecutor.submit(new Runnable() {
@Override
public void run() {
while (true) {
try {
if (isShutdown()) {
break;
}
ReconnectContext reconnectContext = reconnectionSignal
.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
//check alive time.
if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
boolean isHealthy = healthCheck();
if (!isHealthy) {
if (currentConnection == null) {
continue;
}
RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
break;
}
// ...
healthCheck健康检查:
private boolean healthCheck() {
HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
try {
Response response = this.currentConnection.request(healthCheckRequest, 3000L);
return response != null && response.isSuccess();
} catch (NacosException e) {
//ignore
}
return false;
}
如果检查失败,将重新建立连接。
服务端记录connection活跃时间戳
服务端每次收到客户端请求时都会更新对应connection的活跃时间戳。
服务端使用GrpcRequestAcceptor作为业务层请求Acceptor入口,这个类会将GRPC的请求转为业务层请求,并转发到对应的RequestHandler处理器。
在其request方法中,会刷新对应connection的活跃时间戳:
Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
// 刷新connection的活跃时间戳
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
服务端connection活跃检查
服务端周期性检查客户端connection的活跃时间戳和客户端IP连接数,当超过一定的时间不活跃,服务端会发一个检测请求给客户端,当连接数超过阈值时将重置多余的连接。
服务端使用ConnectionManager管理连接:
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
在启动时,会创建周期性任务检查connections的活跃状态,默认每3秒执行一次,以下为代码片段:
// 检查长时间不活跃的连接和超过最大连接数的连接
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
String clientIp = client.getMetaInfo().getClientIp();