设为首页 加入收藏

TOP

Mapreduce的序列化和流量统计程序开发(一)
2019-09-17 18:53:07 】 浏览:62
Tags:Mapreduce 序列化 流量 统计 程序开发

一、Hadoop数据序列化的数据类型

  Java数据类型 => Hadoop数据类型

  int         IntWritable

  float        FloatWritable

  long        LongWritable

  double         DoubleWritable

  String       Text

  boolean      BooleanWritable

  byte        ByteWritable

  map          MapWritable

  array        ArrayWritable

二、Hadoop的序列化

  1.什么是序列化?

   在java中,序列化接口是Serializable,它下面又实现了很多的序列化接口,所以java的序列化是一个重量级的序列化框架,一个对象被java序列化之后会附带很多额外的信息(校验信息、header、继承体系等),不便于在网络中进行高效的传输,所以Hadoop开发了一套自己的序列化框架——Writable。

      序列化就是把内存当中的对象,转化为字节序列以便于存储和网络传输;

   反序列化是将收到的字节序列或硬盘当中的持续化数据,转换成内存中的对象。

  2.序列化的理解方法(自己悟的,不对勿喷~~)

    比如下面流量统计案例中,流量的封装类FlowBean实现了Writable接口,其中定义了变量upFlow、dwFlow、flowSum;

    在Mapper和Reducer类中初始化封装类FlowBean时,内存会分配空间加载这些对象,而这些对象不便于在网络中高效的传输,这是封装类FlowBean中的序列化方法将这些对象转换为字节序列,方便了存储和传输;

    当Mapper或Reducer需要将这些对象的字节序列写出到磁盘时,封装类FlowBean中的反序列化方法将字节序列转换为对象,然后写道磁盘中。

  3.序列化特点

   序列化与反序列化时分布式数据处理当中经常会出现的,比如hadoop通信是通过远程调用(rpc)实现的,这个过程就需要序列化。

  特点:1)紧凑;

     2)快速

     3)可扩展

     4)可互操作

三、Mapreduce的流量统计程序案例

  1.代码

/**
 * @author: PrincessHug
 * @date: 2019/3/23, 23:38
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class FlowBean implements Writable {
    private long upFlow;
    private long dwFlow;
    private long flowSum;

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDwFlow() {
        return dwFlow;
    }

    public void setDwFlow(long dwFlow) {
        this.dwFlow = dwFlow;
    }

    public long getFlowSum() {
        return flowSum;
    }

    public void setFlowSum(long flowSum) {
        this.flowSum = flowSum;
    }

    public FlowBean() {
    }

    public FlowBean(long upFlow, long dwFlow) {
        this.upFlow = upFlow;
        this.dwFlow = dwFlow;
        this.flowSum = upFlow + dwFlow;
    }

    /**
     * 序列化
     * @param out 输出流
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dwFlow);
        out.writeLong(flowSum);
    }

    /**
     * 反序列化
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        dwFlow = in.readLong();
        flowSum = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + dwFlow + "\t" + flowSum;
    }
}

public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取数据
        String line = value.toString();

        //切分数据
        String[] fields = line.split("\t");

        //封装数据
        String phoneNum = fields[1];
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long dwFlow = Long.parseLong(fields[fields.length - 2]);

        //发送数据
        context.write(new Text(phoneNum),new FlowBean(upFlow,dwFlow));
    }
}

public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        //聚合数据
        long upFlow_sum = 0;
        long dwFlow_sum = 0;
        for (FlowBean f:values){
            upFlow_sum += f.getUpFlow();
            dwFlow_sum += f.getDwFlow();
        }
        //发送数据
        context.write(key,new FlowBean(upFlow_sum,dwFlow_sum));
    }
}


public class FlowPartitioner extends Partitioner<Text,FlowBean> {
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Mapreduce的排序(全局排序、分区.. 下一篇LeetCode 178. 分数排名

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目