回顾
上一篇,我们分析了了任务在executor端的运行流程,任务运行结束后,在Executor.launchTask方法最后,通过调用execBackend.statusUpdate方法将任务结果以及任务状态发送给driver。回到driver端,我们在driver的rpc服务端DriverEndPoint的receive方法中寻找对StatusUpdate消息的处理逻辑。
DriverEndpoint.receive
case StatusUpdate(executorId, taskId, state, data) =>
// 通知TaskScheduler任务已完成
scheduler.statusUpdate(taskId, state, data.value)
// 如果任务已经运行结束了,包括FINISHED, FAILED, KILLED, LOST这几种状态
// 那么说明任务占用的资源已经释放了,此时就可以回收这部分资源并重新分配任务
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
所以重点是scheduler.statusUpdate调用
TaskSchedulerImpl.statusUpdate
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
synchronized {
try {
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
// 这个状态不明,没看什么地方会产生这个状态
if (state == TaskState.LOST) {
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the executor as failed.
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
// 任务运行结束,包括这几种状态FINISHED, FAILED, KILLED, LOST
if (TaskState.isFinished(state)) {
// 清除关于这个task的一些簿记量
cleanupTaskState(tid)
// 将这个task从正在运行的task集合中移除
taskSet.removeRunningTask(tid)
if (state == TaskState.FINISHED) {
// 启动一个线程,用来异步地处理任务成功的情况
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
case None =>
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates) or its " +
"executor has been marked as failed.")
.format(state, tid))
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
}
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
assert(reason.isDefined)
dagScheduler.executorLost(failedExecutor.get, reason.get)
backend.reviveOffers()
}
}
这里,启动了一个异步任务,用来处理任务成功的情况,所以我们分析一下异步任务的处理逻辑。
TaskResultGetter.enqueueSuccessfulTask
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
serializedData: ByteBuffer): Unit = {
// 启动一个异步任务
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
// 对传回的结果进行反序列化
val (result, size) = serializer.get().deserialize[TaskResult[_]](seri