设为首页 加入收藏

TOP

ScalaPB(3): gRPC streaming(二)
2019-08-15 00:11:45 】 浏览:301
Tags:ScalaPB gRPC streaming
nCompleted(): Unit = println("Done summing!") override def onNext(value: SumRequest): Unit = { //only allow one response if (value.toAdd > 0) currentSum.add(value.toAdd) else responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } }

客户方调用示范如下:

    //pass to server for result
    val respStreamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done responding!") override def onNext(value: SumResponse): Unit = println(s"Result: ${value.currentResult}") } //get async stub
    val client = SumManyToOneGrpc.stub(channel) //get request stream observer from server
    val reqStreamObserver = client.addManyToOne(respStreamObserver) List(2,5,8,4,0).map { n => reqStreamObserver.onNext(SumRequest(n)) }

Bidirectional-Streaming的IDL描述如下:

/* * Sums up numbers received from the client and returns the current result after each received request. */ service SumInter { rpc AddInter(stream SumRequest) returns (stream SumResponse) {} }

这个service SumInter 描述了stream SumRequest 及 stream SumResponse运算模式。产生的对应scala函数如下:

def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]

这个函数的款式与Client-Streaming服务函数是一样的。但是,我们可以通过responseObserver传递多个SumResponse。这个服务的实现代码是这样的: 

  class Many2ManyService extends SumInterGrpc.SumInter { override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
      new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done requesting!") override def onNext(value: SumRequest): Unit = { responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } }

我们可以多次调用responseObserver.onNext。客户端源代码如下:

    //create stream observer for result stream
    val responseObserver = new StreamObserver[SumResponse] { def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") def onCompleted(): Unit = println("ON_COMPLETED") def onNext(value: SumResponse): Unit = println(s"ON_NEXT: Current sum: ${value.currentResult}") } //get request container
    val requestObserver = client.addInter(responseObserver) scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) { val toBeAdded = Random.nextInt(11) println(s"Adding number: $toBeAdded") requestObserver.onNext(SumRequest(toBeAdded)) }

下面是本次示范的源代码:

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

build.sbt

import scalapb.compiler.Version.scalapbVersion import scalapb.compiler.Version.grpcJavaVersion name := "learn-gRPC" version := "0.1" scalaVersion := "2.12.6" libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf", "io.grpc" % "grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, &quo
首页 上一页 1 2 3 4 5 下一页 尾页 2/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇ScalaPB(5):用akka-stream实现.. 下一篇ScalaPB(2): 在scala中用gRPC..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目