设为首页 加入收藏

TOP

Akka(8): 分布式运算:Remoting-远程查找式(二)
2017-10-09 14:06:30 】 浏览:9462
Tags:Akka 分布式 运算 Remoting- 远程 查找
result: Double = 0.0 //internal state override def receive: Receive = { case Num(d) => result = d case Add(d) => result += d case Sub(d) => result -= d case Mul(d) => result *= d case Div(d) => val _ = result.toInt / d.toInt //yield ArithmeticException result /= d case Clear => result = 0.0 case GetResult => sender() ! s"Result of calculation is: $result" } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting calculator: ${reason.getMessage}") super.preRestart(reason, message) } }

由于ArithmeticException默认的处理策略SupervisorStrategy是Restart,一旦输入Div(0.0)时会重启将result清零。我们可以在remote上加一个Supervisor来把异常处理策略改为Resume。

下面我们先在remote项目本地对Calculator的功能进行测试:remote/CalculatorRunner.scala

package remoteLookup.remote import akka.actor._ import akka.pattern._ import remoteLookup.messages.Messages._ import scala.concurrent.duration._ class SupervisorActor extends Actor { def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { case _: ArithmeticException => SupervisorStrategy.Resume } override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){ decider.orElse(SupervisorStrategy.defaultDecider) } val calcActor = context.actorOf(CalcProps.props,"calculator") override def receive: Receive = { case msg@ _ => calcActor.forward(msg) } } object CalculatorRunner extends App { val remoteSystem = ActorSystem("remoteSystem") val calcActor = remoteSystem.actorOf(Props[SupervisorActor],"supervisorActor") import remoteSystem.dispatcher calcActor ! Clear calcActor ! Num(13.0) calcActor ! Mul(1.5) implicit val timeout = akka.util.Timeout(1 second) ((calcActor ? GetResult).mapTo[String]) foreach println scala.io.StdIn.readLine() calcActor ! Div(0.0) calcActor ! Div(1.5) calcActor ! Add(100.0) ((calcActor ? GetResult).mapTo[String]) foreach println scala.io.StdIn.readLine() remoteSystem.terminate() }

测试运算得出以下结果:

Result of calculation is: 19.5

Result of calculation is: 113.0
[WARN] [06/20/2017 19:28:10.720] [remoteSystem-akka.actor.default-dispatcher-4] [akka://remoteSystem/user/parentActor/calculator] / by zero

supervisorActor实现了它应有的功能。

下面进行远程查找示范:首先,remote需要把Calculator向外发布。这可以通过配置文件设置实现:remote/src/main/resources/application.conf

akka { actor { provider = remote } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } log-sent-messages = on log-received-messages = on } }

上面这段的意思是:所有向外公开Actor的地址前缀为:akka.tcp://remoteSystem@127.0.0.1:2552/user/???

那么Calculator的完整地址path应该就是:akka.tcp://remoteSystem@127.0.0.1:2552/user/supervisorActor/calculator

Akka-Remoting提供了两种远程查找方式:actorSelection.resolveOne方法和Identify消息确认。无论如何,local都需要进行Remoting配置: local/src/main/resources/application.conf

akka { actor { provider = remote } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 0 } } }

port=0的意思是由系统自动选择任何可用的端口。现在我们完成了Remoting设置,也得到了在远程机上Calculator的具体地址,应该足够进行远程Actor沟通了。我们先用actorSelection.resolveOne示范。resolveOne源代码如下:

  /** * Resolve the [[ActorRef]] matching this selection. * The result is returned as a Future that is completed with the [[Actor
首页 上一页 1 2 3 4 5 6 下一页 尾页 2/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark获取时间 下一篇sbt 学习笔记(1)sbt安装和交互..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目