设为首页 加入收藏

TOP

Nacos源码 (5) Grpc服务端和客户端(一)
2023-09-09 10:25:54 】 浏览:57
Tags:Nacos 源码 Grpc

Nacos 2.x在服务端与客户端直接增加了GRPC通信方式,本文通过2.0.2版本源码,简单分析GRPC通信方式:

  • 服务器启动
  • 客户端连接
  • 客户端心跳
  • 服务器监控检查

服务器

proto文件

api/src/main/proto/nacos_grpc_service.proto文件:

syntax = "proto3";

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";

option java_multiple_files = true;
option java_package = "com.alibaba.nacos.api.grpc.auto";

message Metadata {
  string type = 3; // 请求/响应的真实类型
  string clientIp = 8;
  map<string, string> headers = 7;
}

// GRPC通信层请求/响应体
message Payload {
  Metadata metadata = 2;
  // 业务层的请求/响应体,需要使用type做反序列化
  google.protobuf.Any body = 3;
}

service RequestStream {
  // build a streamRequest
  rpc requestStream (Payload) returns (stream Payload) {
  }
}

service Request {
  // Sends a commonRequest
  rpc request (Payload) returns (Payload) {
  }
}

service BiRequestStream {
  // Sends a commonRequest
  rpc requestBiStream (stream Payload) returns (stream Payload) {
  }
}

文件定义了通信层的service和message结构,业务层请求响应的序列化和反序列化是Nacos在RequestAcceptor/Connection中使用工具类实现的,业务层请求处理是在RequestAcceptor中进行的转发。

服务器启动

Server类继承关系

BaseRpcServer
  |-- BaseGrpcServer
     |-- GrpcSdkServer
     |-- GrpcClusterServer

此处介绍一下GrpcSdkServer实现。

GrpcSdkServer类

@Service
public class GrpcSdkServer extends BaseGrpcServer {

    // 所以SDK服务器的监听端口是9848
    private static final int PORT_OFFSET = 1000;

    @Override
    public int rpcPortOffset() {
        return PORT_OFFSET;
    }

    @Override
    public ThreadPoolExecutor getRpcExecutor() {
        return GlobalExecutor.sdkRpcExecutor;
    }
}

大部分的启动逻辑在BaseGrpcServer中。

BaseGrpcServer类

GRPC服务器的启动逻辑大部分都在这个类的startServer方法。

  1. 将处理请求的RequestAcceptor注册到HandlerRegistry
    • GrpcRequestAcceptor用于处理普通业务请求
    • GrpcBiStreamRequestAcceptor用于处理连接建立请求,获取Channel创建GrpcConnection并注册到ConnectionManager中,后续向客户端发送消息都是使用GrpcConnection做的
  2. 创建GRPC的Server对象
    • 设置port和executor
    • 设置HandlerRegistry
    • 添加ServerTransportFilter在连接建立和断开时做一些业务操作
  3. 启动Server

GrpcRequestAcceptor类

这个类对GRPC做了扩展,重写了request方法:

  1. 解析Payload获取请求体的数据类型
  2. 从RequestHandlerRegistry获取适配的RequestHandler处理器
  3. 将请求体反序列化成请求体类型对象
  4. 调用handleRequest方法处理请求返回响应

处理请求代码:

Request request = (Request) parseObj;
try {
    // 获取Connection
    Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
    RequestMeta requestMeta = new RequestMeta();
    requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
    requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
    requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
    requestMeta.setLabels(connection.getMetaInfo().getLabels());
    // 刷新活跃时间,后续的健康检查会使用到这个时间戳
    connectionManager.refreshActiveTime(requestMeta.getConnectionId());
    // 使用RequestHandler处理请求
    Response response = requestHandler.handleRequest(request, requestMeta);
    Payload payloadResponse = GrpcUtils.convert(response);
    traceIfNecessary(payloadResponse, false);
    responseObserver.onNext(payloadResponse);
    responseObserver.onCompleted();
} catch (Throwable e) {
    Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(
            (e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
            e.getMessage()));
    traceIfNecessary(payloadResponse, false);
    responseObserver.onNext(payloadResponse);
    responseObserver.onCompleted();
}
首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇redis 热点key问题及其解决方案 下一篇到底什么是Java AIO?为什么Netty..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目