设为首页 加入收藏

TOP

Spark2.1.0——内置Web框架详解(五)
2019-09-17 18:20:02 】 浏览:84
Tags:Spark2.1.0 内置 Web 框架 详解
oleProgress", true) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None } _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, _env.securityManager, appName, startTime = startTime)) } else { // For tests, do not enable the UI None } _ui.foreach(_.bind())

这段代码的执行步骤如下。

1)  创建Spark状态跟踪器SparkStatusTracker。

2)  创建ConsoleProgressBar。可以配置spark.ui.showConsoleProgress属性为false取消对ConsoleProgressBar的创建,此属性默认为true。

3)  调用SparkUI的createLiveUI方法创建SparkUI。

4)  给SparkUI绑定端口。SparkUI继承自WebUI,因此调用了代码清单4-12中WebUI的bind方法启动SparkUI底层的Jetty服务。

         上述步骤中,第1)、2)、4)步都很简单,所以着重来分析第3)步。SparkUI的createLiveUI的实现如下。

  def createLiveUI(
      sc: SparkContext,
      conf: SparkConf,
      listenerBus: SparkListenerBus,
      jobProgressListener: JobProgressListener,
      securityManager: SecurityManager,
      appName: String,
      startTime: Long): SparkUI = {
    create(Some(sc), conf, listenerBus, securityManager, appName,
      jobProgressListener = Some(jobProgressListener), startTime = startTime)
  }

可以看到SparkUI的createLiveUI方法中调用了create方法。create的实现如下。

  private def create(
      sc: Option[SparkContext],
      conf: SparkConf,
      listenerBus: SparkListenerBus,
      securityManager: SecurityManager,
      appName: String,
      basePath: String = "",
      jobProgressListener: Option[JobProgressListener] = None,
      startTime: Long): SparkUI = {

    val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
      val listener = new JobProgressListener(conf)
      listenerBus.addListener(listener)
      listener
    }

    val environmentListener = new EnvironmentListener
    val storageStatusListener = new StorageStatusListener(conf)
    val executorsListener = new ExecutorsListener(storageStatusListener, conf)
    val storageListener = new StorageListener(storageStatusListener)
    val operationGraphListener = new RDDOperationGraphListener(conf)

    listenerBus.addListener(environmentListener)
    listenerBus.addListener(storageStatusListener)
    listenerBus.addListener(executorsListener)
    listenerBus.addListener(storageListener)
    listenerBus.addListener(operationGraphListener)

    new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
      executorsListener, _jobProgressListener, storageListener, operationGraphListener,
      appName, basePath, startTime)
  }

可以看到create方法里除了JobProgressListener是外部传入的之外,又增加了一些SparkListener,例如用于对JVM参数、Spark属性、Java系统属性、classpath等进行监控的EnvironmentListener;用于维护Executor的存储状态的StorageStatusListener;用于准备将Executor的信息展示在ExecutorsTab的ExecutorsListener;用于准备将Executor相关存储信息展示在BlockManagerUI的StorageListener;用于构建RDD的DAG(有向无关图)的RDDOperationGraphListener等。这5个SparkListener的实现添加到listenerBus的监听器列表中。最后使用SparkUI的构造器创建SparkUI。

SparkUI的初始化

  调用SparkUI的构造器创建SparkUI,实际也是对SparkUI的初始化过程。在介绍初始化之前,先来看看SparkUI中的两个成员属性。

  • killEnabled:标记当前SparkUI能否提供杀死Stage或者Job的链接。
  • appId:当前应用的ID。

SparkUI的构造过程中会执行initialize方法,其实现见代码清单13。

代码清单13         SparkUI的初始化

  def initialize() {
    val jobsTab = new JobsTab(this)
    attachTab(jobsTab)
    val stagesTa
首页 上一页 2 3 4 5 6 下一页 尾页 5/6/6
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇MySQL向数据库表的某字段追加数据 下一篇PHP+jQuery实现双击修改table表格

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目