eset请求重置连接
获取连接的最后活跃时间(客户端每次请求都会更新这个时间),如果超过20秒不活跃,则向客户端发送一个探测请求,如果请求失败则断开连接
断开连接
业务处理流程
GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:
public void transportTerminated(Attributes transportAttrs) {
String connectionId = null;
try {
connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
} catch (Exception e) {
// Ignore
}
if (StringUtils.isNotBlank(connectionId)) {
// 使用ConnectionManager移除连接
connectionManager.unregister(connectionId);
}
}
ConnectionManager移除连接:
public synchronized void unregister(String connectionId) {
// 从Connection集移除连接
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
// IP连接数--
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet();
if (count <= 0) {
connectionForClientIp.remove(clientIp);
}
}
remove.close();
// 通知ClientManager层移除client对象
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
}
ConnectionBasedClientManager的clientDisconnected方法:
public boolean clientDisconnected(String clientId) {
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
// 推送一个ClientDisconnectEvent事件
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
事件处理流程
ClientDisconnectEvent事件:Client disconnect event. Happened when Client disconnect with server.
- ClientServiceIndexesManager - 维护注册和订阅关系
- DistroClientDataProcessor - 同步客户端数据到所有服务节点
- NamingMetadataManager - 维护客户端注册的服务和实例元数据信息
客户端
建立连接
ServerListFactory接口
Server list factory. Use to inner client to connected and switch servers.
管理Server服务器地址集合,RpcClient使用这个接口选择可用的服务器地址。
public interface ServerListFactory {
// 选择一个可用的服务器地址 ip:port格式
String genNextServer();
// 返回当前使用的服务器地址 ip:port格式
String getCurrentServer();
// 返回服务器集合
List<String> getServerList();
}
ServerListManager类
解析Properties参数封装服务器地址集合。
创建RpcClient
RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
createClient方法:
public static RpcClient createClient(String clientName,
ConnectionType connectionType,
Map<String, String> labels) {
return CLIENT_MAP.compute(clientName, (clientNameInner, client) -> {
if (client == null) {
if (ConnectionType.GRPC.equals(connectionType)) {
// 创建的是GrpcSdkClient对象
client = new GrpcSdkClient(clientNameInner);
}
if (client == null) {
throw new UnsupportedOperationException(
"unsupported connection type :" + connectionType.getType());
}
client.labels(labels);
}
return client;
});
}
之后需要为Client进行初始化:
-
设置ServerListFactory,用于选择服务器地址
-
注册ServerRequestHandler处理器,用于处理服务端发送的请求,比如服务订阅的回调、配置文件变化通知
-
注册ConnectionEventListener监听器
rpcClient.serverListFactory(serverListFactory);
rpcClient.start();
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
-
启动Client
- 启动ConnectionEvent处理线程
- 启动健康检查(心跳)线程
- 创建GrpcConnection
创建GrpcConnection
- 创建GRPC的RequestFutureStub和BiRequestStreamStub
- 发一个ServerCheckRequest请求验证服务端的可用性
- 创建GrpcConnection对象,封装serverIn