MapReduce的详细过程(五)

2015-07-24 07:41:40 · 作者: · 浏览: 15
OException * @throws InterruptedException */ public abstract boolean nextKeyValue() throws IOException, InterruptedException; /** * Get the current key * @return the current key or null if there is no current key * @throws IOException * @throws InterruptedException */ public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; /** * Get the current value. * @return the object that was read * @throws IOException * @throws InterruptedException */ public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; /** * The current progress of the record reader through its data. * @return a number between 0.0 and 1.0 that is the fraction of the data read * @throws IOException * @throws InterruptedException */ public abstract float getProgress() throws IOException, InterruptedException; /** * Close the record reader. */ public abstract void close() throws IOException; }

?

?

啊哈,InputSplit原来是RecordReader的一个参数啊。recordReader从InputSplit描述的输入里 取出一个KeyValue,作为mapper.map()方法的输入,跑一遍Map方法。打个比方,InputSplit像一 桌大餐,吃还是得一口一口吃,怎样算一口,就看RecordReader怎么实现了。

好了,如果我想自己实现InputSplit和RecordReader,应该写在哪呢?下面就讲InputFormat。

InputFormat

上文我们提到了InputFormat,这个类我们在配置Job的时候经常会指定它的实现类。先来看接口。

public abstract class InputFormat
  
    { 
public abstract List
   
     getSplits(JobContext context) throws IOException, InterruptedException; public abstract RecordReader
    
      createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; } 
    
   
  


?

明白了吧,InputSplit是在getSplit函数里面算出来的,RecordReader也是在这里Create出来 的。如果你想以自己的方式读取输入,就可以自己写一个InputFormat的实现类,重写里面的方法。

当然,如果你说我很懒,不想自己写怎么办?好办,之所以要用框架,很重要的一点就是人家提供 了默认实现啦。WordCount里面一般用的是TextInputFormat,我们看一下它的实现。


public class TextInputFormat extends FileInputFormat
  
   
implements JobConfigurable {
public RecordReader
   
     getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); String delimiter = job.get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) { recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); } return new LineRecordReader(job, (FileSplit) genericSplit, recordDelimiterBytes); } } 
   
  

有没有一下明白了的感觉?它实现了自己的getRecordReader方法,里面从配置中取了Delimiter, 这个东西的默认值是"\n"!然后返回了以Delimiter划分的一个LineRecordReader,知道为什么你制定了InputFormat之后,Mapper里面读到的就是一行一行的输入了吧。

在我们加强版的WordCount里,也完全可以使用默认实现的TextInputFormat。关于Mapper的 输入暂时就讲这些,下面我们来看Mapper的输出。

Mapper的输出

注意到上文贴出的Mapper的默认实现的map方法中,是将Key和Value直接写入到context当中,我们已经知道了context是从MapContextImpl来的,那这个Write方法是怎么回事?

Context.Write的来历

Write方法是它从MapContextImpl父类TaskInputOutputContextImpl继承来的,看一下这个类 的部分代码:

?

public abstract class TaskInputOutputContextImpl
  
   
extendsTaskAttemptContextImpl
implements TaskInputOutputContext
   
     { private RecordWriter
    
      output; private OutputCommitter committer; public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriter
     
       output, OutputCommitter committer, StatusReporter reporter) { super(conf, taskid, reporter); this.output = output; this.committer = committer; } /** * Generate an output key/value pair. */ public void write(KEYOUT key,VALUEOUT value ) thro