设为首页 加入收藏

TOP

Akka(12): 分布式运算:Cluster-Singleton-让运算在集群节点中自动转移(五)
2017-10-09 13:50:39 】 浏览:10150
Tags:Akka 分布式 运算 Cluster-Singleton- 集群 点中 自动 转移
ers = off akka { loglevel = INFO actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://SingletonClusterSystem@127.0.0.1:2551"] log-info = off } persistence { journal.plugin = "akka.persistence.journal.leveldb-shared" journal.leveldb-shared.store { # DO NOT USE 'native = off' IN PRODUCTION !!! native = off dir = "target/shared-journal" } snapshot-store.plugin = "akka.persistence.snapshot-store.local" snapshot-store.local.dir = "target/snapshots" } }

SingletonActor.scala

package clustersingleton.sa import akka.actor._ import akka.cluster._ import akka.persistence._ import com.typesafe.config.ConfigFactory import akka.cluster.singleton._ import scala.concurrent.duration._ import akka.persistence.journal.leveldb._ import akka.util.Timeout import akka.pattern._ object SingletonActor { sealed trait Command case object Dig extends Command case object Plant extends Command case object AckDig extends Command    //acknowledge
  case object AckPlant extends Command   //acknowledge

  case object Disconnect extends Command   //force node to leave cluster
  case object CleanUp extends Command      //clean up when actor ends

  sealed trait Event case object AddHole extends Event case object AddTree extends Event case class State(nHoles: Int, nTrees: Int, nMatches: Int) def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]")) .withFallback(ConfigFactory.load()) val singletonSystem = ActorSystem("SingletonClusterSystem",config) startupSharedJournal(singletonSystem, (port == 2551), path = ActorPath.fromString("akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/store")) val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props( singletonProps = Props[SingletonActor], terminationMessage = CleanUp, settings = ClusterSingletonManagerSettings(singletonSystem).withRole(Some("singleton")) ), name = "singletonManager") } def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = { // Start the shared journal one one node (don't crash this SPOF) // This will not be needed with a distributed journal
    if (startStore) system.actorOf(Props[SharedLeveldbStore], "store") // register the shared journal
 import system.dispatcher implicit val timeout = Timeout(15.seconds) val f = (system.actorSelection(path) ? Identify(None)) f.onSuccess { case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system) case _ => system.log.error("Shared journal not started at {}", path) system.terminate() } f.onFailure { case _ => system.log.error("Lookup of shared journal at {} timed out", path) system.terminate() } } } class SingletonActor extends PersistentActor with ActorLogging { import SingletonActor._ val cluster = Cluster(context.system) var freeHoles = 0
  var freeTrees = 0
  var ttlMatches = 0

  override def persistenceId = self.path.parent.name + "-" + self.path.name def updateState(evt: Event): Unit = evt match { case AddHole =>
      if (
首页 上一页 2 3 4 5 6 下一页 尾页 5/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(10): 分布式运算:集群-C.. 下一篇scala 基础一 val常量和var变量的..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目