val producer = Source(0 until 100) // ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer. // Note that this is a moving target since the elements are consumed concurrently.
val runnableGraph: RunnableGraph[Source[Int, NotUsed]] = producer.via(killAll.flow).toMat(PartitionHub.statefulSink( () => (info, elem) ? info.consumerIds.minBy(id ? info.queueSize(id)), startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right) val fromProducer: Source[Int, NotUsed] = runnableGraph.run() fromProducer.runForeach(msg => println("fast1: " + msg)) fromProducer.throttle(10, 100.millis, 10, ThrottleMode.Shaping) .runForeach(msg => println("fast2: " + msg))
import akka.NotUsed import akka.stream.scaladsl._ import akka.stream._ import akka.actor._ import scala.concurrent.duration._ object HubsDemo extends App { implicit val actorSys = ActorSystem("sys") implicit val ec = actorSys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(actorSys) .withInputBuffer(16,16) ) val fixedSink = Sink.foreach(println) val sinkGraph: RunnableGraph[Sink[Any,NotUsed]] = MergeHub.source(perProducerBufferSize = 16).to(fixedSink).async val inGate: Sink[Any,NotUsed] = sinkGraph.run() //common input //now connect any number of source
val (killSwitch,_) = (Source(Stream.from(0)).delay(1.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() val (killSwitch2,_) = (Source(List("a","b","c","d","e")).delay(2.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() val (killSwitch3,_) = (Source(List("AA","BB","CC","DD","EE")).delay(3.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() val killAll = KillSwitches.shared("terminator") val fixedSource=Source(Stream.from(100)).delay(1.second,DelayOverflowStrategy.backpressure) val sourceGraph = fixedSource.via(killAll.flow).toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).async val outPort = sourceGraph.run() //shared source //now connect any number of sink to outPort
outPort.to(Sink.foreach{c =>println(s"A: $c")}).run() outPort.to(Sink.foreach{c =>println(s"B: $c")}).run() outPort.to(Sink.foreach{c =>println(s"C: $c")}).run() val (sink, source) = MergeHub.source[Int](perProducerBufferSize = 16) .toMat(BroadcastHub.sink(bufferSize = 16))(Keep.both).run() source.runWith(Sink.ignore) val channel: Flow[Int, Int, UniqueKillSwitch] = Flow.fromSinkAndSource(sink, source) .joinMat(KillSwitches.singleBidi[Int, Int])(Keep.right) .backpr