设为首页 加入收藏

TOP

Nacos源码 (5) Grpc服务端和客户端(五)
2023-09-09 10:25:54 】 浏览:81
Tags:Nacos 源码 Grpc
fo和executor、connectionId、channel等
  • 为BiRequestStreamStub绑定请求处理逻辑:使用ServerRequestHandler处理器处理服务端发送过来的请求
  • 发送ConnectionSetupRequest请求,让服务端创建并注册GrpcConnection
  • if (grpcExecutor == null) {
        int threadNumber = ThreadUtils.getSuitableThreadCount(8);
        grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10000),
                new ThreadFactoryBuilder().setDaemon(true).setNameFormat("nacos-grpc-client-executor-%d")
                        .build());
        grpcExecutor.allowCoreThreadTimeOut(true);
    }
    // 8848+1000
    int port = serverInfo.getServerPort() + rpcPortOffset();
    RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
    // 发一个ServerCheckRequest请求验证服务端的可用性
    Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
    if (response == null || !(response instanceof ServerCheckResponse)) {
        shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
        return null;
    }
    
    BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
            .newStub(newChannelStubTemp.getChannel());
    // 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等
    GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
    grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
    
    // create stream request and bind connection event to this connection
    StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
    
    // stream observer to send response to server
    grpcConn.setPayloadStreamObserver(payloadStreamObserver);
    grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
    grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
    // send a setup request
    ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
    conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
    conSetupRequest.setLabels(super.getLabels());
    conSetupRequest.setAbilities(super.clientAbilities);
    conSetupRequest.setTenant(super.getTenant());
    grpcConn.sendRequest(conSetupRequest);
    

    发送请求

    Requester接口

    这个接口定义了发送请求的方法:

    public interface Requester {
    
        /**
         * send request.
         *
         * @param request      request.
         * @param timeoutMills mills of timeouts.
         * @return response  response returned.
         * @throws NacosException exception throw.
         */
        Response request(Request request, long timeoutMills) throws NacosException;
    
        /**
         * send request.
         *
         * @param request request.
         * @return request future.
         * @throws NacosException exception throw.
         */
        RequestFuture requestFuture(Request request) throws NacosException;
    
        /**
         * send async request.
         *
         * @param request         request.
         * @param requestCallBack callback of request.
         * @throws NacosException exception throw.
         */
        void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException;
    
        /**
         * close connection.
         */
        void close();
    }
    

    GrpcConnection实现

    GrpcConnection类实现了Requester接口的三个request方法,使用的是GRPC的Stub发送请求,以request方法为例:

    public Response request(Request request, long timeouts) throws NacosException {
        Payload grpcRequest = GrpcUtils.convert(request);
        ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcReques
    首页 上一页 2 3 4 5 下一页 尾页 5/5/5
    】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
    上一篇redis 热点key问题及其解决方案 下一篇到底什么是Java AIO?为什么Netty..

    最新文章

    热门文章

    Hot 文章

    Python

    C 语言

    C++基础

    大数据基础

    linux编程基础

    C/C++面试题目