oleProgress", true) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}
_ui.foreach(_.bind())
这段代码的执行步骤如下。
1) 创建Spark状态跟踪器SparkStatusTracker。
2) 创建ConsoleProgressBar。可以配置spark.ui.showConsoleProgress属性为false取消对ConsoleProgressBar的创建,此属性默认为true。
3) 调用SparkUI的createLiveUI方法创建SparkUI。
4) 给SparkUI绑定端口。SparkUI继承自WebUI,因此调用了代码清单4-12中WebUI的bind方法启动SparkUI底层的Jetty服务。
上述步骤中,第1)、2)、4)步都很简单,所以着重来分析第3)步。SparkUI的createLiveUI的实现如下。
def createLiveUI(
sc: SparkContext,
conf: SparkConf,
listenerBus: SparkListenerBus,
jobProgressListener: JobProgressListener,
securityManager: SecurityManager,
appName: String,
startTime: Long): SparkUI = {
create(Some(sc), conf, listenerBus, securityManager, appName,
jobProgressListener = Some(jobProgressListener), startTime = startTime)
}
可以看到SparkUI的createLiveUI方法中调用了create方法。create的实现如下。
private def create(
sc: Option[SparkContext],
conf: SparkConf,
listenerBus: SparkListenerBus,
securityManager: SecurityManager,
appName: String,
basePath: String = "",
jobProgressListener: Option[JobProgressListener] = None,
startTime: Long): SparkUI = {
val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
val listener = new JobProgressListener(conf)
listenerBus.addListener(listener)
listener
}
val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
listenerBus.addListener(operationGraphListener)
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, operationGraphListener,
appName, basePath, startTime)
}
可以看到create方法里除了JobProgressListener是外部传入的之外,又增加了一些SparkListener,例如用于对JVM参数、Spark属性、Java系统属性、classpath等进行监控的EnvironmentListener;用于维护Executor的存储状态的StorageStatusListener;用于准备将Executor的信息展示在ExecutorsTab的ExecutorsListener;用于准备将Executor相关存储信息展示在BlockManagerUI的StorageListener;用于构建RDD的DAG(有向无关图)的RDDOperationGraphListener等。这5个SparkListener的实现添加到listenerBus的监听器列表中。最后使用SparkUI的构造器创建SparkUI。
SparkUI的初始化
调用SparkUI的构造器创建SparkUI,实际也是对SparkUI的初始化过程。在介绍初始化之前,先来看看SparkUI中的两个成员属性。
- killEnabled:标记当前SparkUI能否提供杀死Stage或者Job的链接。
- appId:当前应用的ID。
SparkUI的构造过程中会执行initialize方法,其实现见代码清单13。
代码清单13 SparkUI的初始化
def initialize() {
val jobsTab = new JobsTab(this)
attachTab(jobsTab)
val stagesTa