设为首页 加入收藏

TOP

Nacos源码 (6) Grpc概述与Nacos集成(七)
2023-09-23 15:44:14 】 浏览:192
Tags:Nacos 源码 Grpc 集成
getAttributes().get(TRANS_KEY_REMOTE_IP)) .withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT)) .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT)); if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) { Channel internalChannel = getInternalChannel(call); // 保存channel ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel); } return Contexts.interceptCall(ctx, call, headers, next); } }; // 添加转发组件 addServices(handlerRegistry, serverInterceptor); // 创建Server server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor()) .maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry) .compressorRegistry(CompressorRegistry.getDefaultInstance()) .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) .addTransportFilter(new ServerTransportFilter() { @Override public Attributes transportReady(Attributes transportAttrs) { // 在连接建立时获取ip、port等信息,生成connectionId InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); InetSocketAddress localAddress = (InetSocketAddress) transportAttrs .get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR); int remotePort = remoteAddress.getPort(); int localPort = localAddress.getPort(); String remoteIp = remoteAddress.getAddress().getHostAddress(); Attributes attrWrapper = transportAttrs.toBuilder() .set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort) .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort) .set(TRANS_KEY_LOCAL_PORT, localPort).build(); return attrWrapper; } @Override public void transportTerminated(Attributes transportAttrs) { String connectionId = null; try { connectionId = transportAttrs.get(TRANS_KEY_CONN_ID); } catch (Exception e) { // Ignore } if (StringUtils.isNotBlank(connectionId)) { // 连接断开时,从connectionManager移除 connectionManager.unregister(connectionId); } } }).build(); server.start(); }

GrpcBiStreamRequestAcceptor

grpc bi stream request handler.

主要功能就是封装服务端Connection对象:

if (parseObj instanceof ConnectionSetupRequest) {
    ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
    Map<String, String> labels = setUpRequest.getLabels();
    String appName = "-";
    if (labels != null && labels.containsKey(Constants.APPNAME)) {
        appName = labels.get(Constants.APPNAME);
    }

    ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
            remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
            setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
    metaInfo.setTenant(setUpRequest.getTenant());
    // 封装连接基础信息和responseObserver、channel
    // 使用responseObserver向客户端推送消息
    Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
    connection.setAbilities(setUpRequest.getAbilities());
    boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
    // 注册到connectionManager
    if (rejectSdkOnStarting || !connectionManager.register(connection
首页 上一页 4 5 6 7 8 9 下一页 尾页 7/9/9
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇2023年了,复习了一下spring boot.. 下一篇 10 年程序员的告诫:千万不要重..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目