从代码中可以看出,我们在客户端提交Job之前所进行的配置以JobContext的形式传递给了MapTask, 在MapTask的runNewMapper()的方法中,它使用反射实例化出了用户指定的Mapper类和inputFormat 类,并新建了InputSplit和RecorderReader用来实例化上面提交的MapContext,MapContext以参数的形式被传递给了包装类WrappedMapper,在对input进行初始化后,以
mapper.run(mapperContext);
的形式正式调用我们用户的代码。
InputSplit
源码中对InputSplit类的描述是:
?
/**
* InputSplit represents the data to be processed by an
* individual {@link Mapper}. *
*
Typically, it presents a byte-oriented view on the input and is the * responsibility of {@link RecordReader} of the job to process this and present * a record-oriented view.*/ public abstract class InputSplit {?
public abstract long getLength() throws IOException, InterruptedException;?
public abstractString[] getLocations() throws IOException, InterruptedException; }
?
?
更易于理解的表述是,InputSplit决定了一个Map Task需要处理的输入。更进一步的,有多少个 InputSplit,就对应了多少个处理他们的Map Task。从接口上来看,InputSplit中并没有存放文件的内 容,而是指定了文件的文件的位置以及长度。
既然inputsplit与MapTask之间是一一对应的,那么我们就可以通过控制InputSplit的个数来调整 MapTask的并行性。当文件量一定时,InputSplit越小,并行性越强。inputsplit的大小并不是任意的, 虽然最大值和最小值都可以通过配置文件来指定,但是最大值是不能超过一个block大小的。
Block是什么?用户通过HDFS的接口,看到的是一个完整文件层面,在HDFS底层,文件会被切成固 定大小的Block,并冗余以达到可靠存储的目的。一般默认大小是64MB,可以调节配置指定。
InputSplit是从字节的角度来描述输入的,回头查看一下Mapper,它里面没有这种东西啊,用到的 Key,Value是从哪来的?有请RecordReader。
RecordReader
?
RecordReader
按照惯例,先上源码:
?
public abstract class RecordReaderimplements Closeable { /** * Called once at initialization. * @param split the split that defines the range of records to read * @param context the information about the task * @throws IOException * @throws InterruptedException */ public abstract void initialize(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; /** * Read the next key, value pair. * @return true if a key/value pair was read * @throws I