设为首页 加入收藏

TOP

Akka(21): Stream:实时操控:人为中断-KillSwitch(二)
2017-10-09 13:27:34 】 浏览:7148
Tags:Akka Stream 实时 操控 人为 中断 -KillSwitch
* * @tparam T Type of the elements the Flow will forward * @return A reusable [[Graph]] that is linked with the switch. The materialized value provided is this switch itself.
*/ def flow[T]: Graph[FlowShape[T, T], SharedKillSwitch] = _flow.asInstanceOf[Graph[FlowShape[T, T], SharedKillSwitch]]

用flow构建的SharedKillSwitch实例就像immutable对象,我们可以在多个数据流中插入SharedKillSwitch,然后用这一个共享的handler去终止使用了这个SharedKillSwitch的数据流运算。下面是SharedKillSwitch的使用示范:

  val sharedKillSwitch = KillSwitches.shared("multi-ks") val source2 = Source(Stream.from(1)).delay(2.second,DelayOverflowStrategy.backpressure) source2.via(sharedKillSwitch.flow).to(sink).run() source.via(sharedKillSwitch.flow).to(sink).run() scala.io.StdIn.readLine() killSwitch.shutdown() sharedKillSwitch.shutdown()

注意:我们先构建了一个SharedKillSwitch实例,然后在source2,source数据通道中间加入了这个实例。因为我们已经获取了sharedKillSwitch控制柄,所以不必理会返回结果,直接用via和to来连接上下游节点(默认为Keep.left)。

还有一个KillSwitches.singleBidi类型,这种KillSwitch是用来终止双流向数据流运算的。我们将在下篇讨论里介绍。

下面是本次示范的源代码:

import akka.stream.scaladsl._ import akka.stream._ import akka.actor._ import scala.concurrent.duration._ object KillSwitchDemo extends App { implicit val actorSys = ActorSystem("sys") implicit val ec = actorSys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(actorSys) .withInputBuffer(16,16) ) val source = Source(Stream.from(1,2)).delay(1.second,DelayOverflowStrategy.backpressure) val sink = Sink.foreach(println) val killSwitch = source.viaMat(KillSwitches.single)(Keep.right).to(sink).run() val sharedKillSwitch = KillSwitches.shared("multi-ks") val source2 = Source(Stream.from(1)).delay(2.second,DelayOverflowStrategy.backpressure) source2.via(sharedKillSwitch.flow).to(sink).run() source.via(sharedKillSwitch.flow).to(sink).run() scala.io.StdIn.readLine() killSwitch.shutdown() sharedKillSwitch.shutdown() println("terminated!") actorSys.terminate() }

 

 

 

 

 

 

 

 

首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(20): Stream:异步运算,.. 下一篇Akka(22): Stream:实时操控:..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目