设为首页 加入收藏

TOP

Nacos源码 (6) Grpc概述与Nacos集成(九)
2023-09-23 15:44:14 】 浏览:197
Tags:Nacos 源码 Grpc 集成
equestStreamStub = BiRequestStreamGrpc .newStub(newChannelStubTemp.getChannel()); // 创建connection GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor); grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId()); // create stream request and bind connection event to this connection. // 用于向服务端发送请求 StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn); // stream observer to send response to server grpcConn.setPayloadStreamObserver(payloadStreamObserver); grpcConn.setGrpcFutureServiceStub(newChannelStubTemp); grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel()); // 发送一个ConnectionSetupRequest让服务端创建Connection ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest(); conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion()); conSetupRequest.setLabels(super.getLabels()); conSetupRequest.setAbilities(super.clientAbilities); conSetupRequest.setTenant(super.getTenant()); grpcConn.sendRequest(conSetupRequest); return grpcConn; } return null; } catch (Exception e) {} return null; } private StreamObserver<Payload> bindRequestStream( final BiRequestStreamGrpc.BiRequestStreamStub streamStub, final GrpcConnection grpcConn) { return streamStub.requestBiStream(new StreamObserver<Payload>() { @Override public void onNext(Payload payload) { try { Object parseBody = GrpcUtils.parse(payload); final Request request = (Request) parseBody; if (request != null) { try { // 使用客户端侧的ServerRequestHandler处理服务端发送过来的数据 Response response = handleServerRequest(request); if (response != null) { response.setRequestId(request.getRequestId()); // 响应 sendResponse(response); } } catch (Exception e) { sendResponse(request.getRequestId(), false); } } } catch (Exception e) { // ... } } @Override public void onError(Throwable throwable) { boolean isRunning = isRunning(); boolean isAbandon = grpcConn.isAbandon(); if (isRunning && !isAbandon) { if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { switchServerAsync(); } } else { // ... } } @Override public void onCompleted() { boolean isRunning = isRunning(); boolean isAbandon = grpcConn.isAbandon(); if (isRunning && !isAbandon) { if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) { switchServerAsync(); } } } }); }
首页 上一页 6 7 8 9 下一页 尾页 9/9/9
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇2023年了,复习了一下spring boot.. 下一篇 10 年程序员的告诫:千万不要重..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目