syntax = "proto3"; package learn.grpc.services; /* * responding stream of increment results */ service SumOneToMany { rpc AddOneToMany(SumRequest) returns (stream SumResponse) {} } /* * responding a result from a request of stream of numbers */ service SumManyToOne { rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {} } /* * Sums up numbers received from the client and returns the current result after each received request. */ service SumInter { rpc AddInter(stream SumRequest) returns (stream SumResponse) {} } message SumRequest { int32 toAdd = 1; } message SumResponse { int32 currentResult = 1; }
package learn.grpc.server import io.grpc.{ServerBuilder,ServerServiceDefinition} trait gRPCServer { def runServer(service: ServerServiceDefinition): Unit = { val server = ServerBuilder .forPort(50051) .addService(service) .build .start // make sure our server is stopped when jvm is shut down
Runtime.getRuntime.addShutdownHook(new Thread() { override def run(): Unit = server.shutdown() }) server.awaitTermination() } }
package learn.grpc.sum.one2many.server import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ import monix.execution.atomic.{Atomic,AtomicInt} import learn.grpc.server.gRPCServer object One2ManyServer extends gRPCServer { class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany { override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = { val currentSum: AtomicInt = Atomic(0) (1 to request.toAdd).map { _ => responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet())) } Thread.sleep(1000) //delay and then finish
responseObserver.onCompleted() } } def main(args: Array[String]) = { val svc = SumOneToManyGrpc.bindService(new SumOne2ManyService, scala.concurrent.ExecutionContext.global) runServer(svc) } }
package learn.grpc.sum.one2many.client import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ object One2ManyClient { def main(args: Array[String]): Unit = { //build connection channel
val channel = io.grpc.ManagedChannelBuilder .forAddress("LocalHost",50051) .usePlaintext(true) .build() // get asyn stub
val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel) // prepare stream observer
val streamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done incrementing !!!") override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}") } // call service with stream observer
client.addOneToMany(SumRequest().withToAdd(6),streamObserver) // wait for async execution
scala.io.StdIn.readLine() } }
package learn.grpc.sum.many2one.server import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ import learn.grpc.server.gRPCServer import monix.execution.atomic.{Atomic,AtomicInt} object Many2OneServer extends gRPCServer { class Many2OneService extends Su