设为首页 加入收藏

TOP

Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages(一)
2017-10-09 13:43:39 】 浏览:8246
Tags:Akka Stream 定义 构件 功能 -Custom defined stream processing stages

    从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据流终点Sink三个框架性的流构件(stream components)组成的。这其中:Source和Sink是stream的两个独立端点,而Flow处于stream Source和Sink中间可能由多个通道式的节点组成,每个节点代表某些数据流元素转化处理功能,它们的链接顺序则可能代表整体作业的流程。一个完整的数据流(可运行数据流)必须是一个闭合的数据流,即:从外表上看,数据流两头必须连接一个Source和一个Sink。我们可以直接把一个Sink连接到一个Source来获取一个最简单的可运行数据流,如下:

  Source(1 to 10).runWith(Sink.foreach(println))

从另一个角度说明:akka-stream又包括数据流图Graph及运算器Materializer两个部分。Graph代表运算方案,Materializer负责准备环境并把运算方案Graph放置到Actor系统里去实际运算产生效果(effects)及获取运算结果。所以:akka-stream必须有一个Graph描述功能和流程。每个Graph又可以由一些代表更细小功能的子Graph组成。一个可运行数据流必须由一个闭合的数据流图(closed graph)来代表,而这个ClosedGraph又是由代表不同数据转化处理功能的子图(sub-graph)组成。定制数据流功能就是针对Graph按功能需要进行自定义。

一个Graph可以用GraphShape和GraphStage两个部分来描述:GraphShape描述了Graph的输入输出端口数量,GraphStage描述数据在流通中的转化处理过程。我们先来分析一下GraphShape,它们的基类是Shape:

/** * A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the * philosophy that a Graph is a freely reusable blueprint, everything that * matters from the outside are the connections that can be made with it, * otherwise it is just a black box. */
abstract class Shape { /** * Scala API: get a list of all input ports */ def inlets: immutable.Seq[Inlet[_]] /** * Scala API: get a list of all output ports */ def outlets: immutable.Seq[Outlet[_]] /** * Create a copy of this Shape object, returning the same type as the * original; this constraint can unfortunately not be expressed in the * type system. */ def deepCopy(): Shape ...}

Shape的子类必须实现上面这三个抽象函数。akka-stream预先提供了一些基本的形状,包括SourceShape/FlowShape/SinkShape: 

/** * A Source [[Shape]] has exactly one output and no inputs, it models a source * of data. */ final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq override val outlets: immutable.Seq[Outlet[_]] = out :: Nil override def deepCopy(): SourceShape[T] = SourceShape(out.carbonCopy()) } object SourceShape { /** Java API */ def of[T](outlet: Outlet[T @uncheckedVariance]): SourceShape[T] = SourceShape(outlet) } /** * A Flow [[Shape]] has exactly one input and one output, it looks from the * outside like a pipe (but it can be a complex topology of streams within of * course). */ final case class FlowShape[-I, +O](in: Inlet[I @uncheckedVariance], out: Outlet[O @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = in :: Nil override val outlets: immutable.Seq[Outlet[_]] = out :: Nil override def deepCopy(): FlowShape[I, O] = FlowShape(in.carbonCopy(), out.carbonCopy()) } object FlowShape { /** Java API */ def of[I, O](inlet: Inlet[I @uncheckedVariance], outlet: Outlet[O @uncheckedVariance]): FlowShape[I, O] = FlowShape(inlet, outlet) }

还有一个稍微复杂点的双向流形状BidiShape: 

//#bidi-shape /** * A bidirectional flow of elements that consequently has two inputs and two * outputs, arranged like this: * * {{{ * +------+ * In1 ~>| |~> Out1 * | bidi | * Out2 <~| |<~ In2 * +------+ * }}} */ final case class BidiShape[-In1, +Out1, -In2, +Out2]( in1: Inlet[In1 @uncheckedVariance], out1: Outlet[Out1 @uncheckedVariance], in2: Inlet[In2 @uncheckedVariance], out2: Outlet[Out2 @uncheckedVariance]) extends Shape { //#implementation-details-elided
  override val inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil override val outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: Nil /** * Java API for cre
首页 上一页 1 2 3 4 5 6 下一页 尾页 1/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(17): Stream:数据流基础.. 下一篇Akka(24): Stream:从外部系统..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目