版权声明:分享的快乐。。 https://blog.csdn.net/baolibin528/article/details/50801604
Partitioner
HashPartitioner、TotalOrderPartitioner、KeyFieldBasedPartitioner、BinaryPartitioner
public abstract class Partitioner<KEY, VALUE> {
public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}
1)、HashPartitioner是mapreduce的默认partitioner。计算方法是which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
2)、BinaryPartitioner继承于Partitioner<BinaryComparable, V>,是Partitioner的偏特化子类。该类提供leftOffset和rightOffset,在计算which reducer时仅对键值K的[rightOffset,leftOffset]这个区间取hash。Which reducer=(hash & Integer.MAX_VALUE) % numReduceTaskspublic int getPartition(BinaryComparable key, V value, int numPartitions) {
int length = key.getLength();
int leftIndex = (leftOffset + length) % length;
int rightIndex = (rightOffset + length) % length;
int hash = WritableComparator.hashBytes(key.getBytes(),
leftIndex, rightIndex - leftIndex + 1);
return (hash & Integer.MAX_VALUE) % numPartitions;
}
3)、KeyFieldBasedPartitioner也是基于hash的个partitioner。和BinaryPatitioner不同,它提供了多个区间用于计算hash。当区间数为0时KeyFieldBasedPartitioner退化成HashPartitioner。 public int getPartition(K2 key, V2 value, int numReduceTasks) {
byte[] keyBytes;
List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
if (allKeySpecs.size() == 0) {
return getPartition(key.toString().hashCode(), numReduceTasks);
}
try {
keyBytes = key.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not " +
"support UTF-8 encoding!", e);
}
// return 0 if the key is empty
if (keyBytes.length == 0) {
return 0;
}
int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0,
keyBytes.length);
int currentHash = 0;
for (KeyDescription keySpec : allKeySpecs) {
int startChar = keyFieldHelper.getStartOffset(keyBytes, 0,
keyBytes.length, lengthIndicesFirst, keySpec);
// no key found! continue
if (startChar < 0) {
continue;
}
int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length,
lengthIndicesFirst, keySpec);
currentHash = hashCode(keyBytes, startChar, endChar,
currentHash);
}
return getPartition(currentHash, numReduceTasks);
}
protected int hashCode(byte[] b, int start, int end, int currentHash) {
for (int i = start; i <= end; i++) {
currentHash = 31*currentHash + b[i];
}
return currentHash;
}
protected int getPartition(int hash, int numReduceTasks) {
return (hash & Integer.MAX_VALUE) % numReduceTasks;
}
4)、TotalOrderPartitioner这个类可以实现输出的全排序。不同于以上3个partitioner,这个类并不是基于hash的。public class TotalOrderPartitioner<K extends WritableComparable<>,V>
extends Partitioner<K,V> implements Configurable {
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
Configuration conf) throws IOException {
...
}
}
readPartitions()方法从给定的IFile读取切割点。1、如果所有要排序的数据都用一个reduce来进行排序,数据量少的时候可以,如果数据量大的时候就不可行了。
2、可以创建一些列排好序的文件,然后串联起来,part-r-00000的最大值比part-r-00001的最小值要小,依次下去。。
主要思路是使用一个partitioner来描述全局排序的输出。
由此我们可以归纳出这样一个用hadoop对大量数据排序的步骤:
1) 对待排序数据进行抽样;
2) 对抽样数据进行排序,产生标尺;
3) Map对输入的每条数据计算其处于哪两个标尺之间;将数据发给对应区间ID的reduce
4) Reduce将获得数据直接输出。InputSampler:public class InputSampler<K,V> extends Configured implements Tool {
/**
* Samples the first n records from s splits.
* Inexpensive way to sample random data.
*/
public static class SplitSampler<K,V> implements Sampler<K,V> {
...
}
/**
* Sample from random points in the input.
* General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
* each split.
*/
public static class RandomSampler<K,V> implements Sampler<K,V> {
...
}
/**
* Sample from s splits at regular intervals.
* Useful for sorted data.
*/
public static class IntervalSampler<K,V> implements Sampler<K,V> {
...
}
/**
* 编写一个分区文件给给定的Job,使用提供的取样器
* Queries the sampler for a sample keyset, sorts by the output key
* comparator, selects the keys for each rank, and writes to the destination
* returned from {@link TotalOrderPartitioner#getPartitionFile}.
*/
public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler)
throws IOException, ClassNotFoundException, InterruptedException {
...
}
}
输入采样类,可以对输入目录下的数据进行采样。InputSampler类实现了Sampler接口,目的是创建一个顺序文件来存储定义分区的键。提供了3种采样方法
类名称、采样方式、构造方法、效率、特点
1)、SplitSampler<K,V> | 对前n个记录进行采样 | 采样总数,划分数 | 最高
2)、RandomSampler<K,V> | 遍历所有数据,随机采样 | 采样频率,采样总数,划分数 | 最低
3)、IntervalSampler<K,V> | 固定间隔采样 | 采样频率,划分数 | 中 |
IntervalSampler<K,V>对有序的数据十分适用
InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>( 0.1, 10000, 10);
RandomSampler的三个参数分别是采样率、最大样本数、最大分区。/**
* Create a new RandomSampler.
* @param freq Probability with which a key will be chosen.
* @param numSamples Total number of samples to obtain from all selected
* splits.
* @param maxSplitsSampled The maximum number of splits to examine.
*/
public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
this.freq = freq;
this.numSamples = numSamples;
this.maxSplitsSampled = maxSplitsSampled;
}