设为首页 加入收藏

TOP

Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages(二)
2017-10-09 13:43:39 】 浏览:8266
Tags:Akka Stream 定义 构件 功能 -Custom defined stream processing stages
ating from a pair of unidirectional flows.
*/ def this(top: FlowShape[In1, Out1], bottom: FlowShape[In2, Out2]) = this(top.in, top.out, bottom.in, bottom.out) override def deepCopy(): BidiShape[In1, Out1, In2, Out2] = BidiShape(in1.carbonCopy(), out1.carbonCopy(), in2.carbonCopy(), out2.carbonCopy()) //#implementation-details-elided } //#bidi-shape object BidiShape { def fromFlows[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] = BidiShape(top.in, top.out, bottom.in, bottom.out) /** Java API */ def of[In1, Out1, In2, Out2]( in1: Inlet[In1 @uncheckedVariance], out1: Outlet[Out1 @uncheckedVariance], in2: Inlet[In2 @uncheckedVariance], out2: Outlet[Out2 @uncheckedVariance]): BidiShape[In1, Out1, In2, Out2] = BidiShape(in1, out1, in2, out2) }

还有一对多的UniformFanOutShape和多对一的UniformFanInShape。下面是我们自定义的一个多对多的Shape:

  case class TwoThreeShape[I, I2, O, O2, O3]( in1: Inlet[I], in2: Inlet[I2], out1: Outlet[O], out2: Outlet[O2], out3: Outlet[O3]) extends Shape { override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil override def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Nil override def deepCopy(): Shape = TwoThreeShape( in1.carbonCopy(), in2.carbonCopy(), out1.carbonCopy(), out2.carbonCopy(), out3.carbonCopy() ) }

这是一个二进三出的形状。我们只需要实现inlets,outlets和deepCopy这三个函数。

GraphStage描述了数据流构件的行为,通过数据流元素在构件中进出流动方式和在流动过程中的转变来定义流构件的具体功能。下面是GraphStage的类型定义:

/** * A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes * its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing * logic that ties the ports together. */
abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] { final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) = (createLogic(inheritedAttributes), NotUsed) @throws(classOf[Exception]) def createLogic(inheritedAttributes: Attributes): GraphStageLogic }

每个构件都需要根据需求通过实现createLogic来设计GraphStageLogic功能。GraphStageLogic定义如下:

/** * Represents the processing logic behind a [[GraphStage]]. Roughly speaking, a subclass of [[GraphStageLogic]] is a * collection of the following parts: * * A set of [[InHandler]] and [[OutHandler]] instances and their assignments to the [[Inlet]]s and [[Outlet]]s * of the enclosing [[GraphStage]] * * Possible mutable state, accessible from the [[InHandler]] and [[OutHandler]] callbacks, but not from anywhere * else (as such access would not be thread-safe) * * The lifecycle hooks [[preStart()]] and [[postStop()]] * * Methods for performing stream processing actions, like pulling or pushing elements * * The stage logic is completed once all its input and output ports have been closed. This can be changed by * setting `setKeepGoing` to true. * * The `postStop` lifecycle hook on the logic itself is called once all ports are closed. This is the only tear down * callback that is guaranteed to happen, if the actor system or the materializer is terminated the handlers may never * see any callbacks to `onUpstreamFailure`, `onUpstreamFinish` or `onDownstreamFinish`. Therefore stage resource * cleanup should always be done in `postStop`. */
abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: Int) {...}

GraphStageLogic主

首页 上一页 1 2 3 4 5 6 下一页 尾页 2/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(17): Stream:数据流基础.. 下一篇Akka(24): Stream:从外部系统..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目