设为首页 加入收藏

TOP

Hadoop -实现自定义的Key类型
2018-11-29 08:36:15 】 浏览:47
Tags:Hadoop 实现 定义 Key 类型
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013080251/article/details/52843266
Hadoop MapReduce的key类型的实例应该可以进行互相比较来满足排序的目的。为了在一个MapReduce计算中用作键类型,Hadoop的Writable数据类型应该实现org.apache.hadoop.io.WritableComparable<T>接口。WritableComparable接口继承于org.apache.hadoop.io.Writable接口,并增加了CompareTo()方法来执行比较。

下面是以HTTP服务器日志条目实现自定义hadoop WritableComparable数据类型的步骤,它使用请求的主机名和时间戳进行比较。

具体代码示例如下:

public class LogPair implements WritableComparable<LogPair> {
    private Text userIP = new Text();//用户IP
    private Text timestamp = new Text();//时间戳
    private Text request = new Text();//请求页面
    private IntWritable responseSize = new IntWritable();//请求页面大小
    private IntWritable status = new IntWritable();//状态码

    public Text getUserIP() {
        return userIP;
    }

    public void setUserIP(Text userIP) {
        this.userIP = userIP;
    }

    public Text getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Text timestamp) {
        this.timestamp = timestamp;
    }

    public Text getRequest() {
        return request;
    }

    public void setRequest(Text request) {
        this.request = request;
    }

    public IntWritable getResponseSize() {
        return responseSize;
    }

    public void setResponseSize(IntWritable responseSize) {
        this.responseSize = responseSize;
    }

    public IntWritable getStatus() {
        return status;
    }

    public void setStatus(IntWritable status) {
        this.status = status;
    }
    //创建构造函数
    public LogPair() {

    }
    public LogPair(LogPair lp){
        this.userIP = lp.getUserIP();
        this.timestamp = lp.getTimestamp();
        this.request = lp.getRequest();
        this.responseSize = lp.getResponseSize();
        this.status = lp.getStatus();
    }
    @Override
    public void write(DataOutput out) throws IOException {
        // 序列化处理
        userIP.write(out);
        timestamp.write(out);
        request.write(out);
        responseSize.write(out);
        status.write(out);
        //如果使用的是java的基础数据类型作为writable类型的字段,则可用DataOutput对象中的对应写入方法写入
        //out.writeUTF(userIP);;

    }


    @Override
    public void readFields(DataInput in) throws IOException {
        // 反序列化
        userIP.readFields(in);
        timestamp.readFields(in);
        request.readFields(in);
        responseSize.readFields(in);
        status.readFields(in);
        //如果使用的是java的基础数据类型作为Writable类型中的字段,则从基础流中读取
        //userIP = in.readUTF();

    }

    @Override
    public int hashCode() {
        //Hadoop使用默认的HashPartition作为默认Partition实现,来计算中间数据在reduce中的分布,HashPartition需要键对象的hashcode()
        //方法来满足以下两个属性
        return userIP.hashCode();
    }

    @Override
    public boolean equals(Object o) {
        if(o instanceof LogPair)
        {
            LogPair other = (LogPair)o;
            return userIP.equals(other.userIP) && timestamp.equals(other.timestamp);
        }
        return false;
    }

    @Override
    public int compareTo(LogPair o) {
        if(userIP.compareTo(o.userIP) == 0)//如果第一个字段相同
        {
            return timestamp.compareTo(o.timestamp);//则比较第二个字段
        }else
            return userIP.compareTo(o.userIP);
    }
    //定义输出格式
    @Override
    public String toString() {
        return userIP + " "+ timestamp + " "+request+" "+responseSize +" "+status;
    }
}

工作原理介绍:

除了Writable接口的readFields()和write()方法之外,WritableComparable接口还引入了compareTo()方法。compareTo()方法的返回值有三种类型:负整数,0或正整数,分别表示当前对象小于,等于或大于被比较对象。在LogPair实现中,如果两个用户的IP地址和时间戳是相同的,那么就认为这两个对象是相等的。如果对象不相等,我们就决定排序,首先根据用户的IP排序,然后再根据时间戳排序。
@Override
    public int compareTo(LogPair o) {
        if(userIP.compareTo(o.userIP) == 0)//如果第一个字段相同
        {
            return timestamp.compareTo(o.timestamp);//则比较第二个字段
        }else
            return userIP.compareTo(o.userIP);
    }

Hadoop使用默认的HashPartition作为默认Partition实现,来计算中间数据在reduce中的分布,HashPartition需要键对象的hashcode()方法来满足以下两个属性。

  • 在不同的JVM实例提供相同的哈希值
  • 提供哈希值的均匀分布。

因此,必须实现一个稳定的hashCode()方法,以使自定义的Hadoop key类型满足上述两个要求。在LogPair实现中,我们使用请求的主机名/IP地址的哈希值作为LogPair实例的哈希代码。这样就保证了中间LogPair数据能基于该请求的主机名/IP地址被正确的分区。

@Override
    public int hashCode() {
        return userIP.hashCode();
    }
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hadoop ha 高可用实现原理 下一篇Alex 的 Hadoop 菜鸟教程: 第1课 ..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目