设为首页 加入收藏

TOP

MapReduce之----往hbase数据库写入数据时, 出现数据异常
2019-02-19 13:45:53 】 浏览:72
Tags:MapReduce ---- hbase 数据库 写入 数据时 出现 数据 异常

问题 :

写入HBase的数据不对

读入的数据是

hello   nihao   hadoop  hehe    byebye
hello   nihao   hadoop
spark   scale

存入数据库就成了

hbase(main):038:0> scan 't_user2'
ROW                   COLUMN+CELL                                                  
 byebye               column=MR_column:wordcount, timestamp=1529670719617, value=1 
 hadoop              column=MR_column:wordcount, timestamp=1529670719617, value=2 
 heheop               column=MR_column:wordcount, timestamp=1529670719617, value=1 
 hellop               column=MR_column:wordcount, timestamp=1529670719617, value=2 
 nihaop               column=MR_column:wordcount, timestamp=1529670719617, value=2 
 scalep               column=MR_column:wordcount, timestamp=1529670719617, value=1 
 sparkp               column=MR_column:wordcount, timestamp=1529670719617, value=1 

数据出现了异常, 有没有看到前面的rowkey长度都是一致的 ... 且与hadoop长度不一致的全在最后面补全hadoop后面的字母

原因如下:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


public class MRWriterToHbaseTableDriver {
static class MRWriterToHbaseTableMapper extends

Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] arr = line.split("\t");
for (String name : arr) {
context.write(new Text(name), new IntWritable(1));
}
}
}

static class MRWriterToHbaseTableReducer extends

TableReducer<Text, IntWritable, ImmutableBytesWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> value,
Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable iw : value) {
count += iw.get();
}

// Put put = new Put(key.getBytes());

// 这里就是异常所在,换成下面语句即可解决

Put put = new Put(key.toString().getBytes()); //添加rowkey
put.addColumn("MR_column".getBytes(), "wordcount".getBytes(),
(count + "").getBytes()); //添加列族,列限定符和值

// context.write(new ImmutableBytesWritable(key.getBytes()), put);

// 括号里面的ImmutableBytesWritable可以换成其他类型,例如null, Text类型都可以

// 只要和上面的keyout泛型一致即可

context.write(new ImmutableBytesWritable(key.toString().getBytes()), put);
}
}


public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "min1");


Job job = Job.getInstance(conf, "myjob");
job.setJarByClass(MRWriterToHbaseTableDriver.class);


job.setMapperClass(MRWriterToHbaseTableMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);


FileInputFormat.setInputPaths(job, "/aa");
TableMapReduceUtil.initTableReducerJob("t_user2",
MRWriterToHbaseTableReducer.class, job);


boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion 0 : 1);
}
}

分析 :

Put类型里String.getBytes() 和 Text.getBytes()两个方法得到的字节数组的长度不同, String.getBytes()会动态获得当前字节数组的长度,该是多长就是多长; Text.getBytes()获得的是当前字节数组, 但是会与上一个获得的字节数组长度进行比较, 当长度比上一个短时会自动补全上一个字节后面的内容至当前字节, 若长度比上一个长时会获得当前字节数组内容 . 再看下面的表进行验证:

String.getBytes()写入到hbase后内容:

 byebye               column=MR_column:wordcount, timestamp=1529674870277, value=1 
 hadoop               column=MR_column:wordcount, timestamp=1529674870277, value=2 
 hehe                 column=MR_column:wordcount, timestamp=1529674870277, value=1 
 hello                column=MR_column:wordcount, timestamp=1529674870277, value=2 
 looperqu             column=MR_column:wordcount, timestamp=1529674870277, value=1 
 nihao                column=MR_column:wordcount, timestamp=1529674870277, value=2 
 scale                column=MR_column:wordcount, timestamp=1529674870277, value=1 
 spark                column=MR_column:wordcount, timestamp=1529674870277, value=1 
 zookeeper            column=MR_column:wordcount, timestamp=1529674870277, value=1 
Text.getBytes()写入到hbase后内容 :
 byebye               column=MR_column:wordcount, timestamp=1529675062950, value=1 
 hadoop               column=MR_column:wordcount, timestamp=1529675062950, value=2 
 heheop               column=MR_column:wordcount, timestamp=1529675062950, value=1 
 hellop               column=MR_column:wordcount, timestamp=1529675062950, value=2 
 looperqu             column=MR_column:wordcount, timestamp=1529675062950, value=1 
 nihaorqu             column=MR_column:wordcount, timestamp=1529675062950, value=2 
 scalerqu             column=MR_column:wordcount, timestamp=1529675062950, value=1 
 sparkrqu             column=MR_column:wordcount, timestamp=1529675062950, value=1 
 zookeeper            column=MR_column:wordcount, timestamp=1529675062950, value=1 




】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇分页查询中HBase rowkey在多个查.. 下一篇关于Hbase的cache配置

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目