一、辅助排序
需求:先有一个订单数据文件,包含了订单id、商品id、商品价格,要求将订单id正序,商品价格倒序,且生成结果文件个数为订单id的数量,每个结果文件中只要一条该订单最贵商品的数据。
思路:1.封装订单类OrderBean,实现WritableComparable接口;
2.自定义Mapper类,确定输入输出数据类型,写业务逻辑;
3.自定义分区,根据不同的订单id返回不同的分区值;
4.自定义Reducer类;
5.辅助排序类OrderGroupingComparator继承WritableComparator类,并定义无参构成方法、重写compare方法;
6.书写Driver类;
代码如下:
/** * @author: PrincessHug * @date: 2019/3/25, 21:42 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class OrderBean implements WritableComparable<OrderBean> { private int orderId; private double orderPrice; public OrderBean() { } public OrderBean(int orderId, double orderPrice) { this.orderId = orderId; this.orderPrice = orderPrice; } public int getOrderId() { return orderId; } public void setOrderId(int orderId) { this.orderId = orderId; } public double getOrderPrice() { return orderPrice; } public void setOrderPrice(double orderPrice) { this.orderPrice = orderPrice; } @Override public String toString() { return orderId + "\t" + orderPrice; } @Override public int compareTo(OrderBean o) { int rs ; if (this.orderId > o.getOrderId()){ rs = 1; }else if (this.orderId < o.getOrderId()){ rs = -1; }else { rs = (this.orderPrice > o.getOrderPrice()) ? -1:1; } return rs; } @Override public void write(DataOutput out) throws IOException { out.writeInt(orderId); out.writeDouble(orderPrice); } @Override public void readFields(DataInput in) throws IOException { orderId = in.readInt(); orderPrice = in.readDouble(); } } public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取数据 String line = value.toString(); //切割数据 String[] fields = line.split("\t"); //封装数据 int orderId = Integer.parseInt(fields[0]); double orderPrice = Double.parseDouble(fields[2]); OrderBean orderBean = new OrderBean(orderId, orderPrice); //发送数据 context.write(orderBean,NullWritable.get()); } } public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> { @Override public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) { //构造参数中i的值为reducetask的个数 return (orderBean.getOrderId() & Integer.MAX_VALUE ) % i; } } public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } } public class OrderGrouptingComparator extends WritableComparator { //必须使用super调用父类的构造方法来定义对比的类为OrderBean protected OrderGrouptingComparator(){ super(OrderBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean)a; OrderBean bBean = (OrderBean)b; int rs ; if (aBean.getOrderId() > bBean.getOrderId()){ rs = 1; }else if (aBean.getOrderId() < bBean.getOrderId()){ rs = -1; }else { rs = 0; } return rs; } } public class OrderDriver { public static void main(Strin