设为首页 加入收藏

TOP

Nacos源码 (3) 注册中心(六)
2023-08-26 21:11:08 】 浏览:153
Tags:Nacos 源码
本与服务注册流程相同。

ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 会由DistroClientDataProcessor进行处理,同步客户端数据到所有服务节点

ClientDeregisterServiceEvent事件:Client deregister service event. 由ClientServiceIndexesManager进行处理,ClientServiceIndexesManager类维护clientId与service的注册关系和订阅关系。另外该处理器会推送一个ServiceChangedEvent事件。

InstanceMetadataEvent事件:实例元数据事件。由NamingMetadataManager进行处理,NamingMetadataManager管理客户端注册的服务和实例元数据信息。InstanceMetadataEvent事件会触发该处理器的实例过期判断

ServiceChangedEvent事件:Service data changed event. 有两个处理器:

  • NamingSubscriberServiceV2Impl - 触发回调服务订阅者任务
  • DoubleWriteEventListener - 触发将服务信息同步到其他nacos节点任务

服务实例心跳

  1. 客户端会周期性的发送healthCheck请求
  2. 服务端每次收到客户端请求时都会更新对应connection的活跃时间戳
  3. 服务端也会周期性的检查客户端connection的活跃时间戳和客户端IP连接数,当超过一定的时间不活跃,服务端会发一个检测请求给客户端,当连接数超过阈值时将重置多余的连接

客户端healthCheck请求

客户端会周期性发送healthCheck请求,默认每5秒执行一次,在RpcClient中:

clientEventExecutor.submit(new Runnable() {
    @Override
    public void run() {
        while (true) {
            try {
                if (isShutdown()) {
                    break;
                }
                ReconnectContext reconnectContext = reconnectionSignal
                        .poll(keepAliveTime, TimeUnit.MILLISECONDS);
                if (reconnectContext == null) {
                    //check alive time.
                    if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
                        boolean isHealthy = healthCheck();
                        if (!isHealthy) {
                            if (currentConnection == null) {
                                continue;
                            }
                            
                            RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
                            if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                                break;
                            }
// ...

healthCheck健康检查:

private boolean healthCheck() {
    HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
    try {
        Response response = this.currentConnection.request(healthCheckRequest, 3000L);
        return response != null && response.isSuccess();
    } catch (NacosException e) {
        //ignore
    }
    return false;
}

如果检查失败,将重新建立连接。

服务端记录connection活跃时间戳

服务端每次收到客户端请求时都会更新对应connection的活跃时间戳。

服务端使用GrpcRequestAcceptor作为业务层请求Acceptor入口,这个类会将GRPC的请求转为业务层请求,并转发到对应的RequestHandler处理器。

在其request方法中,会刷新对应connection的活跃时间戳:

Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
// 刷新connection的活跃时间戳
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();

服务端connection活跃检查

服务端周期性检查客户端connection的活跃时间戳和客户端IP连接数,当超过一定的时间不活跃,服务端会发一个检测请求给客户端,当连接数超过阈值时将重置多余的连接。

服务端使用ConnectionManager管理连接:

Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();

在启动时,会创建周期性任务检查connections的活跃状态,默认每3秒执行一次,以下为代码片段:

// 检查长时间不活跃的连接和超过最大连接数的连接
for (Map.Entry<String, Connection> entry : entries) {
    Connection client = entry.getValue();
    String clientIp = client.getMetaInfo().getClientIp();
首页 上一页 3 4 5 6 7 8 9 下一页 尾页 6/9/9
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇quarkus数据库篇之一:比官方demo.. 下一篇JDK 17 营销初体验 —— 亚毫秒停..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目