设为首页 加入收藏

TOP

SparkStreaming不间断运行模式下的流式数据清理机制源码深度剖析-Spark商业环境实战...
2019-03-14 01:20:11 】 浏览:9
Tags:SparkStreaming 间断 运行 模式 流式 数据 清理 机制 源码 深度 剖析 -Spark 商业 环境 实战 ...

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

1 不扫一屋何以扫天下

  • SparkStreaming 应用在持续不断的运行,假设Spark 数据接入只进不出,那么即使Spark内存使用多么牛逼,也会崩掉的,因此及时进行内存数据的清理和磁盘的清理可谓重中之重。
  • 那么SparkStreaming应用的对象,数据,元数据这些信息如何进行回收呢?先抛个问题。

2 何时扫天下?

2.1 jobScheduler之job的提交到结束

  • JobGenerator触发generateJobs
  • JobGenerator -> jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
  • submitJobSet -> jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
  • jobScheduler -> _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
  • jobScheduler -> handleJobCompletion(job, completedTime)
  • jobScheduler -> jobGenerator.onBatchCompletion(jobSet.time)
  • jobGenerator -> eventLoop.post(ClearMetadata(time))

2.2 clearMetadata 神龙见首不见尾

  • 主要的缓存元数据
    private val batchTimeToInputInfos =
      new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]
      
     private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]    
    复制代码
  • ssc.graph.clearMetadata : 基于outputStreams 清除 RDD,通过BlockManager清除Block数据

  • jobScheduler.inputInfoTracker.cleanup : 基于inputInfoTracker清除缓存中的timeToAllocatedBlocks。

  • jobScheduler.inputInfoTracker.cleanup : 基于inputInfoTracker清除batchTimeToInputInfos中元数据

    private def clearMetadata(time: Time) {
    
        ssc.graph.clearMetadata(time)               <- 核心之处
    
        // If checkpointing is enabled, then checkpoint,
        // else mark batch to be fully processed
        if (shouldCheckpoint) {
          eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))  <- 核心之处
    
        } else {
          // If checkpointing is not enabled, then delete metadata information about
          // received blocks (block data not saved in any case). Otherwise, wait for
          // checkpointing of this batch to complete.
          val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
    
          jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)  <- 核心之处
          jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)    <- 核心之处
          markBatchFullyProcessed(time)
        }
      } 
    复制代码

2.3 ssc.graph.clearMetadata 之RDD再见

   * Clear metadata that are older than `rememberDuration` of this DStream.
   * This is an internal method that should not be called directly. This default
   * implementation clears the old generated RDDs. Subclasses of DStream may override
   * this to clear their own metadata along with the generated RDDs.
   
  private[streaming] def clearMetadata(time: Time) {
    val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
    logDebug("Clearing references to old RDDs: [" +
      oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
      
    generatedRDDs --= oldRDDs.keys        <- 核心之处
    
    if (unpersistData) {
      logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", ")}")
      oldRDDs.values.foreach { rdd =>
      
        rdd.unpersist(false)               <- 核心之处
        
        // Explicitly remove blocks of BlockRDD
        rdd match {
          case b: BlockRDD[_] =>
            logInfo(s"Removing blocks of RDD $b of time $time")
            b.removeBlocks()
          case _ =>
        }
      }
    }
复制代码
RDD清除细节
 private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
    env.blockManager.master.removeRdd(rddId, blocking)
    persistentRdds.remove(rddId)
    listenerBus.post(SparkListenerUnpersistRDD(rddId))
  }
复制代码

2.4 cleanupOldBlocksAndBatches 之 batch数据再见

   * Clean up block information of old batches. If waitForCompletion is true, this method
   * returns only after the files are cleaned up.
   
  def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
    require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
    
    val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
    logInfo(s"Deleting batches: ${timesToCleanup.mkString(" ")}")
    
    if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
      timeToAllocatedBlocks --= timesToCleanup
      writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
    } else {
      logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
    }
  }
复制代码

2.5 cleanupOldBlocksAndBatches 之 batch info 元数据再见

  def cleanup(batchThreshTime: Time): Unit = synchronized {
    val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)
    logInfo(s"remove old batch metadata: ${timesToCleanup.mkString(" ")}")
    batchTimeToInputInfos --= timesToCleanup
  }
复制代码

3 总结

不扫一屋何以扫天下 终章

秦凯新 于深圳 1:13 2018


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇SparkStreaming数据流从currentBu.. 下一篇Spark ShuffleManager内存缓冲器B..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }