MapReduce的详细过程(六)

2015-07-24 07:41:40 · 作者: · 浏览: 14
ws IOException, InterruptedException { output.write(key, value); } }
注意到了有个RecordWriter类,跟我们在上文分析过的RecordReader一看就是兄弟嘛,作用你 也肯定猜到了,就是将一个Key value对写入到输出文件中。回过头来看它的输出类是怎么从MapTask 中传入的:

?

?

org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}


?

判断条件满足时,说明这个Job没有ReduceTask,这时RecordWriter被实例化成了 NewDirectOutputCollector,否则的话,实例化为NewOutputCollector。来具体看看这两个内部类。

?

private class NewDirectOutputCollector
  
   
extends org.apache.hadoop.mapreduce.RecordWriter
   
     { private final org.apache.hadoop.mapreduce.RecordWriter out; ... out = outputFormat.getRecordWriter(taskContext); ... out.write(key, value); } 
   
  


?

前者直接调用了OutputFormat来实例化自己,我们写Job的时候一般会指定Job的OutputFormat,这个类在MapTask中是通过反射的方式引入的。可见,第一个分支的逻辑是会直接把map的输出写入到 我们整个Job的输出当中。具体是怎么个写入的过程,我们留到reduce的输出中讲,毕竟那里才是最常规的会写输出文件的地方。

?

private class NewOutputCollector
  
   
extends org.apache.hadoop.mapreduce.RecordWriter
   
     { private final MapOutputCollector
    
      collector; private final org.apache.hadoop.mapreduce.Partitioner
     
       partitioner; private final int partitions; @SuppressWarnings("unchecked") NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { collector = createSortingCollector(job, reporter); partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner
      
       ) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner
       
        () { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; } } @Override public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions)); } @Override public void close(TaskAttemptContext context ) throws IOException,InterruptedException { try { collector.flush(); } catch (ClassNotFoundException cnf) { throw new IOException("can't find class ", cnf); } collector.close(); } }
       
      
     
    
   
  


?

?

这个内部类有两个成员变量,一个是MapOutputCollector,一个是Partitioner。最终的写入调 用的是MapOutputCollector的Write方法完成的。Partitioner的名气更大一些,我们先来介绍。

Partitioner

但凡了解一点MapReduce的人应该都知道这个类,它的作用是根据Key将Map的输出分区,然后 发送给Reduce线程。有多少个Partition,就对应有多少个Reduce线程。Reduce线程的个数是可以 在配置文件中设定的。上面代码的逻辑就是先读一下这个配置,看一下需要分到少个分区,如果分区数 少于1,就实例化出一个Partitioner的默认实现,否则的话,用反射读取用户设置的实现类。

我们一般只重写它的一个方法:getPartition,参数是一个Key Value对以及Partition的总数,比 较常见的实现是取Key的hashcode再对总的分区数取模。

注意,为了提高整个job的运行速度,reduce task应该尽可能均匀的接收Map的输出。partition 作为Map输出分配的唯一参考标准,映射规则至关重要,partition返回值一样的Map的输出,将会交 给一个reducetask,在实际工作中,我们就遇到了partition返回值不合理,好多Mapper的输出都压 在一个reduce的task上,造成这个reducetask执行非常缓慢,整体的job一直结束不了的情况。尽可 能均匀的分配partition!


?

MapOutputCollector

这个Collector我们可以自己实现,不过不是很常见。它有一个默认实现,叫MapOutputBuffer。有关MapOutputBuffer的分析,文献[4]有非常清晰的解释,值得一看。

MapOutputBuffer

Combiner的意思是局部的reduce,它可以在job配置的时候指定,实现的逻辑也跟reduce一致, Combiner的作用是可以减少Mapper和Reducer之