Nacos 2.x在服务端与客户端直接增加了GRPC通信方式,本文通过2.0.2版本源码,简单分析GRPC通信方式:
- 服务器启动
- 客户端连接
- 客户端心跳
- 服务器监控检查
服务器
proto文件
api/src/main/proto/nacos_grpc_service.proto文件:
syntax = "proto3";
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
option java_multiple_files = true;
option java_package = "com.alibaba.nacos.api.grpc.auto";
message Metadata {
string type = 3; // 请求/响应的真实类型
string clientIp = 8;
map<string, string> headers = 7;
}
// GRPC通信层请求/响应体
message Payload {
Metadata metadata = 2;
// 业务层的请求/响应体,需要使用type做反序列化
google.protobuf.Any body = 3;
}
service RequestStream {
// build a streamRequest
rpc requestStream (Payload) returns (stream Payload) {
}
}
service Request {
// Sends a commonRequest
rpc request (Payload) returns (Payload) {
}
}
service BiRequestStream {
// Sends a commonRequest
rpc requestBiStream (stream Payload) returns (stream Payload) {
}
}
文件定义了通信层的service和message结构,业务层请求响应的序列化和反序列化是Nacos在RequestAcceptor/Connection中使用工具类实现的,业务层请求处理是在RequestAcceptor中进行的转发。
服务器启动
Server类继承关系
BaseRpcServer
|-- BaseGrpcServer
|-- GrpcSdkServer
|-- GrpcClusterServer
此处介绍一下GrpcSdkServer实现。
GrpcSdkServer类
@Service
public class GrpcSdkServer extends BaseGrpcServer {
// 所以SDK服务器的监听端口是9848
private static final int PORT_OFFSET = 1000;
@Override
public int rpcPortOffset() {
return PORT_OFFSET;
}
@Override
public ThreadPoolExecutor getRpcExecutor() {
return GlobalExecutor.sdkRpcExecutor;
}
}
大部分的启动逻辑在BaseGrpcServer中。
BaseGrpcServer类
GRPC服务器的启动逻辑大部分都在这个类的startServer方法。
- 将处理请求的RequestAcceptor注册到HandlerRegistry
- GrpcRequestAcceptor用于处理普通业务请求
- GrpcBiStreamRequestAcceptor用于处理连接建立请求,获取Channel创建GrpcConnection并注册到ConnectionManager中,后续向客户端发送消息都是使用GrpcConnection做的
- 创建GRPC的Server对象
- 设置port和executor
- 设置HandlerRegistry
- 添加ServerTransportFilter在连接建立和断开时做一些业务操作
- 启动Server
GrpcRequestAcceptor类
这个类对GRPC做了扩展,重写了request方法:
- 解析Payload获取请求体的数据类型
- 从RequestHandlerRegistry获取适配的RequestHandler处理器
- 将请求体反序列化成请求体类型对象
- 调用handleRequest方法处理请求返回响应
处理请求代码:
Request request = (Request) parseObj;
try {
// 获取Connection
Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
// 刷新活跃时间,后续的健康检查会使用到这个时间戳
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
// 使用RequestHandler处理请求
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
} catch (Throwable e) {
Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(
(e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
e.getMessage()));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
}