设为首页 加入收藏

TOP

Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub(四)
2017-10-09 13:27:34 】 浏览:9563
Tags:Akka Stream 实时 操控 动态 管道 连接 -MergeHub BroadcastHub and PartitionHub
essureTimeout(
3.seconds) val killChannel1 = fixedSource.viaMat(channel)(Keep.right).to(fixedSink).run() val killChannel2 = Source.repeat(888) .delay(2.second,DelayOverflowStrategy.backpressure) .viaMat(channel)(Keep.right).to(fixedSink).run() scala.io.StdIn.readLine() killSwitch.shutdown() killSwitch2.shutdown() killSwitch3.shutdown() killAll.shutdown() killChannel1.shutdown() killChannel2.shutdown() scala.io.StdIn.readLine() actorSys.terminate() }

下面是PartitionHub示范源代码:

import akka.NotUsed import akka.stream.scaladsl._ import akka.stream._ import akka.actor._ import scala.concurrent.duration._ object PartitionHubDemo extends App { implicit val actorSys = ActorSystem("sys") implicit val ec = actorSys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(actorSys) .withInputBuffer(16,16) ) //interupted temination
  val killAll = KillSwitches.shared("terminator") //fix a producer
  val fixedSource = Source.tick(1.second, 1.second, "message") .zipWith(Source(1 to 100))((a, b) => s"$a-$b") //connect to PartitionHub which uses function to select sink
  val sourceGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.sink( (size, elem) => math.abs(elem.hashCode) % size, startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right) //materialize the source
  val fromSource = sourceGraph.run() //connect to fixedSource freely
  fromSource.runForeach(msg => println("subs1: " + msg)) fromSource.runForeach(msg => println("subs2: " + msg)) //partitioner function
  def roundRobin(): (PartitionHub.ConsumerInfo, String) ? Long = { var i = -1L (info, elem) => { i += 1 info.consumerIdByIdx((i % info.size).toInt) } } val roundRobinGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.statefulSink( () => roundRobin(),startAfterNrOfConsumers = 2,bufferSize = 256) )(Keep.right) val roundRobinSource = roundRobinGraph.run() roundRobinSource.runForeach(msg => println("roundRobin1: " + msg)) roundRobinSource.runForeach(msg => println("roundRobin2: " + msg)) val producer = Source(0 until 100) // ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer. // Note that this is a moving target since the elements are consumed concurrently.
  val runnableGraph: RunnableGraph[Source[Int, NotUsed]] = producer.via(killAll.flow).toMat(PartitionHub.statefulSink( () => (info, elem) ? info.consumerIds.minBy(id ? info.queueSize(id)), startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right) val fromProducer: Source[Int, NotUsed] = runnableGraph.run() fromProducer.runForeach(msg => println("fast1: " + msg)) fromProducer.throttle(10, 100.millis, 10, ThrottleMode.Shaping) .runForeach(msg => println("fast2: " + msg)) scala.io.StdIn.readLine() killAll.shutdown() actorSys.terminate() }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

首页 上一页 1 2 3 4 5 下一页 尾页 4/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(21): Stream:实时操控:.. 下一篇scala基本语法和单词统计

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目