/** * Convenience functions for often-encountered purposes like keeping only the * left (first) or only the right (second) of two input values. */
object Keep { private val _left = (l: Any, r: Any) ? l private val _right = (l: Any, r: Any) ? r private val _both = (l: Any, r: Any) ? (l, r) private val _none = (l: Any, r: Any) ? NotUsed def left[L, R]: (L, R) ? L = _left.asInstanceOf[(L, R) ? L] def right[L, R]: (L, R) ? R = _right.asInstanceOf[(L, R) ? R] def both[L, R]: (L, R) ? (L, R) = _both.asInstanceOf[(L, R) ? (L, R)] def none[L, R]: (L, R) ? NotUsed = _none.asInstanceOf[(L, R) ? NotUsed] }
Source[+Out, +Mat] //Out代表元素类型,Mat为运算结果类型
Flow[-In, +Out, +Mat] //In,Out为数据流元素类型,Mat是运算结果类型
Sink[-In, +Mat] //In是数据元素类型,Mat是运算结果类型
/** * Flow with attached input and output, can be executed. */ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] { override def shape = ClosedShape /** * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were. */ def mapMaterializedValue[Mat2](f: Mat ? Mat2): RunnableGraph[Mat2] = copy(traversalBuilder.transformMat(f.asInstanceOf[Any ? Any])) /** * Run this flow and return the materialized instance from the flow. */ def run()(implicit materializer: Materializer): Mat = materializer.materialize(this) ...
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka._ import scala.concurrent._ object SourceDemo extends App { implicit val sys=ActorSystem("demo") implicit val mat=ActorMaterializer() implicit val ec=sys.dispatcher val s1: Source[Int,NotUsed] = Source(1 to 10) val sink: Sink[Any,Future[Done]] = Sink.foreach(println) val rg1: RunnableGraph[NotUsed] = s1.to(sink) val rg2: RunnableGraph[Future[Done]] = s1.toMat(sink)(Keep.right) val res1: NotUsed = rg1.run() Thread.sleep(1000) val res2: Future[Done] = rg2.run() res2.andThen { case _ => sys.terminate() } }
val seq = Seq[Int](1,2,3) def toIterator() = seq.iterator val flow1: Flow[Int,Int,NotUsed] = Flow[Int].map(_ + 2) val flow2: Flow[Int,Int,NotUsed] = Flow[Int].map(_ * 3) val s2 = Source.fromIterator(toIterator) val s3 = s1 ++ s2 val s4: Source[Int,NotUsed] = s3.viaMat(flow1)(Keep.right) val s5: Source[Int,NotUsed] = s3.via(flow1).async.viaMat(flow2)(Keep.right) val s6: Source[Int,NotUsed] = s4.async.viaMat(flow2)(