: ${r.result}") }
下面是Unary-Call的具体调用方式:
object UnaryCallClient extends App { implicit val system = ActorSystem("UnaryClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.addPair(29,33).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() }
在Server-Streaming中一个request返回的是stream of responses。IDL的描述如下:
service SumNumbers {
rpc SumPair(NumPair) returns (SumResult) {}
rpc GenIncsFrom(Num) returns (stream Num) {}
}
编译后自动产生的service trait如下:
trait SumNumbers extends AbstractService { override def serviceCompanion = SumNumbers def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed] }
这个服务函数genIncsFrom是Flow[Num,Num,NotUsed],它的具体实现如下:
class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers { val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName) override def genIncsFrom: Flow[Num, Num, NotUsed] = { logger.info("*** calling genIncsFrom") Flow[Num].mapConcat { n => (1 to n.num).map {m => logger.info(s"genIncFrom producing num: ${m}") Num(m) } } } }
因为输出response是一个stream,可以用mapConcat展平Seq来产生一个。在客户方调用服务函数genIncsFrom的方式如下:
def genIncNumbers(len: Int): Source[Int,NotUsed] = { logger.info(s"Requesting to produce ${len} inc numbers") Source .single(Num(len)) .via(stub.genIncsFrom) .map(n => n.num) }
我们还是用runForeach来运算这个Source:
object ServerStreamingClient extends App { implicit val system = ActorSystem("ServerStreamingClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.genIncNumbers(5).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() }
再来看看Client-Streaming是如何通过reactive-stream实现的。IDL服务描述如下:
service SumNumbers {
rpc SumPair(NumPair) returns (SumResult) {}
rpc GenIncsFrom(Num) returns (stream Num) {}
rpc SumStreamNums(stream Num) returns (SumResult) {}
}
自动产生的service接口如下:
trait SumNumbers extends AbstractService { override def serviceCompanion = SumNumbers def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed] def sumStreamNums: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] }
sumStreamNums Flow实现如下:
override def sumStreamNums: Flow[Num, SumResult, NotUsed] = { logger.info("*** calling sumStreamNums") Flow[Num].fold(SumResult(0)) { case (a, b) => logger.info(s"receiving operand ${b.num}") SumResult(b.num + a.result) } }
request是一个stream,可以用aggregation来汇总成一个response。在客户端调用stub.sumStreamNums:
def sumManyNumbers(nums: Seq[Int]): Source[String,NotUsed] = { logger.info(s"Requesting to sum up ${nums}") Source(nums.map(Num(_)).to[collection.immutable.Iterable]) .via(stub.sumStreamNums) .map(r => s"the result: ${r.