设为首页 加入收藏

TOP

Spark Streaming 数据准备阶段分析(Receiver方式)
2019-02-13 01:09:27 】 浏览:39
Tags:Spark Streaming 数据 准备 阶段 分析 Receiver 方式

——————————————————————————————
Spark Streaming概述
Spark Streaming 初始化过程
Spark Streaming Receiver启动过程分析
Spark Streaming 数据准备阶段分析(Receiver方式)
Spark Streaming 数据计算阶段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 机制分析

——————————————————————————————

1、Spark Streaming数据准备流程

SparkStreaming的全过程分为两个阶段:数据准备阶段和数据计算阶段。两个阶段在功能上相互独立,仅通过数据联系在一起。本文重点从源码角度分析Spark Streaming数据准备阶段的具体流程。

Spark Streaming数据准备阶段包含对流入数据的接收、分片(按照时间片划分为数据集)以及分片数据的分发工作。其转数据的接收转化过程主要有以下几个关键步骤:

  1. Receiver接收外部数据流,其将接收的数据流交由BlockGenerator存储在ArrayBuffer中,在存储之前会先获取许可(由“spark.streaming.receiver.maxRate”指定,spark 1.5之后由backpressure进行自动计算,代表可以存取的最大速率,每存储一条数据获取一个许可,若未获取到许可接收将阻塞)。
  2. BlockGenerater中定义一Timer,其依据设置的Interval定时将ArrayBuffer中的数据取出,包装成Block,并将Block存放入blocksForPushing中(阻塞队列ArrayBlockingQueue),并将ArrayBuffer清空
  3. BlockGenerater中的blockPushingThread线程从阻塞队列中取出取出block信息,并以onPushBlock的方式将消息通过监听器(listener)发送给ReceiverSupervisor.
  4. ReceiverSupervisor收到消息后,将对消息中携带数据进行处理,其会通过调用BlockManager对数据进行存储,并将存储结果信息向ReceiverTracker汇报
  5. ReceiverTracker收到消息后,将信息存储在未分配Block队列(streamidToUnallocatedBlock)中,等待JobGenerator生成Job时将其指定给RDD

1过程持续进行,2-5 以BlockInterval为周期重复执行.

2、源码分析

以WordCount应用为例,程序见Spark Streaming概述

2.1 数据接收

在Receiver启动之后,其将开始接收外部数据源的数据(WordCount程序中使用的SocketReceiver是以主动接收的方式获取数据),并对数据进行存储。SocketReceiver实现代码如下:

 def receive() {
    try {
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next())
      }
  ......
  }

  /**
   * Store a single item of received data to Spark's memory.
   * These single items will be aggregated together into data blocks before
   * being pushed into Spark's memory.
   */
  def store(dataItem: T) {
    supervisor.pushSingle(dataItem)
  }

其中 supervisor的pushSingle()实现如下:

 /** Push a single record of received data into block generator. */
  def pushSingle(data: Any) {
    defaultBlockGenerator.addData(data)
  }

其调用defaultBlockGenerator的addData将数据添加进currentBuffer,其中defaultBlockGenerator 即为BlockGenerator,其addData方法如下:

 /**
   * Push a single data item into the buffer.
   */
  def addData(data: Any): Unit = {
    if (state == Active) {
      waitToPush()
      synchronized {
        if (state == Active) {
          currentBuffer += data
        } else {
          throw new SparkException(
            "Cannot add data as BlockGenerator has not been started or has been stopped")
        }
      }
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
    }
  }

分析上述代码,其中waitToPush()方法,是用来控制接收速率的,与BackPressure机制相关,"SparkStreaming Backpressure分析"一章会进行详细分析。当获取到许可之后,数据将会存入currentBuffer中,并等待进行后续处理。
Receiver会不断重复上述过程,接收数据,存入currentBuffer.

2.2 数据切片

"Spark Streaming Receiver启动过程分析"提到,在启动Receiver进会创建ReceiverSupervisorImpl, ReceiverSupervisorImpl又会创建并启动BlockGenerator,用于对Receiver接收的数据流进行切片操作。其切片是以定时器的方式进行的。其时间周期由“spark.streaming.blockInterval”进行设置,默认为200ms.

