设为首页 加入收藏

TOP

Spark Streaming Executor DynamicAllocation 机制分析
2019-02-13 01:09:29 】 浏览:17
Tags:Spark Streaming Executor DynamicAllocation 机制 分析

——————————————————————————————
Spark Streaming概述
Spark Streaming 初始化过程
Spark Streaming Receiver启动过程分析
Spark Streaming 数据准备阶段分析(Receiver方式)
Spark Streaming 数据计算阶段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 机制分析

——————————————————————————————

1、 引入Streaming Executor DynamicAllocation 机制的原因

在Spark Streaming,作业的执行是以批处理的方式进行的,批处理间隔内(batch interval)要完成对批量作业的执行,这就要求作业的执行时间(process time)不大于设定的批处理间隔。在计算资源一定的情况下,执行时间与待处理的数据规模成正比。在大数据流式计算环境中,数据的产生完全由数据源决定,由于不同的数据源在不同时空范围内的状态不统一且发生动态变化,导致数据流的速率呈现出了突发性的特征。前一时刻数据速率和后一时刻数据速率可能会有巨大的差异, 因此不同批次接收的数据总量存在差异,不同批次的执行时间也就会存在差异。这种差异是由于计算资源与数据规模不匹配造成的。在Spark 2.0以前, Streaming的资源分配都采取资源预先分配的策略,资源管理器依据应用的申请量予以提前分配,在应用执行期间不能依据应用实际的计算资源需求量进行调整,执行作业时会存在资源过剩或资源不足的情况。

2、Spark Streaming 的计算特征

Spark Streaming应用的执行过程可以分成数据准备和数据计算两个阶段,两个阶段分别由不同的作业进行处理,分别是数据接收作业(提交Receiver时的Job)和数据处理作业(Batch Job)。在大数据流式计算中,数据是实时产生、动态增加的,只要数据源处于活动状态,数据就会一直产生,因此要求数据接收作业不间断的接收数据,并将接收的数据分割成批,供数据处理作业消费。数据处理作业以批处理方式运行。一般情况下,批处理的执行时间不大于其批处理间隔,一个批次可以分成作业实际处理阶段和等待下一个批次阶段。综上所述,Spark Streaming的计算特征如下图所示(实际处理时间<批处理间隔):


4093076-37a7253f4a01a01a.png
SparkStreaming 计算特征

另外,特殊情况下,会存在实际处理阶段所用的时间 > 批处理时间。这种情况下,一般是由于作业处理能力弱引起的。

3、Streaming Executor DynamicAllocation机制的评价指标

经过上述分析,作业的实际处理时间与设定的批处理间隔之间存在的关系如下:

  • 作业的实际处理时间 远小于 批处理间隔
    此时,计算资源大部分时间处于空闲,造成资源浪费
  • 作业的实际处理时间 约等于 批处理间隔
    此时,能满足处理,但数据具有波动性,可能下一个时间不能满足,造成延迟。计算资源以初显不足。
  • 介于以上两者之间
    系统资源能正常满足计算,又不至于造成过多浪费,此时系统稳定性良好。
  • 作业的实际处理时间 大于 批处理间隔
    批处理间隔内不能完成批作业,说明计算资源不足。

因此,采用 有效处理时间占比【有效处理时间占比 = 实际处理时间 / 批处理间隔】来评价当前计算资源的过剩或不足。

4 、Streaming Executor DynamicAllocation机制的工作流程

  • 通过监控组件,获取已完成批作业的实际执行时间
  • 计算有效处理时间占比,然后与设置的阈值进行比较,如果小于下限down,则结束一个Executor; 如果大于上限up, 则增加Executor(个数由比率决定).

该功能启用与否由参数spark.streaming.dynamicAllocation.enabled参数决定,默认为false不开启。

5、源码分析

与Spark Streaming Backpressure机制相同,Spark Streaming Executor动态伸缩机制也是以事件驱动的形式工作的。其负责动态伸缩的类为ExecutorAllocationManager,其继承自StreamingListener。 类定义结构如下:

private[streaming] class ExecutorAllocationManager(
    client: ExecutorAllocationClient,
    receiverTracker: ReceiverTracker,
    conf: SparkConf,
    batchDurationMs: Long,
    clock: Clock) extends StreamingListener with Logging {
    ......
    ......
}

ExecutorAllocationManager 负责管理分配给StreamingContext(Streaming应用)的Executor.其通过分析Streaming作业的监控信息,动态的伸请或释放executor.

5.1 ExecutorAllocationManager 注册与启动

JobScheduler启动时会创建ExecutorAllocationManager 并向ListenerBus注册并开启监听。


  def start(): Unit = synchronized {
    ......

    val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
      case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
      case _ => null
    }
    ......
    executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
      executorAllocClient,
    executorAllocationManager.foreach(ssc.addStreamingListener)
   ......
    executorAllocationManager.foreach(_.start())
    logInfo("Started JobScheduler")
  }

