设为首页 加入收藏

TOP

Akka(24): Stream:从外部系统控制数据流-control live stream from external system(三)
2017-10-09 13:43:38 】 浏览:7186
Tags:Akka Stream 外部 系统 控制 数据流 -control live stream from external system
g.info(s
"forwarding message:$s") } } object GetStageActorDemo extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val stageActorMessenger = sys.actorOf(Props[Messenger],"forwarder") val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure) val graph = new StageAsActor(stageActorMessenger) val flow = Flow.fromGraph(graph) source.via(flow).to(Sink.foreach(println)).run() Thread.sleep(2000) stageActorMessenger ! "Hello" Thread.sleep(1000) stageActorMessenger ! "World!" Thread.sleep(2000) stageActorMessenger ! "Stop" scala.io.StdIn.readLine() sys.terminate() }

Messenger就是一个存粹的中介,把控制消息通过StageActor转发给运行中的数据流。

下面是本次示范的源代码:

GetAsyncCallBack.scala

import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage._ import scala.concurrent.duration._ //external system
object Injector { var callback: AsyncCallback[String] = null def inject(m: String) = { if (callback != null) callback.invoke(m) } } class InjectControl(injector: Injector.type) extends GraphStage[FlowShape[String,String]] { val inport = Inlet[String]("input") val outport = Outlet[String]("output") val shape = FlowShape.of(inport,outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) { var extMessage = ""
      override def preStart(): Unit = { val callback = getAsyncCallback[String] { m =>
          if (m.length > 0) m match { case "Stop" => completeStage() case s: String => extMessage = s } } injector.callback = callback } setHandler(inport, new InHandler { override def onPush(): Unit =
          if (extMessage.length > 0) { push(outport,extMessage) extMessage="" } else push(outport, grab(inport)) }) setHandler(outport, new OutHandler { override def onPull(): Unit = pull(inport) }) } } object GetAsyncCallbackDemo extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure) val graph = new InjectControl(Injector) val flow = Flow.fromGraph(graph) source.via(flow).to(Sink.foreach(println)).run() Thread.sleep(2000) Injector.inject("hello") Thread.sleep(2000) Injector.inject("world!") Thread.sleep(2000) Injector.inject("Stop") scala.io.StdIn.readLine() sys.terminate() }

GetStageActorDemo.scala

import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage._ import scala.concurrent.duration._ class StageAsActor(extActor: ActorRef) extends GraphStage[FlowShape[String,String]] { val inport = Inlet[String]("input") val outport = Outlet[String]("output") val shape = FlowShape.of(inport,outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) { var extMessage = ""
      override def preStart(): Unit = {
首页 上一页 1 2 3 4 下一页 尾页 3/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(23): Stream:自定义流构.. 下一篇Akka(18): Stream:组合数据流..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目