uot;Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
// 如果配置文件没有配置YARN信息,则构建LocalRunner,MR任务本地运行
// 如果配置文件有配置YARN信息,则构建YarnRunner,MR任务在YARN集群上运行
if (jobTrackAddr == null) {
// 客户端通讯协议client是调用ClientProtocolProvider的create()方法实现
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: ", e);
}
}
}
if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}
- 进入
submitJobInternal()
,job的内部提交方法,用于提交job到集群
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
// 检查结果的输出路径是否已经存在,如果存在会报异常
checkSpecs(job);
// conf里边是集群的xml配置文件信息
Configuration conf = job.getConfiguration();
// 添加MR框架到分布式缓存中
addMRFrameworkToDistributedCache(conf);
// 获取提交执行时相关资源的临时存放路径
// 参数未配置时默认是(工作空间根目录下的)/tmp/hadoop-yarn/staging/提交作业用户名/.staging
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {//记录提交作业的主机IP、主机名,并且设置配置信息conf
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
// 获取JobId
JobID jobId = submitClient.getNewJobID();
// 设置jobId
job.setJobID(jobId);
// 提交作业的路径Path(Path parent, String child),会将两个参数拼接为一个路径
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
// job的状态
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key&quo