设为首页 加入收藏

TOP

ScalaPB(5):用akka-stream实现reactive-gRPC(二)
2019-08-15 00:11:46 】 浏览:281
Tags:ScalaPB akka-stream 实现 reactive-gRPC
: ${r.result}
") }

下面是Unary-Call的具体调用方式:

object UnaryCallClient extends App { implicit val system = ActorSystem("UnaryClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.addPair(29,33).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() }

在Server-Streaming中一个request返回的是stream of responses。IDL的描述如下:

service SumNumbers {
   rpc SumPair(NumPair) returns (SumResult) {}
   rpc GenIncsFrom(Num) returns (stream Num) {}
}

编译后自动产生的service trait如下:

 trait SumNumbers extends AbstractService { override def serviceCompanion = SumNumbers def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed] }

这个服务函数genIncsFrom是Flow[Num,Num,NotUsed],它的具体实现如下:

class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers { val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName) override def genIncsFrom: Flow[Num, Num, NotUsed] = { logger.info("*** calling genIncsFrom") Flow[Num].mapConcat { n => (1 to n.num).map {m => logger.info(s"genIncFrom producing num: ${m}") Num(m) } } } }

因为输出response是一个stream,可以用mapConcat展平Seq来产生一个。在客户方调用服务函数genIncsFrom的方式如下:

  def genIncNumbers(len: Int): Source[Int,NotUsed] = { logger.info(s"Requesting to produce ${len} inc numbers") Source .single(Num(len)) .via(stub.genIncsFrom) .map(n => n.num) }

我们还是用runForeach来运算这个Source:

object ServerStreamingClient extends App { implicit val system = ActorSystem("ServerStreamingClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.genIncNumbers(5).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() }

再来看看Client-Streaming是如何通过reactive-stream实现的。IDL服务描述如下:

service SumNumbers {
   rpc SumPair(NumPair) returns (SumResult) {}
   rpc GenIncsFrom(Num) returns (stream Num) {}
   rpc SumStreamNums(stream Num) returns (SumResult) {}
}

自动产生的service接口如下:

 trait SumNumbers extends AbstractService { override def serviceCompanion = SumNumbers def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed] def sumStreamNums: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] }

sumStreamNums Flow实现如下:

  override def sumStreamNums: Flow[Num, SumResult, NotUsed] = { logger.info("*** calling sumStreamNums") Flow[Num].fold(SumResult(0)) { case (a, b) => logger.info(s"receiving operand ${b.num}") SumResult(b.num + a.result) } }

request是一个stream,可以用aggregation来汇总成一个response。在客户端调用stub.sumStreamNums:

  def sumManyNumbers(nums: Seq[Int]): Source[String,NotUsed] = { logger.info(s"Requesting to sum up ${nums}") Source(nums.map(Num(_)).to[collection.immutable.Iterable]) .via(stub.sumStreamNums) .map(r => s"the result: ${r.
首页 上一页 1 2 3 4 5 下一页 尾页 2/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇重要的博客收集 下一篇ScalaPB(3): gRPC streaming

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目