设为首页 加入收藏

TOP

Nacos源码 (5) Grpc服务端和客户端(四)
2023-09-09 10:25:54 】 浏览:75
Tags:Nacos 源码 Grpc
eset请求重置连接
  • 获取连接的最后活跃时间(客户端每次请求都会更新这个时间),如果超过20秒不活跃,则向客户端发送一个探测请求,如果请求失败则断开连接
  • 断开连接

    业务处理流程

    GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:

    public void transportTerminated(Attributes transportAttrs) {
        String connectionId = null;
        try {
            connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
        } catch (Exception e) {
            // Ignore
        }
        if (StringUtils.isNotBlank(connectionId)) {
            // 使用ConnectionManager移除连接
            connectionManager.unregister(connectionId);
        }
    }
    

    ConnectionManager移除连接:

    public synchronized void unregister(String connectionId) {
        // 从Connection集移除连接
        Connection remove = this.connections.remove(connectionId);
        if (remove != null) {
            String clientIp = remove.getMetaInfo().clientIp;
            AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
            // IP连接数--
            if (atomicInteger != null) {
                int count = atomicInteger.decrementAndGet();
                if (count <= 0) {
                    connectionForClientIp.remove(clientIp);
                }
            }
            remove.close();
            // 通知ClientManager层移除client对象
            clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
        }
    }
    

    ConnectionBasedClientManager的clientDisconnected方法:

    public boolean clientDisconnected(String clientId) {
        ConnectionBasedClient client = clients.remove(clientId);
        if (null == client) {
            return true;
        }
        client.release();
        // 推送一个ClientDisconnectEvent事件
        NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
        return true;
    }
    

    事件处理流程

    ClientDisconnectEvent事件:Client disconnect event. Happened when Client disconnect with server.

    • ClientServiceIndexesManager - 维护注册和订阅关系
    • DistroClientDataProcessor - 同步客户端数据到所有服务节点
    • NamingMetadataManager - 维护客户端注册的服务和实例元数据信息

    客户端

    建立连接

    ServerListFactory接口

    Server list factory. Use to inner client to connected and switch servers.

    管理Server服务器地址集合,RpcClient使用这个接口选择可用的服务器地址。

    public interface ServerListFactory {
    
        // 选择一个可用的服务器地址 ip:port格式
        String genNextServer();
    
        // 返回当前使用的服务器地址 ip:port格式
        String getCurrentServer();
    
        // 返回服务器集合
        List<String> getServerList();
    }
    

    ServerListManager类

    解析Properties参数封装服务器地址集合。

    创建RpcClient

    RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
    

    createClient方法:

    public static RpcClient createClient(String clientName,
                                         ConnectionType connectionType,
                                         Map<String, String> labels) {
        return CLIENT_MAP.compute(clientName, (clientNameInner, client) -> {
            if (client == null) {
                if (ConnectionType.GRPC.equals(connectionType)) {
                    // 创建的是GrpcSdkClient对象
                    client = new GrpcSdkClient(clientNameInner);
                }
                if (client == null) {
                    throw new UnsupportedOperationException(
                        "unsupported connection type :" + connectionType.getType());
                }
                client.labels(labels);
            }
            return client;
        });
    }
    

    之后需要为Client进行初始化:

    1. 设置ServerListFactory,用于选择服务器地址

    2. 注册ServerRequestHandler处理器,用于处理服务端发送的请求,比如服务订阅的回调、配置文件变化通知

    3. 注册ConnectionEventListener监听器

      rpcClient.serverListFactory(serverListFactory);
      rpcClient.start();
      rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
      rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
      
    4. 启动Client

      • 启动ConnectionEvent处理线程
      • 启动健康检查(心跳)线程
      • 创建GrpcConnection

    创建GrpcConnection

    1. 创建GRPC的RequestFutureStub和BiRequestStreamStub
    2. 发一个ServerCheckRequest请求验证服务端的可用性
    3. 创建GrpcConnection对象,封装serverIn
    首页 上一页 1 2 3 4 5 下一页 尾页 4/5/5
    】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
    上一篇redis 热点key问题及其解决方案 下一篇到底什么是Java AIO?为什么Netty..

    最新文章

    热门文章

    Hot 文章

    Python

    C 语言

    C++基础

    大数据基础

    linux编程基础

    C/C++面试题目