MapReduce的详细过程(二)

2015-07-24 07:41:40 · 作者: · 浏览: 21
,比如家乡(location),力气( 内存),财产(CPU),君王(RM)通过锦衣卫(NM)来掌握各个地方(Node)壮丁的使用情况。 当有百姓提出一个要求(提交一个Job),比如兴修水利,君王不再事无巨细的过问这件事情,而是叫 一个合适的大臣(AM)过来,比如此例中的水利大臣,问他需要多少人,多少钱,然后衡量一下国力, 播一些壮丁给他用。水利大臣可以使用全国范围内的壮丁,对他们有绝对的领导权,让他们干嘛就得干 嘛。事情要么圆满完成,水利大臣给君王报喜,要么发现难度太大啊,尝试了好多办法都失败了(job尝 试次数到达一定数量),只好回去请罪。

君王遵循政务公开的原则,所有job的运行情况都可以通过50030端口查看:


\

\

好了,讲了这么一大通,我想关于Job怎么跑起来,task怎么来怎么没,应该有个概念了。用户将自 己的代码上传到集群的一个client Node上,运行代码,代码里会对自己的job进行配置,比如输入在 哪,有哪些依赖的jar包,输出写到哪,以什么格式写,然后提交给ResourceManager,ResourceManager 会在一个Node上启动ApplicationMaster负责用户的这个Job,AM申请资源,得到RM的批准和分配 后,在得到的Container里启动MapTask和ReduceTask,这两种task会调用我们编写的Mapper和Reducer等代码,完成任务。任务的运行情况可以通过web端口查看。

MapReduce计算框架最重要的两个类是Mapper和Reducer,用户可以继承这两个类完成自己的 业务逻辑,下面以这两个类的输入输出为主线详细讲解整个过程。例子总是最容易被人理解的,所以讲解过程有看不懂的,可以回来查看这个简单的job。用户想使用MapReduce的过程统计一组文件中每 个单词出现的次数,与经典的WordCount不同的是,要求大写字母开头的单词写到一个文件里面,小写 的写到另一个文件中。

Mapper的输入

\

?

所谓源码之前,了无秘密,先上mapper的源码。

Mapper的源码

?

?

public class Mapper
   
     { /*** The 
    Context passed on to the {@link Mapper} implementations. */ public abstract class Contextimplements MapContext
    
      { } /*** Called once at the beginning of the task. */ protected void setup(Context context) throws IOException, InterruptedException { // NOTHING } /*** Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function.*/ @SuppressWarnings("unchecked")protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /*** Called once at the end of the task. */ protected void cleanup(Context context) throws IOException, InterruptedException { // NOTHING } /*** Expert users can override this method for more complete control over the * execution of the Mapper.* @param context* @throws IOException*/ public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } } 
    
   

可以简单的说,Mapper的输入来自于Context。我们先看一下MapContext的实现:

?

?

public class MapContextImpl
  
   extends TaskInputOutputContextImpl
   
     implements MapContext
    
      { private RecordReader
     
       reader; private InputSplit split; public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader
      
        reader, RecordWriter
       
         writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { super(conf, taskid, writer, committer, reporter); this.reader = reader;this.split = split; } /** * Get the input split for this map. */ public InputSplit getInputSplit() { return split; } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); } } 
       
      
     
    
   
  

?

MapContextImpl类组合了两个类型的对象,即InputSplit和RecordReader,并封装了获取输入 的Key和Value的方法,在深入探讨InputSplit和RecordReader之前,我们先看一下这个Context是 怎么传递给我们编写的Mapper函数的。下面是我从MapTask类中摘出的一段代码:

public class MapTask extends Task {
private 
  
   
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
f