classSparkContext(config: SparkConf)extendsLogging{/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func:(TaskContext, Iterator[T])=> U,
partitions: Seq[Int],
resultHandler:(Int, U)=> Unit): Unit ={if(stopped.get()){thrownewIllegalStateException("SparkContext has been shutdown")}
val callSite = getCallSite
val cleanedFunc =clean(func)logInfo("Starting job: "+ callSite.shortForm)if(conf.getBoolean("spark.logLineage",false)){logInfo("RDD's recursive dependencies:\n"+ rdd.toDebugString)}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()}/**
* Submit a job for execution and return a FutureJob holding the result.
*
* @param rdd target RDD to run tasks on
* @param processPartition a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
* @param resultFunc function to be executed when the result is ready
*/
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T]=> U,
partitions: Seq[Int],
resultHandler:(Int, U)=> Unit,
resultFunc:=> R): SimpleFutureAction[R]={assertNotStopped()
val cleanF =clean(processPartition)
val callSite = getCallSite
val waiter = dagScheduler.submitJob(
rdd,(context: TaskContext, iter: Iterator[T])=>cleanF(iter),
partitions,
callSite,
resultHandler,
localProperties.get)newSimpleFutureAction(waiter, resultFunc)}
/**
* Run an action job on the given RDD and pass all the results to the resultHandler function as
* they arrive.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @note Throws `Exception` when the job fails
*/
def runJob[T, U](
rdd: RDD[T],
func:(TaskContext, Iterator[T])=> U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler:(Int, U)=> Unit,
properties: Properties): Unit ={
val start = System.nanoTime
val waiter =submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {case scala.util.Success(_)=>logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm,(System.nanoTime - start)/1e9))case scala.util.Failure(exception)=>logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm,(System.nanoTime - start)/1e9))// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)throw exception
}}/**
* Submit an action job to the scheduler.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @return a JobWaiter object that can be used to block until the job finishes executing
* or can be used to cancel the job.
*
* @throws IllegalArgumentException when partitions ids are illegal
*/
def submitJob[T, U](
rdd: RDD[T],
func:(TaskContext, Iterator[T])=> U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler:(Int, U)=> Unit,
properties: Properties): JobWaiter[U]={// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p <0).foreach { p =>thrownewIllegalArgumentException("Attempting to access a non-existent partition: "+ p +". "+"Total number of partitions: "+ maxPartitions)}
val jobId = nextJobId.getAndIncrement()if(partitions.size ==0){// Return immediately if the job is running 0 tasksreturnnewJobWaiter[U](this, jobId,0, resultHandler)}assert(partitions.size >0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_])=> _]
val waiter =newJobWaiter(this, jobId, partitions.size, resultHandler)//实际上是将JOB put进一个LinkedBlockingDeque。另一个线程负责将JOB取出。另一个线程将执行下面的方法
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}//接收的上面JOB的事件private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {caseJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)=>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)caseMapStageSubmitted(jobId, dependency, callSite, listener, properties)=>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)caseStageCancelled(stageId, reason)=>
dagScheduler.handleStageCancellation(stageId, reason)caseJobCancelled(jobId, reason)=>
dagScheduler.handleJobCancellation(jobId, reason)caseJobGroupCancelled(groupId)=>
dagScheduler.handleJobGroupCancelled(groupId)case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()caseExecutorAdded(execId, host)=>
dagScheduler.handleExecutorAdded(execId, host)caseExecutorLost(execId, reason)=>
val workerLost = reason match {caseSlaveLost(_,true)=>truecase _ =>false}
dagScheduler.handleExecutorLost(execId, workerLost)caseWorkerRemoved(workerId, host, message)=>
dagScheduler.handleWorkerRemoved(workerId, host, message)caseBeginEvent(task, taskInfo)=>
dagScheduler.handleBeginEvent(task, taskInfo)caseSpeculativeTaskSubmitted(task)=>
dagScheduler.handleSpeculativeTaskSubmitted(task)caseGettingResultEvent(taskInfo)=>
dagScheduler.handleGetTaskResult(taskInfo)case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)caseTaskSetFailed(taskSet, reason, exception)=>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()}
override def submitTasks(taskSet: TaskSet){
val tasks = taskSet.tasks
logInfo("Adding task set "+ taskSet.id +" with "+ tasks.length +" tasks")this.synchronized{
val manager =createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage,newHashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId)= manager
val conflictingTaskSet = stageTaskSets.exists {case(_, ts)=>
ts.taskSet != taskSet &&!ts.isZombie
}if(conflictingTaskSet){thrownewIllegalStateException(s"more than one active taskSet for stage $stage:"+
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if(!isLocal &&!hasReceivedTask){
starvationTimer.scheduleAtFixedRate(newTimerTask(){
override def run(){if(!hasLaunchedTask){logWarning("Initial job has not accepted any resources; "+"check your cluster UI to ensure that workers are registered "+"and have sufficient resources")}else{this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}
hasReceivedTask =true}
backend.reviveOffers()}
RDD
abstractclassRDD[T: ClassTag](
@transientprivate var _sc: SparkContext,
@transientprivate var deps: Seq[Dependency[_]])extendsSerializable with Logging {
def map[U: ClassTag](f: T => U): RDD[U]= withScope {
val cleanF = sc.clean(f)newMapPartitionsRDD[U, T](this,(context, pid, iter)=> iter.map(cleanF))}
def reduce(f:(T, T)=> T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T]=> Option[T]= iter =>{if(iter.hasNext){Some(iter.reduceLeft(cleanF))}else{
None
}}
var jobResult: Option[T]= None
val mergeResult =(index: Int, taskResult: Option[T])=>{if(taskResult.isDefined){
jobResult = jobResult match {caseSome(value)=>Some(f(value, taskResult.get))case None => taskResult
}}}
sc.runJob(this, reducePartition, mergeResult)// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(thrownewUnsupportedOperationException("empty collection"))}
RDD的checkpoint机制
/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint(): Unit = RDDCheckpointData.synchronized{// NOTE: we use a global lock here due to complexities downstream with ensuring// children RDD partitions point to the correct parent partitions. In the future// we should revisit this consideration.if(context.checkpointDir.isEmpty){thrownewSparkException("Checkpoint directory has not been set in the SparkContext")}elseif(checkpointData.isEmpty){
checkpointData =Some(newReliableRDDCheckpointData(this))}}
checkpoint 的使用方式如下
val data = sc.textFile("/tmp/spark/1.data").cache()// 注意要cache
sc.setCheckpointDir("/tmp/spark/checkpoint")
data.checkpoint
data.count