设为首页 加入收藏

TOP

Akka(21): Stream:实时操控:人为中断-KillSwitch(一)
2017-10-09 13:27:34 】 浏览:7142
Tags:Akka Stream 实时 操控 人为 中断 -KillSwitch

  akka-stream是多线程non-blocking模式的,一般来说,运算任务提交到另外线程后这个线程就会在当前程序控制之外自由运行了。任何时候如果需要终止运行中的数据流就必须采用一种任务柄(handler)方式来控制在其它线程内运行的任务。这个handler可以在提交运算任务时获取。akka-stream提供了KillSwitch trait来支持这项功能:

/** * A [[KillSwitch]] allows completion of [[Graph]]s from the outside by completing [[Graph]]s of [[FlowShape]] linked * to the switch. Depending on whether the [[KillSwitch]] is a [[UniqueKillSwitch]] or a [[SharedKillSwitch]] one or * multiple streams might be linked with the switch. For details see the documentation of the concrete subclasses of * this interface. */
//#kill-switch
trait KillSwitch { /** * After calling [[KillSwitch#shutdown()]] the linked [[Graph]]s of [[FlowShape]] are completed normally. */ def shutdown(): Unit /** * After calling [[KillSwitch#abort()]] the linked [[Graph]]s of [[FlowShape]] are failed. */ def abort(ex: Throwable): Unit } //#kill-switch

可以想象:我们必须把这个KillSwitch放在一个流图中间,所以它是一种FlowShape的,这可以从KillSwitch的构建器代码里可以看得到:

object KillSwitches { /** * Creates a new [[SharedKillSwitch]] with the given name that can be used to control the completion of multiple * streams from the outside simultaneously. * * @see SharedKillSwitch */ def shared(name: String): SharedKillSwitch = new SharedKillSwitch(name) /** * Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion * of that unique materialization. Different materializations result in different, independent switches. * * For a Bidi version see [[KillSwitch#singleBidi]] */ def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch] = UniqueKillSwitchStage.asInstanceOf[Graph[FlowShape[T, T], UniqueKillSwitch]] /** * Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion * of that unique materialization. Different materializations result in different, independent switches. * * For a Flow version see [[KillSwitch#single]] */ def singleBidi[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch] = UniqueBidiKillSwitchStage.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch]] ...}

akka-stream提供了single,shared,singleBidi三种KillSwitch的构建方式,它们的形状都是FlowShape。KillSwitches.single返回结果类型是Graph[FlowShape[T,T],UniqueKillSwitch]。因为我们需要获取这个KillSwitch的控制柄,所以要用viaMat来可运算化(materialize)这个Graph,然后后选择右边的类型UniqueKillSwitch。这个类型可以控制一个可运算化FlowShape的Graph,如下:

  val source = Source(Stream.from(1,2)).delay(1.second,DelayOverflowStrategy.backpressure) val sink = Sink.foreach(println) val killSwitch = source.viaMat(KillSwitches.single)(Keep.right).to(sink).run() scala.io.StdIn.readLine() killSwitch.shutdown() println("terminated!") actorSys.terminate()

当然,也可以用异常方式中断运行:

killSwitch.abort(new RuntimeException("boom!"))

source是一个不停顿每秒发出一个数字的数据源。如上所述:必须把KillSwitch放在source和sink中间形成数据流完整链状。运算这个数据流时返回了handle killSwitch,我们可以使用这个killSwitch来shutdown或abort数据流运算。

KillSwitches.shared构建了一个SharedKillSwitch类型。这个类型可以被用来控制多个FlowShape Graph的终止运算。SharedKillSwitch类型里的flow方法可以返回终止运算的控制柄handler:

 /** * Returns a typed Flow of a requested type that will be linked to this [[SharedKillSwitch]] instance. By invoking * [[SharedKillSwitch#shutdown()]] or [[SharedKillSwitch#abort()]] all running instances of all provided [[Graph]]s by this * switch will be stopped normally or failed.
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(20): Stream:异步运算,.. 下一篇Akka(22): Stream:实时操控:..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目