设为首页 加入收藏

TOP

spark任务运行完成后在driver端的处理逻辑(一)
2019-09-17 15:32:51 】 浏览:86
Tags:spark 任务 运行 完成 后在 driver 处理 逻辑

回顾

上一篇,我们分析了了任务在executor端的运行流程,任务运行结束后,在Executor.launchTask方法最后,通过调用execBackend.statusUpdate方法将任务结果以及任务状态发送给driver。回到driver端,我们在driver的rpc服务端DriverEndPoint的receive方法中寻找对StatusUpdate消息的处理逻辑。

DriverEndpoint.receive

case StatusUpdate(executorId, taskId, state, data) =>
    // 通知TaskScheduler任务已完成
    scheduler.statusUpdate(taskId, state, data.value)
    // 如果任务已经运行结束了,包括FINISHED, FAILED, KILLED, LOST这几种状态
    // 那么说明任务占用的资源已经释放了,此时就可以回收这部分资源并重新分配任务
    if (TaskState.isFinished(state)) {
      executorDataMap.get(executorId) match {
        case Some(executorInfo) =>
          executorInfo.freeCores += scheduler.CPUS_PER_TASK
          makeOffers(executorId)
        case None =>
          // Ignoring the update since we don't know about the executor.
          logWarning(s"Ignored task status update ($taskId state $state) " +
            s"from unknown executor with ID $executorId")
      }
    }   

所以重点是scheduler.statusUpdate调用

TaskSchedulerImpl.statusUpdate

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
synchronized {
  try {
    taskIdToTaskSetManager.get(tid) match {
      case Some(taskSet) =>
        // 这个状态不明,没看什么地方会产生这个状态
        if (state == TaskState.LOST) {
          // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
          // where each executor corresponds to a single task, so mark the executor as failed.
          val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
            "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
          if (executorIdToRunningTaskIds.contains(execId)) {
            reason = Some(
              SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
            removeExecutor(execId, reason.get)
            failedExecutor = Some(execId)
          }
        }
        // 任务运行结束,包括这几种状态FINISHED, FAILED, KILLED, LOST
        if (TaskState.isFinished(state)) {
          // 清除关于这个task的一些簿记量
          cleanupTaskState(tid)
          // 将这个task从正在运行的task集合中移除
          taskSet.removeRunningTask(tid)
          if (state == TaskState.FINISHED) {
            // 启动一个线程,用来异步地处理任务成功的情况
            taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
          } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
            taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
          }
        }
      case None =>
        logError(
          ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
            "likely the result of receiving duplicate task finished status updates) or its " +
            "executor has been marked as failed.")
            .format(state, tid))
    }
  } catch {
    case e: Exception => logError("Exception in statusUpdate", e)
  }
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
  assert(reason.isDefined)
  dagScheduler.executorLost(failedExecutor.get, reason.get)
  backend.reviveOffers()
}
}

这里,启动了一个异步任务,用来处理任务成功的情况,所以我们分析一下异步任务的处理逻辑。

TaskResultGetter.enqueueSuccessfulTask

def enqueueSuccessfulTask(
  taskSetManager: TaskSetManager,
  tid: Long,
  serializedData: ByteBuffer): Unit = {
// 启动一个异步任务
getTaskResultExecutor.execute(new Runnable {
  override def run(): Unit = Utils.logUncaughtExceptions {
    try {
      // 对传回的结果进行反序列化
      val (result, size) = serializer.get().deserialize[TaskResult[_]](seri
首页 上一页 1 2 3 4 5 6 7 下一页 尾页 1/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇PyQt5(2)、垃圾分类小程序(2)——.. 下一篇微服务链路追踪原理

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目