package clustersharding.shard import akka.persistence.journal.leveldb._ import akka.actor._ import akka.cluster.sharding._ import com.typesafe.config.ConfigFactory import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern._ import clustersharding.entity.CalculatorShard object CalcShards { def create(port: Int) = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${port}") .withFallback(ConfigFactory.load("sharding")) // Create an Akka system
val system = ActorSystem("ShardingSystem", config) startupSharding(port,system) } def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = { // Start the shared journal one one node (don't crash this SPOF) // This will not be needed with a distributed journal
if (startStore) system.actorOf(Props[SharedLeveldbStore], "store") // register the shared journal
import system.dispatcher implicit val timeout = Timeout(15.seconds) val f = (system.actorSelection(path) ? Identify(None)) f.onSuccess { case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system) case _ => system.log.error("Shared journal not started at {}", path) system.terminate() } f.onFailure { case _ => system.log.error("Lookup of shared journal at {} timed out", path) system.terminate() } } def startupSharding(port: Int, system: ActorSystem) = { startupSharedJournal(system, startStore = (port == 2551), path = ActorPath.fromString("akka.tcp://ShardingSystem@127.0.0.1:2551/user/store")) ClusterSharding(system).start( typeName = CalculatorShard.shardName, entityProps = CalculatorShard.entityProps, settings = ClusterShardingSettings(system), extractEntityId = CalculatorShard.getEntityId, extractShardId = CalculatorShard.getShardId ) } }
package clustersharding.demo import akka.actor.ActorSystem import akka.cluster.sharding._ import clustersharding.entity.CalculatorShard.CalcCommands import clustersharding.entity._ import clustersharding.shard.CalcShards import com.typesafe.config.ConfigFactory object ClusterShardingDemo extends App { CalcShards.create(2551) CalcShards.create(0) CalcShards.create(0) CalcShards.create(0) Thread.sleep(1000) val shardingSystem = ActorSystem("ShardingSystem",ConfigFactory.load("sharding")) CalcShards.startupSharding(0,shardingSystem) Threa