其中executorAllocationManager的start方法定义如下:

  def start(): Unit = {
    timer.start()
    logInfo(s"ExecutorAllocationManager started with " +
      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
  }

  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    _ => manageAllocation(), "streaming-executor-allocation-manager")

其将开启定时器,周期性的执行manageAllocation方法,其时间周期由参数 “spark.streaming.dynamicAllocation.scalingInterval”决定,默认为60s.
即:默认情况下每隔60s 执行一次manageAllocation. managerAllocation会使用历史作业执行信息计算出有效处理时间占比ratio, 并依据占比与预设阈值的关系决定增减资源。预设的阈值信息及增减策略为:

  • scalingUpRatio
    阈值上限,由参数“park.streaming.dynamicAllocation.scalingUpRatio”控制,默认值为0.9。当计算出的ratio大于scalingUpRatio 时,将按如下算式计算出的值,增加若干Executor。
 math.max(math.round(ratio).toInt, 1)
  • scalingDownRatio
    阈值下限, 由参数“spark.streaming.dynamicAllocation.scalingDownRatio”,默认值为0.3。 当计算出的ratio小于scalingDownRatio时,将减少一个Executor.

manageAllocation的实现如下:

/**
   * Manage executor allocation by requesting or killing executors based on the collected
   * batch statistics.
   */
  private def manageAllocation(): Unit = synchronized {
    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    if (batchProcTimeCount > 0) {
      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
      val ratio = averageBatchProcTime.toDouble / batchDurationMs
      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
      if (ratio >= scalingUpRatio) {
        logDebug("Requesting executors")
        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
        requestExecutors(numNewExecutors)
      } else if (ratio <= scalingDownRatio) {
        logDebug("Killing executors")
        killExecutor()
      }
    }
    batchProcTimeSum = 0
    batchProcTimeCount = 0
  }

经过分析代码可知,初始情况下,因if条件batchProcTimeCount > 0不满足,上述机制并不会执行。只有onBatchCompleted事件触发之后,即有作业执行完成,可以做为判断依据之后,manageAllocation()才会正式生效。

5.2 事件触发

Executor dynamicAllocation机制是依据完成的批Job的执行信息进行决策,其在批Job执行完成时会收集作业。
其事件触发过程,同“SparkStreaming Backpressure分析”中"3.3.1 BatchCompleted触发过程"一节过程相同,不再赘述。

5.3 事件处理

onBatchCompleted的事件处理定义如下:

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    logDebug("onBatchCompleted called: " + batchCompleted)
    if (!batchCompleted.batchInfo.outputOperationInfos.values.exists(_.failureReason.nonEmpty)) {
      batchCompleted.batchInfo.processingDelay.foreach(addBatchProcTime)
    }
  }

当onBatchCompleted事件处发时,将执行addBatchProcTime, 为batchProcTimeSum及batchProcTimeCount 增值。

 private def addBatchProcTime(timeMs: Long): Unit = synchronized {
    batchProcTimeSum += timeMs
    batchProcTimeCount += 1
    logDebug(
      s"Added batch processing time $timeMs, sum = $batchProcTimeSum, count = $batchProcTimeCount")
  }

5.4 生效时机

当Timer 再次触发时,manageAllocation 的if条件将满足,其会依据决策信息对Executor进行增减操作。 同时重置batchProcTimeSum及batchProcTimeCount的值,开始下一轮次的准备工作。

6、与Spark Core中Executor DynamicAllocation 机制的区别

Spark Core 中的DynamicAllocation机制同Spark Streaming中一样,也是周期性调度评估机制确定是否要进行Executor增减。不同的是,Spark Core中应用是批处理应用,执行完成之后,即可以结束,因此其使用idle策略来评估Executor的增减,具体策略为:

  • 减少Executor
    应用拥有的Executor数量在处理当前负载时绰绰有余,通过缩减Executor仍然能够一次性执行所有任务(running + pending task)时,会通过一定的策略结束掉部分Executor,达到节省资源的目的。Spark Core中的策略为如果监测到一个Executor空闲了K 秒,这意味着其在未来不再执行任务,则可以将其移除。其中参数K由“spark.dynamicAllocation.executorIdleTimeout ”配制,默认值为60s.
  • 增加Executor
    应用拥有的Executor数量不足以及时处理当前负载,存在任务长时间堆集,则要增加Executor。Spark Core中的策略为如果监测到任务调度队列中的任务N秒内没有被调度,则会增加新的Executor,如果再过M秒还未进行调度,则以指数方式继续增加,直到上限。

由于Spark Streaming应用是长期存在的、微批处理应用。其每隔一个小的时间间隔(batchInterval)就会提交微批处理应用。其会使Spark Core中的Executor DynamicAllocation 机制的idle 策略受阻,因此Spark Streaming 采用有效处理时间占比radio来进行决策。


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark基础解析 下一篇Spark Streaming 数据准备阶段分..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }