设为首页 加入收藏

TOP

Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub(三)
2017-10-09 13:27:34 】 浏览:9596
Tags:Akka Stream 实时 操控 动态 管道 连接 -MergeHub BroadcastHub and PartitionHub
= 256) )(Keep.right) val roundRobinSource = roundRobinGraph.run() roundRobinSource.runForeach(msg => println("roundRobin1: " + msg)) roundRobinSource.runForeach(msg => println("roundRobin2: " + msg))

上面例子里数据源流动方向是由roundRobin函数确定的。

而在下面这个例子里数据流向速率最快的subscriber:

  val producer = Source(0 until 100) // ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer. // Note that this is a moving target since the elements are consumed concurrently.
  val runnableGraph: RunnableGraph[Source[Int, NotUsed]] = producer.via(killAll.flow).toMat(PartitionHub.statefulSink( () => (info, elem) ? info.consumerIds.minBy(id ? info.queueSize(id)), startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right) val fromProducer: Source[Int, NotUsed] = runnableGraph.run() fromProducer.runForeach(msg => println("fast1: " + msg)) fromProducer.throttle(10, 100.millis, 10, ThrottleMode.Shaping) .runForeach(msg => println("fast2: " + msg))

上面这个例子里partitioner函数是根据众下游的缓冲数量(queueSize)来确定数据应该流向哪个subscriber,queueSize数值越大则表示速率越慢。

下面是以上示范中MergeHub及BroadcastHub示范的源代码:

import akka.NotUsed import akka.stream.scaladsl._ import akka.stream._ import akka.actor._ import scala.concurrent.duration._ object HubsDemo extends App { implicit val actorSys = ActorSystem("sys") implicit val ec = actorSys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(actorSys) .withInputBuffer(16,16) ) val fixedSink = Sink.foreach(println) val sinkGraph: RunnableGraph[Sink[Any,NotUsed]] = MergeHub.source(perProducerBufferSize = 16).to(fixedSink).async val inGate: Sink[Any,NotUsed] = sinkGraph.run()   //common input //now connect any number of source
  val (killSwitch,_) = (Source(Stream.from(0)).delay(1.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() val (killSwitch2,_) = (Source(List("a","b","c","d","e")).delay(2.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() val (killSwitch3,_) = (Source(List("AA","BB","CC","DD","EE")).delay(3.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() val killAll = KillSwitches.shared("terminator") val fixedSource=Source(Stream.from(100)).delay(1.second,DelayOverflowStrategy.backpressure) val sourceGraph = fixedSource.via(killAll.flow).toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).async val outPort = sourceGraph.run()  //shared source //now connect any number of sink to outPort
  outPort.to(Sink.foreach{c =>println(s"A: $c")}).run() outPort.to(Sink.foreach{c =>println(s"B: $c")}).run() outPort.to(Sink.foreach{c =>println(s"C: $c")}).run() val (sink, source) = MergeHub.source[Int](perProducerBufferSize = 16) .toMat(BroadcastHub.sink(bufferSize = 16))(Keep.both).run() source.runWith(Sink.ignore) val channel: Flow[Int, Int, UniqueKillSwitch] = Flow.fromSinkAndSource(sink, source) .joinMat(KillSwitches.singleBidi[Int, Int])(Keep.right) .backpr
首页 上一页 1 2 3 4 5 下一页 尾页 3/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(21): Stream:实时操控:.. 下一篇scala基本语法和单词统计

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目