设为首页 加入收藏

TOP

Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages(三)
2017-10-09 13:43:39 】 浏览:8264
Tags:Akka Stream 定义 构件 功能 -Custom defined stream processing stages
ack)来捕获的。用setHandler(out,outHandler)来注册OutHandler实例。下面是针对输出端口的操作函数:

1、push(out,elem):对端口推出数据,只容许在下游使用pull提出读取数据要求后才能进行,在此之前不容许多次调用

2、complete(out):正常手动关闭端口

3、fail(out,exeption):异常手动关闭端口

输出端口响应事件包括:

1、onPull():下游可以接收数据,此时可以用push(out,elem)来向输出端口发送数据

2、onDownStreamFinish():下游终止读取数据,此后不会再收到任何onPull事件

下面的函数可以获得输出端口的当前状态:

1、isAvailable(out):true代表可以使用push(out,elem)

2、isClosed(out):true代表输出端口已经关闭,无法聆听事件或者推送数据

同样,输入端口状态捕获是通过用setHandler(in,inHandler)登记的inHandler中callback实现的。输入端口操作函数包括:

1、pull(in):向上游提出读取数据要求,只容许在上游已经完成了数据推送后才能使用,在此之前不容许多次调用

2、grab(in):从端口读取当前数据,只有在上游完成了数据推送后才能使用,其中不容许多次调用

3、cancel(in):手动关闭输入端口

输入端口事件:

1、onPush():上游已经发送数据至输入端口,此时可以用grab(in)来读取当前数据,用pull(in)向上游要求下一个数据

2、onUpstreamFinish():上游已经终止数据发送,此后再不会捕获onPush事件,不得使用pull(in)向上游请求数据

3、onUpstreamFalure():上游异常终止

获取输入端口状态方法:

1、isAvailable(in):true代表现在可以使用grab(in)读取当前数据

2、hasBeenPulled(in):true代表已经使用pull(in)进行了数据读取要求,在此状态下不容许再次使用pull(in)

3、isClosed(in):true代表端口已经关闭,此后不可施用pull(in)及无法捕获onPush事件

从上面的pull(in)和push(out,elem)的功能描述可以得出它们是严格相互依赖、相互循环配合的,即:下游pull(in)前上游必须先push(out),而上游push(out,elem)前下游必须先pull(in)。这容易理解,因为akka-stream是Reactive-Stream,是push,pull结合模式上下游相互沟通的。但如此则很不方便某些应用场景,比如数据流动控制。akka-stream还提供了一套更简单的API使用户可以更灵活的对端口进行操作。这个API中的函数包括下面这些:

1、emit(out,elem):临时替换OutHandler,向端口发送elem,然后再恢复OutHandler

2、emitMultiple(out,Iterable(e1,e2,e3...)):临时替换OutHandler,向端口发送一串数据,然后再恢复OutHandler

3、read(in)(andThen):临时替换InHandler,从端口读取一个数据元素,然后再恢复InHandler

4、readN(in)(andThen):临时替换InHandler,从端口读取n个数据元素,然后再恢复InHandler

5、abortEmitting():取消输出端口上未完成的数据推送

6、abortReading():取消输入端口上未完成的读取操作

这个API实际上也支持reactive-stream-backpressure,我们从emitMultiple函数源代码中可以得出:

 /** * Emit a sequence of elements through the given outlet and continue with the given thunk * afterwards, suspending execution if necessary. * This action replaces the [[OutHandler]] for the given outlet if suspension * is needed and reinstalls the current handler upon receiving an `onPull()` * signal (before invoking the `andThen` function). */ final protected def emitMultiple[T](out: Outlet[T], elems: Iterator[T], andThen: () ? Unit): Unit =
    if (elems.hasNext) { if (isAvailable(out)) { push(out, elems.next()) if (elems.hasNext) setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen)) else andThen() } else { setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen)) } } else andThen()

下面我们就定制一个Flow GraphStage,利用read/emit让用户自定义的函数可以控制数据流元素的流动和筛选。对于Flow,同时需要关注输入端口上游推送数据状态及输出端口上下游读取请求状态:

trait Row trait Move case object Stand extends Move case class Next(rows: Iterable[Row]) extends Move class FlowValve(controller: Row => Move) extends GraphStage[FlowShape[Row,Row]] { val inport = Inlet[Row]("input") val outport = Outlet[Row]("output") val shape = FlowShape.of(inport,outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler { override def onPush(): Unit = { controller(grab(inport)) match { case Next(rows) => emitMultiple(outport,rows) case _ => pull(inport) } } override def onPull(): Unit = pull(inport) setHandlers(inport,outport,this) } }

上面这个Fl

首页 上一页 1 2 3 4 5 6 下一页 尾页 3/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(17): Stream:数据流基础.. 下一篇Akka(24): Stream:从外部系统..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目