BlockGenerator的start方法实现如下:

 /** Start block generating and pushing threads. */
  def start(): Unit = synchronized {
    if (state == Initialized) {
      state = Active
      blockIntervalTimer.start()
      blockPushingThread.start()
      logInfo("Started BlockGenerator")
    } else {
      throw new SparkException(
        s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
    }
  }

其中

  • blockIntervalTimer为定时器任务,其会周期性的执行计划任务
  private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
  require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")

  private val blockIntervalTimer =
    new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
  • blockPushingThread为新线程,负载不断的从阻塞队列中取出打包的数据
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

2.2.1 数据流切分

RecurringTimer为定时器,其每隔blockIntervalMs时间,执行一次updateCurrentBuffer方法,将currentBuffer中的数据进行打包,并添加到阻塞队列blocksForPushing中。

 /** Change the buffer to which single records are added to. */
  private def updateCurrentBuffer(time: Long): Unit = {
    try {
      var newBlock: Block = null
      synchronized {
        if (currentBuffer.nonEmpty) {   //如果buffer空,则不生成block.
          val newBlockBuffer = currentBuffer
          currentBuffer = new ArrayBuffer[Any]
          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
          listener.onGenerateBlock(blockId)
          newBlock = new Block(blockId, newBlockBuffer)
        }
      }

      if (newBlock != null) {
        blocksForPushing.put(newBlock)  // put is blocking when queue is full
      }
    } catch {
      case ie: InterruptedException =>
        logInfo("Block updating timer thread was interrupted")
      case e: Exception =>
        reportError("Error in block updating thread", e)
    }
  }

2.2.2 数据传输

blockPushingThread 线程启动会,将执行keepPushingBlocks()方法,从阻塞队列中取出切片后的数据,并通过defaultBlockGeneratorListener转发,并等待下一步存储、分发操作。(defaultBlockGeneratorListener在ReceiverSupervisorImpl中定义)。

/** Keep pushing blocks to the BlockManager. */
  private def keepPushingBlocks() {
    logInfo("Started block pushing thread")

    def areBlocksBeingGenerated: Boolean = synchronized {
      state != StoppedGeneratingBlocks
    }

    try {
      // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
      while (areBlocksBeingGenerated) {
        Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }

      // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
      logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
      while (!blocksForPushing.isEmpty) {
        val block = blocksForPushing.take()
        logDebug(s"Pushing block $block")
        pushBlock(block)
        logInfo("Blocks left to push " + blocksForPushing.size())
      }
      logInfo("Stopped block pushing thread")
    } catch {
      case ie: InterruptedException =>
        logInfo("Block pushing thread was interrupted")
      case e: Exception =>
        reportError("Error in block pushing thread", e)
    }
  }

其中pushBlock方法实现如下:

 private def pushBlock(block: Block) {
    listener.onPushBlock(block.id, block.buffer)
    logInfo("Pushed block " + block.id)
  }

2.3 Block 存储与汇报

BlockGeneratorListener 监控到onPushBlock事件后,会对传输的数据分片进行存储操作,并向ReceiverTracker汇报。

2.3.1 Block存储

BlockGeneratorListener 监控到onPushBlock事件后,经过一系列调整,最后将调用 pushAndReportBlock对数据分片进行存储,pushAndReportBlock的实现如下:

/** Store block and report it to driver */
  def pushAndReportBlock(
      receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    val blockId = blockIdOption.getOrElse(nextBlockId)
    val time = System.currentTimeMillis
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
    val numRecords = blockStoreResult.numRecords
    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
    logDebug(s"Reported block $blockId")
  }

其中,数据通过receivedBlockHandler存储为Block, ReceivedBlockHandler有两种实现

  • WriteAheadLogBasedBlockHandler , 开启WAL时会使用此实现
  • BlockManagerBasedBlockHandler,默认情况下会使用此实现 。

BlockManagerBasedBlockHandler通过BlockManager的接口对数据在Receiver所在节点进行保存,并依据StorageLevel 设置的副本数,在其它Executor中保存副本。保存副本的方法如下所示:

  /**
   * Replicate block to another node. Note that this is a blocking call that returns after
   * the block has been replicated.
   */
  private def replicate(
      blockId: BlockId,
      data: ChunkedByteBuffer,
      level: StorageLevel,
      classTag: ClassTag[_]): Unit = {
    ......
    var peersForReplication = blockReplicationPolicy.prioritize(
      blockManagerId,
      getPeers(false),
      mutable.HashSet.empty,
      blockId,
      numPeersToReplicateTo)
   ......
}

其中副本策略采用,随机取样的方式进行,


  /**
   * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation,
   * that just makes sure we put blocks on different hosts, if possible
   *
   * @param blockManagerId Id of the current BlockManager for self identification
   * @param peers A list of peers of a BlockManager
   * @param peersReplicatedTo Set of peers already replicated to
   * @param blockId BlockId of the block being replicated. This can be used as a source of
   *                randomness if needed.
   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
   */
  override def prioritize(
      blockManagerId: BlockManagerId,
      peers: Seq[BlockManagerId],
      peersReplicatedTo: mutable.HashSet[BlockManagerId],
      blockId: BlockId,
      numReplicas: Int): List[BlockManagerId] = {
    val random = new Random(blockId.hashCode)
    logDebug(s"Input peers : ${peers.mkString(", ")}")
    val prioritizedPeers = if (peers.size > numReplicas) {
      getSampleIds(peers.size, numReplicas, random).map(peers(_))
    } else {
      if (peers.size < numReplicas) {
        logWarning(s"Expecting ${numReplicas} replicas with only ${peers.size} peer/s.")
      }
      random.shuffle(peers).toList
    }
    logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
    prioritizedPeers
  }

2.3.2 Block 汇报

当Block保存完成,并且副本制作完成后,将通过trackerEndpoint向ReceiverTrack进行汇报。

  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))

ReceiverTrackEndpoint收到“AddBlock”信息后,将receivedBlockTracker将block信息保存入队列streamIdToUnallocatedBlockQueues中,以用于生成Job。

case AddBlock(receivedBlockInfo) =>
        if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
          walBatchingThreadPool.execute(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              if (active) {
                context.reply(addBlock(receivedBlockInfo))
              } else {
                throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
              }
            }
          })
        } else {
          context.reply(addBlock(receivedBlockInfo))
        }


  /** Add new blocks for the given stream */
  private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
  }

其中receivedBlockTracker的addBlock实现如下:


  /** Add received block. This event will get written to the write ahead log (if enabled). */
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    try {
      val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
      if (writeResult) {
        synchronized {
          getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
        }
        logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
          s"block ${receivedBlockInfo.blockStoreResult.blockId}")
      } else {
        logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
          s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
      }
      writeResult
    } catch {
      case NonFatal(e) =>
        logError(s"Error adding block $receivedBlockInfo", e)
        false
    }
  }

  /** Get the queue of received blocks belonging to a particular stream */
  private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
    streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
  }

至此数据准备阶段完成,保存在streamIdToUnallocatedBlockQueues中的数据信息,在下一个批次生成Job时会被取出用于封装成RDD,且注册数据信息会转移至timeToAllocatedBlocks中。


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark Streaming Executor Dynami.. 下一篇Spark Streaming 数据计算阶段分析

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }