设为首页 加入收藏

TOP

HDFS 上的数据导入到Hbase
2018-11-30 00:18:40 】 浏览:81
Tags:HDFS 数据 导入 Hbase

需求:将HDFS上的文件中的数据导入到hbase中

实现上面的需求也有两种办法,一种是自定义mr,一种是使用hbase提供好的import工具

一、hdfs中的数据是这样的

每一行的数据是这样的id name age gender birthday

(my_python_env)[root@hadoop26 ~]# hadoop fs  -cat /t1/*
1    zhangsan    10    male    NULL
2    lisi    NULL    NULL    NULL
3    wangwu    NULL    NULL    NULL
4    zhaoliu    NULL    NULL    1993

二、自定义mr

复制代码
public class HdfsToHBase {
    public static void main(String[] args) throws Exception{
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop26:2181");
        conf.set("hbase.rootdir", "hdfs://hadoop26:9000/hbase");
        conf.set(TableOutputFormat.OUTPUT_TABLE, args[1]);
        Job job = Job.getInstance(conf, HdfsToHBase.class.getSimpleName());
        TableMapReduceUtil.addDependencyJars(job);
        job.setJarByClass(HdfsToHBase.class);
        
        job.setMapperClass(HdfsToHBaseMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setReducerClass(HdfsToHBaseReducer.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.setOutputFormatClass(TableOutputFormat.class);
        job.waitForCompletion(true);
    }
    
    public static class HdfsToHBaseMapper extends Mapper<LongWritable, Text, Text, Text>{
        private Text outKey = new Text();
        private Text outValue = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] splits = value.toString().split("\t");
            outKey.set(splits[0]);
            outValue.set(splits[1]+"\t"+splits[2]+"\t"+splits[3]+"\t"+splits[4]);
            context.write(outKey, outValue);
        }
    }
    
    public static class HdfsToHBaseReducer extends TableReducer<Text, Text, NullWritable>{
        @Override
        protected void reduce(Text k2, Iterable<Text> v2s, Context context) throws IOException, InterruptedException {
            Put put = new Put(k2.getBytes());
            for (Text v2 : v2s) {
                String[] splis = v2.toString().split("\t");
                if(splis[0]!=null && !"NULL".equals(splis[0])){
                    put.add("f1".getBytes(), "name".getBytes(),splis[0].getBytes());
                }
                if(splis[1]!=null && !"NULL".equals(splis[1])){
                    put.add("f1".getBytes(), "age".getBytes(),splis[1].getBytes());
                }
                if(splis[2]!=null && !"NULL".equals(splis[2])){
                    put.add("f1".getBytes(), "gender".getBytes(),splis[2].getBytes());
                }
                if(splis[3]!=null && !"NULL".equals(splis[3])){
                    put.add("f1".getBytes(), "birthday".getBytes(),splis[3].getBytes());
                }
            }
            context.write(NullWritable.get(),put);
        }
    }
}
复制代码

2.1打包运行

首先在hbase中创建一个表

hbase(main):006:0> create 'table1','f1'
0 row(s) in 0.4240 seconds

=> Hbase::Table - table1

然后运行

hadoop jar HdfsToHBase.jar com.lanyun.hadoop2.HdfsToHBase /t1/part* table1

最后查看table1中的数据

复制代码
hbase(main):014:0* scan 'table1'
ROW                                              COLUMN+CELL                                                                                                                                 
 1                                               column=f1:age, timestamp=1469069255119, value=10                                                                                            
 1                                               column=f1:gender, timestamp=1469069255119, value=male                                                                                       
 1                                               column=f1:name, timestamp=1469069255119, value=zhangsan                                                                                     
 2                                               column=f1:name, timestamp=1469069255119, value=lisi                                                                                         
 3                                               column=f1:name, timestamp=1469069255119, value=wangwu                                                                                       
 4                                               column=f1:birthday, timestamp=1469069255119, value=1993                                                                                     
 4                                               column=f1:name, timestamp=1469069255119, value=zhaoliu                                                                                      
4 row(s) in 0.0430 seconds
复制代码

三、使用habse提供的import工具

首先查看其用法

复制代码
(my_python_env)[root@hadoop26 ~]# hbase org.apache.hadoop.hbase.mapreduce.Import 
ERROR: Wrong number of arguments: 0
Usage: Import [options] <tablename> <inputdir>
By default Import will load data directly into HBase. To instead generate
HFiles of data to prepare for a bulk data load, pass the option:
  -Dimport.bulk.output=/path/for/output
复制代码

在hbase中创建表table2

hbase(main):016:0> create 'table2','f1'
0 row(s) in 0.4080 seconds

=> Hbase::Table - table2

在命令中中使用命令进行导入

hbase org.apache.hadoop.hbase.mapreduce.Import table2 /t2

查看table2中的数据

复制代码
hbase(main):018:0> scan 'table2'
ROW                                              COLUMN+CELL                                                                                                                                 
 1                                               column=f1:age, timestamp=1468824267106, value=10                                                                                            
 1                                               column=f1:gender, timestamp=1468824289990, value=male                                                                                       
 1                                               column=f1:name, timestamp=1468824137463, value=zhangsan                                                                                     
 2                                               column=f1:name, timestamp=1468824236014, value=lisi                                                                                         
 3                                               column=f1:name, timestamp=1468824247109, value=wangwu                                                                                       
 4                                               column=f1:birthday, timestamp=1468825870158, value=1993                                                                                     
 4                                               column=f1:name, timestamp=1468825659207, value=zhaoliu                                                                                      
4 row(s) in 0.0440 seconds
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HDFS查看文件的前几行-后几行-行数 下一篇Hadoop系列-HDFS的Shell脚本命令..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目