设为首页 加入收藏

TOP

Akka(17): Stream:数据流基础组件-Source,Flow,Sink简介(三)
2017-10-09 13:43:40 】 浏览:7665
Tags:Akka Stream 数据流 基础 组件 -Source Flow Sink 简介
caladsl.Sink]], * concatenating the processing steps of both.
*/ def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ? Mat3): RunnableGraph[Mat3] = { RunnableGraph(traversalBuilder.append(sink.traversalBuilder, sink.shape, combine)) }

可以发现via和to分别是viaMat和toMat的简写,分别固定了Keep.left。意思是选择左边数据流图的运算结果。我们上面提过akka-stream是在actor系统里处理数据流元素的。在这个过程中同时可以用actor内部状态来产生运算结果。via和to连接了左右两个graph,并且选择了左边graph的运算结果。我们可以用viaMat和toMat来选择右边graph运算结果。这是通过combine: (Mat,Mat2)=>Mat3这个函数实现的。akka-stream提供了一个Keep对象来表达这种选择:

/** * Convenience functions for often-encountered purposes like keeping only the * left (first) or only the right (second) of two input values. */
object Keep { private val _left = (l: Any, r: Any) ? l private val _right = (l: Any, r: Any) ? r private val _both = (l: Any, r: Any) ? (l, r) private val _none = (l: Any, r: Any) ? NotUsed def left[L, R]: (L, R) ? L = _left.asInstanceOf[(L, R) ? L] def right[L, R]: (L, R) ? R = _right.asInstanceOf[(L, R) ? R] def both[L, R]: (L, R) ? (L, R) = _both.asInstanceOf[(L, R) ? (L, R)] def none[L, R]: (L, R) ? NotUsed = _none.asInstanceOf[(L, R) ? NotUsed] }

既然提到运算结果的处理方式,我们就来看看Source,Flow,Sink的类型参数:

Source[+Out, +Mat]       //Out代表元素类型,Mat为运算结果类型
Flow[-In, +Out, +Mat]    //In,Out为数据流元素类型,Mat是运算结果类型
Sink[-In, +Mat]          //In是数据元素类型,Mat是运算结果类型

Keep对象提供的是对Mat的选择。上面源代码中to,toMat函数的返回结果都是RunnableGraph[Mat3],也就是说只有连接了Sink的数据流才能进行运算。RunnableGraph提供一个run()函数来运算数据流:

/** * Flow with attached input and output, can be executed. */ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] { override def shape = ClosedShape /** * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were. */ def mapMaterializedValue[Mat2](f: Mat ? Mat2): RunnableGraph[Mat2] = copy(traversalBuilder.transformMat(f.asInstanceOf[Any ? Any])) /** * Run this flow and return the materialized instance from the flow. */ def run()(implicit materializer: Materializer): Mat = materializer.materialize(this) ...

上面shape = ClosedShape代表RunnableGraph的形状是闭合的(ClosedShape),意思是说:一个可运行的graph所有输人输出端口都必须是连接的。

下面我们就用一个最简单的线性数据流来做些详细解释:

import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka._ import scala.concurrent._ object SourceDemo extends App { implicit val sys=ActorSystem("demo") implicit val mat=ActorMaterializer() implicit val ec=sys.dispatcher val s1: Source[Int,NotUsed] = Source(1 to 10) val sink: Sink[Any,Future[Done]] = Sink.foreach(println) val rg1: RunnableGraph[NotUsed] = s1.to(sink) val rg2: RunnableGraph[Future[Done]] = s1.toMat(sink)(Keep.right) val res1: NotUsed = rg1.run() Thread.sleep(1000) val res2: Future[Done] = rg2.run() res2.andThen { case _ => sys.terminate() } }

我们把焦点放在特别注明的结果类型上面:Source的运算结果Mat类型是NotUsed,Sink的运算结果Mat类型是Future[Done]。从上面这段代码我们看到用toMat选择返回Sink的运算结果Future[Done]才能捕捉到运算终止节点。下面的另一个例子包括了一些组合动作:

  val seq = Seq[Int](1,2,3) def toIterator() = seq.iterator val flow1: Flow[Int,Int,NotUsed] = Flow[Int].map(_ + 2) val flow2: Flow[Int,Int,NotUsed] = Flow[Int].map(_ * 3) val s2 = Source.fromIterator(toIterator) val s3 = s1 ++ s2 val s4: Source[Int,NotUsed] = s3.viaMat(flow1)(Keep.right) val s5: Source[Int,NotUsed] = s3.via(flow1).async.viaMat(flow2)(Keep.right) val s6: Source[Int,NotUsed] = s4.async.viaMat(flow2)(
首页 上一页 1 2 3 4 5 下一页 尾页 3/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇为什么要创建开放源码的PlayScala.. 下一篇Akka(23): Stream:自定义流构..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目