设为首页 加入收藏

TOP

MapReduce之Job提交流程源码和切片源码分析(一)
2019-09-30 16:46:46 】 浏览:84
Tags:MapReduce Job 提交 流程 源码 切片 分析

hadoop2.7.2 MapReduce Job提交源码及切片源码分析

  1. 首先从waitForCompletion函数进入
boolean result = job.waitForCompletion(true);
/**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    // 首先判断state,当state为DEFINE时可以提交,进入 submit() 方法
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }
  1. 进入submit()方法
/**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    // 确认JobState状态为可提交状态,否则不能提交
    ensureState(JobState.DEFINE);
    // 设置使用最新的API
    setUseNewAPI();
    // 进入connect()方法,MapReduce作业提交时连接集群是通过Job类的connect()方法实现的,
    // 它实际上是构造集群Cluster实例cluster
    connect();
    // connect()方法执行完之后,定义提交者submitter
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        // 这里的核心方法是submitJobInternal(),顾名思义,提交job的内部方法,实现了提交job的所有业务逻辑
          // 进入submitJobInternal
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    // 提交之后state状态改变
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }
  1. 进入connect()方法
  • MapReduce作业提交时连接集群通过Job的Connect方法实现,它实际上是构造集群Cluster实例cluster
  • cluster是连接MapReduce集群的一种工具,提供了获取MapReduce集群信息的方法
  • 在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol的实例client,它由ClientProtocolProvider的静态create()方法构造
  • 在create内部,Hadoop2.x中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的
  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {// cluster提供了远程获取MapReduce的方法
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     // 只需关注这个Cluster()构造器,构造集群cluster实例
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }
  1. 进入Cluster()构造器
// 首先调用一个参数的构造器,间接调用两个参数的构造器
public Cluster(Configuration conf) throws IOException {
    this(null, conf);
  }

  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    // 最重要的initialize方法
    initialize(jobTrackAddr, conf);
  }
  
// cluster中要关注的两个成员变量是客户端通讯协议提供者ClientProtocolProvider和客户端通讯协议ClientProtocol实例client
  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug(&q
首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇刷脸支付袭来,WeChat Pay & AliP.. 下一篇pt-archiver归档数据 源库和目标..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目