Inlast course, we learned "How the Spark Streaming Job is generated dynamically".
Fromthat course, we have known there are 3 key classes for Spark Streaming Job:
JobScheduler: schedules Spark Streaming Jobs
JobGenerator: generates Spark Streaming Job
ReceiverTracker: manages the execution of the receivers of ReceiverInputDStreams
In this course, we will learn more about the inside of Spark Streaming JobScheduler.
We need to always remember that JobScheduler class is the center of Spark Streaming Job scheduling, you can regard it as Spark Core's DAGScheduler.
Let's check some important member variables, innder classes and member functions of JobScheduler class.
Here is the whole picture of JobScheduler class:
JobScheduler class hasfollowing important member variables:
jobGenerator
Ateach batchDuration interval, the start of jobGenerator will call DStreamGraph to generate RDD graph, and finally generate Job
receiverTracker
The start of receiverTracker willstart the Receiver on Executor of Spark Cluster, and have it to be ready to accept input streaming
inputInfoTracker
A tracker totrack all input streaming information (a.k.a meta data) as well as processed record number
jobSets
A set of Jobs belong to the same batch. There are information stored such as time of the batch, jobsof the batchand a map of input stream id to itsinput info
jobExecutor
A thread pool including fixed number of threads to execute all of jobs in jobSet
The number of threadscan be configuredby spark.streaming.concurrentJobs configuration parameter, default is 1
eventLoop
An event loop to receive events from the caller and process all events in the event thread
The events all are instances of JobSchedulerEvent trait
listenerBus
An instance of StreamingListenerBus class
It will asynchronously pass StreamingListenerEvents to registered StreamingListeners
JobScheduler class hasfollowing important inner classes:
JobSchedulerEvent
A private sealed trait interface class which can only be extended by other inner class(s) within JobScheduler class
Here it isused as alternative to enums of event types for JobScheduler
Theparameter of JobScheduler eventLoop's onReceive functionis declared asJobSchedulerEvent class
JobStarted
A private class which extends JobSchedulerEvent
It has 2 parameters in its constructor function: job and start time
This event type will call handleJobStarted function
JobCompleted
A private class which extends JobSchedulerEvent
It has 2 parameters in its constructor function: job and completed time
This event type will call handleJobCompleted function
ErrorReported
A private class which extends JobSchedulerEvent
It has 2 parameters in its constructor function:error messageand throwable exception
This event type will call handleError function
JobHandler
A private class which extends Runnable interface
It implementsfunction: run()
It has 1 parameter in its constructor function: job
In its run() function, it sets job description, posts JobStarted eventto eventLoop, calls job's run() function which calls job's func passed in job's constructor function, and posts JobCompleted event to eventLoop
JobScheduler class hasfollowing important memeber functions:
start()
Itisonly called bya new thread named "streaming-start"in StreamingContext#start function
Here is theStreamingContext#start function:
In JobScheduler#start function, the most 2 keythings are to start a EventLoop to listen JobSchedulerEvent and call related function, and start a JobGenerator to generate a Spark Streaming Job
Here is the JobScheduler#start function:
processEvent(JobSchedulerEvent)
In JobScheduler#processEvent function,different incoming JobSchedulerEventpost by JobHandlersuch asJobStarted/JobCompleted/ErrorReportedleads tocalldifferent handler function such as handleJobStart/handleJobCompletion/handleError
Here is the JobScheduler#processEvent function:
submitJobSet(JobSet)
In JobScheduler#submitJobSet function, the jobSet with its specific time will be put intoa jobSets map, and each Job in the jobSet will be executed by a thread in jobExecutor thread pool, which actually calls JobHandler's run function
Also post StreamingListenerBatchSubmitted streaming event to streaminglistener bus
Here is the JobScheduler#submitJobSet function:
handleJobStart(Job, Long)
Set processingStartTime of the jobSet if it is not set
Set startTime of the job
Also post StreamingListenerBatchStarted and StreamingListenerOutputOperationStarted streaming events to streaminglistener bus
Here is the JobScheduler#handleJobStart function:
handleJobCompletion(Job, Long)
Set processingEndTime of the jobSet if it is not set
Set completedTime of the job
If jobSet has completed, remove it from jobSets map, and call jobGenerator#onBatchCompletion to clear related meta data
Also postStreamingListenerOutputOperationCompleted and StreamingListenerBatchCompletedstreaming events to streaminglistener bus
Here is the JobScheduler#handleJobCompletion function:
handleError(String, Throwable)
Here is the JobScheduler#handleError function:
stop(Boolean)
In JobScheduler#stop function, please note the order to stop/shutdown receiverTracker#stop -> jobGenerator#stop -> jobExecutor#shutdown -> listenerBus#stop -> eventLoop#stop