设为首页 加入收藏

TOP

Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub(五)
2017-10-09 13:27:34 】 浏览:9564
Tags:Akka Stream 实时 操控 动态 管道 连接 -MergeHub BroadcastHub and PartitionHub
nel2
= Source.repeat(888) .delay(2.second,DelayOverflowStrategy.backpressure) .viaMat(channel)(Keep.right).to(fixedSink).run()

上面我们提到:PartitionHub就是一种特殊的BroadcastHub。功能是扩散型的。不过PartitionHub用了一个函数来选择下游的subscriber。从PartitionHub.sink函数款式可以看出:

 def sink[T](partitioner: (Int, T) ? Int, startAfterNrOfConsumers: Int, bufferSize: Int = defaultBufferSize): Sink[T, Source[T, NotUsed]] = statefulSink(() ? (info, elem) ? info.consumerIdByIdx(partitioner(info.size, elem)), startAfterNrOfConsumers, bufferSize)

可以看出:partitioner函数就是一种典型的状态转换函数款式,实际上sink调用了statefulSink方法并固定了partitioner函数:

   * This `statefulSink` should be used when there is a need to keep mutable state in the partition function, * e.g. for implemening round-robin or sticky session kind of routing. If state is not needed the [[#sink]] can * be more convenient to use. *
   * @param partitioner Function that decides where to route an element. It is a factory of a function to *   to be able to hold stateful variables that are unique for each materialization. The function *   takes two parameters; the first is information about active consumers, including an array of consumer *   identifiers and the second is the stream element. The function should return the selected consumer *   identifier for the given element. The function will never be called when there are no active consumers, *   i.e. there is always at least one element in the array of identifiers. * @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected. *   This is only used initially when the stage is starting up, i.e. it is not honored when consumers have * been removed (canceled). * @param bufferSize Total number of elements that can be buffered. If this buffer is full, the producer *   is backpressured. */ @ApiMayChange def statefulSink[T](partitioner: () ? (ConsumerInfo, T) ? Long, startAfterNrOfConsumers: Int, bufferSize: Int = defaultBufferSize): Sink[T, Source[T, NotUsed]] = Sink.fromGraph(new PartitionHub[T](partitioner, startAfterNrOfConsumers, bufferSize))

与BroadcastHub相同,我们首先构建一个共用的数据源producer,然后连接PartitionHub形成一个通往下游终端的通道让任何下游subscriber可以连接这个通道:

 //interupted temination
  val killAll = KillSwitches.shared("terminator") //fix a producer
  val fixedSource = Source.tick(1.second, 1.second, "message") .zipWith(Source(1 to 100))((a, b) => s"$a-$b") //connect to PartitionHub which uses function to select sink
  val sourceGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.sink( (size, elem) => math.abs(elem.hashCode) % size, startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right) //materialize the source
  val fromSource = sourceGraph.run() //connect to fixedSource freely
  fromSource.runForeach(msg => println("subs1: " + msg)) fromSource.runForeach(msg => println("subs2: " + msg)) scala.io.StdIn.readLine() killAll.shutdown() actorSys.terminate()

可以看到:上游数据流向多个下游中哪个subscriber是通过partitioner函数选定的。从这项功能来讲:PartitionHub又是某种路由Router。下面的例子实现了仿Router的RoundRobin推送策略: 

  //partitioner function
  def roundRobin(): (PartitionHub.ConsumerInfo, String) ? Long = { var i = -1L (info, elem) => { i += 1 info.consumerIdByIdx((i % info.size).toInt) } } val roundRobinGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.statefulSink( () => roundRobin(),startAfterNrOfConsumers = 2,bufferSize
首页 上一页 2 3 4 5 下一页 尾页 5/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(21): Stream:实时操控:.. 下一篇scala基本语法和单词统计

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目