设为首页 加入收藏

TOP

Nacos源码 (3) 注册中心(七)
2023-08-26 21:11:08 】 浏览:149
Tags:Nacos 源码
AtomicInteger integer = expelForIp.get(clientIp); if (integer != null && integer.intValue() > 0) { integer.decrementAndGet(); expelClient.add(client.getMetaInfo().getConnectionId()); expelCount--; } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) { outDatedConnections.add(client.getMetaInfo().getConnectionId()); } } // ... // 重置超过最大连接数的连接 for (String expelledClientId : expelClient) { try { Connection connection = getConnection(expelledClientId); if (connection != null) { ConnectResetRequest connectResetRequest = new ConnectResetRequest(); connectResetRequest.setServerIp(serverIp); connectResetRequest.setServerPort(serverPort); connection.asyncRequest(connectResetRequest, null); } } catch (ConnectionAlreadyClosedException e) { unregister(expelledClientId); } catch (Exception e) { } } // ... if (CollectionUtils.isNotEmpty(outDatedConnections)) { Set<String> successConnections = new HashSet<>(); final CountDownLatch latch = new CountDownLatch(outDatedConnections.size()); for (String outDateConnectionId : outDatedConnections) { try { Connection connection = getConnection(outDateConnectionId); if (connection != null) { // 给客户端发检测请求 ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest(); connection.asyncRequest(clientDetectionRequest, new RequestCallBack() { @Override public Executor getExecutor() { return null; } @Override public long getTimeout() { return 1000L; } @Override public void onResponse(Response response) { latch.countDown(); if (response != null && response.isSuccess()) { connection.freshActiveTime(); successConnections.add(outDateConnectionId); } } @Override public void onException(Throwable e) { latch.countDown(); } }); } else { latch.countDown(); } } catch (ConnectionAlreadyClosedException e) { latch.countDown(); } catch (Exception e) { latch.countDown(); } } latch.await(3000L, TimeUnit.MILLISECONDS); // 移除失败的已断开连接 for (String outDateConnectionId : outDatedConnections) { if (!successConnections.contains(outDateConnectionId)) { unregister(outDateConnectionId); } } }

客户端断开连接

业务处理流程

GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:

public void transportTerminated(Attributes transportAttrs) {
    String connectionId = null;
    try {
        connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
    } catch (Exception e) {
        // Ignore
    }
    if (StringUtils.isNotBlank(connectionId)) {
        // 使用ConnectionManager移除连接
        connectionManager.unregister(connectionId);
    }
}

ConnectionManager移除连接:

public synchronized void unregister(String connectionId) {
    // 从Connection集移除连接
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        // IP连接数--
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
        remove.close();
        // 通知ClientManager层断开连接
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }
}

ConnectionBasedClientManager的clientDisconnected方法:

public boolean clientDisconnected(String clientId) {
    ConnectionBasedClient cli
首页 上一页 4 5 6 7 8 9 下一页 尾页 7/9/9
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇quarkus数据库篇之一:比官方demo.. 下一篇JDK 17 营销初体验 —— 亚毫秒停..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目