设为首页 加入收藏

TOP

Akka(13): 分布式运算:Cluster-Sharding-运算的集群分片(六)
2017-10-09 13:50:18 】 浏览:6200
Tags:Akka 分布式 运算 Cluster-Sharding- 集群 分片
Throwable,SupervisorStrategy.Directive]
= { case _: ArithmeticException => SupervisorStrategy.Resume } override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){ decider.orElse(SupervisorStrategy.defaultDecider) } val calcActor = context.actorOf(Calculator.props,"calculator") override def receive: Receive = { case msg@ _ => calcActor.forward(msg) } } object CalculatorShard { import Calculator._ case class CalcCommands(eid: String, msg: Command) //user should use it to talk to shardregion val shardName = "calcShard" val getEntityId: ShardRegion.ExtractEntityId = { case CalcCommands(id,msg) => (id,msg) } val getShardId: ShardRegion.ExtractShardId = { case CalcCommands(id,_) => id.head.toString } def entityProps = Props(new CalcSupervisor) }

CalcShard.scala

package clustersharding.shard import akka.persistence.journal.leveldb._ import akka.actor._ import akka.cluster.sharding._ import com.typesafe.config.ConfigFactory import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern._ import clustersharding.entity.CalculatorShard object CalcShards { def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${port}") .withFallback(ConfigFactory.load("sharding")) // Create an Akka system
    val system = ActorSystem("ShardingSystem", config) startupSharding(port,system) } def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = { // Start the shared journal one one node (don't crash this SPOF) // This will not be needed with a distributed journal
    if (startStore) system.actorOf(Props[SharedLeveldbStore], "store") // register the shared journal
 import system.dispatcher implicit val timeout = Timeout(15.seconds) val f = (system.actorSelection(path) ? Identify(None)) f.onSuccess { case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system) case _ => system.log.error("Shared journal not started at {}", path) system.terminate() } f.onFailure { case _ => system.log.error("Lookup of shared journal at {} timed out", path) system.terminate() } } def startupSharding(port: Int, system: ActorSystem) = { startupSharedJournal(system, startStore = (port == 2551), path = ActorPath.fromString("akka.tcp://ShardingSystem@127.0.0.1:2551/user/store")) ClusterSharding(system).start( typeName = CalculatorShard.shardName, entityProps = CalculatorShard.entityProps, settings = ClusterShardingSettings(system), extractEntityId = CalculatorShard.getEntityId, extractShardId = CalculatorShard.getShardId ) } }

ClusterShardingDemo.scala

package clustersharding.demo import akka.actor.ActorSystem import akka.cluster.sharding._ import clustersharding.entity.CalculatorShard.CalcCommands import clustersharding.entity._ import clustersharding.shard.CalcShards import com.typesafe.config.ConfigFactory object ClusterShardingDemo extends App { CalcShards.create(2551) CalcShards.create(0) CalcShards.create(0) CalcShards.create(0) Thread.sleep(1000) val shardingSystem = ActorSystem("ShardingSystem",ConfigFactory.load("sharding")) CalcShards.startupSharding(0,shardingSystem) Threa
首页 上一页 3 4 5 6 7 下一页 尾页 6/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala 基础一 val常量和var变量的.. 下一篇为什么要创建开放源码的PlayScala..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目