MapReduce的详细过程(十二)

2015-07-24 07:41:40 · 作者: · 浏览: 20
ontext, String name, String extension) { TaskID taskId = context.getTaskAttemptID().getTaskID(); int partition = taskId.getId(); StringBuilder result = new StringBuilder(); result.append(name); result.append('-'); result.append( TaskID.getRepresentingCharacter(taskId.getTaskType())); result.append('-'); result.append(NUMBER_FORMAT.format(partition)); result.append(extension); return result.toString(); }
这个方法提供了一种生成唯一输出文件名的功能,之所以需要这个,是因为多个task都输出,万一起名冲突就坏了。命名的规则是以用户提供的字符串开头,然后是task的类型(map-m, reduce-r),最后是这个task所属的partition。这种方式保证了在当前job生成的结果中文件名是唯一的。标示出task得类型和partition有很大的好处,我们在实际工作中就有过这种体会。我们有一个每小时都会运行的job,跑完一个小时的数据需要15分钟的样子,但是每天0点都会跑的特别慢,也不报错,通过查看生成文件,我们发现标示为r的partition号为3的那个task总是最后生成文件,而且比其他partition的都要明显大,最终确定了是交给这个partition的数据太多造成的。对于一开始用户提供的前缀,当然可以是任何形式,但是我们强烈建议缀上时间戳。在我们的实践中,当前小时生成的数据有可能需要拷贝回前面的文件夹,默认提供的命名方式只能保证在当前job生成的输出文件是唯一的,没法保证与之前的不冲突,我们的做法是在前缀上加时间戳,这样可以方便的分辨哪些是后来加入的文件。

OutputCommitter
从源码的注释,我们知道OutputCommitter负责下面的工作:
? 在job启动时setup job。例如,在job的启动期间建立临时的输出目录。
? 在job结束是clean up job。比如,job结束之后删掉临时输出目录。
? 建立task的临时输出
? 检测一个task需不需要提交自己的输出
? 提交task的输出
? 丢弃task的输出
这么列出来,感觉比较空洞,我讲一下我的理解。正如前文提到的,OutputCommitter的主要职责是建立起task执行的临时目录,然后验证这个task需不需要将自己的输出的结果提交,提交到哪里。对于产生的临时目录和写入的临时文件,也要负责清理干净。
OutputCommitter有很多需要实现的方法,我列一下:

?

public abstract class OutputCommitter {
public abstract void setupJob(JobContext jobContext) throws IOException;
public void cleanupJob(JobContext jobContext) throws IOException { }
public void commitJob(JobContext jobContext) throws IOException {
cleanupJob(jobContext);
}
public void abortJob(JobContext jobContext, JobStatus.State state)
throws IOException {
cleanupJob(jobContext);
}
public abstract void setupTask(TaskAttemptContext taskContext)
throws IOException;
public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
throws IOException;
public abstract void commitTask(TaskAttemptContext taskContext)
throws IOException;
public abstract void abortTask(TaskAttemptContext taskContext)
throws IOException;
public boolean isRecoverySupported() {
return false;
}
public void recoverTask(TaskAttemptContext taskContext)
throwsIOException
{}

?

?

方法名比较准确的反应方法需要实现的功能。下面我们看一下与FileOutputFormat对应的Committer。

FileOutputCommitter

前面已经提到了,OutputCommitter最重要的就是目录的建立删除以及拷贝,那么要理解一个 Committer的行为,只要专注它是怎么操作目录的就可以了。

在FileOutputCommitter里,有三个四种目录,四种目录分别包括

· 最终的Job输出目录

· 临时的Job目录

· task的提交目录

· task的临时目录

task每次在task临时目录中工作,如果确定成功并且需要被提交,就会提交到task的提交目录中。 task的提交目录实际上跟临时Job的目录是一个目录,当一个job的所有task都顺利执行之后,会从临时 job目录提交到最终的输出目录。

之所以有这么多跳,其实还是基于task很可能会执行失败的假设,这种方式,在task失败的时候, 可以直接清掉它的目录重来,效率上肯定要差一些。因此我的同事写过一个DirectFileOutputCommitter,当task执行成功时,直接提交到最终的工作目录。这种方式虽然在一定程度上提高了效率,可有个风险, 当这个job失败需要重新执行的时候,就得事先清一下最终的输出目录。

在实践的时候,我们常常通过在一个目录下生成"_SUCCESS"文件来标记这个目录已经完成,一个很好的生成时机就是Committer的commitJob方法。

RecordWriter

这个类的介绍非常普通,它做的事情也很简单,就是将一对KeyValue的pair写入到输出文件中。他的接口很简单:

?

public abstract class RecordWriter
  
    {
/**
* Writes a key/value pair.
*
* @param key the key to write.
* @param value the value to write.
* @throws IOException
*/
public abstract void write(K key,V value
) throws IOException, InterruptedException;
/**