设为首页 加入收藏

TOP

Mapreduce的排序(全局排序、分区加排序、Combiner优化)(一)
2019-09-17 18:53:09 】 浏览:66
Tags:Mapreduce 排序 全局 分区 Combiner 优化

一、MR排序的分类

  1.部分排序:MR会根据自己输出记录的KV对数据进行排序,保证输出到每一个文件内存都是经过排序的;

  2.全局排序;

  3.辅助排序:再第一次排序后经过分区再排序一次;

  4.二次排序:经过一次排序后又根据业务逻辑再次进行排序。

 

二、MR排序的接口——WritableComparable

  该接口继承了Hadoop的Writable接口和Java的Comparable接口,实现该接口要重写write、readFields、compareTo三个方法。

 

三、流量统计案例的排序与分区

/**
 * @author: PrincessHug
 * @date: 2019/3/24, 15:36
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class FlowSortBean implements WritableComparable<FlowSortBean> {
    private long upFlow;
    private long dwFlow;
    private long flowSum;

    public FlowSortBean() {
    }

    public FlowSortBean(long upFlow, long dwFlow) {
        this.upFlow = upFlow;
        this.dwFlow = dwFlow;
        this.flowSum = upFlow + dwFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDwFlow() {
        return dwFlow;
    }

    public void setDwFlow(long dwFlow) {
        this.dwFlow = dwFlow;
    }

    public long getFlowSum() {
        return flowSum;
    }

    public void setFlowSum(long flowSum) {
        this.flowSum = flowSum;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dwFlow);
        out.writeLong(flowSum);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        dwFlow = in.readLong();
        flowSum = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + dwFlow + "\t" + flowSum;
    }

    @Override
    public int compareTo(FlowSortBean o) {
        return this.flowSum > o.getFlowSum() ? -1:1;
    }
}

public class FlowSortMapper extends Mapper<LongWritable, Text,FlowSortBean,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取数据
        String line = value.toString();

        //切分数据
        String[] fields = line.split("\t");

        //封装数据
        long upFlow = Long.parseLong(fields[1]);
        long dwFlow = Long.parseLong(fields[2]);

        //传输数据
        context.write(new FlowSortBean(upFlow,dwFlow),new Text(fields[0]));
    }
}

public class FlowSortReducer extends Reducer<FlowSortBean,Text,Text,FlowSortBean> {
    @Override
    protected void reduce(FlowSortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        context.write(values.iterator().next(),key);
    }
}

public class FlowSortPartitioner extends Partitioner<FlowSortBean, Text> {
    @Override
    public int getPartition(FlowSortBean key, Text value, int i) {
        String phoneNum = value.toString().substring(0, 3);

        int partition = 4;
        if ("135".equals(phoneNum)){
            return 0;
        }else if ("137".equals(phoneNum)){
            return 1;
        }else if ("138".equals(phoneNum)){
            return 2;
        }else if ("139".equals(phoneNum)){
            return 3;
        }
        return partition;
    }
}

public class FlowSortDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //设置配置,初始化Job类
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //设置执行类
        job.setJarByClass(FlowSortDriver.class);

        //设置Mapper、Reducer类
        job.setMapperClass(FlowSortMapper.class);
        job.setReducerClass(FlowSortReducer.class
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇mysql8.0版本skip-grant-tables出.. 下一篇Mapreduce的序列化和流量统计程序..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目