设为首页 加入收藏

TOP

sparkstreaming-JobScheduler, Job, JobSet 详解
2018-11-23 01:27:58 】 浏览:69
Tags:sparkstreaming-JobScheduler Job JobSet 详解

JobScheduler, Job, JobSet 详解

[酷玩 Spark] Spark Streaming 源码解析系列,返回目录请猛戳这里

「腾讯·广点通」技术团队荣誉出品

本系列内容适用范围:

* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0)
* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.2)
* 2016.11.07 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1, 1.6.2, 1.6.3)


阅读本文前,请一定先阅读 [Spark Streaming 实现思路与模块概述](0.1 Spark Streaming 实现思路与模块概述.md) 一文,其中概述了 Spark Streaming 的 4 大模块的基本作用,有了全局概念后再看本文对模块 2:Job 动态生成细节的解释。

引言

前面在 [Spark Streaming 实现思路与模块概述](0.1 Spark Streaming 实现思路与模块概述.md) 和 [DStream 生成 RDD 实例详解](1.2 DStream 生成 RDD 实例详解.md) 里我们分析了DStreamGraphDStream具有能够实例化RDDRDDDAG 的能力,下面我们来看 Spark Streaming 是如何将其动态调度的。

在 Spark Streaming 程序的入口,我们都会定义一个batchDuration,就是需要每隔多长时间就比照静态的DStreamGraph来动态生成一个 RDD DAG 实例。在 Spark Streaming 里,总体负责动态作业调度的具体类是JobScheduler,在 Spark Streaming 程序在ssc.start()开始运行时,将JobScheduler的实例给 start() 运行起来。

// 来自 StreamingContext
def start(): Unit = synchronized {
  ...
  ThreadUtils.runInNewThread("streaming-start") {
    sparkContext.setCallSite(startSite.get)
    sparkContext.clearJobGroup()
    sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
    scheduler.start()  // 【这里调用了 JobScheduler().start()】
  }
  state = StreamingContextState.ACTIVE
  ...
}

Spark Streaming 的 Job 总调度者 JobScheduler

JobScheduler是 Spark Streaming 的 Job 总调度者

JobScheduler有两个非常重要的成员:JobGeneratorReceiverTrackerJobScheduler将每个 batch 的 RDD DAG 具体生成工作委托给JobGenerator,而将源头输入数据的记录工作委托给ReceiverTracker

image

JobScheduler    的全限定名是:org.apache.spark.streaming.scheduler.JobScheduler
JobGenerator    的全限定名是:org.apache.spark.streaming.scheduler.JobGenerator
ReceiverTracker 的全限定名是:org.apache.spark.streaming.scheduler.ReceiverTracker

JobGenerator维护了一个定时器,周期就是我们刚刚提到的batchDuration定时为每个 batch 生成 RDD DAG 的实例。 具体的,根据我们在 [DStream 生成 RDD 实例详解](1.2 DStream 生成 RDD 实例详解.md) 中的解析,DStreamGraph.generateJobs(time)将返回一个Seq[Job],其中的每个Job是一个ForEachDStream实例的generateJob(time)返回的结果。

image

此时,JobGenerator拿到了Seq[Job]后(如上图(2)),就将其包装成一个 JobSet(如上图(3)),然后就调用JobScheduler.submitJobSet(jobSet)来交付回 JobScheduler(如上图 (4) )。

那么JobScheduler收到jobSet后是具体如何处理的呢?我们看其实现:

// 来自 JobScheduler.submitJobSet(jobSet: JobSet)
if (jobSet.jobs.isEmpty) {
  logInfo("No jobs added for time " + jobSet.time)
} else {
  listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
  jobSets.put(jobSet.time, jobSet)
  // 【下面这行是最主要的处理逻辑:将每个 job 都在 jobExecutor 线程池中、用 new JobHandler 来处理】
  jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
  logInfo("Added jobs for time " + jobSet.time)
}

这里最重要的处理逻辑是job => jobExecutor.execute(new JobHandler(job)),也就是将每个 job 都在 jobExecutor 线程池中、用 new JobHandler 来处理。

JobHandler

先来看 JobHandler 针对 Job 的主要处理逻辑:

// 来自 JobHandler
def run()
{
  ...
  // 【发布 JobStarted 消息】
  _eventLoop.post(JobStarted(job))
  PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
    // 【主要逻辑,直接调用了 job.run()】
    job.run()
  }
  _eventLoop = eventLoop
  if (_eventLoop != null) {
  // 【发布 JobCompleted 消息】
    _eventLoop.post(JobCompleted(job))
  }
  ...
}

也就是说,JobHandler除了做一些状态记录外,最主要的就是调用job.run()!这里就与我们在 [DStream 生成 RDD 实例详解](1.2 DStream 生成 RDD 实例详解.md) 里分析的对应起来了: 在ForEachDStream.generateJob(time)时,是定义了Job的运行逻辑,即定义了Job.func。而在JobHandler这里,是真正调用了Job.run()、将触发Job.func的真正执行!

Job 运行的线程池 jobExecutor

上面JobHandler是解决了做什么的问题,本节jobExecutor是解决Job在哪里做。

具体的,jobExecutorJobScheduler的成员:

// 来自 JobScheduler
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
  ...
  private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
  private val jobExecutor =
      ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
  ...
}

也就是,ThreadUtils.newDaemonFixedThreadPool()调用将产生一个名为"streaming-job-executor"的线程池,所以,Job将在这个线程池的线程里,被实际执行func

