君王遵循政务公开的原则,所有job的运行情况都可以通过50030端口查看:

好了,讲了这么一大通,我想关于Job怎么跑起来,task怎么来怎么没,应该有个概念了。用户将自 己的代码上传到集群的一个client Node上,运行代码,代码里会对自己的job进行配置,比如输入在 哪,有哪些依赖的jar包,输出写到哪,以什么格式写,然后提交给ResourceManager,ResourceManager 会在一个Node上启动ApplicationMaster负责用户的这个Job,AM申请资源,得到RM的批准和分配 后,在得到的Container里启动MapTask和ReduceTask,这两种task会调用我们编写的Mapper和Reducer等代码,完成任务。任务的运行情况可以通过web端口查看。
MapReduce计算框架最重要的两个类是Mapper和Reducer,用户可以继承这两个类完成自己的 业务逻辑,下面以这两个类的输入输出为主线详细讲解整个过程。例子总是最容易被人理解的,所以讲解过程有看不懂的,可以回来查看这个简单的job。用户想使用MapReduce的过程统计一组文件中每 个单词出现的次数,与经典的WordCount不同的是,要求大写字母开头的单词写到一个文件里面,小写 的写到另一个文件中。Mapper的输入

?
所谓源码之前,了无秘密,先上mapper的源码。
Mapper的源码
?
?
public class Mapper{ /*** The Contextpassed on to the {@link Mapper} implementations. */ public abstract class Contextimplements MapContext{ } /*** Called once at the beginning of the task. */ protected void setup(Context context) throws IOException, InterruptedException { // NOTHING } /*** Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function.*/ @SuppressWarnings("unchecked")protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /*** Called once at the end of the task. */ protected void cleanup(Context context) throws IOException, InterruptedException { // NOTHING } /*** Expert users can override this method for more complete control over the * execution of the Mapper.* @param context* @throws IOException*/ public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } }
可以简单的说,Mapper的输入来自于Context。我们先看一下MapContext的实现:
?
?
public class MapContextImplextends TaskInputOutputContextImpl implements MapContext { private RecordReader reader; private InputSplit split; public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader reader, RecordWriter writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { super(conf, taskid, writer, committer, reporter); this.reader = reader;this.split = split; } /** * Get the input split for this map. */ public InputSplit getInputSplit() { return split; } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); } }
?
MapContextImpl类组合了两个类型的对象,即InputSplit和RecordReader,并封装了获取输入 的Key和Value的方法,在深入探讨InputSplit和RecordReader之前,我们先看一下这个Context是 怎么传递给我们编写的Mapper函数的。下面是我从MapTask类中摘出的一段代码:
public class MapTask extends Task {
private
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
f