设为首页 加入收藏

TOP

Akka(18): Stream:组合数据流,组件-Graph components(二)
2017-10-09 13:43:37 】 浏览:9201
Tags:Akka Stream 组合 数据流 组件 -Graph components
uildBlock: GraphDSL.Builder[Mat] ? (g1.Shape) ? S): Graph[S, Mat] = {...} def create[S <: Shape, Mat, M1, M2](g1: Graph[Shape, M1], g2: Graph[Shape, M2])(combineMat: (M1, M2) ? Mat)(buildBlock: GraphDSL.Builder[Mat] ? (g1.Shape, g2.Shape) ? S): Graph[S, Mat] = {...} ... def create[S <: Shape, Mat, M1, M2, M3, M4, M5](g1: Graph[Shape, M1], g2: Graph[Shape, M2], g3: Graph[Shape, M3], g4: Graph[Shape, M4], g5: Graph[Shape, M5])(combineMat: (M1, M2, M3, M4, M5) ? Mat)(buildBlock: GraphDSL.Builder[Mat] ? (g1.Shape, g2.Shape, g3.Shape, g4.Shape, g5.Shape) ? S): Graph[S, Mat] = { ...}

buildBlock函数类型:buildBlock: GraphDSL.Builder[Mat] ? (g1.Shape, g2.Shape,...,g5.Shape) ? S,g?代表合并处理后的开放型流图。下面是几个最基本的Graph构建试例:

import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ object SimpleGraphs extends App{ implicit val sys = ActorSystem("streamSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer() val source = Source(1 to 10) val flow = Flow[Int].map(_ * 2) val sink = Sink.foreach(println) val sourceGraph = GraphDSL.create(){implicit builder => import GraphDSL.Implicits._ val src = source.filter(_ % 2 == 0) val pipe = builder.add(Flow[Int]) src ~> pipe.in SourceShape(pipe.out) } Source.fromGraph(sourceGraph).runWith(sink).andThen{case _ => } // sys.terminate()}
 val flowGraph = GraphDSL.create(){implicit builder => import GraphDSL.Implicits._ val pipe = builder.add(Flow[Int]) FlowShape(pipe.in,pipe.out) } val (_,fut) = Flow.fromGraph(flowGraph).runWith(source,sink) fut.andThen{case _ => } //sys.terminate()}
 val sinkGraph = GraphDSL.create(){implicit builder => import GraphDSL.Implicits._ val pipe = builder.add(Flow[Int]) pipe.out.map(_ * 3) ~> Sink.foreach(println) SinkShape(pipe.in) } val fut1 = Sink.fromGraph(sinkGraph).runWith(source) Thread.sleep(1000) sys.terminate()

上面我们示范了Source,Flow,Sink的Graph编写,我们使用了Flow[Int]作为共同基础组件。我们知道:akka-stream的Graph可以用更简单的Partial-Graph来组合,而所有Graph最终都是用基础流图Core-Graph如Source,Flow,Sink组合而成的。上面例子里我们是用builder.add(...)把一个Flow Graph加入到一个空的Graph模版里,builder.add返回Shape pipe用于揭露这个被加入的Graph的输入输出端口。然后我们按目标Graph的功能要求把pipe的端口连接起来就完成了这个数据流图的设计了。测试使用证明这几个Graph的功能符合预想。下面我们还可以试着自定义一种类似的Pipe类型Graph来更细致的了解Graph组合的过程。所有基础组件Core-Graph都必须定义Shape来描述它的输入输出端口,定义GraphStage中的GraphStateLogic来描述对数据流元素具体的读写方式。

import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import scala.collection.immutable case class PipeShape[In,Out]( in: Inlet[In], out: Outlet[Out]) extends Shape { override def inlets: immutable.Seq[Inlet[_]] = in :: Nil override def outlets: immutable.Seq[Outlet[_]] = out :: Nil override def deepCopy(): Shape = PipeShape( in = in.carbonCopy(), out = out.carbonCopy() ) }

PipeShape有一个输入端口和一个输出端口。因为继承了Shape类所以必须实现Shape类的抽象函数。假设我们设计一个Graph,能把用户提供的一个函数用来对输入元素进行施用,如:source.via(ApplyPipe(myFunc)).runWith(sink)。当然,我们可以直接使用source.map(r => myFunc).runWith(sink),不过我们需要的是:ApplyPipe里可能涉及到许多预设定的共用功能,然后myFunc是其中的一部分代码。如果用map(...)的话用户就必须提供所有的代码了。ApplyPipe的形状是PipeShape,下面是它的GraphState设计:

  class Pipe[In, Out](f: In => Out) extends GraphStage[PipeShape[In, Out]] { val in = Inlet[In]("Pipe.in") val out = Outlet[Out]("Pipe.out") override def shape = PipeShape(in, out) override def initialAttributes: Attributes = Attributes.none overri
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(24): Stream:从外部系统.. 下一篇R的两均值比较检验(非参数检验)

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目