设为首页 加入收藏

TOP

Mapreduce的排序(全局排序、分区加排序、Combiner优化)(二)
2019-09-17 18:53:09 】 浏览:69
Tags:Mapreduce 排序 全局 分区 Combiner 优化
); //设置Mapper输出数据类型 job.setMapOutputKeyClass(FlowSortBean.class); job.setMapOutputValueClass(Text.class); //设置Reducer输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowSortBean.class); //设置自定义分区 job.setPartitionerClass(FlowSortPartitioner.class); job.setNumReduceTasks(5); //设置文件输入输出类型 FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\flowsort\\in")); FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\flowsort\\partitionout")); //提交任务 if (job.waitForCompletion(true)){ System.out.println("运行完成!"); }else { System.out.println("运行失败!"); } } }

  注意:再写Mapper类的时候,要注意KV对输出的数据类型,Key的类型一定要为FlowSortBean,因为在Mapper和Reducer之间进行的排序(只是排序)是通过Mapper输出的Key来进行排序的,而分区可以指定是通过Key或者Value。

 

四、Combiner合并

  Combiner是在MR之外的一个组件,可以用来在maptask输出到环形缓冲区溢写之后,分区排序完成时进行局部的汇总,可以减少网络传输量,进而优化MR程序。

  Combiner是用在当数据量到达一定规模之后的,小的数据量并不是很明显。

  例如WordCount程序,当单词文件的大小到达一定程度,可以使用自定义Combiner进行优化:

public class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{
	protected void reduce(Text key,Iterable<IntWritable> values,Context context){
		//计数
		int count = 0;
		
		//累加求和
		for(IntWritable v:values){
			count += v.get();
		}
		//输出
		context.write(key,new IntWritable(count));
	}
}

  然后再Driver类中设置使用Combiner类

job.setCombinerClass(WordCountCombiner.class);

  如果仔细观察,WordCount的自定义Combiner类与Reducer类是完全相同的,因为他们的逻辑是相同的,即在maptask之后的分区内先进行一次累加求和,然后到reducer后再进行总的累加求和,所以在设置Combiner时也可以这样:

job.setCombinerClass(WordCountReducer.class);

 

  注意:Combiner的应用一定要注意不能影响最终业务逻辑的情况下使用,比如在求平均值的时候:

  mapper输出两个分区:3,5,7  =>avg=5

            2,6    =>avg=4

  reducer合并输出:  5,4     =>avg=4.5  但是实际应该为4.6,错误!

  所以在使用Combiner时要注意其不会影响最中的结果!!!

 

首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇mysql8.0版本skip-grant-tables出.. 下一篇Mapreduce的序列化和流量统计程序..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目