设为首页 加入收藏

TOP

Spark PersistenceEngine持久化引擎与领导选举代理机制内核原理深入剖析-Spark商业环境实战...
2019-03-14 01:21:53 】 浏览:92
Tags:Spark PersistenceEngine 持久化 引擎 领导 选举 代理 机制 内核 原理 深入 剖析 -Spark 商业 环境 实战 ...

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

1 PersistenceEngine持久化引擎

1.1 PersistenceEngine的启动

  • 选择故障恢复机制,主要有ZOOKEEPER 和 FILESYSTEM 。

    private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
    复制代码
  • PersistenceEngine 的初始化是放在Master的onStart()方法中,用于初始化持久化引擎。

    val serializer = new JavaSerializer(conf)

      val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
        case "ZOOKEEPER" =>
          logInfo("Persisting recovery state to ZooKeeper")
          
          val zkFactory =
            new ZooKeeperRecoveryModeFactory(conf, serializer)
          (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
          
        case "FILESYSTEM" =>
        
          val fsFactory =
            new FileSystemRecoveryModeFactory(conf, serializer)
          (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
          
        case "CUSTOM" =>
        
          val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
          val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
            .newInstance(conf, serializer)
            .asInstanceOf[StandaloneRecoveryModeFactory]
          (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
          
        case _ =>
          (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
      }
      persistenceEngine = persistenceEngine_
      leaderElectionAgent = leaderElectionAgent_
    复制代码

1.2 PersistenceEngine的功能

  • PersistenceEngine主要用于当Master发生故障时,来读取持久化的Application,Worker,Driver的详细信息。
  • PersistenceEngine同样负责写入持久化Application,Worker,Driver的详细信息。

(1)PersistenceEngine 的调用时机:

  • 在新的Application注册之前。
  • 在新的Worker注册之前。
  • 在removeApplication和removeWorker方法被调用的时候

举例如下:

   persistenceEngine.removeWorker(worker)
复制代码

1.3 PersistenceEngine的抽象模板,也即调用时机

abstract class PersistenceEngine {

  /**
   * Defines how the object is serialized and persisted. Implementation will
   * depend on the store used.
   */
  def persist(name: String, obj: Object): Unit

  /**
   * Defines how the object referred by its name is removed from the store.
   */
  def unpersist(name: String): Unit

  /**
   * Gives all objects, matching a prefix. This defines how objects are
   * read/deserialized back.
   */
  def read[T: ClassTag](prefix: String): Seq[T]

  final def addApplication(app: ApplicationInfo): Unit = {
    persist("app_" + app.id, app)
  }

  final def removeApplication(app: ApplicationInfo): Unit = {
    unpersist("app_" + app.id)
  }

  final def addWorker(worker: WorkerInfo): Unit = {
    persist("worker_" + worker.id, worker)
  }

  final def removeWorker(worker: WorkerInfo): Unit = {
    unpersist("worker_" + worker.id)
  }

  final def addDriver(driver: DriverInfo): Unit = {
    persist("driver_" + driver.id, driver)
  }

  final def removeDriver(driver: DriverInfo): Unit = {
    unpersist("driver_" + driver.id)
  }

  /**
   * Returns the persisted data sorted by their respective ids (which implies that they're
   * sorted by time of creation).
   */
  final def readPersistedData(
      rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
    rpcEnv.deserialize { () =>
      (read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
    }
  }

  def close() {}
}
复制代码

1.4 PersistenceEngine 的基于文件系统持久化和基于Zookeeper的持久化

  • 基于文件系统持久化FileSystemPersistenceEngine

       private def serializeIntoFile(file: File, value: AnyRef) {
          val created = file.createNewFile()
          if (!created) { throw new IllegalStateException("Could not create file: " + file) }
          val fileOut = new FileOutputStream(file)
          var out: SerializationStream = null
          Utils.tryWithSafeFinally {
            out = serializer.newInstance().serializeStream(fileOut)
            out.writeObject(value)
          } {
            fileOut.close()
            if (out != null) {
              out.close()
            }
          }
        }
    复制代码
  • 基于Zookeeper的持久化ZooKeeperPersistenceEngine

    Curator是Netflix公司开源的Zookeeper客户端,注意这里会把ApplicationInfo,WorkerInfo,DriverInfo等数据通过ZooKeeperPersistenceEngine将数据存储到Zookeeper的不同Znode节点上。

    这里Zookeeper能撑得住吗??疑问

    private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
    private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
    
      private def serializeIntoFile(path: String, value: AnyRef) {
      
          val serialized = serializer.newInstance().serialize(value)
          val bytes = new Array[Byte](serialized.remaining())
          serialized.get(bytes)
          zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
       }
    复制代码

2 领导选举机制

所谓选举机制就是注册监听机制,一旦监听到Master挂了,就会进行回调监听。

主要有:

  • ZooKeeperLeaderElectionAgent
  • MonarchyLeaderAgent

接下来主要以ZooKeeperLeaderElectionAgent为例:

2.1 借鸡生蛋的道理

  • 通过/leader_election这个目录进行监听:

    val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
    
    private def start() {
        logInfo("Starting ZooKeeper LeaderElection agent")
        zk = SparkCuratorUtil.newClient(conf)
        leaderLatch = new LeaderLatch(zk, WORKING_DIR)
        leaderLatch.addListener(this)
        leaderLatch.start()
      }
    
      private def updateLeadershipStatus(isLeader: Boolean) {
        if (isLeader && status == LeadershipStatus.NOT_LEADER) {
          status = LeadershipStatus.LEADER
          masterInstance.electedLeader()
        } else if (!isLeader && status == LeadershipStatus.LEADER) {
          status = LeadershipStatus.NOT_LEADER
          masterInstance.revokedLeadership()
        }
      }
    复制代码
  • 通过监听/leader_election对应目录来进行选举

      override def isLeader() {
          synchronized {
            // could have lost leadership by now.
            if (!leaderLatch.hasLeadership) {
              return
            }
      
            logInfo("We have gained leadership")
            updateLeadershipStatus(true)
          }
        }
      
        override def notLeader() {
          synchronized {
            // could have gained leadership by now.
            if (leaderLatch.hasLeadership) {
              return
            }
      
            logInfo("We have lost leadership")
            updateLeadershipStatus(false)
          }
        }
    复制代码

3 Master 在选举中要做什么

Master自己给自己发送消息,开始进行恢复操作:

  • Master继承了LeaderElectable,因此实现了electedLeader方法:

    override def electedLeader() {
      self.send(ElectedLeader)
    }
    复制代码
  • Master 的行动beginRecovery和CompleteRecovery

       override def receive: PartialFunction[Any, Unit] = {
          case ElectedLeader =>
            val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
            state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
              RecoveryState.ALIVE
            } else {
              RecoveryState.RECOVERING
            }
            logInfo("I have been elected leader! New state: " + state)
            if (state == RecoveryState.RECOVERING) {
    
              beginRecovery(storedApps, storedDrivers, storedWorkers)      <=神来之笔
              
              recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
                override def run(): Unit = Utils.tryLogNonFatalError {
                  self.send(CompleteRecovery)   <=神来之笔
                }
              }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
            }
    复制代码
  • Master 的行动beginRecovery

      private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
            storedWorkers: Seq[WorkerInfo]) {
          for (app <- storedApps) {
            logInfo("Trying to recover app: " + app.id)
            try {
              registerApplication(app)
              app.state = ApplicationState.UNKNOWN
              app.driver.send(MasterChanged(self, masterWebUiUrl))
            } catch {
              case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
            }
          }
    复制代码
  • Master 的行动completeRecovery

      private def completeRecovery() {
          // Ensure "only-once" recovery semantics using a short synchronization period.
          if (state != RecoveryState.RECOVERING) { return }
          state = RecoveryState.COMPLETING_RECOVERY
      
          // Kill off any workers and apps that didn't respond to us.
          workers.filter(_.state == WorkerState.UNKNOWN).foreach(
            removeWorker(_, "Not responding for recovery"))
          apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
      
          // Update the state of recovered apps to RUNNING
          apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)
      
          // Reschedule drivers which were not claimed by any workers
          drivers.filter(_.worker.isEmpty).foreach { d =>
            logWarning(s"Driver ${d.id} was not found after master recovery")
            if (d.desc.supervise) {
              logWarning(s"Re-launching ${d.id}")
              relaunchDriver(d)
            } else {
              removeDriver(d.id, DriverState.ERROR, None)
              logWarning(s"Did not re-launch ${d.id} because it was not supervised")
            }
          }
    复制代码

4 总结

秦凯新 于深圳

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇史上最简单的spark教程第三章-深.. 下一篇SparkStreaming之JobGenerator周..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目