?
?
啊哈,InputSplit原来是RecordReader的一个参数啊。recordReader从InputSplit描述的输入里 取出一个KeyValue,作为mapper.map()方法的输入,跑一遍Map方法。打个比方,InputSplit像一 桌大餐,吃还是得一口一口吃,怎样算一口,就看RecordReader怎么实现了。
好了,如果我想自己实现InputSplit和RecordReader,应该写在哪呢?下面就讲InputFormat。
InputFormat
上文我们提到了InputFormat,这个类我们在配置Job的时候经常会指定它的实现类。先来看接口。
public abstract class InputFormat{ public abstract List getSplits(JobContext context) throws IOException, InterruptedException; public abstract RecordReader createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
?
明白了吧,InputSplit是在getSplit函数里面算出来的,RecordReader也是在这里Create出来 的。如果你想以自己的方式读取输入,就可以自己写一个InputFormat的实现类,重写里面的方法。
当然,如果你说我很懒,不想自己写怎么办?好办,之所以要用框架,很重要的一点就是人家提供 了默认实现啦。WordCount里面一般用的是TextInputFormat,我们看一下它的实现。
public class TextInputFormat extends FileInputFormatimplements JobConfigurable { public RecordReader getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); String delimiter = job.get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) { recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); } return new LineRecordReader(job, (FileSplit) genericSplit, recordDelimiterBytes); } }
有没有一下明白了的感觉?它实现了自己的getRecordReader方法,里面从配置中取了Delimiter, 这个东西的默认值是"\n"!然后返回了以Delimiter划分的一个LineRecordReader,知道为什么你制定了InputFormat之后,Mapper里面读到的就是一行一行的输入了吧。
在我们加强版的WordCount里,也完全可以使用默认实现的TextInputFormat。关于Mapper的 输入暂时就讲这些,下面我们来看Mapper的输出。
Mapper的输出
注意到上文贴出的Mapper的默认实现的map方法中,是将Key和Value直接写入到context当中,我们已经知道了context是从MapContextImpl来的,那这个Write方法是怎么回事?
Context.Write的来历
Write方法是它从MapContextImpl父类TaskInputOutputContextImpl继承来的,看一下这个类 的部分代码:
?
public abstract class TaskInputOutputContextImplextendsTaskAttemptContextImpl implements TaskInputOutputContext { private RecordWriter output; private OutputCommitter committer; public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriter output, OutputCommitter committer, StatusReporter reporter) { super(conf, taskid, reporter); this.output = output; this.committer = committer; } /** * Generate an output key/value pair. */ public void write(KEYOUT key,VALUEOUT value ) thro