一、MR排序的分类
1.部分排序:MR会根据自己输出记录的KV对数据进行排序,保证输出到每一个文件内存都是经过排序的;
2.全局排序;
3.辅助排序:再第一次排序后经过分区再排序一次;
4.二次排序:经过一次排序后又根据业务逻辑再次进行排序。
二、MR排序的接口——WritableComparable
该接口继承了Hadoop的Writable接口和Java的Comparable接口,实现该接口要重写write、readFields、compareTo三个方法。
三、流量统计案例的排序与分区
/** * @author: PrincessHug * @date: 2019/3/24, 15:36 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class FlowSortBean implements WritableComparable<FlowSortBean> { private long upFlow; private long dwFlow; private long flowSum; public FlowSortBean() { } public FlowSortBean(long upFlow, long dwFlow) { this.upFlow = upFlow; this.dwFlow = dwFlow; this.flowSum = upFlow + dwFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDwFlow() { return dwFlow; } public void setDwFlow(long dwFlow) { this.dwFlow = dwFlow; } public long getFlowSum() { return flowSum; } public void setFlowSum(long flowSum) { this.flowSum = flowSum; } @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dwFlow); out.writeLong(flowSum); } @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); dwFlow = in.readLong(); flowSum = in.readLong(); } @Override public String toString() { return upFlow + "\t" + dwFlow + "\t" + flowSum; } @Override public int compareTo(FlowSortBean o) { return this.flowSum > o.getFlowSum() ? -1:1; } } public class FlowSortMapper extends Mapper<LongWritable, Text,FlowSortBean,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取数据 String line = value.toString(); //切分数据 String[] fields = line.split("\t"); //封装数据 long upFlow = Long.parseLong(fields[1]); long dwFlow = Long.parseLong(fields[2]); //传输数据 context.write(new FlowSortBean(upFlow,dwFlow),new Text(fields[0])); } } public class FlowSortReducer extends Reducer<FlowSortBean,Text,Text,FlowSortBean> { @Override protected void reduce(FlowSortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(),key); } } public class FlowSortPartitioner extends Partitioner<FlowSortBean, Text> { @Override public int getPartition(FlowSortBean key, Text value, int i) { String phoneNum = value.toString().substring(0, 3); int partition = 4; if ("135".equals(phoneNum)){ return 0; }else if ("137".equals(phoneNum)){ return 1; }else if ("138".equals(phoneNum)){ return 2; }else if ("139".equals(phoneNum)){ return 3; } return partition; } } public class FlowSortDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //设置配置,初始化Job类 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //设置执行类 job.setJarByClass(FlowSortDriver.class); //设置Mapper、Reducer类 job.setMapperClass(FlowSortMapper.class); job.setReducerClass(FlowSortReducer.class