设为首页 加入收藏

TOP

ScalaPB(3): gRPC streaming(一)
2019-08-15 00:11:45 】 浏览:70
Tags:ScalaPB gRPC streaming

  接着上期讨论的gRPC unary服务我们跟着介绍gRPC streaming,包括: Server-Streaming, Client-Streaming及Bidirectional-Streaming。我们首先在.proto文件里用IDL描述Server-Streaming服务:

/* * responding stream of increment results */ service SumOneToMany { rpc AddOneToMany(SumRequest) returns (stream SumResponse) {} } message SumRequest { int32 toAdd = 1; } message SumResponse { int32 currentResult = 1; }

SumOneToMany服务中AddOneToMany函数接受一个SumRequest然后返回stream SumResponse,就这么简单。经过编译后产生了SumOneToManyGrpc.scala文件,在这个文件里提供了有关RPC操作的api。我们看看protoc把IDL描述的服务函数变成了什么样的scala函数:

def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit 

调用scala函数addOneToMany需要传入参数SumRequest和StreamObserver[SumResponse],也就是说用户需要准备这两个入参数。在调用addOneToMany函数时用户事先构建这个StreamObserver传给server,由server把结果通过这个结构传回用户。gRPC是通过StreamObserver类型实例来实现数据streaming的。这个类型的构建例子如下:

 

    val responseObserver = new StreamObserver[SumResponse] { def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") def onCompleted(): Unit = println("ON_COMPLETED") def onNext(value: SumResponse): Unit = println(s"ON_NEXT: Current sum: ${value.currentResult}") }

server端通过onNext把结果不断传回给client端,因为这个responseObserver是在client端构建的。下面是SumManyToMany的实现:

 class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany { override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = { val currentSum: AtomicInt = Atomic(0) (1 to request.toAdd).map { _ => responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet())) } Thread.sleep(1000)     //delay and then finish
 responseObserver.onCompleted() } }

这个addOneToMany服务函数把 1-request.toAdd之间的数字逐个通过responseObserver返还调用方。 在客户端如下调用服务:

    // get asyn stub
    val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel) // prepare stream observer
    val streamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done incrementing !!!") override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}") } // call service with stream observer
    client.addOneToMany(SumRequest().withToAdd(6),streamObserver)

Client-Streaming服务的IDL如下:

/* * responding a result from a request of stream of numbers */ service SumManyToOne { rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {} }

传入stream SumRequest, 返回SumResponse。scalaPB自动产生scala代码中的addManyToOne函数款式如下:

def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]

调用方提供StreamObserver[SumResponse]用作返回结果,函数返回客方需要的StreamObserver[SumRequest]用以传递request流。注意:虽然在.proto文件中AddManyToOne的返回结果是单个SumResponse,但产生的scala函数则提供了一个StreamObserver[SumResponse]类型,所以需要谨记只能调用一次onNext。下面是这个服务的实现代码:

  class Many2OneService extends SumManyToOneGrpc.SumManyToOne { val currentSum: AtomicInt = Atomic(0) override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
       new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def o  
		
ScalaPB(3): gRPC streaming(一) https://www.cppentry.com/bencandy.php?fid=90&id=229039

首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇ScalaPB(5):用akka-stream实现.. 下一篇ScalaPB(2): 在scala中用gRPC..