RequestHandler处理器
RequestHandler抽象类是Nacos在业务层处理GRPC请求的抽象类:
public abstract class RequestHandler<T extends Request, S extends Response> {
@Autowired
private RequestFilters requestFilters;
/**
* Handler request.
*/
public Response handleRequest(T request, RequestMeta meta) throws NacosException {
for (AbstractRequestFilter filter : requestFilters.filters) {
try {
Response filterResult = filter.filter(request, meta, this.getClass());
if (filterResult != null && !filterResult.isSuccess()) {
return filterResult;
}
} catch (Throwable throwable) {
Loggers.REMOTE.error("filter error", throwable);
}
}
return handle(request, meta);
}
/**
* Handler request.
*/
public abstract S handle(T request, RequestMeta meta) throws NacosException;
}
实现类:
Nacos使用RequestHandlerRegistry管理所有的RequestHandler,是一个Map结构:
// key是Request类型的简单名
// value是RequestHandler实现类对象
Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>();
RequestHandlerRegistry会扫描Spring容器里面所有的RequestHandler对象,解析RequestHandler实现类处理的Request类型的简单名,将其注册到registryHandlers中。
GrpcRequestAcceptor类获取适配的RequestHandler处理器使用的就是RequestHandlerRegistry类的getByRequestType方法:
public RequestHandler getByRequestType(String requestType) {
return registryHandlers.get(requestType);
}
建立连接
在Server初始化的时候,Nacos注册了ServerInterceptor和ServerTransportFilter组件,这些组件会在连接建立时将conn_id、remote_ip、remote_port、local_port、ctx_channel等绑定到Context上。
创建GrpcConnection
客户端在连接建立之后会发送一个ConnectionSetupRequest请求,服务器使用GrpcBiStreamRequestAcceptor处理该请求:
- 获取到conn_id、remote_ip、remote_port、local_port等
- 解析请求获取clienIp
- 封装GrpcConnection对象,包括:conn_id、remote_ip、remote_port、local_port、clientIp、客户端版本等基础信息,以及StreamObserver和Channel
- 将GrpcConnection注册到ConnectionManager上
创建Client
ConnectionManager的注册操作会触发ConnectionBasedClientManager的clientConnected方法来创建Client对象:
public void clientConnected(Connection connect) {
// grpc类型
String type = connect.getMetaInfo().getConnectType();
// 此处获取到的是ConnectionBasedClientFactory对象
ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
// 此处创建的是ConnectionBasedClient对象
clientConnected(clientFactory.newClient(connect.getMetaInfo().getConnectionId()));
}
public boolean clientConnected(Client client) {
if (!clients.containsKey(client.getClientId())) {
// 注册到client集
// 使用Map维护clientId->client对象关系
clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client);
}
return true;
}
健康检查
ConnectionManager连接管理器
这个类管理客户端连接,提供注册连接、移除连接等功能:
// 管理IP -> 连接数,用于实现ConnectionLimitRule
private Map<String, AtomicInteger> connectionForClientIp = new ConcurrentHashMap<String, AtomicInteger>(16);
// 管理connectionId -> Connection
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
Connection抽象类实现了Requester接口,能够向客户端发送请求、管理连接状态。
GrpcConnection实现了Connection抽象类。
在连接建立后,客户端会发送一个ConnectionSetupRequest请求,服务端收到该请求后,会解析出connectionId、客户端IP、客户端端口、客户端版本、Channel等封装成GrpcConnection对象,然后注册到ConnectionManager中。
健康检查周期任务
ConnectionManager在启动阶段会启动一个周期任务来检查IP连接数和连接的活跃状态,每3秒执行一次:
- 遍历连接集,使用connectionLimitRule查找需要重置的连接,向这些客户端发r