spark.streaming.concurrentJobs 参数

这里jobExecutor的线程池大小,是由spark.streaming.concurrentJobs参数来控制的,当没有显式设置时,其取值为1

进一步说,这里jobExecutor的线程池大小,就是能够并行执行的Job数。而回想前文讲解的DStreamGraph.generateJobs(time)过程,一次 batch 产生一个Seq[Job},里面可能包含多个Job—— 所以,确切的,**有几个output操作,就调用几次ForEachDStream.generatorJob(time),就产生出几个Job**。

为了验证这个结果,我们做一个简单的小测试:先设置spark.streaming.concurrentJobs = 10,然后在每个 batch 里做2foreachRDD()这样的output操作:

// 完整代码可见本文最后的附录
val BLOCK_INTERVAL = 1 // in seconds
val BATCH_INTERVAL = 5 // in seconds
val CURRENT_JOBS = 10
...

// DStream DAG 定义开始
val inputStream = ssc.receiverStream(...)
inputStream.foreachRDD(_ => Thread.sleep(Int.MaxValue)) // output 1
inputStream.foreachRDD(_ => Thread.sleep(Int.MaxValue)) // output 2
// DStream DAG 定义结束
...

在上面的设定下,我们很容易知道,能够同时在处理的 batch 有10 / 2 = 5个,其余的 batch 的Job只能处于等待处理状态。

下面的就是刚才测试代码的运行结果,验证了我们前面的分析和计算:

image

Spark Streaming 的 JobSet, Job,与 Spark Core 的 Job, Stage, TaskSet, Task

最后,我们专门拿出一个小节,辨别一下这 Spark Streaming 的 JobSet, Job,与 Spark Core 的 Job, Stage, TaskSet, Task 这几个概念。

[Spark Streaming]
JobSet  的全限定名是:org.apache.spark.streaming.scheduler.JobSet
Job     的全限定名是:org.apache.spark.streaming.scheduler.Job

[Spark Core]
Job     没有一个对应的实体类,主要是通过 jobId:Int 来表示一个具体的 job
Stage   的全限定名是:org.apache.spark.scheduler.Stage
TaskSet 的全限定名是:org.apache.spark.scheduler.TaskSet
Task    的全限定名是:org.apache.spark.scheduler.Task

Spark Core 的 Job, Stage, Task 就是我们“日常”谈论 Spark 任务时所说的那些含义,而且在 Spark 的 WebUI 上有非常好的体现,比如下图就是 1 个Job包含 3 个Stage;3 个Stage各包含 8, 2, 4 个Task。而TaskSet则是 Spark Core 的内部代码里用的类,是Task的集合,和Stage是同义的。

image

而 Spark Streaming 里也有一个Job,但此Job非彼Job。Spark Streaming 里的Job更像是个Java里的Runnable,可以run()一个自定义的func函数。而这个func, 可以:

  • 直接调用RDDaction,从而产生 1 个或多个 Spark Core 的Job
  • 先打印一行表头;然后调用firstTen = RDD.collect(),再打印firstTen的内容;最后再打印一行表尾 —— 这正是DStream.print()Job实现
  • 也可以是任何用户定义的 code,甚至整个 Spark Streaming 执行过程都不产生任何 Spark Core 的Job—— 如上一小节所展示的测试代码,其Jobfunc实现就是:Thread.sleep(Int.MaxValue),仅仅是为了让这个Job一直跑在jobExecutor线程池里,从而测试jobExecutor的并行度 :)

最后,Spark Streaming 的JobSet就是多个Job的集合了。

如果对上面 5 个概念做一个层次划分的话(上一层与下一层多是一对多的关系,但不完全准确),就应该是下表的样子:

Spark Core Spark Streaming
lv 5 RDD DAGs DStreamGraph
lv 4 RDD DAG JobSet
lv 3 Job Job
lv 2 Stage
lv 1 Task

附录

import java.util.concurrent.{Executors, TimeUnit}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf

object ConcurrentJobsDemo {

  def main(args: Array[String]) {

    // 完整代码可见本文最后的附录
    val BLOCK_INTERVAL = 1 // in seconds
    val BATCH_INTERVAL = 5 // in seconds
    val CURRENT_JOBS = 10

    val conf = new SparkConf()
    conf.setAppName(this.getClass.getSimpleName)
    conf.setMaster("local[2]")
    conf.set("spark.streaming.blockInterval", s"${BLOCK_INTERVAL}s")
    conf.set("spark.streaming.concurrentJobs", s"${CURRENT_JOBS}")
    val ssc = new StreamingContext(conf, Seconds(BATCH_INTERVAL))

    // DStream DAG 定义开始
    val inputStream = ssc.receiverStream(new MyReceiver)
    inputStream.foreachRDD(_ => Thread.sleep(Int.MaxValue)) // output 1
    inputStream.foreachRDD(_ => Thread.sleep(Int.MaxValue)) // output 2
    // DStream DAG 定义结束

    ssc.start()
    ssc.awaitTermination()
  }

  class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {

    override def onStart() {
      // invoke store("str") every 100ms
      Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable {
        override def run(): Unit = store("str")
      }, 0, 100, TimeUnit.MILLISECONDS)
    }

    override def onStop() {}
  }

}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark定制班第7课:Spark Streami.. 下一篇Spark pdf 电子书大全 百度云

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目