设为首页 加入收藏

TOP

Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages(六)
2017-10-09 13:43:39 】 浏览:8268
Tags:Akka Stream 定义 构件 功能 -Custom defined stream processing stages
rceShape[String],NotUsed]
= new AlphaSource(Seq("a","b","c","d")) val alphaSource = Source.fromGraph(sourceGraph).delay(1.second,DelayOverflowStrategy.backpressure) // alphaSource.runWith(Sink.foreach(println)) val sinkGraph: Graph[SinkShape[String],NotUsed] = new UppercaseSink val upperSink = Sink.fromGraph(sinkGraph) alphaSource.runWith(upperSink) case class Order(burger: String, qty: Int) extends Row case class Burger(msg: String) extends Row def orderDeliver: Row => Move = order => { order match { case Order(name,qty) => if (qty > 0) { val burgers: Iterable[Burger] = (1 to qty).foldLeft(Iterable[Burger]()) { (b, a) => b ++ Iterable(Burger(s"$name $a of ${qty}")) } Next(burgers) } else Stand } } val flowGraph: Graph[FlowShape[Row,Row],NotUsed] = new FlowValve(orderDeliver) val deliverFlow: Flow[Row,Row,NotUsed] = Flow.fromGraph(flowGraph) val orders = List(Order("cheeze",2),Order("beef",3),Order("pepper",1),Order("Rice",0) ,Order("plain",1),Order("beef",2)) Source(orders).via(deliverFlow).to(Sink.foreach(println)).run() // Source(1 to 10).runWith(Sink.foreach(println)) scala.io.StdIn.readLine() sys.terminate() }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

首页 上一页 3 4 5 6 下一页 尾页 6/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(17): Stream:数据流基础.. 下一篇Akka(24): Stream:从外部系统..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目