设为首页 加入收藏

TOP

Akka(19): Stream:组合数据流,组合共用-Graph modular composition(一)
2017-10-09 13:35:07 】 浏览:1888
Tags:Akka Stream 组合 数据流 共用 -Graph modular composition

   akka-stream的Graph是一种运算方案,它可能代表某种简单的线性数据流图如:Source/Flow/Sink,也可能是由更基础的流图组合而成相对复杂点的某种复合流图,而这个复合流图本身又可以被当作组件来组合更大的Graph。因为Graph只是对数据流运算的描述,所以它是可以被重复利用的。所以我们应该尽量地按照业务流程需要来设计构建Graph。在更高的功能层面上实现Graph的模块化(modular)。按上回讨论,Graph又可以被描述成一种黑盒子,它的入口和出口就是Shape,而内部的作用即处理步骤Stage则是用GraphStage来形容的。下面是akka-stream预设的一些基础数据流图:

compose_shapes.png

上面Source,Sink,Flow代表具备线性步骤linear-stage的流图,属于最基础的组件,可以用来构建数据处理链条。而Fan-In合并型,Fan-Out扩散型则具备多个输入或输出端口,可以用来构建更复杂的数据流图。我们可以用以上这些基础Graph来构建更复杂的复合流图,而这些复合流图又可以被重复利用去构建更复杂的复合流图。下面就是一些常见的复合流图:

compose_composites.png

注意上面的Composite Flow(from Sink and Source)可以用Flow.fromSinkAndSource函数构建:

def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] = fromSinkAndSourceMat(sink, source)(Keep.none)

这个Flow从流向来说先Sink再Source是反的,形成的Flow上下游间无法协调,即Source端终结信号无法到达Sink端,因为这两端是相互独立的。我们必须用CoupledTermination对象中的fromSinkAndSource函数构建的Flow来解决这个问题:

/** * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them. * Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped stages. */
object CoupledTerminationFlow { @deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ...)(Keep.both)` instead", "2.5.2") def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] = Flow.fromSinkAndSourceCoupledMat(in, out)(Keep.both)  

从上面图列里的Composite BidiFlow可以看出:一个复合Graph的内部可以是很复杂的,但从外面看到的只是简单的几个输入输出端口。不过Graph内部构件之间的端口必须按照功能逻辑进行正确的连接,剩下的就变成直接向外公开的界面端口了。这种机制支持了层级式的模块化组合方式,如下面的图示:

compose_nested_flow.png

最后变成:

compose_nested_flow_opaque.png

在DSL里我们可以用name("???")来分割模块:

val nestedFlow =
  Flow[Int].filter(_ != 0) // an atomic processing stage
    .map(_ - 2) // another atomic processing stage
    .named("nestedFlow") // wraps up the Flow, and gives it a name

val nestedSink =
  nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
    .named("nestedSink") // wrap it up

// Create a RunnableGraph
val runnableGraph = nestedSource.to(nestedSink)

在下面这个示范里我们自定义一个某种功能的流图模块:它有2个输入和3个输出。然后我们再使用这个自定义流图模块组建一个完整的闭合流图:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

import scala.collection.immutable

object GraphModules {
  def someProcess[I, O]: I => O = i => i.asInstanceOf[O]

  case class TwoThreeShape[I, I2, O, O2, O3](
                                              in1: Inlet[I],
                                              in2: Inlet[I2],
                                              out1: Outlet[O],
                                              out2: Outlet[O2],
                                              out3: Outlet[O3]) extends Shape {

    override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil

    override def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Nil

    override def deepCopy(): Shape = TwoThreeShape(
      in1.carbonCopy(),
      in2.carbonCopy(),
      out1.carbonCopy(),
      out2.carbonCopy(),
      out3.carbonCopy()
    )
  }
//a functional module with 2 input 3 output
  def TwoThreeGraph[I, I2, O, O2, O3] = GraphDSL.create() { implicit builder =>
    val balancer = builder.add(Balance[I](2))
    val flow = builder.add(Flow[I2].map(someProcess[I2, O2]))

    TwoThreeShape(balancer.in, flow.in, balancer.out(0), balancer.out(1), flow.out)
  }

  val closedGraph = GraphDSL.create() {implicit builder =>
    import GraphDSL.Implicits._
    val inp1 = builder.add(Source(List(1,2,3))).out
    val inp2 = builder.add(Source(List(10,20,30))).out
    val m
首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇R的两均值比较检验(非参数检验) 下一篇Akka(20): Stream:异步运算,..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目