设为首页 加入收藏

TOP

Akka(24): Stream:从外部系统控制数据流-control live stream from external system(二)
2017-10-09 13:43:38 】 浏览:7187
Tags:Akka Stream 外部 系统 控制 数据流 -control live stream from external system
shed with exit code
0

正确地把"hello world!"插入了一个正在运行中的数据流中并在最后终止了这个数据流。

另外,一个GraphStage也可以被外界当作一种Actor来进行交流。我们可以在GraphStage内部构建一个(ActorRef,Any)=>Unit款式的函数,然后用getStageActor(func).ref以一种ActorRef形式返回这个函数:

 /** * Initialize a [[StageActorRef]] which can be used to interact with from the outside world "as-if" an [[Actor]]. * The messages are looped through the [[getAsyncCallback]] mechanism of [[GraphStage]] so they are safe to modify * internal state of this stage. * * This method must (the earliest) be called after the [[GraphStageLogic]] constructor has finished running, * for example from the [[preStart]] callback the graph stage logic provides. * * Created [[StageActorRef]] to get messages and watch other actors in synchronous way. * * The [[StageActorRef]]'s lifecycle is bound to the Stage, in other words when the Stage is finished, * the Actor will be terminated as well. The entity backing the [[StageActorRef]] is not a real Actor, * but the [[GraphStageLogic]] itself, therefore it does not react to [[PoisonPill]]. * * @param receive callback that will be called upon receiving of a message by this special Actor * @return minimal actor with watch method */
  // FIXME: I don't like the Pair allocation :(
 @ApiMayChange final protected def getStageActor(receive: ((ActorRef, Any)) ? Unit): StageActor = { _stageActor match { case null ? val actorMaterializer = ActorMaterializerHelper.downcast(interpreter.materializer) _stageActor = new StageActor(actorMaterializer, getAsyncCallback, receive) _stageActor case existing ? existing.become(receive) existing } } ... /** * The ActorRef by which this StageActor can be contacted from the outside. * This is a full-fledged ActorRef that supports watching and being watched * as well as location transparent (remote) communication. */ def ref: ActorRef = functionRef

下面是receive:((ActorRef,Any))=>Unit这个函数的实现例子:

      def behavior(m:(ActorRef,Any)): Unit = { val (sender, msg) = m msg.asInstanceOf[String] match { case "Stop" => completeStage() case s@ _ => extMessage = s } }

这个函数的输入参数(sender,msg)代表发送消息的Actor和发送的消息。与上个例子一样,作为一个GraphStage的内部函数,它可以使用、更新GraphStage内部状态。GraphStage的实现如下:

class StageAsActor(extActor: ActorRef) extends GraphStage[FlowShape[String,String]] { val inport = Inlet[String]("input") val outport = Outlet[String]("output") val shape = FlowShape.of(inport,outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) { var extMessage = ""
      override def preStart(): Unit = { extActor ! getStageActor(behavior).ref } def behavior(m:(ActorRef,Any)): Unit = { val (sender, msg) = m msg.asInstanceOf[String] match { case "Stop" => completeStage() case s@ _ => extMessage = s } } setHandler(inport, new InHandler { override def onPush(): Unit =
          if (extMessage.length > 0) { push(outport,extMessage) extMessage="" } else push(outport, grab(inport)) }) setHandler(outport, new OutHandler { override def onPull(): Unit = pull(inport) }) } }

参数extActor就是外部的控制Actor。在creatLogic.preStart()里我们先把StageActor传给extActor。外部系统就可以通过extActor来控制数据流行为:

class Messenger extends Actor with ActorLogging { var stageActor: ActorRef = _ override def receive: Receive = { case r: ActorRef => stageActor = r log.info("received stage actorRef") case s: String => stageActor forward s lo
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(23): Stream:自定义流构.. 下一篇Akka(18): Stream:组合数据流..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目