设为首页 加入收藏

TOP

Nacos源码 (5) Grpc服务端和客户端(三)
2023-09-09 10:25:54 】 浏览:74
Tags:Nacos 源码 Grpc
t); Payload grpcResponse; try { // 由于request方法是同步的,所以此处阻塞等待响应 grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS); } catch (Exception e) { throw new NacosException(NacosException.SERVER_ERROR, e); } return (Response) GrpcUtils.parse(grpcResponse); }

对于另外两个方法:

  • requestFuture方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,创建一个RequestFuture返回
  • asyncRequest方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,为requestFuture添加监听回调

心跳healthCheck

前文介绍过,在启动RpcClient阶段,会启动健康检查任务,该任务每5秒执行一次,对当前客户端封装的connection做健康检查:

// keepAliveTime默认5000L
ReconnectContext reconnectContext = reconnectionSignal
        .poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
    // check alive time.
    if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
        // 健康检查
        boolean isHealthy = healthCheck();
        if (!isHealthy) {
            if (currentConnection == null) {
                continue;
            }

            RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
            if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                break;
            }

            // 准备重连
            boolean success = RpcClient.this.rpcClientStatus
                    .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
            if (success) {
                reconnectContext = new ReconnectContext(null, false);
            } else {
                continue;
            }
        } else {
            lastActiveTimeStamp = System.currentTimeMillis();
            continue;
        }
    } else {
        continue;
    }
}

if (reconnectContext.serverInfo != null) {
    // clear recommend server if server is not in server list.
    boolean serverExist = false;
    for (String server : getServerListFactory().getServerList()) {
        ServerInfo serverInfo = resolveServerInfo(server);
        if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
            serverExist = true;
            reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
            break;
        }
    }
    if (!serverExist) {
        reconnectContext.serverInfo = null;
    }
}
// 重连
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);

healthCheck方法:

private boolean healthCheck() {
    HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
    if (this.currentConnection == null) {
        return false;
    }
    try {
        Response response = this.currentConnection.request(healthCheckRequest, 3000L);
        // not only check server is ok ,also check connection is register.
        return response != null && response.isSuccess();
    } catch (NacosException e) {
        // ignore
    }
    return false;
}
首页 上一页 1 2 3 4 5 下一页 尾页 3/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇redis 热点key问题及其解决方案 下一篇到底什么是Java AIO?为什么Netty..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目