设为首页 加入收藏

TOP

辅助排序和Mapreduce整体流程(一)
2019-09-17 19:03:29 】 浏览:71
Tags:辅助 排序 Mapreduce 整体 流程

一、辅助排序

  需求:先有一个订单数据文件,包含了订单id、商品id、商品价格,要求将订单id正序,商品价格倒序,且生成结果文件个数为订单id的数量,每个结果文件中只要一条该订单最贵商品的数据。

  思路:1.封装订单类OrderBean,实现WritableComparable接口;

     2.自定义Mapper类,确定输入输出数据类型,写业务逻辑;

     3.自定义分区,根据不同的订单id返回不同的分区值;

     4.自定义Reducer类;

     5.辅助排序类OrderGroupingComparator继承WritableComparator类,并定义无参构成方法、重写compare方法;

     6.书写Driver类;

  代码如下:

/**
 * @author: PrincessHug
 * @date: 2019/3/25, 21:42
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class OrderBean implements WritableComparable<OrderBean> {
    private int orderId;
    private double orderPrice;

    public OrderBean() {
    }

    public OrderBean(int orderId, double orderPrice) {
        this.orderId = orderId;
        this.orderPrice = orderPrice;
    }

    public int getOrderId() {
        return orderId;
    }

    public void setOrderId(int orderId) {
        this.orderId = orderId;
    }

    public double getOrderPrice() {
        return orderPrice;
    }

    public void setOrderPrice(double orderPrice) {
        this.orderPrice = orderPrice;
    }

    @Override
    public String toString() {
        return orderId + "\t" + orderPrice;
    }

    @Override
    public int compareTo(OrderBean o) {
        int rs ;
        if (this.orderId > o.getOrderId()){
            rs = 1;
        }else if (this.orderId < o.getOrderId()){
            rs = -1;
        }else {
            rs = (this.orderPrice > o.getOrderPrice()) ? -1:1;
        }
        return rs;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(orderId);
        out.writeDouble(orderPrice);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        orderId = in.readInt();
        orderPrice = in.readDouble();
    }
}

public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取数据
        String line = value.toString();

        //切割数据
        String[] fields = line.split("\t");

        //封装数据
        int orderId = Integer.parseInt(fields[0]);
        double orderPrice = Double.parseDouble(fields[2]);
        OrderBean orderBean = new OrderBean(orderId, orderPrice);

        //发送数据
        context.write(orderBean,NullWritable.get());
    }
}

public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
        //构造参数中i的值为reducetask的个数
        return (orderBean.getOrderId() & Integer.MAX_VALUE ) % i;
    }
}

public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}

public class OrderGrouptingComparator extends WritableComparator {
    //必须使用super调用父类的构造方法来定义对比的类为OrderBean
    protected OrderGrouptingComparator(){
        super(OrderBean.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean aBean = (OrderBean)a;
        OrderBean bBean = (OrderBean)b;

        int rs ;
        if (aBean.getOrderId() > bBean.getOrderId()){
            rs = 1;
        }else if (aBean.getOrderId() < bBean.getOrderId()){
            rs = -1;
        }else {
            rs = 0;
        }
        return rs;
    }
}

public class OrderDriver {
    public static void main(Strin
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇日常监控邮件预警(视图无效测试分.. 下一篇shullfe机制详解

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目