equestStreamStub = BiRequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
// 创建connection
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
// create stream request and bind connection event to this connection.
// 用于向服务端发送请求
StreamObserver<Payload> payloadStreamObserver =
bindRequestStream(biRequestStreamStub, grpcConn);
// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
// 发送一个ConnectionSetupRequest让服务端创建Connection
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
conSetupRequest.setAbilities(super.clientAbilities);
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
return grpcConn;
}
return null;
} catch (Exception e) {}
return null;
}
private StreamObserver<Payload> bindRequestStream(
final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
final GrpcConnection grpcConn) {
return streamStub.requestBiStream(new StreamObserver<Payload>() {
@Override
public void onNext(Payload payload) {
try {
Object parseBody = GrpcUtils.parse(payload);
final Request request = (Request) parseBody;
if (request != null) {
try {
// 使用客户端侧的ServerRequestHandler处理服务端发送过来的数据
Response response = handleServerRequest(request);
if (response != null) {
response.setRequestId(request.getRequestId());
// 响应
sendResponse(response);
}
} catch (Exception e) {
sendResponse(request.getRequestId(), false);
}
}
} catch (Exception e) {
// ...
}
}
@Override
public void onError(Throwable throwable) {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
} else {
// ...
}
}
@Override
public void onCompleted() {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
}
}
});
}
|