rceShape[String],NotUsed] = new AlphaSource(Seq("a","b","c","d")) val alphaSource = Source.fromGraph(sourceGraph).delay(1.second,DelayOverflowStrategy.backpressure) // alphaSource.runWith(Sink.foreach(println))
val sinkGraph: Graph[SinkShape[String],NotUsed] = new UppercaseSink val upperSink = Sink.fromGraph(sinkGraph) alphaSource.runWith(upperSink) case class Order(burger: String, qty: Int) extends Row case class Burger(msg: String) extends Row def orderDeliver: Row => Move = order => { order match { case Order(name,qty) =>
if (qty > 0) { val burgers: Iterable[Burger] = (1 to qty).foldLeft(Iterable[Burger]()) { (b, a) => b ++ Iterable(Burger(s"$name $a of ${qty}")) } Next(burgers) } else Stand } } val flowGraph: Graph[FlowShape[Row,Row],NotUsed] = new FlowValve(orderDeliver) val deliverFlow: Flow[Row,Row,NotUsed] = Flow.fromGraph(flowGraph) val orders = List(Order("cheeze",2),Order("beef",3),Order("pepper",1),Order("Rice",0) ,Order("plain",1),Order("beef",2)) Source(orders).via(deliverFlow).to(Sink.foreach(println)).run() // Source(1 to 10).runWith(Sink.foreach(println))
scala.io.StdIn.readLine() sys.terminate() }
|