service SumNumbers {
rpc SumPair(NumPair) returns (SumResult) {}
rpc GenIncsFrom(Num) returns (stream Num) {}
rpc SumStreamNums(stream Num) returns (SumResult) {}
rpc KeepAdding(stream Num) returns (stream SumResult) {}
}
override def keepAdding: Flow[Num, SumResult, NotUsed] = { Flow[Num].scan(SumResult(0)) { case (a,b) => logger.info(s"receiving operand ${b.num}") SumResult(b.num + a.result) } }
def ContSum(nums: Seq[Int]): Source[String,NotUsed] = { logger.info(s"Requesting to sum up ${nums}") Source(nums.map(Num(_)).to[collection.immutable.Iterable]) .throttle(1, 500.millis, 1, ThrottleMode.shaping) .map { n => logger.info(s"Sending number: $n") n } .via(stub.keepAdding) .map(r => s"current sum = ${r.result}") }
object BiDiStreamingClient extends App { implicit val system = ActorSystem("BiDiStreamingClient") implicit val mat = ActorMaterializer.create(system) val client = new gRPCAkkaStreamClient("localhost", 50051) client.ContSum(Seq(12,4,8,19)).runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() }
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") resolvers += Resolver.bintrayRepo("beyondthelines", "maven") libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1", "beyondthelines" %% "grpcakkastreamgenerator" % "0.0.5" )
import scalapb.compiler.Version.scalapbVersion import scalapb.compiler.Version.grpcJavaVersion name := "gRPCAkkaStreamDemo" version := "0.1" scalaVersion := "2.12.6" resolvers += Resolver.bintrayRepo("beyondthelines", "maven") libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf", "io.grpc" % "grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, "io.monix" %% "monix" % "2.3.0", // for GRPC Akkastream
"beyondthelines" %% "grpcakkastreamruntime" % "0.0.5" ) PB.targets in Compile := Seq( scalapb.gen() -> (sourceManaged in Compile).value, // generate the akka stream files
grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value )
syntax = "proto3"; package learn.grpc.akka.stream.services; message NumPair { int32 num1 = 1; int32 num2 = 2; } message Num { int32 num = 1; } message SumResult { int32 result = 1; } servic