设为首页 加入收藏

TOP

ScalaPB(3): gRPC streaming(五)
2019-08-15 00:11:45 】 浏览:289
Tags:ScalaPB gRPC streaming
mManyToOneGrpc.SumManyToOne { val currentSum: AtomicInt
= Atomic(0) override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] = new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done summing!") override def onNext(value: SumRequest): Unit = { //only allow one response if (value.toAdd > 0) currentSum.add(value.toAdd) else responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } } def main(args: Array[String]): Unit = { val svc = SumManyToOneGrpc.bindService(new Many2OneService,scala.concurrent.ExecutionContext.global) runServer(svc) } }

ManyToOneClient.scala

package learn.grpc.sum.many2one.client import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ object Many2OneClient { def main(args: Array[String]): Unit = { //build channel
    val channel = io.grpc.ManagedChannelBuilder .forAddress("LocalHost",50051) .usePlaintext(true) .build() //pass to server for result
    val respStreamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done responding!") override def onNext(value: SumResponse): Unit = println(s"Result: ${value.currentResult}") } //get async stub
    val client = SumManyToOneGrpc.stub(channel) //get request stream observer from server
    val reqStreamObserver = client.addManyToOne(respStreamObserver) List(2,5,8,4,0).map { n => reqStreamObserver.onNext(SumRequest(n)) } scala.io.StdIn.readLine() } }

ManyToManyServer.scala 

package learn.grpc.sum.many2many.server import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ import learn.grpc.server.gRPCServer import monix.execution.atomic.{Atomic,AtomicInt} object Many2ManyServer extends gRPCServer { class Many2ManyService extends SumInterGrpc.SumInter { override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
      new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done requesting!") override def onNext(value: SumRequest): Unit = { responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } } def main(args: Array[String]): Unit = { val svc = SumInterGrpc.bindService(new Many2ManyService, scala.concurrent.ExecutionContext.global) runServer(svc) } }

ManyToManyClient.scala

package learn.grpc.sum.many2many.client import monix.execution.Scheduler.{global => scheduler} import learn.grpc.services.sum._ import scala.concurrent.duration._ import scala.util.Random import io.grpc._ import io.grpc.stub.StreamObserver object Many2ManyClient { def main(args: Array[String]): Unit = { val channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext(true).build val client = SumInterGrpc.stub(channel) //create stream observer for result stream
    val responseObserver = new StreamObserver[SumResponse] { def onError(t: Throwable): Unit =
首页 上一页 2 3 4 5 下一页 尾页 5/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇ScalaPB(5):用akka-stream实现.. 下一篇ScalaPB(2): 在scala中用gRPC..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目