设为首页 加入收藏

TOP

Akka(4): Routers - 智能任务分配(三)
2017-10-09 14:10:01 】 浏览:7670
Tags:Akka Routers 智能 任务 分配
acciRoutee的preRestart接口中增加了向自己补发产生异常消息的过程。运算结果显示:虽然出现了多次异常,router重启了f发生异常的Routee,所有消息都得到了处理。

Akka中有些routing模式支持Router-Pool Routee的自动增减。由于BalancingPool不支持此项功能,下面我们就用RoundRobinPool来做个示范。由于需要定义监管策略,只有在代码中设置Resizer了:

 val resizer = DefaultResizer( lowerBound = 2, upperBound = 5, pressureThreshold = 1 ,rampupRate = 1, backoffRate = 0.25 ,backoffThreshold = 0.25, messagesPerResize = 1 ) val router = routingSystem.actorOf( RoundRobinPool(nrOfInstances = 2 ,resizer = Some(resizer) ,supervisorStrategy = routerSupervisorStrategy) .props(FibonacciRoutee.props) ,"roundrobin-pool-router" )

以上resizer设置为:Routee最少2个,可以自动增加到5个。运行后routingSystem自动增加了两个Routee: c,d。

下面是本次示范的完整源代码:

 

import akka.actor._ import akka.routing._ import scala.annotation.tailrec import scala.concurrent.duration._ import scala.util.Random object FibonacciRoutee { case class FibonacciNumber(nbr: Int, msDelay: Int)  //增加延迟参数
  case class GetAnswer(nbr: Int) class RouteeException extends Exception def props = Props[FibonacciRoutee] } class FibonacciRoutee extends Actor with ActorLogging { import FibonacciRoutee._ import context.dispatcher override def receive: Receive = { case FibonacciNumber(nbr,ms) => context.system.scheduler.scheduleOnce(ms second,self,GetAnswer(nbr)) case GetAnswer(nbr) =>
      if (Random.nextBoolean()) throw new RouteeException else { val answer = fibonacci(nbr) log.info(s"${self.path.name}'s answer: Fibonacci($nbr)=$answer") } } private def fibonacci(n: Int): Int = { @tailrec def fib(n: Int, b: Int, a: Int): Int = n match { case 0 => a case _ => fib(n - 1, a + b, b) } fib(n, 1, 0) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting ${self.path.name} on ${reason.getMessage}") message foreach {m => self ! m} super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { log.info(s"Restarted ${self.path.name} on ${reason.getMessage}") super.postRestart(reason) } override def postStop(): Unit = { log.info(s"Stopped ${self.path.name}!") super.postStop() } } object RouterDemo extends App { import FibonacciRoutee._ import scala.concurrent.ExecutionContext.Implicits.global val routingSystem = ActorSystem("routingSystem") /* cannot set SupervisorStrategy in config file val router = routingSystem.actorOf( FromConfig.props(FibonacciRoutee.props) ,"balance-pool-router") */ val routingDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { case _: RouteeException => SupervisorStrategy.Restart } val routerSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)( routingDecider.orElse(SupervisorStrategy.defaultDecider) ) /* does not support resizing routees val router = routingSystem.actorOf( BalancingPool(nrOfInstances = 3 ,supervisorStrategy=routerSupervisorStrategy //set SupervisorStrategy here ).withDispatcher("akka.pool-dispatcher") .props(FibonacciRoutee.props) ,"balance-pool-router" ) */ val resizer = DefaultResizer( lowerBound = 2, upperBound = 5, pressureThreshold = 1 ,rampupRate = 1, backoffRate = 0.25 ,backoffThreshold = 0.25, messagesPerResize = 1 ) val router = routingSystem.actorOf( RoundRobinPool(nrOfInstances = 2 ,resizer = Some(resizer) ,supervis
首页 上一页 1 2 3 4 下一页 尾页 3/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(5): ConsistentHashing R.. 下一篇Akka(3): Actor监管 - 细述Bac..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目