设为首页 加入收藏

TOP

Spark Streaming 初始化过程分析
2019-02-13 01:07:40 】 浏览:23
Tags:Spark Streaming 初始 过程 分析

——————————————————————————————
Spark Streaming概述
Spark Streaming 初始化过程
Spark Streaming Receiver启动过程分析
Spark Streaming 数据准备阶段分析(Receiver方式)
Spark Streaming 数据计算阶段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 机制分析

——————————————————————————————

Spark Streaming是一种构建在Spark上的实时计算框架。Spark Streaming应用以Spark应用的方式提交到Spark平台,其组件以长期批处理任务的形式在Spark平台运行。这些任务主要负责接收实时数据流及定期产生批作业并提交至Spark集群,本文要说明的是以下几个功能模块运行前的准备工作。

  • 数据接收
  • Job 生成
  • 流量控制
  • 动态资源伸缩

下面我们以WordCount程序为例分析Spark Streaming运行环境的初始化过程。

val conf = new SparkConf().setAppName("wordCount").setMaster("local[4]") 
val sc = new SparkContext(conf) 
val ssc = new StreamingContext(sc, Seconds(10)) 
val lines = ssc.socketTextStream("localhost", 8585, StorageLevel.MEMORY_ONLY) 
val words = lines.flatMap(_.split(" ")).map(w => (w,1)) 
val wordCount = words.reduceByKey(_+_) 
wordCount.print 
ssc.start()
ssc.awaitTermination()

以下流程,皆以上述WordCount源码为例。

1、StreamingContext的初始化过程

StreamingContext是Spark Streaming应用的执行环境,其定义很多Streaming功能的入口,如:它提供从多种数据源创建DStream的方法等。
在创建Streaming应用时,首先应创建StreamingContext(WordCount应用可知),伴随StreamingContext的创建将会创建以下主要组件:

1.1 DStreamGraph

DStreamGraph的主要功能是记录InputDStream及OutputStream及从InputDStream中抽取出ReceiverInputStreams。因为DStream之间的依赖关系类似于RDD,并在任务执行时转换成RDD,因此,可以认为DStream Graph与RDD Graph存在对应关系. 即:DStreamGraph以批处理间隔为周期转换成RDDGraph.

  • ReceiverInputStreams: 包含用于接收数据的Receiver信息,并在启动Receiver时提供相关信息
  • OutputStream:每个OutputStream会在批作业生成时,生成一个Job.

1.2 JobScheduler

JobScheduler是Spark Streaming中最核心的组件,其负载Streaming各功作组件的启动。

  • 数据接收
  • Job 生成
  • 流量控制
  • 动态资源伸缩
    以及负责生成的批Job的调度及状态管理工作。

2、 DStream的创建与转换

StreamingContext初始化完毕后,通过调用其提供的创建InputDStream的方法创建SocketInputDStream.

SocketInputDStream的继承关系为:
SocketInputDStream->ReceiverInputDStream->InputDStream->DStream.
在InputDStream中 提供如下功

 ssc.graph.addInputStream(this)

JAVA中初始化子类时,会先初始化其父类。所以在创建SocketInputDStream时,会先初始化InputDStream,在InputDStream中实现将自身加入DStreamGraph中,以标识其为输入数据源。
DStream中算子的转换,类似于RDD中的转换,都是延迟计算,仅形成pipeline链。当上述应用遇到print(Output算子)时,会将DStream转换为ForEachDStream,并调register方法作为OutputStream注册到DStreamGraph的outputStreams列表,以待生成Job。
print算子实现方法如下:

/**
   * Print the first num elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
 def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

  /**
   * Apply a function to each RDD in this DStream. This is an output operator, so
   * 'this' DStream will be registered as an output stream and therefore materialized.
   * @param foreachFunc foreachRDD function
   * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
   *                           in the `foreachFunc` to be displayed in the UI. If `false`, then
   *                           only the scopes and callsites of `foreachRDD` will override those
   *                           of the RDDs on the display.
   */
  private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

ForEachDStream 不同于其它DStream的地方为其重写了generateJob方法,以使DStream Graph操作转换成RDD Graph操作,并生成Job.

3、SparkContext启动

/**
   * Start the execution of the streams.
   *
   * @throws IllegalStateException if the StreamingContext is already stopped.
   */
  def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()

            // Start the streaming scheduler in a new thread, so that thread local properties
            // like call sites and job groups can be reset without affecting those of the
            // current thread.
            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
          } catch {
            case NonFatal(e) =>
              logError("Error starting the context, marking it as stopped", e)
              scheduler.stop(false)
              state = StreamingContextState.STOPPED
              throw e
          }
          StreamingContext.setActiveContext(this)
        }
        ......
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

在此方法中,最核心的代码是以线程的方式启动JobScheduler,从而开启各功能组件。

3.1 JobScheduler的启动

JobScheduler主要负责以下几种任务:

  • 数据接收相关组件的初始化及启动
    ReceiverTracker的初始化及启动。ReceiverTracker负责管理Receiver,包括Receiver的启停,状态维护 等。
  • Job生成相关组件的启动
    JobGenerator的启动。JobGenerator负责以BatchInterval为周期生成Job.
  • Streaming监听的注册与启动
  • 作业监听
  • 反压机制
    BackPressure机制,通过RateController控制数据摄取速率。
  • Executor DynamicAllocation 的启动
    Executor 动态伸缩管理, 动态增加或减少Executor,来达到使用系统稳定运行 或减少资源开销的目的。
  • Job的调度及状态维护。

JobScheduler的start方法的代码如下所示:

def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start()
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)

    val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
      case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
      case _ => null
    }

    executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
      executorAllocClient,
      receiverTracker,
      ssc.conf,
      ssc.graph.batchDuration.milliseconds,
      clock)
    executorAllocationManager.foreach(ssc.addStreamingListener)
    receiverTracker.start()
    jobGenerator.start()
    executorAllocationManager.foreach(_.start())
    logInfo("Started JobScheduler")
  }

代码中存在的 eventLoop: EventLoop[JobSchedulerEvent]对象,用以接收和处理事件。调用者通过调用其post方法向事件队列注册事件。EventLoop开始执行时,会开启一deamon线程用于处理队列中的事件。EventLoop是一个抽象类,JobScheduler中初始化EventLoop时实现了其OnReceive方法。该方法中指定接收的事件由processEvent(event)方法处理。

小结

JobScheduler是Spark Streaming中核心的组件,在其开始执行时,会开启数据接收相关组件及Job生成相关组件,从而使数据准备和数据计算两个流程开始工作。
另外,其还负责BackPressure, Executor DynamicAllocation 等优化机制的启动工作。
下面的章节,将对数据准备和数据计算阶段的流程进行分析,以及BackPressure, Executor DynamicAllocation 机制进行分析。


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇SparkStreaming Backpressure分析 下一篇Spark Streaming概述

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }