设为首页 加入收藏

TOP

Akka-CQRS(3)- 再想多点,全面点(二)
2019-08-15 00:11:03 】 浏览:178
Tags:Akka-CQRS 面点
orLogging {
override def preStart(): Unit = { log.info("Starting") } override def postStop(): Unit = { log.info("Stopping") } override def receive: Receive = { case "passivate" ? log.info("Passivating") context.parent ! Passivate(StopMessage) // simulate another message causing a stop before the region sends the stop message // e.g. a persistent actor having a persist failure while processing the next message context.stop(self) case "hello" ? sender() ! Response(self) case StopMessage ? log.info("Received stop from region") context.parent ! PoisonPill } } } class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSender { import SupervisionSpec._ "Supervision for a sharded actor" must { "allow passivation" in { val supervisedProps = BackoffSupervisor.props(Backoff.onStop( Props(new PassivatingActor()), childName = "child", minBackoff = 1.seconds, maxBackoff = 30.seconds, randomFactor = 0.2, maxNrOfRetries = -1 ).withFinalStopMessage(_ == StopMessage)) Cluster(system).join(Cluster(system).selfAddress) val region = ClusterSharding(system).start( "passy", supervisedProps, ClusterShardingSettings(system), idExtractor, shardResolver ) region ! Msg(10, "hello") val response = expectMsgType[Response](5.seconds) watch(response.self) region ! Msg(10, "passivate") expectTerminated(response.self) // This would fail before as sharded actor would be stuck passivating region ! Msg(10, "hello") expectMsgType[Response](20.seconds) } } }

8、异常处理、重试策略 backoffsupervisor 实现,如下:

      val supervisedProps = BackoffSupervisor.props(Backoff.onStop( Props(new EventWriter()), childName = "child", minBackoff = 1.seconds, maxBackoff = 30.seconds, randomFactor = 0.2, maxNrOfRetries = -1 )) //自动passivate时设定 .withFinalStopMessage(_ == StopMessage))

9、分片sharding部署

一般来说可以通过ClusterSharding(system).start(...)在每个节点上部署分片,如:

 

 ClusterSharding(system).start( typeName = shardName, entityProps = POSProps, settings = mySettings, extractEntityId = getPOSId, extractShardId = getShopId, allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(mySettings), handOffStopMessage = PassivatePOS )

 

但如果分片的调用客户端所在节点因某种原因不能部署分片时可以用ClusterSharding(system).startProxy(...)部署一个分片代理:

 ClusterSharding(system).startProxy( typeName = shardName, role = Some(role), extractEntityId = getPOSId, extractShardId = getShopId )

实际上当所在节点的role不等于startProxy参数role时才能启动这个分片代理。下面是一个成功部署分片代理的例子:

  def create(port: Int): ActorSystem = { var config: Config = ConfigFactory.load() if (port != 2554) config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.parseString("akka.cluster.roles = [shard]")) .withFallback(ConfigFactory.load()) else config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.load()) val system = ActorSystem("posSystem",config) val role = "shard" val mySettings = ClusterShardingSettings(system)   //.withPassivateIdleAfter(10 seconds)
 .withRole(role) /* val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultane
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇2018-11-13 中文代码示例之Progra.. 下一篇2018-12-09 疑似bug_中文代码示例..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目