设为首页 加入收藏

TOP

Akka(4): Routers - 智能任务分配(二)
2017-10-09 14:10:01 】 浏览:7671
Tags:Akka Routers 智能 任务 分配
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-6] [akka://routingSystem/user/balance-pool-router/$c] $c's answer: Fibonacci(15)=610 [INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-6] [akka://routingSystem/user/balance-pool-router/$c] $c's answer: Fibonacci(17)=1597

我们看到,router按配置自动构建了3个FibonacciRoutee。Routee的构建过程是无法人工干预的。向router发送的计算指令被分配给b,a,c,c去运算了。从显示顺序可以证明每个参与的Actor占用运算时间不同,产生了无序的运算结果。

下面我们在Routee里加一个延迟效应。这样运算结果显示会更自然些:

object FibonacciRoutee { case class FibonacciNumber(nbr: Int, msDelay: Int)  //增加延迟参数
  case class GetAnswer(nbr: Int) class RouteeException extends Exception def props = Props[FibonacciRoutee] } class FibonacciRoutee extends Actor with ActorLogging { import FibonacciRoutee._ import context.dispatcher override def receive: Receive = { case FibonacciNumber(nbr,ms) => context.system.scheduler.scheduleOnce(ms second,self,GetAnswer(nbr)) case GetAnswer(nbr) =>
      if (Random.nextBoolean()) throw new RouteeException else { val answer = fibonacci(nbr) log.info(s"${self.path.name}'s answer: Fibonacci($nbr)=$answer") } } private def fibonacci(n: Int): Int = { @tailrec def fib(n: Int, b: Int, a: Int): Int = n match { case 0 => a case _ => fib(n - 1, a + b, b) } fib(n, 1, 0) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting ${self.path.name} on ${reason.getMessage}") message foreach {m => self ! m} super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { log.info(s"Restarted ${self.path.name} on ${reason.getMessage}") super.postRestart(reason) } override def postStop(): Unit = { log.info(s"Stopped ${self.path.name}!") super.postStop() } }

因为在Actor内部不能使用Thread.sleep,所以我们用了个scheduleOnce在延迟时间后向自己发送一个唤醒消息。注意,scheduleOnce是无阻塞non-blocking代码,调用后程序不会停留等待计划动作。在上面修改后的代码里增加了监管策略SupervisorStrategy的使用测试。Router的默认监管策略是Esculate,即把某个Routee发生的异常提交给Router的直属父级处理。如果Router直属父级对Routee异常的处理方式是重启的话,那么首先重启Router,然后是作为直属子级的所有Routees都会被重启,结果并不是我们想要的。所以必须人为的设定Router监管策略。由于Router的SupervisorStrategy无法在设置文件中定义,所以这次我们只有用代码方式来设置routing模式了:

object RouterDemo extends App { import FibonacciRoutee._ import scala.concurrent.ExecutionContext.Implicits.global val routingSystem = ActorSystem("routingSystem") /* cannot set SupervisorStrategy in config file val router = routingSystem.actorOf( FromConfig.props(FibonacciRoutee.props) ,"balance-pool-router") */ val routingDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { case _: RouteeException => SupervisorStrategy.Restart } val routerSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)( routingDecider.orElse(SupervisorStrategy.defaultDecider) ) val router = routingSystem.actorOf( BalancingPool(nrOfInstances = 3 ,supervisorStrategy=routerSupervisorStrategy    //set SupervisorStrategy here
      ).withDispatcher("akka.pool-dispatcher") .props(FibonacciRoutee.props) ,"balance-pool-router" ) router ! FibonacciNumber(10,5) router ! FibonacciNumber(13,2) router ! FibonacciNumber(15,3) router ! FibonacciNumber(17,1) scala.io.StdIn.readLine() routingSystem.terminate() }

注意:我们在Fibon

首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(5): ConsistentHashing R.. 下一篇Akka(3): Actor监管 - 细述Bac..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目