import org.apache.hadoop.mapred.lib.InputSampler;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
public class SortByMapReduce {
/**
* @param args
* @throws URISyntaxException
* @throws IOException
*/
public static void main(String[] args) throws IOException, URISyntaxException {
runJob(args);
}
private static void runJob(String[] args) throws IOException, URISyntaxException {
JobConf conf = new JobConf(SortByMapReduce.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setJobName(”SortByMapReduce”);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setNumReduceTasks(5);
conf.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.RandomSampler
new InputSampler.RandomSampler
Path input = FileInputFormat.getInputPaths(conf)[0];
input = input.makeQualified(input.getFileSystem(conf));
Path partitionFile = new Path(input,”_partitions”);
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
InputSampler.writePartitionFile(conf, sampler);
URI partitionURI = new URI(partitionFile.toString() + “#_partitions”);
DistributedCache.addCacheFile(partitionURI, conf);
DistributedCache.createSymlink(conf);
JobClient.runJob(conf);
}
}