设为首页 加入收藏

TOP

ScalaPB(5):用akka-stream实现reactive-gRPC(三)
2019-08-15 00:11:46 】 浏览:267
Tags:ScalaPB akka-stream 实现 reactive-gRPC
result}
") } object ClientStreamingClient extends App { implicit val system = ActorSystem("ClientStreamingClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.sumManyNumbers(Seq(12,4,8,19)).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() }

最后我们示范一下BiDirectional-Streaming。先用IDL定义一个流输入输出的服务函数keepAdding:

service SumNumbers {
   rpc SumPair(NumPair) returns (SumResult) {}
   rpc GenIncsFrom(Num) returns (stream Num) {}
   rpc SumStreamNums(stream Num) returns (SumResult) {}
   rpc KeepAdding(stream Num) returns (stream SumResult) {}
}

这个函数的实现代码: 

  override def keepAdding: Flow[Num, SumResult, NotUsed] = { Flow[Num].scan(SumResult(0)) { case (a,b) => logger.info(s"receiving operand ${b.num}") SumResult(b.num + a.result) } }

这个服务函数的作用是把一串输入数字逐个相加并输出当前结果。我们可以用scan来实现这样的功能。下面是客户端调用服务的示范代码:

  def ContSum(nums: Seq[Int]): Source[String,NotUsed] = { logger.info(s"Requesting to sum up ${nums}") Source(nums.map(Num(_)).to[collection.immutable.Iterable]) .throttle(1, 500.millis, 1, ThrottleMode.shaping) .map { n => logger.info(s"Sending number: $n") n } .via(stub.keepAdding) .map(r => s"current sum = ${r.result}") }

用下面这段代码运算:

object BiDiStreamingClient extends App { implicit val system = ActorSystem("BiDiStreamingClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.ContSum(Seq(12,4,8,19)).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() }

好,下面是本次讨论涉及的所有源代码:

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") resolvers += Resolver.bintrayRepo("beyondthelines", "maven") libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1", "beyondthelines"         %% "grpcakkastreamgenerator" % "0.0.5" )

build.sbt

import scalapb.compiler.Version.scalapbVersion import scalapb.compiler.Version.grpcJavaVersion name := "gRPCAkkaStreamDemo" version := "0.1" scalaVersion := "2.12.6" resolvers += Resolver.bintrayRepo("beyondthelines", "maven") libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf", "io.grpc" % "grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, "io.monix" %% "monix" % "2.3.0", // for GRPC Akkastream
  "beyondthelines"         %% "grpcakkastreamruntime" % "0.0.5" ) PB.targets in Compile := Seq( scalapb.gen() -> (sourceManaged in Compile).value, // generate the akka stream files
  grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value )

src/main/protobuf/sum.proto

syntax = "proto3"; package learn.grpc.akka.stream.services; message NumPair { int32 num1 = 1; int32 num2 = 2; } message Num { int32 num = 1; } message SumResult { int32 result = 1; } servic
首页 上一页 1 2 3 4 5 下一页 尾页 3/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇重要的博客收集 下一篇ScalaPB(3): gRPC streaming

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目