fo和executor、connectionId、channel等
为BiRequestStreamStub绑定请求处理逻辑:使用ServerRequestHandler处理器处理服务端发送过来的请求
发送ConnectionSetupRequest请求,让服务端创建并注册GrpcConnection
if (grpcExecutor == null) {
int threadNumber = ThreadUtils.getSuitableThreadCount(8);
grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("nacos-grpc-client-executor-%d")
.build());
grpcExecutor.allowCoreThreadTimeOut(true);
}
// 8848+1000
int port = serverInfo.getServerPort() + rpcPortOffset();
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
// 发一个ServerCheckRequest请求验证服务端的可用性
Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (response == null || !(response instanceof ServerCheckResponse)) {
shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
return null;
}
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
// 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
// create stream request and bind connection event to this connection
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
// send a setup request
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
conSetupRequest.setAbilities(super.clientAbilities);
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
发送请求
Requester接口
这个接口定义了发送请求的方法:
public interface Requester {
/**
* send request.
*
* @param request request.
* @param timeoutMills mills of timeouts.
* @return response response returned.
* @throws NacosException exception throw.
*/
Response request(Request request, long timeoutMills) throws NacosException;
/**
* send request.
*
* @param request request.
* @return request future.
* @throws NacosException exception throw.
*/
RequestFuture requestFuture(Request request) throws NacosException;
/**
* send async request.
*
* @param request request.
* @param requestCallBack callback of request.
* @throws NacosException exception throw.
*/
void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException;
/**
* close connection.
*/
void close();
}
GrpcConnection实现
GrpcConnection类实现了Requester接口的三个request方法,使用的是GRPC的Stub发送请求,以request方法为例:
public Response request(Request request, long timeouts) throws NacosException {
Payload grpcRequest = GrpcUtils.convert(request);
ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcReques