设为首页 加入收藏

TOP

Akka(19): Stream:组合数据流,组合共用-Graph modular composition(二)
2017-10-09 13:35:07 】 浏览:1907
Tags:Akka Stream 组合 数据流 共用 -Graph modular composition
erge
= builder.add(Merge[Int](2)) val mod23 = builder.add(TwoThreeGraph[Int,Int,Int,Int,Int]) inp1 ~> mod23.in1 inp2 ~> mod23.in2 mod23.out1 ~> merge.in(0) mod23.out2 ~> merge.in(1) mod23.out3 ~> Sink.foreach(println) merge ~> Sink.foreach(println) ClosedShape } } object TailorGraph extends App { import GraphModules._ implicit val sys = ActorSystem("streamSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer() RunnableGraph.fromGraph(closedGraph).run() scala.io.StdIn.readLine() sys.terminate() }

这个自定义的TwoThreeGraph是一个复合的流图模块,是可以重复使用的。注意这个~>符合的使用:akka-stream只提供了对预设定Shape作为连接对象的支持如:

      def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...} def ~>[Out](junction: UniformFanOutShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...} def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...} def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit = b.addEdge(importAndGetPort(b), b.add(to).in) def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit = b.addEdge(importAndGetPort(b), to.in) ...

所以对于我们自定义的TwoThreeShape就只能使用直接的端口连接了:

   def ~>[U >: T](to: Inlet[U])(implicit b: Builder[_]): Unit = b.addEdge(importAndGetPort(b), to)

以上的过程显示:通过akka的GraphDSL,对复合型Graph的构建可以实现形象化,大部分工作都在如何对组件之间的端口进行连接。我们再来看个较复杂复合流图的构建过程,下面是这个流图的图示:

compose_graph.png

可以说这是一个相对复杂的数据处理方案,里面甚至包括了数据流回路(feedback)。无法想象如果用纯函数数据流如scalaz-stream应该怎样去实现这么复杂的流程,也可能根本是没有解决方案的。但用akka GraphDSL可以很形象的组合这个数据流图;

 import GraphDSL.Implicits._ RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => val A: Outlet[Int] = builder.add(Source.single(0)).out val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2)) val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2)) val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1)) val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2)) val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2)) val G: Inlet[Any] = builder.add(Sink.foreach(println)).in C <~ F A ~>  B  ~>  C     ~> F B ~>  D  ~>  E  ~> F E ~> G ClosedShape })

另一个端口连接方式的版本如下:

RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => val B = builder.add(Broadcast[Int](2)) val C = builder.add(Merge[Int](2)) val E = builder.add(Balance[Int](2)) val F = builder.add(Merge[Int](2)) Source.single(0) ~> B.in; B.out(0) ~> C.in(1); C.out ~> F.in(0) C.in(0) <~ F.out B.out(1).map(_ + 1) ~> E.in; E.out(0) ~> F.in(1) E.out(1) ~> Sink.foreach(println) ClosedShape })

如果把上面这个复杂的Graph切分成模块的话,其中一部分是这样的:

compose_graph_partial.png

这个开放数据流复合图可以用GraphDSL这样构建:
val partial = GraphDSL.create() { implicit builder => val B = builder.add(Broadcast[Int](2)) val C = builder.add(Merge[Int](2)) val E = builder.add(Balance[Int](2)) val F = builder.add(Merge[Int](2)) C <~ F B ~>                            C  ~> F B ~>  Flow[Int].map(_ + 1)  ~>  E  ~> F FlowShape(B.in, E.out(1)) }.named("partial")
模块化的完整Graph图示如下:
compose_graph_flow.png
这部分可以用下面的代码来实现:
// Convert the partial graph of FlowShape to a Flow to get // access to the fluid DSL (for example to be able to call .filter())
val flow = Flow.fromGraph(partial) // Simple way to create a graph backed Source
val source = Source.fromGraph( GraphDSL.create() { implicit builder
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇R的两均值比较检验(非参数检验) 下一篇Akka(20): Stream:异步运算,..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目