版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013080251/article/details/52843266
Hadoop MapReduce的key类型的实例应该可以进行互相比较来满足排序的目的。为了在一个MapReduce计算中用作键类型,Hadoop的Writable数据类型应该实现org.apache.hadoop.io.WritableComparable<T>接口。WritableComparable接口继承于org.apache.hadoop.io.Writable接口,并增加了CompareTo()方法来执行比较。
下面是以HTTP服务器日志条目实现自定义hadoop WritableComparable数据类型的步骤,它使用请求的主机名和时间戳进行比较。
具体代码示例如下:
public class LogPair implements WritableComparable<LogPair> {
private Text userIP = new Text();
private Text timestamp = new Text();
private Text request = new Text();
private IntWritable responseSize = new IntWritable();
private IntWritable status = new IntWritable();
public Text getUserIP() {
return userIP;
}
public void setUserIP(Text userIP) {
this.userIP = userIP;
}
public Text getTimestamp() {
return timestamp;
}
public void setTimestamp(Text timestamp) {
this.timestamp = timestamp;
}
public Text getRequest() {
return request;
}
public void setRequest(Text request) {
this.request = request;
}
public IntWritable getResponseSize() {
return responseSize;
}
public void setResponseSize(IntWritable responseSize) {
this.responseSize = responseSize;
}
public IntWritable getStatus() {
return status;
}
public void setStatus(IntWritable status) {
this.status = status;
}
public LogPair() {
}
public LogPair(LogPair lp){
this.userIP = lp.getUserIP();
this.timestamp = lp.getTimestamp();
this.request = lp.getRequest();
this.responseSize = lp.getResponseSize();
this.status = lp.getStatus();
}
@Override
public void write(DataOutput out) throws IOException {
userIP.write(out);
timestamp.write(out);
request.write(out);
responseSize.write(out);
status.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
userIP.readFields(in);
timestamp.readFields(in);
request.readFields(in);
responseSize.readFields(in);
status.readFields(in);
}
@Override
public int hashCode() {
return userIP.hashCode();
}
@Override
public boolean equals(Object o) {
if(o instanceof LogPair)
{
LogPair other = (LogPair)o;
return userIP.equals(other.userIP) && timestamp.equals(other.timestamp);
}
return false;
}
@Override
public int compareTo(LogPair o) {
if(userIP.compareTo(o.userIP) == 0)
{
return timestamp.compareTo(o.timestamp);
}else
return userIP.compareTo(o.userIP);
}
@Override
public String toString() {
return userIP + " "+ timestamp + " "+request+" "+responseSize +" "+status;
}
}
工作原理介绍:
除了Writable接口的readFields()和write()方法之外,WritableComparable接口还引入了compareTo()方法。compareTo()方法的返回值有三种类型:负整数,0或正整数,分别表示当前对象小于,等于或大于被比较对象。在LogPair实现中,如果两个用户的IP地址和时间戳是相同的,那么就认为这两个对象是相等的。如果对象不相等,我们就决定排序,首先根据用户的IP排序,然后再根据时间戳排序。
@Override
public int compareTo(LogPair o) {
if(userIP.compareTo(o.userIP) == 0)
{
return timestamp.compareTo(o.timestamp);
}else
return userIP.compareTo(o.userIP);
}
Hadoop使用默认的HashPartition作为默认Partition实现,来计算中间数据在reduce中的分布,HashPartition需要键对象的hashcode()方法来满足以下两个属性。
- 在不同的JVM实例提供相同的哈希值
- 提供哈希值的均匀分布。
因此,必须实现一个稳定的hashCode()方法,以使自定义的Hadoop key类型满足上述两个要求。在LogPair实现中,我们使用请求的主机名/IP地址的哈希值作为LogPair实例的哈希代码。这样就保证了中间LogPair数据能基于该请求的主机名/IP地址被正确的分区。
@Override
public int hashCode() {
return userIP.hashCode();
}