);
//设置Mapper输出数据类型
job.setMapOutputKeyClass(FlowSortBean.class);
job.setMapOutputValueClass(Text.class);
//设置Reducer输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowSortBean.class);
//设置自定义分区
job.setPartitionerClass(FlowSortPartitioner.class);
job.setNumReduceTasks(5);
//设置文件输入输出类型
FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\flowsort\\in"));
FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\flowsort\\partitionout"));
//提交任务
if (job.waitForCompletion(true)){
System.out.println("运行完成!");
}else {
System.out.println("运行失败!");
}
}
}
注意:再写Mapper类的时候,要注意KV对输出的数据类型,Key的类型一定要为FlowSortBean,因为在Mapper和Reducer之间进行的排序(只是排序)是通过Mapper输出的Key来进行排序的,而分区可以指定是通过Key或者Value。
四、Combiner合并
Combiner是在MR之外的一个组件,可以用来在maptask输出到环形缓冲区溢写之后,分区排序完成时进行局部的汇总,可以减少网络传输量,进而优化MR程序。
Combiner是用在当数据量到达一定规模之后的,小的数据量并不是很明显。
例如WordCount程序,当单词文件的大小到达一定程度,可以使用自定义Combiner进行优化:
public class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{
protected void reduce(Text key,Iterable<IntWritable> values,Context context){
//计数
int count = 0;
//累加求和
for(IntWritable v:values){
count += v.get();
}
//输出
context.write(key,new IntWritable(count));
}
}
然后再Driver类中设置使用Combiner类
job.setCombinerClass(WordCountCombiner.class);
如果仔细观察,WordCount的自定义Combiner类与Reducer类是完全相同的,因为他们的逻辑是相同的,即在maptask之后的分区内先进行一次累加求和,然后到reducer后再进行总的累加求和,所以在设置Combiner时也可以这样:
job.setCombinerClass(WordCountReducer.class);
注意:Combiner的应用一定要注意不能影响最终业务逻辑的情况下使用,比如在求平均值的时候:
mapper输出两个分区:3,5,7 =>avg=5
2,6 =>avg=4
reducer合并输出: 5,4 =>avg=4.5 但是实际应该为4.6,错误!
所以在使用Combiner时要注意其不会影响最中的结果!!!