从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据流终点Sink三个框架性的流构件(stream components)组成的。这其中:Source和Sink是stream的两个独立端点,而Flow处于stream Source和Sink中间可能由多个通道式的节点组成,每个节点代表某些数据流元素转化处理功能,它们的链接顺序则可能代表整体作业的流程。一个完整的数据流(可运行数据流)必须是一个闭合的数据流,即:从外表上看,数据流两头必须连接一个Source和一个Sink。我们可以直接把一个Sink连接到一个Source来获取一个最简单的可运行数据流,如下:
Source(1 to 10).runWith(Sink.foreach(println))
从另一个角度说明:akka-stream又包括数据流图Graph及运算器Materializer两个部分。Graph代表运算方案,Materializer负责准备环境并把运算方案Graph放置到Actor系统里去实际运算产生效果(effects)及获取运算结果。所以:akka-stream必须有一个Graph描述功能和流程。每个Graph又可以由一些代表更细小功能的子Graph组成。一个可运行数据流必须由一个闭合的数据流图(closed graph)来代表,而这个ClosedGraph又是由代表不同数据转化处理功能的子图(sub-graph)组成。定制数据流功能就是针对Graph按功能需要进行自定义。
一个Graph可以用GraphShape和GraphStage两个部分来描述:GraphShape描述了Graph的输入输出端口数量,GraphStage描述数据在流通中的转化处理过程。我们先来分析一下GraphShape,它们的基类是Shape:
/** * A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the * philosophy that a Graph is a freely reusable blueprint, everything that * matters from the outside are the connections that can be made with it, * otherwise it is just a black box. */
abstract class Shape { /** * Scala API: get a list of all input ports */ def inlets: immutable.Seq[Inlet[_]] /** * Scala API: get a list of all output ports */ def outlets: immutable.Seq[Outlet[_]] /** * Create a copy of this Shape object, returning the same type as the * original; this constraint can unfortunately not be expressed in the * type system. */ def deepCopy(): Shape ...}
Shape的子类必须实现上面这三个抽象函数。akka-stream预先提供了一些基本的形状,包括SourceShape/FlowShape/SinkShape:
/** * A Source [[Shape]] has exactly one output and no inputs, it models a source * of data. */ final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq override val outlets: immutable.Seq[Outlet[_]] = out :: Nil override def deepCopy(): SourceShape[T] = SourceShape(out.carbonCopy()) } object SourceShape { /** Java API */ def of[T](outlet: Outlet[T @uncheckedVariance]): SourceShape[T] = SourceShape(outlet) } /** * A Flow [[Shape]] has exactly one input and one output, it looks from the * outside like a pipe (but it can be a complex topology of streams within of * course). */ final case class FlowShape[-I, +O](in: Inlet[I @uncheckedVariance], out: Outlet[O @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = in :: Nil override val outlets: immutable.Seq[Outlet[_]] = out :: Nil override def deepCopy(): FlowShape[I, O] = FlowShape(in.carbonCopy(), out.carbonCopy()) } object FlowShape { /** Java API */ def of[I, O](inlet: Inlet[I @uncheckedVariance], outlet: Outlet[O @uncheckedVariance]): FlowShape[I, O] = FlowShape(inlet, outlet) }
还有一个稍微复杂点的双向流形状BidiShape:
//#bidi-shape /** * A bidirectional flow of elements that consequently has two inputs and two * outputs, arranged like this: * * {{{ * +------+ * In1 ~>| |~> Out1 * | bidi | * Out2 <~| |<~ In2 * +------+ * }}} */ final case class BidiShape[-In1, +Out1, -In2, +Out2]( in1: Inlet[In1 @uncheckedVariance], out1: Outlet[Out1 @uncheckedVariance], in2: Inlet[In2 @uncheckedVariance], out2: Outlet[Out2 @uncheckedVariance]) extends Shape { //#implementation-details-elided
override val inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil override val outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: Nil /** * Java API for cre