t);
Payload grpcResponse;
try {
// 由于request方法是同步的,所以此处阻塞等待响应
grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, e);
}
return (Response) GrpcUtils.parse(grpcResponse);
}
对于另外两个方法:
- requestFuture方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,创建一个RequestFuture返回
- asyncRequest方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,为requestFuture添加监听回调
心跳healthCheck
前文介绍过,在启动RpcClient阶段,会启动健康检查任务,该任务每5秒执行一次,对当前客户端封装的connection做健康检查:
// keepAliveTime默认5000L
ReconnectContext reconnectContext = reconnectionSignal
.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// check alive time.
if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
// 健康检查
boolean isHealthy = healthCheck();
if (!isHealthy) {
if (currentConnection == null) {
continue;
}
RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
break;
}
// 准备重连
boolean success = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (success) {
reconnectContext = new ReconnectContext(null, false);
} else {
continue;
}
} else {
lastActiveTimeStamp = System.currentTimeMillis();
continue;
}
} else {
continue;
}
}
if (reconnectContext.serverInfo != null) {
// clear recommend server if server is not in server list.
boolean serverExist = false;
for (String server : getServerListFactory().getServerList()) {
ServerInfo serverInfo = resolveServerInfo(server);
if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
serverExist = true;
reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
break;
}
}
if (!serverExist) {
reconnectContext.serverInfo = null;
}
}
// 重连
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
healthCheck方法:
private boolean healthCheck() {
HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
if (this.currentConnection == null) {
return false;
}
try {
Response response = this.currentConnection.request(healthCheckRequest, 3000L);
// not only check server is ok ,also check connection is register.
return response != null && response.isSuccess();
} catch (NacosException e) {
// ignore
}
return false;
}