设为首页 加入收藏

TOP

Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages(五)
2017-10-09 13:43:39 】 浏览:8265
Tags:Akka Stream 定义 构件 功能 -Custom defined stream processing stages
要负责通过InHandler和OutHandler响应输出输入端口的事件,对元素的转变和在端口上的流动方式进行控制:

/** * Collection of callbacks for an input port of a [[GraphStage]] */ trait InHandler { /** * Called when the input port has a new element available. The actual element can be retrieved via the * [[GraphStageLogic.grab()]] method. */ @throws(classOf[Exception]) def onPush(): Unit /** * Called when the input port is finished. After this callback no other callbacks will be called for this port. */ @throws(classOf[Exception]) def onUpstreamFinish(): Unit = GraphInterpreter.currentInterpreter.activeStage.completeStage() /** * Called when the input port has failed. After this callback no other callbacks will be called for this port. */ @throws(classOf[Exception]) def onUpstreamFailure(ex: Throwable): Unit = GraphInterpreter.currentInterpreter.activeStage.failStage(ex) } /** * Collection of callbacks for an output port of a [[GraphStage]] */ trait OutHandler { /** * Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]] * is now allowed to be called on this port. */ @throws(classOf[Exception]) def onPull(): Unit /** * Called when the output port will no longer accept any new elements. After this callback no other callbacks will * be called for this port. */ @throws(classOf[Exception]) def onDownstreamFinish(): Unit = { GraphInterpreter .currentInterpreter .activeStage .completeStage() } }

可以看到:我们需要实现InHandler.onPush()和OutHandler.onPull。akka-stream在数据流的各环节都实现了Reactive-Stream-Specification,所以对于输入端口InHandler来讲需要响应上游推送信号onPush,输出端口OutHandler要响应下游的读取信号onPull。就构件自身来说需要:从输入端口pull(in),对输出端口push(out)。

下面我们就示范设计一个循环产生一串指定字符的Source。Source只有一个输出端口,我们只需要观察输出端口下游的读取信号。所以在这种情况下我们只需要重写函数OutHandler即可:

class AlphaSource(chars: Seq[String]) extends GraphStage[SourceShape[String]] { val outport = Outlet[String]("output") val shape = SourceShape(outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) { var pos: Int = 0 setHandler(outport,new OutHandler { override def onPull(): Unit = { push(outport,chars(pos)) pos += 1
          if (pos == chars.length) pos = 0 } }) } }

GraphStage类是Graph子类:

abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {...} abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] {...}

所以我们可以把AlphaSource当作Graph然后用Source.fromGraph来构建Source构件:

  val sourceGraph: Graph[SourceShape[String],NotUsed] = new AlphaSource(Seq("A","B","C","D")) val alphaSource = Source.fromGraph(sourceGraph).delay(1.second,DelayOverflowStrategy.backpressure) alphaSource.runWith(Sink.foreach(println))

同样对于Sink:我们只需要观察上游推送信号然后读取数据:

class UppercaseSink extends GraphStage[SinkShape[String]] { val inport = Inlet[String]("input") val shape = SinkShape(inport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler { override def preStart(): Unit = pull(inport) override def onPush(): Unit = { println(grab(inport).toUpperCase) pull(inport) } setHandler(inport,this) } }

从上面的AlphaSource,UppercaseSink我们略为尝试了一把数据流元素流动控制,主要是对输出输入端口状态变化采取一种被动的响应:通过push,pull来对端口进行操作。下面列出了一些常用的端口状态事件及操作方法:

输出端口状态变化事件是通过OutHandler中的回调函数(callb

首页 上一页 2 3 4 5 6 下一页 尾页 5/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(17): Stream:数据流基础.. 下一篇Akka(24): Stream:从外部系统..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目