设为首页 加入收藏

TOP

Spark天堂之门
2019-04-14 13:29:28 】 浏览:24
Tags:Spark 天堂

1, Spark程序在运行的时候分为Driver和Executors两部分;

2, Spark的程序编写是基于SparkContext的,具体来说包含两方面:

 a)Spark编程的核心基础---RDD,是由SparkContext来最初创建(第一个RDD一定是由SparkContext来创建的);
 b)Spark程序的调度优化也是基于SparkContext;

3, Spark程序的注册时通过SparkContext实例化时候生产的对象来完成的(其实是SchedulerBackend来注册程序)

4, Spark程序运行的时候要通过Cluster Manager获得具体的计算资源,计算资源的获取也是通过SparkContext产生的对象来申请的(其实是SchedulerBackend来获取计算资源的);

5, SparkContext**崩溃或者结束**的时候整个Spark程序也结束啦!

总结:
SparkContext**开启**天堂之门:Spark程序是通过SparkContext发布到Spark集群的;
SparkContext**导演**天堂世界:Spark程序的运行都是在SparkContext为核心的调度器的指挥下进行的;
SparkContext**关闭**天堂之门:SparkContext崩溃或者结束的时候整个Spark程序也结束啦!

二:SparkContext使用案例鉴赏
这里写图片描述
三:SparkContext天堂内幕

1, SparkContext构建的顶级三大核心对象:DAGScheduler、TaskScheduler、ShedulerBackend,其中:

a) DAGScheduler是面向Job的Stage的高层

调度器;

b) TaskScheduler是一个接口,根据具体的ClusterManager的不同会有不同的实现,Standalone模式下具体的实现是TaskSchedulerImpl;

c) SchedulerBackend是一个接口,根据具体的ClusterManager的不同会有不同的实现,Standalone模式下具体的实现是SparkDeploySchedulerBackend;

2,从整个程序运行的角度来讲,SparkContext包含四大核心对象:DAGScheduler、TaskScheduler、ShedulerBackend、MapOutputTrackerMaster。

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// start TaskScheduler after taskScheduler sets DAGScheduler reference inDAGScheduler's
// constructor
_taskScheduler.start()

createTaskScheduler:

case SPARK_REGEX(sparkUrl) =>
  val scheduler = new TaskSchedulerImpl(sc)
  val masterUrls = sparkUrl.split(",").map("spark://" + _)
  val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
  scheduler.initialize(backend)
  (backend, scheduler)

在sheduler.initialize调用的时候会创建ShedulerPool

this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
  schedulingMode match {
    case SchedulingMode.FIFO =>
      new FIFOSchedulableBuilder(rootPool)
    case SchedulingMode.FAIR =>
      new FairSchedulableBuilder(rootPool, conf)
  }
}
schedulableBuilder.buildPools()

SparkDeploySchedulerBackend有三大核心功能:
(1)负责与Master链接注册当前程序
(2)接收集群中为当前应用程序而分配的计算资源Executor的注册并管理Executors
(3)负责发送Task到具体的 Executor执行
补充说明的是:SparkDeploySchedulerBackend是被TaskSchedulerImpl来管理的!

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
  args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)

当通过SparkDeploySchedulerBackend注册程序给Master的时候会把上述command提交给Master,Master发指令给Worker去启动Executor所在的进程的时候加载的main方法所在的入口类就是command中的CoarseGrainedExecutorBackend,当然你可以实现自己的ExecutorBackened,在CoarseGrainedExecutorBackend中启动Executor(Executor是先注册再实例化),Executor通过线程池并发执行Task

private[spark] case class ApplicationDescription(
    name: String,
    maxCores: Option[Int],
    memoryPerExecutorMB: Int,
    command: Command,
    appUiUrl: String,
    eventLogDir: Option[URI] = None,
    // short name of compression codec used when writing event logs, if any (e.g. lzf)
    eventLogCodec: Option[String] = None,
    coresPerExecutor: Option[Int] = None,
    user: String = System.getProperty("user.name", "<unknown>")) {

  override def toString: String = "ApplicationDescription(" + name + ")"
}

编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Maven在一个项目中整合Java Spark.. 下一篇PySpark读取Mysql数据到DataFrame..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }