orLogging { override def preStart(): Unit = { log.info("Starting") } override def postStop(): Unit = { log.info("Stopping") } override def receive: Receive = { case "passivate" ? log.info("Passivating") context.parent ! Passivate(StopMessage) // simulate another message causing a stop before the region sends the stop message // e.g. a persistent actor having a persist failure while processing the next message
context.stop(self) case "hello" ? sender() ! Response(self) case StopMessage ? log.info("Received stop from region") context.parent ! PoisonPill } } } class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSender { import SupervisionSpec._ "Supervision for a sharded actor" must { "allow passivation" in { val supervisedProps = BackoffSupervisor.props(Backoff.onStop( Props(new PassivatingActor()), childName = "child", minBackoff = 1.seconds, maxBackoff = 30.seconds, randomFactor = 0.2, maxNrOfRetries = -1 ).withFinalStopMessage(_ == StopMessage)) Cluster(system).join(Cluster(system).selfAddress) val region = ClusterSharding(system).start( "passy", supervisedProps, ClusterShardingSettings(system), idExtractor, shardResolver ) region ! Msg(10, "hello") val response = expectMsgType[Response](5.seconds) watch(response.self) region ! Msg(10, "passivate") expectTerminated(response.self) // This would fail before as sharded actor would be stuck passivating
region ! Msg(10, "hello") expectMsgType[Response](20.seconds) } } }
8、异常处理、重试策略 backoffsupervisor 实现,如下:
val supervisedProps = BackoffSupervisor.props(Backoff.onStop( Props(new EventWriter()), childName = "child", minBackoff = 1.seconds, maxBackoff = 30.seconds, randomFactor = 0.2, maxNrOfRetries = -1 )) //自动passivate时设定 .withFinalStopMessage(_ == StopMessage))
9、分片sharding部署
一般来说可以通过ClusterSharding(system).start(...)在每个节点上部署分片,如:
ClusterSharding(system).start( typeName = shardName, entityProps = POSProps, settings = mySettings, extractEntityId = getPOSId, extractShardId = getShopId, allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(mySettings), handOffStopMessage = PassivatePOS )
但如果分片的调用客户端所在节点因某种原因不能部署分片时可以用ClusterSharding(system).startProxy(...)部署一个分片代理:
ClusterSharding(system).startProxy( typeName = shardName, role = Some(role), extractEntityId = getPOSId, extractShardId = getShopId )
实际上当所在节点的role不等于startProxy参数role时才能启动这个分片代理。下面是一个成功部署分片代理的例子:
def create(port: Int): ActorSystem = { var config: Config = ConfigFactory.load() if (port != 2554) config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.parseString("akka.cluster.roles = [shard]")) .withFallback(ConfigFactory.load()) else config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.load()) val system = ActorSystem("posSystem",config) val role = "shard" val mySettings = ClusterShardingSettings(system) //.withPassivateIdleAfter(10 seconds)
.withRole(role) /* val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultane |