纯干活:通过WourdCount程序示例:详细讲解MapReduce之Block+Split+Shuffle+Map+Reduce的区别及数据处理流程。
Shuffle过程是MapReduce的核心,集中了MR过程最关键的部分。要想了解MR,Shuffle是必须要理解的。了解Shuffle的过程,更有利于我们在对MapReduce job性能调优的工作有帮助,以及进一步加深我们队MR内部机理的了解。Shuffle到底是什么,自己在参考一位大牛两年前的博客,关于MR系列的文章中,才知道前辈什么时候已经开始相应的工作,真实佩服。这里通过对前辈的概念梳理,加上自己的见解,来尽可能的梳理清楚什么是Shuffle过程,什么是block,什么是split,揭开MR的神秘面纱。
在上篇博客中简单给出了Shuffle的概念,稍提了一下split,但没有谈block。在了解Shuffle之间我们要先了解一下block与split。这里的一片数据,你可以理解成一个split数据。但split和block的区别是什么?
1、Block块:
你把文件上传到HDFS中,第一步就是数据的划分,这个是真实物理上的划分,数据文件上传到HDFS后,要把文件划分成一块一块,每块的大小可以有hadoop-default.xml里配置选项进行划分。这里默认每块64MB,一个文件被分成多个64MB大小的小文件,最后一个可能于64MB。注意:64MB只是默认,是可以更改的,下面会谈到如何更改。
dfs.block.size
67108864
The default block size for new files.
数据的划分有冗余,冗余的概念来自哪儿?为了保证数据的安全,上传的文件是被复制成3份,当一份数据宕掉,其余的可以即刻补上。当然这只是默认。
dfs.replication
3
Default block replication.The actual number of replications can be specified
when the file is created.The default is used if replication is not specified in create time.
2、Split块:
Hadoop中,有另一种关于数据的划分。这里定义了一个InputFormat接口,其中一个方法就是getSplits方法。这里就谈到了split。由于Hadoop版本更新换代很快,不同版本中的split的划分是由不同的job任务来完成的。早早先的版本split是有JobTracker端弯沉的,后来的版本是由JobClient完成的,JobClient划分好后,把split.file写入HDFS中,到时候JobTracker端读这个文件,就知道split是怎样划分的了。这种数据的划分其实只是一种逻辑上的划分,目的是为了让Map Task更好的获取数据。
例如:
File1:Block11,Block12,Block13,Block14,Block15
File2:Block21,Block22,Block23
如果用户在程序中指定map tasks的个数,比如说是2,如果不指定maptasks个数默认是1,那么在FileInputFormat(最常见的InputFormat实现)的getSplits方法中,首先会计算totalSize=8(源码中定义,注意getSplits这个函数计算的单位是Block个数,而不是Byte个数,后面有个变量叫bytesremaining表示剩余的 Block个数,不要根据变量名就认为是度byte的字数),然后会计算goalSize=totalSize/numSplits=4,对于File1,计算一个Split 有多少个Block就是这样计算的。
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
protected long computeSplitSize(long goalSize, long minSize, long blockSize)
{
return Math.max(minSize, Math.min(goalSize, blockSize));
}
这里minSize是1(说明了1个Split至少包含1个Block,不会出现一个Split包含零点几个Block的情况),计算得出splitSize=4,所以接下来Split划分是这样分的:
Split 1: Block11, Block12, Block13,Block14
Split 2: Block15
Split 3: Block21, Block22, Block23
那用户指定的map个数是2,出现了三个split怎么办?在JobInProgress里其实maptasks的个数是根据Splits的长度来指定的,所以用户指定的map个数只是个参考。可以参看里的代码:
JobInProgress: initTasks()
try {
splits = JobClient.readSplitFile(splitFile);
}
finally {
splitFile.close();
}
numMapTasks = splits.length;
maps = new TaskInProgress[numMapTasks];
所以问题就很清晰了,如果用户指定了20个map作业,那么最后会有8个Split(每个Split一个Block),所以最后实际上就有8个MapTasks,也就是说maptask的个数是由splits的长度决定的。
几个简单的结论:
1)一个split大于等于1的整数个Block
2) 一个split不会包含两个File的Block,不会跨越File边界
3) split和Block的关系是一对多的关系,默认一对一
4) maptasks的个数最终决定于splits的长度
还有一点要说明:在FileSplit类中,有一项是private String[] hosts;看上去是说明这个FileSplit是放在哪些机器上的,实际上hosts里只是存储了一个Block的冗余机器列表。比如上面例子中的Split 1: Block11, Block12, Block13,Block14,这个FileSplit中的hosts里最终存储的是Block11本身和其冗余所在的机器列表,也就是说 Block12,Block13,Block14这些块存在的那些机器上没有在FileSplit中记录,并包含Block与Split之间的关系记录。
FileSplit中的这个属性有利于调度作业时候的数据本地性问题(数据本地性:作用很大,更有利于MapReduce性能调优)。如果一个tasktracker前来索取task,jobtracker就会找个task给它,找到一个maptask,得先看这个task的输入的FileSplit里hosts是否包含tasktracker所在机器,也就是判断和该tasktracker同时存在一个机器上的datanode是否拥有FileSplit中某个Block的备份。
简单的说了一下Block与Split之间的联系与区别。其实