在前面几篇讨论里我们介绍了scala-gRPC的基本功能和使用方法,我们基本确定了选择gRPC作为一种有效的内部系统集成工具,主要因为下面gRPC支持的几种服务模式:
1、Unary-Call:独立的一对client-request/server-response,是我们常用的http交互模式 2、Server-Streaming:client发出一个request后从server端接收一串多个response 3、Client-Streaming:client向server发送一串多个request后从server接收一个response 4、Bidirectional-Streaming:由client首先发送request启动连接,然后在这个连接上两端可以不断交互信息。
很明显,gRPC支持双向的streaming。那么如果能把gRPC中ListenableFuture和StreamObserver这两种类型转成akka-stream的基本类型应该就能够实现所谓的reactive-gRPC了。如果我们能用akka-stream编程方式实现gRPC服务调用的话,可能会遭遇下面的场景:在服务端我们只需要实现一种akka-stream的Flow把进来的request转化成出去的response,如下:
// Unary case
Flow[Request].map(computeResponse) // Server streaming
Flow[Request].flatMapConcat(computeResponses) // Client streaming
Flow[Request].fold(defaultResponse)(computeResponse) // Bidirectional streaming
Flow[Request].flatMapConcat(computeResponses)
当然,这是个akka-stream Flow,我们可以在这个Flow里调用任何akka-stream提供的功能,如:
Flow[Request] .throttle(1, 10.millis, 1, ThrottleMode.Shaping) .map(computeResponse)
在客户端我们可以直接经客户端stub调用Flow,如下:
Source
.single(request)
.via(stub.doSomething)
.runForeach(println)
刚好,beyond-the-lines gRPCAkkaStream开源项目提供这么一种gRPC StreamObserver到aka-stream Flow转换桥梁。下面是gRPCAkkaStream的使用示范。先从Unary-Call开始:下面是.proto文件的IDL服务描述:
syntax = "proto3"; package learn.grpc.akka.stream.services; message NumPair { int32 num1 = 1; int32 num2 = 2; } message Num { int32 num = 1; } message SumResult { int32 result = 1; } service SumNumbers { rpc SumPair(NumPair) returns (SumResult) {} }
我们看看编译后自动产生的SumGrpcAkkaStream.scala文件中一些相关类型和函数:
服务界面描述:
trait SumNumbers extends AbstractService { override def serviceCompanion = SumNumbers def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] }
我们看到服务函数sumPair是一个akka-stream Fow[NumPair,SumResult,NotUsed]。下面是具体实现SumNumbers.sumPair代码:
class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers { val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName) override def sumPair: Flow[NumPair, SumResult, NotUsed] = { logger.info(s"*** calling sumPair ... ***") Flow[NumPair].map { case NumPair(a,b) => { logger.info(s"serving ${a} + ${b} = ???") SumResult(a + b) } } }
产生的客户端stub源代码如下:
class SumNumbersStub( channel: Channel, options: CallOptions = CallOptions.DEFAULT ) extends AbstractStub[SumNumbersStub](channel, options) with SumNumbers { override def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] = Flow[learn.grpc.akka.stream.services.sum.NumPair].flatMapConcat(request => Source.fromFuture( Grpc.guavaFuture2ScalaFuture( ClientCalls.futureUnaryCall(channel.newCall(METHOD_SUM_PAIR, options), request) ) ) ) def stub(channel: Channel): SumNumbersStub = new SumNumbersStub(channel)
我们可以通过stub来调用sumPair方法,如下:
val channel = ManagedChannelBuilder .forAddress(host,port) .usePlaintext(true) .build() val stub = SumGrpcAkkaStream.stub(channel) def addPair(num1: Int, num2: Int): Source[String,NotUsed] = { logger.info(s"Requesting to add $num1, $num2") Source .single(NumPair(num1,num2)) .via(stub.sumPair) .map(r => s"the result