问题 :
写入HBase的数据不对
如
读入的数据是
hello nihao hadoop hehe byebye
hello nihao hadoop
spark scale
存入数据库就成了
hbase(main):038:0> scan 't_user2'
ROW COLUMN+CELL
byebye column=MR_column:wordcount, timestamp=1529670719617, value=1
hadoop column=MR_column:wordcount, timestamp=1529670719617, value=2
heheop column=MR_column:wordcount, timestamp=1529670719617, value=1
hellop column=MR_column:wordcount, timestamp=1529670719617, value=2
nihaop column=MR_column:wordcount, timestamp=1529670719617, value=2
scalep column=MR_column:wordcount, timestamp=1529670719617, value=1
sparkp column=MR_column:wordcount, timestamp=1529670719617, value=1
数据出现了异常, 有没有看到前面的rowkey长度都是一致的 ... 且与hadoop长度不一致的全在最后面补全hadoop后面的字母
原因如下:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class MRWriterToHbaseTableDriver {
static class MRWriterToHbaseTableMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] arr = line.split("\t");
for (String name : arr) {
context.write(new Text(name), new IntWritable(1));
}
}
}
static class MRWriterToHbaseTableReducer extends
TableReducer<Text, IntWritable, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> value,
Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable iw : value) {
count += iw.get();
}
// Put put = new Put(key.getBytes());
// 这里就是异常所在,换成下面语句即可解决
Put put = new Put(
key.toString().getBytes()); //添加rowkey
put.addColumn("MR_column".getBytes(), "wordcount".getBytes(),
(count + "").getBytes()); //添加列族,列限定符和值
// context.write(new ImmutableBytesWritable(key.getBytes()), put);
// 括号里面的ImmutableBytesWritable可以换成其他类型,例如null, Text类型都可以
// 只要和上面的keyout泛型一致即可
context.write(new ImmutableBytesWritable(
key.toString().getBytes()), put);
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "min1");
Job job = Job.getInstance(conf, "myjob");
job.setJarByClass(MRWriterToHbaseTableDriver.class);
job.setMapperClass(MRWriterToHbaseTableMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, "/aa");
TableMapReduceUtil.initTableReducerJob("t_user2",
MRWriterToHbaseTableReducer.class, job);
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion 0 : 1);
}
}
分析 :
Put类型里String.getBytes() 和 Text.getBytes()两个方法得到的字节数组的长度不同, String.getBytes()会动态获得当前字节数组的长度,该是多长就是多长; Text.getBytes()获得的是当前字节数组, 但是会与上一个获得的字节数组长度进行比较, 当长度比上一个短时会自动补全上一个字节后面的内容至当前字节, 若长度比上一个长时会获得当前字节数组内容 . 再看下面的表进行验证:
String.getBytes()写入到hbase后内容:
byebye column=MR_column:wordcount, timestamp=1529674870277, value=1
hadoop column=MR_column:wordcount, timestamp=1529674870277, value=2
hehe column=MR_column:wordcount, timestamp=1529674870277, value=1
hello column=MR_column:wordcount, timestamp=1529674870277, value=2
looperqu column=MR_column:wordcount, timestamp=1529674870277, value=1
nihao column=MR_column:wordcount, timestamp=1529674870277, value=2
scale column=MR_column:wordcount, timestamp=1529674870277, value=1
spark column=MR_column:wordcount, timestamp=1529674870277, value=1
zookeeper column=MR_column:wordcount, timestamp=1529674870277, value=1
Text.getBytes()写入到hbase后内容 :
byebye column=MR_column:wordcount, timestamp=1529675062950, value=1
hadoop column=MR_column:wordcount, timestamp=1529675062950, value=2
heheop column=MR_column:wordcount, timestamp=1529675062950, value=1
hellop column=MR_column:wordcount, timestamp=1529675062950, value=2
looperqu column=MR_column:wordcount, timestamp=1529675062950, value=1
nihaorqu column=MR_column:wordcount, timestamp=1529675062950, value=2
scalerqu column=MR_column:wordcount, timestamp=1529675062950, value=1
sparkrqu column=MR_column:wordcount, timestamp=1529675062950, value=1
zookeeper column=MR_column:wordcount, timestamp=1529675062950, value=1