import akka.NotUsed import akka.stream.scaladsl._ import akka.stream._ import akka.actor._ import scala.concurrent.duration._ object PartitionHubDemo extends App { implicit val actorSys = ActorSystem("sys") implicit val ec = actorSys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(actorSys) .withInputBuffer(16,16) ) //interupted temination
val killAll = KillSwitches.shared("terminator") //fix a producer
val fixedSource = Source.tick(1.second, 1.second, "message") .zipWith(Source(1 to 100))((a, b) => s"$a-$b") //connect to PartitionHub which uses function to select sink
val sourceGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.sink( (size, elem) => math.abs(elem.hashCode) % size, startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right) //materialize the source
val fromSource = sourceGraph.run() //connect to fixedSource freely
fromSource.runForeach(msg => println("subs1: " + msg)) fromSource.runForeach(msg => println("subs2: " + msg)) //partitioner function
def roundRobin(): (PartitionHub.ConsumerInfo, String) ? Long = { var i = -1L (info, elem) => { i += 1 info.consumerIdByIdx((i % info.size).toInt) } } val roundRobinGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.statefulSink( () => roundRobin(),startAfterNrOfConsumers = 2,bufferSize = 256) )(Keep.right) val roundRobinSource = roundRobinGraph.run() roundRobinSource.runForeach(msg => println("roundRobin1: " + msg)) roundRobinSource.runForeach(msg => println("roundRobin2: " + msg)) val producer = Source(0 until 100) // ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer. // Note that this is a moving target since the elements are consumed concurrently.
val runnableGraph: RunnableGraph[Source[Int, NotUsed]] = producer.via(killAll.flow).toMat(PartitionHub.statefulSink( () => (info, elem) ? info.consumerIds.minBy(id ? info.queueSize(id)), startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right) val fromProducer: Source[Int, NotUsed] = runnableGraph.run() fromProducer.runForeach(msg => println("fast1: " + msg)) fromProducer.throttle(10, 100.millis, 10, ThrottleMode.Shaping) .runForeach(msg => println("fast2: " + msg)) scala.io.StdIn.readLine() killAll.shutdown() actorSys.terminate() }