设为首页 加入收藏

TOP

将HDFS中的数据通过MapReduce产生HFile,然后将HFile导入到HBase具体案例分析
2019-02-08 12:20:26 】 浏览:88
Tags:HDFS 数据 通过 MapReduce 产生 HFile 然后 导入 HBase 具体 案例分析
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u011955252/article/details/50528002
Hadoop数据迁移:从Hadoop向HBase载入数据
Hadoop向HBase载入数据:
将数据从Hadoop中向HBase载入数据,该过程大致可以分为两步:
一、将Hadoop中普通文本格式的数据转化为可被HBase识别的HFile文件,HFile相当于Oracle中的DBF数据文件。
二、将HFile载入到HBase中,该过程实际就是将数据从一个地移动到HBase某表数据的存放地。
载入数据的优点:
利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。

配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

载入数据的缺点:
HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群
HBase表的情况:
public static void createTab(String tabName) throws Exception {
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tabName)) {
System.out.println(tabName + " exists!");
admin.close();
return;
}
HTableDescriptor table = new HTableDescriptor(tabName);
table.addFamily(new HColumnDescriptor("f1"));
table.addFamily(new HColumnDescriptor("f2"));

table.addFamily(new HColumnDescriptor("f3"));
table.getFamily(Bytes.toBytes("f1"));
admin.createTable(table);
admin.close();
}
HDFS数据:
rowkey3,f1,f1c1,f1c1 values
rowkey3,f1,f1c2,f1c2 values
rowkey3,f1,f1c3,f1c3 values
rowkey3,f2,f2c1,f2c1 values
rowkey3,f2,f2c2,f2c2 values
rowkey3,f2,f2c3,f2c3 values
rowkey4,f1,f1c1,f1c1 values
rowkey4,f1,f1c2,f1c2 values
rowkey4,f1,f1c3,f1c3 values
rowkey4,f2,f2c1,f2c1 values
rowkey5,f3,f3c1,f3c1 values
rowkey5,f3,f3c2,f3c2 values


数据转换Mapper:
import java.io.IOException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
protected void map(LongWritable key, Text value,Context context)

throws IOException, InterruptedException {
String[] values = value.toString().split(",");
byte[] rowKey = Bytes.toBytes(values[0]);
ImmutableBytesWritable rKey = new ImmutableBytesWritable(rowKey);
KeyValue kv = new KeyValue(rowKey, Bytes.toBytes(values[1]), Bytes.toBytes(values[2]), Bytes.toBytes(values[3]));
context.write(rKey, kv);
}
}
该Mapper用于将文本格式的HDFS文件转化为KeyValue对(ImmutableBytesWritable, KeyValue)


转换为HFile:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyDriver {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "toHFile");

job.setJarByClass(MyDriver.class);

job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setReducerClass(KeyValueSortReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) 0 : 1);
}
}
job.setMapperClass(MyMapper.class);

载入HBase:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
public class LoadData {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("hbase.metrics.showTableName", "false");
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
HBaseAdmin admin = new HBaseAdmin(conf);
HTable table = new HTable(conf, "xtab");

SchemaMetrics.configureGlobally(conf);
loader.doBulkLoad(new Path(args[0]), table);
table.flushCommits();
table.close();
admin.close();
}
}
将生成的HFile加载到HBase中去,其真实的过程是对数据进行move操作,因此,输入的HFile路径在加载之后就没有了数据。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HDFS快照与配额 下一篇hive查看表在hdfs上的位置

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目