=> val merge = builder.add(Merge[Int](2)) Source.single(0) ~> merge Source(List(2, 3, 4)) ~> merge // Exposing exactly one output port
SourceShape(merge.out) }) // Building a Sink with a nested Flow, using the fluid DSL
val sink = { val nestedFlow = Flow[Int].map(_ * 2).drop(10).named("nestedFlow") nestedFlow.to(Sink.head) } // Putting all together
val closed = source.via(flow.filter(_ > 1)).to(sink)
和scalaz-stream不同的还有akka-stream的运算是在actor上进行的,除了大家都能对数据流元素进行处理之外,akka-stream还可以通过actor的内部状态来维护和返回运算结果。这个运算结果在复合流图中传播的过程是可控的,如下图示:
返回运算结果是通过viaMat, toMat来实现的。简写的via,to默认选择流图左边运算产生的结果。
|