TOP

Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub(一)
2017-10-09 13:27:34 】 浏览:9469
Tags:Akka Stream 实时 操控 动态 管道 连接 -MergeHub BroadcastHub and PartitionHub

  在现实中我们会经常遇到这样的场景:有一个固定的数据源Source,我们希望按照程序运行状态来接驳任意数量的下游接收方subscriber、又或者我需要在程序运行时(runtime)把多个数据流向某个固定的数据流终端Sink推送。这就涉及到动态连接合并型Merge或扩散型Broadcast的数据流连接点junction。从akka-stream的技术文档得知:一对多,多对一或多对多类型的复杂数据流组件必须用GraphDSL来设计,产生Graph类型结果。前面我们提到过:Graph就是一种运算预案,要求所有的运算环节都必须是预先明确指定的,如此应该是无法实现动态的管道连接的。但akka-stream提供了MergeHub,BroadcastHub和PartitionHub来支持这样的功能需求。

1、MergeHub:多对一合并类型。支持动态的多个上游publisher连接

2、BroadcastHub:一对多扩散类型。支持动态的多个下游subscriber连接

3、PartitionHub:实际上是一对多扩散类型。通过一个函数来选择数据派送目的地

MergeHub对象中有个source函数:

 /** * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized * arbitrary many times and each of the materializations will feed the elements into the original [[Source]]. * * Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own * [[Sink]] for feeding that materialization. * * If one of the inputs fails the [[Sink]], the [[Source]] is failed in turn (possibly jumping over already buffered * elements). Completed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed * and any new producers using the [[Sink]] will be cancelled. * * @param perProducerBufferSize Buffer space used per producer. Default value is 16. */ def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = Source.fromGraph(new MergeHub[T](perProducerBufferSize))

MergeHub.source函数的返回结果类型是Source[T,Sink[T,NotUsed]],本质上MergeHub就是一个共用的Sink,如下所示:

  val fixedSink = Sink.foreach(println) val sinkGraph: RunnableGraph[Sink[Any,NotUsed]] = MergeHub.source(perProducerBufferSize = 16).to(fixedSink) val inGate: Sink[Any,NotUsed] = sinkGraph.run()   //common input //now connect any number of source
  val (killSwitch,_) = (Source(Stream.from(0)).delay(1.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() val (killSwitch2,_) = (Source(List("a","b","c","d","e")).delay(2.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() val (killSwitch3,_) = (Source(List("AA","BB","CC","DD","EE")).delay(3.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() scala.io.StdIn.readLine() killSwitch.shutdown() killSwitch2.shutdown() killSwitch3.shutdown() actorSys.terminate()

同样,BroadcastHub就是一种共用的Source,可以连接任何数量的下游subscriber。下面是BroadcastHub.sink的定义:

  /** * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized * value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the * broadcast elements from the original [[Sink]]. * * Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own * [[Source]] for consuming the [[Sink]] of that materialization. * * If the original [[Sink]] is fai  
		
Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub(一) https://www.cppentry.com/bencandy.php?fid=90&id=124388

首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(21): Stream:实时操控:.. 下一篇scala基本语法和单词统计