设为首页 加入收藏

TOP

Akka(24): Stream:从外部系统控制数据流-control live stream from external system(一)
2017-10-09 13:43:38 】 浏览:7175
Tags:Akka Stream 外部 系统 控制 数据流 -control live stream from external system

   在数据流应用的现实场景中常常会遇到与外界系统对接的需求。这些外部系统可能是Actor系统又或者是一些其它类型的系统。与这些外界系统对接的意思是在另一个线程中运行的数据流可以接收外部系统推送的事件及做出行为改变的响应。

如果一个外界系统需要控制一个运行中数据流的功能环节GraphStage,首先必须在这个GraphStage内部构建一个控制函数,这样才能接触并更改GraphStage的内部状态。外部系统可以通过调用这个控制函数来向GraphStage发送信息,控制GraphStage行为。akka-stream是多线程异步模式的程序,所以这个函数只能是一个异步运行的回调callback。akka-stream提供了一个函数getAsyncCallback函数,能够把一个函数放到另一个线程里并返回它的callback:

  /** * Obtain a callback object that can be used asynchronously to re-enter the * current [[GraphStage]] with an asynchronous notification. The [[invoke()]] method of the returned * [[AsyncCallback]] is safe to be called from other threads and it will in the background thread-safely * delegate to the passed callback function. I.e. [[invoke()]] will be called by the external world and * the passed handler will be invoked eventually in a thread-safe way by the execution environment. * * This object can be cached and reused within the same [[GraphStageLogic]]. */ final def getAsyncCallback[T](handler: T ? Unit): AsyncCallback[T] = { new AsyncCallback[T] { override def invoke(event: T): Unit = interpreter.onAsyncInput(GraphStageLogic.this, event, handler.asInstanceOf[Any ? Unit]) } }

getAsyncCallback把一个函数T=>Unit变成了异步运算函数并通过AsyncCallback返回它的回调callback。下面是getAsyncCallback的一个用例: 

//external system
object Injector { var callback: AsyncCallback[String] = null def inject(m: String) = { if (callback != null) callback.invoke(m) } } class InjectControl(injector: Injector.type) extends GraphStage[FlowShape[String,String]] { val inport = Inlet[String]("input") val outport = Outlet[String]("output") val shape = FlowShape.of(inport,outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) { var extMessage = ""
      override def preStart(): Unit = { val callback = getAsyncCallback[String] { m =>
          if (m.length > 0) m match { case "Stop" => completeStage() case s: String => extMessage = s } } injector.callback = callback } setHandler(inport, new InHandler { override def onPush(): Unit =
          if (extMessage.length > 0) { push(outport,extMessage) extMessage="" } else push(outport, grab(inport)) }) setHandler(outport, new OutHandler { override def onPull(): Unit = pull(inport) }) } }

上面这个例子里的object Injector模拟一个外部系统。我们重写了GraphStage InjectControl.createLogic里的preStart()函数,在这里把一个String=>Unit函数的callback登记在Injector里。这个callback函数能接受传入的String并更新内部状态extMessage,或者当传入String==“Stop"时终止数据流。在onPush()里extMessage最终会被当作流元素插入到数据流中。下面我们就构建这个GraphStage的测试运行程序:

object InteractWithStreams extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val source = Source(Stream.from(1)).map(_.toString).delay(1.second,DelayOverflowStrategy.backpressure) val graph = new InjectControl(Injector) val flow = Flow.fromGraph(graph) source.via(flow).to(Sink.foreach(println)).run() Thread.sleep(2000) Injector.inject("hello") Thread.sleep(2000) Injector.inject("world!") Thread.sleep(2000) Injector.inject("Stop") scala.io.StdIn.readLine() sys.terminate() }

试运行结果显示:

1
2 hello 4
5
6 world!
8
9
10 Process fini
首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(23): Stream:自定义流构.. 下一篇Akka(18): Stream:组合数据流..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目