//pass to server for result
val respStreamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done responding!") override def onNext(value: SumResponse): Unit = println(s"Result: ${value.currentResult}") } //get async stub
val client = SumManyToOneGrpc.stub(channel) //get request stream observer from server
val reqStreamObserver = client.addManyToOne(respStreamObserver) List(2,5,8,4,0).map { n => reqStreamObserver.onNext(SumRequest(n)) }
/* * Sums up numbers received from the client and returns the current result after each received request. */ service SumInter { rpc AddInter(stream SumRequest) returns (stream SumResponse) {} }
def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]
class Many2ManyService extends SumInterGrpc.SumInter { override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done requesting!") override def onNext(value: SumRequest): Unit = { responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } }
//create stream observer for result stream
val responseObserver = new StreamObserver[SumResponse] { def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") def onCompleted(): Unit = println("ON_COMPLETED") def onNext(value: SumResponse): Unit = println(s"ON_NEXT: Current sum: ${value.currentResult}") } //get request container
val requestObserver = client.addInter(responseObserver) scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) { val toBeAdded = Random.nextInt(11) println(s"Adding number: $toBeAdded") requestObserver.onNext(SumRequest(toBeAdded)) }
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"
import scalapb.compiler.Version.scalapbVersion import scalapb.compiler.Version.grpcJavaVersion name := "learn-gRPC" version := "0.1" scalaVersion := "2.12.6" libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf", "io.grpc" % "grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, &quo