hadoop2.7.2 MapReduce Job提交源码及切片源码分析
- 首先从
waitForCompletion
函数进入
boolean result = job.waitForCompletion(true);
/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
// 首先判断state,当state为DEFINE时可以提交,进入 submit() 方法
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
- 进入
submit()
方法
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
// 确认JobState状态为可提交状态,否则不能提交
ensureState(JobState.DEFINE);
// 设置使用最新的API
setUseNewAPI();
// 进入connect()方法,MapReduce作业提交时连接集群是通过Job类的connect()方法实现的,
// 它实际上是构造集群Cluster实例cluster
connect();
// connect()方法执行完之后,定义提交者submitter
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
// 这里的核心方法是submitJobInternal(),顾名思义,提交job的内部方法,实现了提交job的所有业务逻辑
// 进入submitJobInternal
return submitter.submitJobInternal(Job.this, cluster);
}
});
// 提交之后state状态改变
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
- 进入
connect()
方法
- MapReduce作业提交时连接集群通过Job的Connect方法实现,它实际上是构造集群Cluster实例cluster
- cluster是连接MapReduce集群的一种工具,提供了获取MapReduce集群信息的方法
- 在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol的实例client,它由ClientProtocolProvider的静态create()方法构造
- 在create内部,Hadoop2.x中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {// cluster提供了远程获取MapReduce的方法
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
// 只需关注这个Cluster()构造器,构造集群cluster实例
return new Cluster(getConfiguration());
}
});
}
}
- 进入
Cluster()
构造器
// 首先调用一个参数的构造器,间接调用两个参数的构造器
public Cluster(Configuration conf) throws IOException {
this(null, conf);
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
// 最重要的initialize方法
initialize(jobTrackAddr, conf);
}
// cluster中要关注的两个成员变量是客户端通讯协议提供者ClientProtocolProvider和客户端通讯协议ClientProtocol实例client
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug(&q