Name());
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("getRealNameByUsernames completed");
}
});
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onCompleted();
} catch (StatusRuntimeException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", GRPC_SERVER_PORT).usePlaintext().build();
try {
StudentGrpcClient client = new StudentGrpcClient(channel);
int count = 1;
for (int i = 0; i < count; i++) {
client.getRealNameByUsername("admin2018");
Thread.sleep(20);
System.out.println("---");
client.getRealNameByUsernameLike("admin2019");
Thread.sleep(20);
System.out.println("---");
client.getRealNameByUsernames("admin2020");
Thread.sleep(20);
System.out.println("---");
client.getRealNamesByUsernames("admin2021");
}
Thread.sleep(10000);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
Client代码(FutureStub)
仅适用于单请求单响应的简单rpc调用:
try {
ListenableFuture<StudentResponse> future = futureStub.getRealNameByUsername(
StudentUsernameRequest.newBuilder().setUsername(username).build());
// 阻塞等待
// StudentResponse studentResponse = future.get();
CountDownLatch latch = new CountDownLatch(1);
Futures.addCallback(future, new FutureCallback<StudentResponse>() {
@Override
public void onSuccess(StudentResponse response) {
System.out.printf("Real name=%s\n", response.getRealName());
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
System.err.println(t.getMessage());
latch.countDown();
}
}, Executors.newSingleThreadExecutor());
latch.await();
} catch (StatusRuntimeException | InterruptedException e) {
System.err.println(e.getMessage());
}
Nacos中grpc的使用
在Nacos中,proto文件并没有定义所有的接口,而是只定义了基础的转发接口和通用请求响应Payload结构体。
具体的接口请求响应结构体在业务代码中编写,业务接口则是使用转发接口进行路由,类似SpringMVC中DispatcherServlet转发请求给Controller一样。
本小节将简单介绍Nacos中集成grpc的方式。
服务端
BaseGrpcServer
抽象类BaseRpcServer定义了rpc服务器的框架逻辑,模板方法startServer()要子类实现,是启动rpc服务器的核心逻辑。
抽象类BaseGrpcServer继承了BaseRpcServer类,封装了grpc组件:
- Server - GRPC服务器对象
- GrpcRequestAcceptor - 业务请求接收、转发器
- GrpcBiStreamRequestAcceptor - 连接请求接收处理器,用于获取双向流发送StreamObserver
- ConnectionManager - 连接管理器
startServer()方法封装了启动grpc服务器的逻辑:
public void startServer() throws Exception {
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
// server interceptor to set connection id.
ServerInterceptor serverInterceptor = new ServerInterceptor() {
@Override
public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
ServerCallHandler<T, S> next) {
// 把connectionId、ip、port等保存到Context上
Context ctx = Context.current()
.withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
.withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.