TOP

Akka(18): Stream:组合数据流,组件-Graph components(一)
2017-10-09 13:43:37 】 浏览:9238
Tags:Akka Stream 组合 数据流 组件 -Graph components

   akka-stream的数据流可以由一些组件组合而成。这些组件统称数据流图Graph,它描述了数据流向和处理环节。Source,Flow,Sink是最基础的Graph。用基础Graph又可以组合更复杂的复合Graph。如果一个Graph的所有端口(输入、输出)都是连接的话就是一个闭合流图RunnableGraph,否则就属于·开放流图PartialGraph。一个完整的(可运算的)数据流就是一个RunnableGraph。Graph的输出出入端口可以用Shape来描述:

/** * A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the * philosophy that a Graph is a freely reusable blueprint, everything that * matters from the outside are the connections that can be made with it, * otherwise it is just a black box. */
abstract class Shape { /** * Scala API: get a list of all input ports */ def inlets: immutable.Seq[Inlet[_]] /** * Scala API: get a list of all output ports */ def outlets: immutable.Seq[Outlet[_]] ...

Shape类型的抽象函数inlets,outlets分别代表Graph形状的输入、输出端口。下面列出了aka-stream提供的几个现有形状Shape:

final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape {...} final case class FlowShape[-I, +O](in: Inlet[I @uncheckedVariance], out: Outlet[O @uncheckedVariance]) extends Shape {...} final case class SinkShape[-T](in: Inlet[T @uncheckedVariance]) extends Shape {...} sealed abstract class ClosedShape extends Shape /** * A bidirectional flow of elements that consequently has two inputs and two * outputs, arranged like this: * * {{{ * +------+ * In1 ~>| |~> Out1 * | bidi | * Out2 <~| |<~ In2 * +------+ * }}} */ final case class BidiShape[-In1, +Out1, -In2, +Out2]( in1: Inlet[In1 @uncheckedVariance], out1: Outlet[Out1 @uncheckedVariance], in2: Inlet[In2 @uncheckedVariance], out2: Outlet[Out2 @uncheckedVariance]) extends Shape {...} object UniformFanInShape { def apply[I, O](outlet: Outlet[O], inlets: Inlet[I]*): UniformFanInShape[I, O] =
    new UniformFanInShape(inlets.size, FanInShape.Ports(outlet, inlets.toList)) } object UniformFanOutShape { def apply[I, O](inlet: Inlet[I], outlets: Outlet[O]*): UniformFanOutShape[I, O] =
    new UniformFanOutShape(outlets.size, FanOutShape.Ports(inlet, outlets.toList)) }

Shape是Graph类型的一个参数:

trait Graph[+S <: Shape, +M] { /** * Type-level accessor for the shape parameter of this graph. */ type Shape = S @uncheckedVariance /** * The shape of a graph is all that is externally visible: its inlets and outlets. */ def shape: S ...

RunnableGraph类型的Shape是ClosedShape:

/** * Flow with attached input and output, can be executed. */ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] { override def shape = ClosedShape /** * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were. */ def mapMaterializedValue[Mat2](f: Mat ? Mat2): RunnableGraph[Mat2] = copy(traversalBuilder.transformMat(f.asInstanceOf[Any ? Any])) /** * Run this flow and return the materialized instance from the flow. */ def run()(implicit materializer: Materializer): Mat = materializer.materialize(this) ...

我们可以用akka-stream提供的GraphDSL来构建Graph。GraphDSL继承了GraphApply的create方法,GraphDSL.create(...)就是构建Graph的方法:

object GraphDSL extends GraphApply {...} trait GraphApply { /** * Creates a new [[Graph]] by passing a [[GraphDSL.Builder]] to the given create function. */ def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] ? S): Graph[S, NotUsed] = { val builder = new GraphDSL.Builder val s = buildBlock(builder) createGraph(s, builder) } ... def create[S <: Shape, Mat](g1: Graph[Shape, Mat])(b  
		
Akka(18): Stream:组合数据流,组件-Graph components(一) https://www.cppentry.com/bencandy.php?fid=90&id=124439

首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(24): Stream:从外部系统.. 下一篇R的两均值比较检验(非参数检验)