?
init
public void init(Context context) throws IOException, ClassNotFoundException;
collect
?
public synchronized void collect(K key, V value, int partition) throws IOException;
对外最常调用的接口。判定参数传入的参数类型与用户对job的配置一不一致(”Type mismatch in key from map\value“),当缓冲区没有足够的位置存放当前键值对时,将缓冲区的内容溢出写 到磁盘,否则的话,序列化键值对,写入到缓冲区数组,并将这个键值对的位置信息连同partition编号 写入到index数组里。
?
flush
?
public synchronized void collect(K key, V value, int partition) throws IOException;
?
当map所有的输出都收集完了之后,处理残留在缓冲区,没有溢写到磁盘的数据。
sortAndSpill
?
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException;
?
?
溢写的关键逻辑,其中会调用排序函数和combiner。Combiner的逻辑与reducer的完全一样,相 当于每个map线程的局部预处理,通过对局部数据的合并,来起到减少shuffle阶段数据量的作用。
spillSingleRecord
?
private void spillSingleRecord(K key, V value, int partition) throws IOException;
?
当缓冲区没有达到溢出条件,并且放不下当前这条记录的时候会调用的方法,主要用来处理大键值 对的边界条件。这种情况直接写磁盘。
compare&&swap?
public int compare(int mi, int mj) {
int kvi = this.offsetFor(mi % this.maxRec);
int kvj = this.offsetFor(mj % this.maxRec);
int kvip = this.kvmeta.get(kvi + 2);
int kvjp = this.kvmeta.get(kvj + 2);
return kvip != kvjp?kvip - kvjp:this.comparator.compare(this.kvbuffer, this.kvmeta.get(kvi + 1), this.kvmeta.get(kvi + 0) - this.kvmeta.get
(kvi + 1), this.kvbuffer, this.kvmeta.get(kvj + 1), this.kvmeta.get(kvj + 0) - this.kvmeta.get(kvj + 1));
}
public void swap(int mi, int mj) {
int iOff = mi % this.maxRec * 16;
int jOff = mj % this.maxRec * 16;
System.arraycopy(this.kvbuffer, iOff, this.META_BUFFER_TMP, 0, 16);
System.arraycopy(this.kvbuffer, jOff, this.kvbuffer, iOff, 16);
System.arraycopy(this.META_BUFFER_TMP, 0, this.kvbuffer, jOff, 16);
}
?
?
从这两个函数可以猜出排序函数的行为。代码里出现的kvmeta就是上文中提到的index数组,他是 kvbuffer的一种int视角,比较的对象就是它的两个元素,如果有乱序情况,交换的也是这两个元素的位 置。
mergeParts
?
private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException;
当文件全部溢出之后,会调用这个方法将小文件合并成大文件。
?

?
合并前后的示意图还是很形象的。最终在shuffle的时候,只要根据index查找对应的数据就可以了。?
业务场景
我一直没有想过MapTask是否会对输出自动排序,直到有一天我正真需要自己动手修改业务代码。
我在的组做的是数据处理,在我们的业务场景中,有两种数据结构,event和session,用户在电商网站上操作时,会在后台产生一系列的event,比如你查询了一件商品,后台就有一个查询event产生。event用guid和timestamp唯一标示,可能还含有其他的属性(比如ip等),guid可以简单的理解成用 户的一种标示,event说白了是某个用户在某一时刻产生的某种动作。session的意思某个用户在一段连 续时间内产生的动作集合,比event的抽象层次更高,它用sessionId和timestamp来标示,也有诸如这 个session一共包含了多少个event这种统计信息。sessionId跟guid一样,某个用户在一定时间内是唯 一的,session的timestamp取的是这段时间这个用户的第一个event的timestamp。
好了,我们需要写一个MapReduce的job,输入是event,输出是session。在map阶段,从event 里面提取出key,然后同一个用户产生的event,应该一起在reduce阶段统计。既然有时序的问题,是 不是在统计之前应该先排个序?可我翻遍了代码,都没有找到对key排序的逻辑,是前辈代码的巨大bug ?
当然不是,在我们将guid与timestamp作为key输出时,MapTask已经按照这两个字段做了排序。 注意,这种有序,指的只是当前MapTask局部输出的有序。从Mapper的输出,到真正Reducer的输入,还有很重要的一个过程要走。?
Shuffle
从语义上说,Shuffle应该是Map和Reduce中间的过程,从源码的代码结构上看,shuffle过程是 在reduceTask中得。前段时间在考公司的hadoop测试的时候,有这种变态的问题,说下面属于reduce 过程的操作有。。至今不知道正确答案是什么。
ReduceTask有三个Phase,即copyPhase,sortPhase和reducePhase,主流的做法应该是将前两个phase归为Shuffle阶段,reducephase作为狭义的reduce过程。
ShuffleConsumerPlugin
Shuffle过程通过调用抽象类ShuffleConsumerPlugin来完成,它有个实现类,就叫做“Shuffle”。下面是Shuffle类最主要的run方法的实现