设为首页 加入收藏

TOP

spark SchedulerBackend源码解析
2019-03-29 01:19:56 】 浏览:88
Tags:spark SchedulerBackend 源码 解析

Spark 程序在运行的时候分为 Driver 和 Executor 两部分
Spark 程序编写是基于 SparkContext 的,具体来说包含两方面
Spark 编程的核心 基础-RDD 是由 SparkContext 来最初创建的(第一个RDD一定是由 SparkContext 来创建的)
Spark 程序的调度优化也是基于 SparkContext,首先进行调度优化。
Spark 程序的注册时通过 SparkContext 实例化时候生产的对象来完成的(其实是 SchedulerBackend 来注册程序)
Spark 程序在运行的时候要通过 Cluster Manager 获取具体的计算资源,计算资源获取也是通过 SparkContext 产生的对象来申请的(其实是 SchedulerBackend 来获取计算资源的)

SparkContext 崩溃或者结束的时候整个 Spark 程序也结束啦!

[下图是 SparkContext 在创建核心对象后的流程图]

  1. SparkContext 構建的頂級三大核心:DAGScheduler,TaskScheduler,SchedulerBackend,其中:
  • DAGScheduler 是面向 Job 的 Stage 的高層調度器;
  • TaskScheduler 是一個接口,是低層調度器,根據具體的 ClusterManager 的不同會有不同的實現,Standalone 模式下具體的實現 TaskSchedulerImpl;
  • SchedulerBackend 是一個接口,根據具體的 ClusterManager 的不同會有不同的實現,Standalone 模式下具體的實現是SparkDeploySchedulerBackend

從整個程序運行的角度來講,SparkContext 包含四大核心對象:DAGScheduler,TaskScheduler,SchedulerBackend,MapOutputTrackerMaster

SparkDeploySchedulerBackend 有三大核心功能:

  • 負責與 Master 連接注冊當前程序 RegisterWithMaster
  • 接收集群中為當前應用程序而分配的計算資源 Executor 的注冊並管理 Executors;
  • 負責發送 Task 到具體的 Executor 執行

補充說明的是SparkDeploySchedulerBackend 是被TaskSchedulerImpl 來管理的!

创建 SparkContext 的核心对象
  • 程序一开始运行时会实例化 SparkContext 里的东西,所以不在方法里的成员都会被实例化!一开始实例化的时候第一个关键的代码是createTaskScheduler,它是位于 SparkContext 的 Primary Constructor 中,当它实例化时会直接被调用,这个方法返回的是 taskScheduler 和 dagScheduler 的实例,然后基于这个内容又构建了 DAGScheduler,然后调用taskScheduler 的 start( ) 方法,要先创建taskScheduler然后再创建 dagScheduler,因为taskScheduler是受dagScheduler管理的。
    [下图是 SparkContext.scala 中的创建 schedulerBackend 和 taskSchdulerImpl 的实例对象]
  • 调用 createTaskSchedule,这个方法创建了 TaskSchdulerImpl 和 SparkDeploySchedulerBackend,接受第一个参数是 SparkContext 对象本身,然后是字符串,(这也是你平时转入 master 里的字符串)
    [下图是 HelloSpark.scala 中创建 SparkConf 和 SparkContext 的上下文信息]

    [下图是 SparkContext.scala 中的 createTaskScheduler 方法]
  • 它会判断一下你的 master 是什么然后具体进行不同的操作!假设我们是Spark 集群模式,它会:
    [下图是 SparkContext.scala 中的 SparkMasterRegex 静态对象]
  • 创建 TaskSchedulerImpl 实例然后把 SparkContext 传进去;
  • 匹配集群中 master 的地址 e.g. spark://
  • 创建 SparkDeploySchedulerBackend 实例,然后把 taskScheduler (这里是 TaskSchedulerImpl)、SparkContext 和 master 地址信息传进去;
  • 调用 taskScheduler (这里是 TaskSchedulerImpl) 的 initialize 方法最后返回 (SparkDeploySchedulerBackend,TaskSchedulerImpl) 的实例对象
  • SparkDeploySchedulerBackend 是被TaskSchedulerImpl 來管理的,所以这里要首先把 scheduler 创建,然后把scheduler 的实例传进去。
    [下图是 SparkContext.scala 中的调用模式匹配 SPARK_REGEX 的处理逻辑]
  • Task 默认失败后重新启动次数为 4 次
    [下图是 TaskSchedulerImpl.scala 中的类和主构造器的调用方法]

TaskSchedulerImpl.initialize( )方法是

  • 创建一个 Pool 来初定义资源分布的模式 Scheduling Mode,默认是先进先出的模式。

调用taskScheduler 的 start( ) 方法

  • 在这个方法中再调用 backend (SparkDeploySchedulerBackend) 的 start( ) 方法。

  • 當通過SparkDeploySchedulerBackend注冊程序給 Master 的時候會把以上的 command 提交給 Master

CoarseGrainedExecutorBackend
  • Master 發指令給 Worker 去啟動 Executor 所有的進程的時候加載的 Main 方法所在的入口類就是 command 中的CoarseGrainedExecutorBackend,當然你可以實現自己的 ExecutorBackend,在CoarseGrainedExecutorBackend 中啟動 Executor (Executor 是先注冊再實例化),Executor 通過线程池並發執行 Task。

  • 这里调用了它的 run 方法


  • 注冊成功后再实例化
SparkDeploySchedulerBackend 的 start 方法内幕
  • 然后创建一个很重要的对象,AppClient 对象,然后调用它的 client (AppClient) 的start( ) 方法,创建一个 ClientEndpoint 对象。

  • 它是一个 RpcEndPoint,然后接下来的故事就是向 Master注冊,首先调用自己的 onStart 方法

  • 然后再调用 registerWithMaster 方法

  • 从registerWithMaster 调用 tryRegisterAllMasters,开一条新的线程来注冊,然后发送一条信息(RegisterApplication 的case class )给 Master,注冊是通过 Thread 来完成的。


    ApplicationDescription 的 case class

Master 接受程序的注冊
  • Master 收到了这个信息便开始注冊,注冊后最后再次调用 schedule( ) 方法



】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark2.0.2测试spark-shell 下一篇2014年spark开发者大赛火热进行中..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目