设为首页 加入收藏

TOP

Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub(二)
2017-10-09 13:27:34 】 浏览:9575
Tags:Akka Stream 实时 操控 动态 管道 连接 -MergeHub BroadcastHub and PartitionHub
led, then the failure is immediately propagated to all of its materialized * [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then * all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later * materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are * cancelled are simply removed from the dynamic set of consumers. * * @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two * concurrent consumers can be in terms of element. If this buffer is full, the producer * is backpressured. Must be a power of two and less than 4096.
*/ def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = Sink.fromGraph(new BroadcastHub[T](bufferSize))

BroadcastHub.sink返回结果类型:Sink[T,Source[T,NotUsed]],就是个可连接任何数量下游的共用Source: 

  val killAll = KillSwitches.shared("terminator") val fixedSource=Source(Stream.from(100)).delay(1.second,DelayOverflowStrategy.backpressure) val sourceGraph = fixedSource.via(killAll.flow).toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).async val outPort = sourceGraph.run()  //shared source //now connect any number of sink to outPort
  outPort.to(Sink.foreach{c =>println(s"A: $c")}).run() outPort.to(Sink.foreach{c =>println(s"B: $c")}).run() outPort.to(Sink.foreach{c =>println(s"C: $c")}).run()

还有一种做法是把MergeHub和BroadcastHub背对背连接起来形成一种多对多的形状。理论上应该能作为一种集散中心容许连接任何数量的上游publisher和下游subscriber。我们先把它们连接起来获得一个Sink和一个Source:

val (sink, source)  = MergeHub.source[Int](perProducerBufferSize = 16) .toMat(BroadcastHub.sink(bufferSize = 16))(Keep.both).run()

理论上我们现在可以对sink和source进行任意连接了。但有个特殊情况是:当下游没有任何subscriber时上游所有producer都无法发送任何数据。这是由于backpressure造成的:作为一个合成的节点,下游速率跟不上则通过backpressure制约上游数据发布。我们可以安装一个泄洪机制来保证上游publisher数据推送的正常进行:

  source.runWith(Sink.ignore)

这样在没有任何下游subscriber的情况下,上游producer还是能够正常运作。

现在我们可以用Flow.fromSinkAndSource(sink, source)来构建一个Flow[I,O,?]:

  def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] = fromSinkAndSourceMat(sink, source)(Keep.none)

我们还可以把上篇提到的KillSwitches.singleBidi用上:

 val channel: Flow[Int, Int, UniqueKillSwitch] = Flow.fromSinkAndSource(sink, source) .joinMat(KillSwitches.singleBidi[Int, Int])(Keep.right) .backpressureTimeout(3.seconds)

上面backpressureTimeout保证了任何下游subscriber阻塞超时的话都会被强力终止。如下:

  /** * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, * the stream is failed with a [[scala.concurrent.TimeoutException]]. The timeout is checked periodically, * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element * * '''Backpressures when''' downstream backpressures * * '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand. * * '''Cancels when''' downstream cancels */ def backpressureTimeout(timeout: FiniteDuration): Repr[Out] = via(new Timers.BackpressureTimeout[Out](timeout))

好了,下面我们可以把channel当作Flow来使用了:

  val killChannel1 = fixedSource.viaMat(channel)(Keep.right).to(fixedSink).run() val killChan
首页 上一页 1 2 3 4 5 下一页 尾页 2/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(21): Stream:实时操控:.. 下一篇scala基本语法和单词统计

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目