使用Hadoop MapReduce 进行排序(二)

2014-11-24 07:15:02 · 作者: · 浏览: 7
mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.InputSampler;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
public class SortByMapReduce {

/**
* @param args
* @throws URISyntaxException
* @throws IOException
*/
public static void main(String[] args) throws IOException, URISyntaxException {
runJob(args);
}

private static void runJob(String[] args) throws IOException, URISyntaxException {

JobConf conf = new JobConf(SortByMapReduce.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setJobName(”SortByMapReduce”);

conf.setInputFormat(CxfInputFormat.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setNumReduceTasks(5);
conf.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.RandomSampler sampler =
new InputSampler.RandomSampler(0.1,10000,10);

Path input = FileInputFormat.getInputPaths(conf)[0];
input = input.makeQualified(input.getFileSystem(conf));
Path partitionFile = new Path(input,”_partitions”);
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
InputSampler.writePartitionFile(conf, sampler);

URI partitionURI = new URI(partitionFile.toString() + “#_partitions”);
DistributedCache.addCacheFile(partitionURI, conf);
DistributedCache.createSymlink(conf);
JobClient.runJob(conf);
}
}