设为首页 加入收藏

TOP

Akka(3): Actor监管 - 细述BackoffSupervisor(一)
2017-10-09 14:06:34 】 浏览:6395
Tags:Akka Actor 监管 细述 BackoffSupervisor

    在上一篇讨论中我们谈到了监管:在Akka中就是一种直属父子监管树结构,父级Actor负责处理直属子级Actor产生的异常。当时我们把BackoffSupervisor作为父子监管方式的其中一种。实际上BackoffSupervisor与定义了supervisorStrategy的Actor有所不同。我们应该把BackoffSupervisor看作是一个一体化的Actor。当然,它的实现方式还是由一对父子Actor组成。监管策略(SupervisorStrategy)是在BackoffSupervisor的内部实现的。从外表上BackoffSupervisor就像是一个Actor,运算逻辑是在子级Actor中定义的,所谓的父级Actor除监管之外没有任何其它功能,我们甚至没有地方定义父级Actor的功能,它的唯一功能是转发收到的信息给子级,是嵌入BackoffSupervisor里的。所以我们虽然发送消息给BackoffSupervisor,但实际上是在与它的子级交流。我们看看下面这个例子:

package backoffSupervisorDemo import akka.actor._ import akka.pattern._ import backoffSupervisorDemo.InnerChild.TestMessage import scala.concurrent.duration._ object InnerChild { case class TestMessage(msg: String) class ChildException extends Exception def props = Props[InnerChild] } class InnerChild extends Actor with ActorLogging { import InnerChild._ override def receive: Receive = { case TestMessage(msg) => //模拟子级功能
      log.info(s"Child received message: ${msg}") } } object Supervisor { def props: Props = { //在这里定义了监管策略和child Actor构建
    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { case _: InnerChild.ChildException => SupervisorStrategy.Restart } val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0) .withManualReset .withSupervisorStrategy( OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)( decider.orElse(SupervisorStrategy.defaultDecider) ) ) BackoffSupervisor.props(options) } } //注意:下面是Supervisor的父级,不是InnerChild的父级
object ParentalActor { case class SendToSupervisor(msg: InnerChild.TestMessage) case class SendToInnerChild(msg: InnerChild.TestMessage) case class SendToChildSelection(msg: InnerChild.TestMessage) def props = Props[ParentalActor] } class ParentalActor extends Actor with ActorLogging { import ParentalActor._ //在这里构建子级Actor supervisor
  val supervisor = context.actorOf(Supervisor.props,"supervisor") supervisor ! BackoffSupervisor.getCurrentChild //要求supervisor返回当前子级Actor
  var innerChild: Option[ActorRef] = None   //返回的当前子级ActorRef
  val selectedChild = context.actorSelection("/user/parent/supervisor/innerChild") override def receive: Receive = { case BackoffSupervisor.CurrentChild(ref) =>   //收到子级Actor信息
      innerChild = ref
    case SendToSupervisor(msg) => supervisor ! msg case SendToChildSelection(msg) => selectedChild ! msg case SendToInnerChild(msg) => innerChild foreach(child => child ! msg) } } object BackoffSupervisorDemo extends App { import ParentalActor._ val testSystem = ActorSystem("testSystem") val parent = testSystem.actorOf(ParentalActor.props,"parent") Thread.sleep(1000)   //wait for BackoffSupervisor.CurrentChild(ref) received
 parent ! SendToSupervisor(TestMessage("Hello message 1 to supervisor")) parent ! SendToInnerChild(TestMessage("Hello message 2 to innerChild")) parent ! SendToChildSelection(TestMessage("Hello message 3 to selectedChild")) scala.io.StdIn.readLine() testSystem.terminate() }

在上面的例子里我们分别向supervisor,innerChild,selectedChild发送消息。但所有消息都是由InnerChild响应的,如下:

[INFO] [05/29/2017 16:11:48.167] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child re
首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(4): Routers - 智能任务.. 下一篇Akka(6): become/unbecome:运..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目