设为首页 加入收藏

TOP

Akka(10): 分布式运算:集群-Cluster(三)
2017-10-09 13:50:40 】 浏览:10356
Tags:Akka 分布式 运算 集群 -Cluster
n shutdown the thunk will run on the caller thread immediately. * Typically used together `cluster.leave(cluster.selfAddress)` and then `system.terminate()`.
*/ def registerOnMemberRemoved[T](code: ? T): Unit = registerOnMemberRemoved(new Runnable { override def run(): Unit = code }) /** * Java API: The supplied thunk will be run, once, when current cluster member is `Removed`. * If the cluster has already been shutdown the thunk will run on the caller thread immediately. * Typically used together `cluster.leave(cluster.selfAddress)` and then `system.terminate()`. */ def registerOnMemberRemoved(callback: Runnable): Unit = { if (_isTerminated.get()) callback.run() else clusterDaemons ! InternalClusterAction.AddOnMemberRemovedListener(callback) }

下面我们就用个例子来示范Akka-Cluster的运作过程:

首先需要Akka-Cluster的dependency:build.sbt

name := "cluster-states-demo" version := "1.0" scalaVersion := "2.11.8" libraryDependencies ++= { val akkaVersion = "2.5.3" Seq( "com.typesafe.akka"       %%  "akka-actor"   % akkaVersion, "com.typesafe.akka"       %%  "akka-cluster"   % akkaVersion ) }

然后是基本的配置:cluster.conf

akka { actor { provider = "cluster" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 2551 } } cluster { seed-nodes = [ "akka.tcp://clusterSystem@127.0.0.1:2551"] } }

下面是一个集群状态转换事件的监听Actor:

import akka.actor._ import akka.cluster.ClusterEvent._ import akka.cluster._ import com.typesafe.config.ConfigFactory class EventLisener extends Actor with ActorLogging { val cluster = Cluster(context.system) override def preStart(): Unit = { cluster.subscribe(self,initialStateMode = InitialStateAsEvents ,classOf[MemberEvent],classOf[UnreachableMember]) //订阅集群状态转换信息
 super.preStart() } override def postStop(): Unit = { cluster.unsubscribe(self) //取消订阅
 super.postStop() } override def receive: Receive = { case MemberJoined(member) => log.info("Member is Joining: {}", member.address) case MemberUp(member) => log.info("Member is Up: {}", member.address) case MemberLeft(member) => log.info("Member is Leaving: {}", member.address) case MemberExited(member) => log.info("Member is Exiting: {}", member.address) case MemberRemoved(member, previousStatus) => log.info( "Member is Removed: {} after {}", member.address, previousStatus) case UnreachableMember(member) => log.info("Member detected as unreachable: {}", member) cluster.down(member.address) //手工驱除,不用auto-down
    case _: MemberEvent => // ignore
 } } 

下面是EventListener使用测试代码,增加了Node加人集群后可能进行的前期设置及退出集群后的事后清理:

object ClusterEventsDemo { def main(args: Array[String]): Unit = { //重设port,seed-node-address
    val port =
      if (args.isEmpty) "0"
      else args(0) val addr =
      if (args.length < 2) "2551"
      else args(1) val seednodeSetting = "akka.cluster.seed-nodes = ["+
    "\"akka.tcp://clusterSystem@127.0.0.1:"+ s"${addr}"+"\"]" val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = ${port}") .withFallback(ConfigFactory.parseString(seednodeSetting)) .withFallback(ConfigFactory.load("cluster.conf")) val clusterSystem = ActorSystem(na
首页 上一页 1 2 3 4 5 6 7 下一页 尾页 3/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(9): 分布式运算:Remotin.. 下一篇Akka(12): 分布式运算:Cluste..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目