设为首页 加入收藏

TOP

MapReduce之Job提交流程源码和切片源码分析(二)
2019-09-30 16:46:46 】 浏览:92
Tags:MapReduce Job 提交 流程 源码 切片 分析
uot;Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { // 如果配置文件没有配置YARN信息,则构建LocalRunner,MR任务本地运行 // 如果配置文件有配置YARN信息,则构建YarnRunner,MR任务在YARN集群上运行 if (jobTrackAddr == null) { // 客户端通讯协议client是调用ClientProtocolProvider的create()方法实现 clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; } else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: ", e); } } } if (null == clientProtocolProvider || null == client) { throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); } }
  1. 进入submitJobInternal(),job的内部提交方法,用于提交job到集群
JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    // 检查结果的输出路径是否已经存在,如果存在会报异常
    checkSpecs(job);

    // conf里边是集群的xml配置文件信息
    Configuration conf = job.getConfiguration();
    // 添加MR框架到分布式缓存中
    addMRFrameworkToDistributedCache(conf);

    // 获取提交执行时相关资源的临时存放路径
    // 参数未配置时默认是(工作空间根目录下的)/tmp/hadoop-yarn/staging/提交作业用户名/.staging
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {//记录提交作业的主机IP、主机名,并且设置配置信息conf
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    // 获取JobId
    JobID jobId = submitClient.getNewJobID();
    // 设置jobId
    job.setJobID(jobId);
    // 提交作业的路径Path(Path parent, String child),会将两个参数拼接为一个路径
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    // job的状态
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key&quo
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇刷脸支付袭来,WeChat Pay & AliP.. 下一篇pt-archiver归档数据 源库和目标..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目