设为首页 加入收藏

TOP

Akka(13): 分布式运算:Cluster-Sharding-运算的集群分片(一)
2017-10-09 13:50:18 】 浏览:6187
Tags:Akka 分布式 运算 Cluster-Sharding- 集群 分片

    通过上篇关于Cluster-Singleton的介绍,我们了解了Akka为分布式程序提供的编程支持:基于消息驱动的运算模式特别适合分布式程序编程,我们不需要特别的努力,只需要按照普通的Actor编程方式就可以实现集群分布式程序了。Cluster-Singleton可以保证无论集群节点出了任何问题,只要集群中还有节点在线,都可以持续的安全运算。Cluster-Singleton这种模式保证了某种Actor的唯一实例可以安全稳定地在集群环境下运行。还有一种情况就是如果有许多特别占用资源的Actor需要同时运行,而这些Actor同时占用的资源远远超过一台服务器的容量,如此我们必须把这些Actor分布到多台服务器上,或者是一个由多台服务器组成的集群环境,这时就需要Cluster-Sharding模式来帮助解决这样的问题了。

我把通过使用Cluster-Sharding后达到的一些目的和大家分享一下,大家一起来分析分析到底这些达成的目标里是否包括了Actor在集群节点间的分布:

首先我有个Actor,它的名称是一个自编码,由Cluster-Sharding在集群中某个节点上构建。由于在一个集群环境里所以这个Actor到底在哪个节点上,具体地址是什么我都不知道,我只需要用这个自编码就可以和它沟通。如果我有许多自编码的消耗资源的Actor,我可以通过自编码中的分片(shard)编号来指定在其它的分片(shard)里构建这些Actor。Akka-Cluster还可以根据整个集群中节点的增减按当前集群节点情况进行分片在集群节点调动来重新配载(rebalance),包括在某些节点因故脱离集群时把节点上的所有Actor在其它在线节点上重新构建。这样看来,这个Actor的自编码应该是Cluster-Sharding的应用核心元素了。按惯例我们还是用例子来示范Cluster-Sharding的使用。我们需要分片(sharding)的Actor就是前几篇讨论里提到的Calculator:

package clustersharding.entity import akka.actor._ import akka.cluster._ import akka.persistence._ import scala.concurrent.duration._ import akka.cluster.sharding._ object Calculator { sealed trait Command case class Num(d: Double) extends Command case class Add(d: Double) extends Command case class Sub(d: Double) extends Command case class Mul(d: Double) extends Command case class Div(d: Double) extends Command case object ShowResult extends Command sealed trait Event case class SetResult(d: Any) extends Event def getResult(res: Double, cmd: Command) = cmd match { case Num(x) => x case Add(x) => res + x case Sub(x) => res - x case Mul(x) => res * x case Div(x) => { val _ = res.toInt / x.toInt //yield ArithmeticException when /0.00
      res / x } case _ => new ArithmeticException("Invalid Operation!") } case class State(result: Double) { def updateState(evt: Event): State = evt match { case SetResult(n) => copy(result = n.asInstanceOf[Double]) } } case object Disconnect extends Command    //exit cluster
 def props = Props(new Calcultor) } class Calcultor extends PersistentActor with ActorLogging { import Calculator._ val cluster = Cluster(context.system) var state: State = State(0) override def persistenceId: String = self.path.parent.name+"-"+self.path.name override def receiveRecover: Receive = { case evt: Event => state = state.updateState(evt) case SnapshotOffer(_,st: State) => state = state.copy(result = st.result) } override def receiveCommand: Receive = { case Num(n) => persist(SetResult(getResult(state.result,Num(n))))(evt => state = state.updateState(evt)) case Add(n) => persist(SetResult(getResult(state.result,Add(n))))(evt => state = state.updateState(evt)) case Sub(n) => persist(SetResult(getResult(state.result,Sub(n))))(evt => state = state.updateState(evt)) case Mul(n) => persist(SetResult(getResult(state.result,Mul(n))))(evt => state = state.updateState(evt)) case Div(n) => persist(SetResult(getResult(state.result,Div(n))))(evt => state = state.updateState(evt)) case ShowResult => log.info(s"Result on ${cluster.selfAddress.hostPort} is: ${state.result}") case Disconnect => log.info(s"${cluster.selfAddress} is leaving cluster!!!") cluster.leave (cluster.selfAddress) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Resta
首页 上一页 1 2 3 4 5 6 7 下一页 尾页 1/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala 基础一 val常量和var变量的.. 下一篇为什么要创建开放源码的PlayScala..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目