设为首页 加入收藏

TOP

HBase高级编程
2019-03-19 01:25:25 】 浏览:80
Tags:HBase 高级 编程
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_1018944104/article/details/85371629

目录

1、HBase结合MapReduce

1.1、HBaseToHDFS

1.2、HDFSToHBase

2、HBase和MySQL进行数据互导

2.1、MySQL数据导入到HBase

2.2、HBase数据导入到MySQL

3、HBase整合Hive

3.1、原理

3.2、准备HBase表和数据

3.3、Hive端操作

3.4、验证


1、HBase结合MapReduce

为什么需要用MapReduce去访问HBase的数据——加快分析速度和扩展分析能力。

MapReduce访问HBase数据作分析一定是在离线分析的场景下应用。

1.1、HBaseToHDFS

从HBase中读取数据,分析之后然后写入HDFS,代码实现:

package com.aura.mazh.hbase126.mapreduce;

import java.io.IOException; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 描述: 编写 mapreduce 程序从 hbase 读取数据,然后存储到 hdfs */
public class HBaseDataToHDFSMR {

    public static final String ZK_CONNECT = "hadoop02:2181,hadoop03:2181,hadoop04:2181";
    public static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; public static final String HDFS_CONNECT = "hdfs://myha01/";
    public static final String HDFS_CONNECT_KEY = "fs.defaultFS"; public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create(); conf.set(ZK_CONNECT_KEY, ZK_CONNECT); conf.set(HDFS_CONNECT_KEY, HDFS_CONNECT);
        System.setProperty("HADOOP_USER_NAME", "hadoop"); 
        Job job = Job.getInstance(conf);
        // 输入数据来源于 hbase 的 user_info 表
        Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob("user_info", scan,HBaseDataToHDFSMRMapper.class, Text.class, NullWritable.class, job);
        // RecordReader --- TableRecordReader
        // InputFormat ----- TextInputFormat // 数据输出到 hdfs
        FileOutputFormat.setOutputPath(job, new Path("/hbase2hdfs/output2"));
        boolean waitForCompletion = job.waitForCompletion(true);
        System.exit(waitForCompletion  0 : 1);
    }

    /**
    * mapper的输入key-value类型是:ImmutableBytesWritable, Result * mapper的输出key-value类型就可以由用户自己制定
    */
    static class HBaseDataToHDFSMRMapper extends TableMapper<Text, NullWritable> {
        
        /**
         * keyType: LongWritable -- ImmutableBytesWritable:rowkey
         * ValueType: Text -- Result:hbase 表中某一个 rowkey 查询出来的所有的 key-value 对 */
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {


            // byte[] rowkey = Bytes.copy(key, 0, key.getLength());
            String rowkey = Bytes.toString(key.copyBytes()); List<Cell> listCells = value.listCells();
            Text text = new Text();
            // 最后输出格式是: rowkye, base_info:name-huangbo, base-info:age-34 
            for (Cell cell : listCells) {
                String family = new String(CellUtil.cloneFamily(cell)); 
                String qualifier = new String(CellUtil.cloneQualifier(cell));                             
                String v = new String(CellUtil.cloneva lue(cell));
                long ts = cell.getTimestamp();
                text.set(rowkey + "\t" + family + "\t" + qualifier + "\t" + v + "\t" + ts);
                context.write(text, NullWritable.get()); }


        }
        
    }

)

1.2、HDFSToHBase

从HDFS从读入数据,处理之后写入HBase,代码实现:

package com.aura.mazh.hbase126.mapreduce; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 需求:读取 HDFS 上的数据。插入到 HBase 库中 *
* 程序运行之前,要先做两件事:
* 1、把 student.txt 文件放入:/bigdata/student/input/目录中 
* 2、创建好一张 hbase 表:create "student", "info"
*/
public class HDFSDataToHBaseMR extends Configured implements Tool{



    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new HDFSDataToHBaseMR(), args);
        System.exit(run); 
    }


    @Override
    public int run(String[] arg0) throws Exception {
  
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "hadoop02:2181,hadoop03:2181,hadoop04:2181");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        Job job = Job.getInstance(config, "HDFSDataToHBaseMR");
        job.setJarByClass(HDFSDataToHBaseMR.class);
        job.setMapperClass(HBaseMR_Mapper.class);             
        job.setMapOutputKeyClass(Text.class);         
        job.setMapOutputValueClass(NullWritable.class);
        // 设置数据的输出组件
        TableMapReduceUtil.initTableReducerJob("student", HBaseMR_Reducer.class, job);
        job.setOutputKeyClass(NullWritable.class);     
        job.setOutputValueClass(Put.class);
        FileInputFormat.addInputPath(job, new Path("/bigdata/student/input"));
        boolean isDone = job.waitForCompletion(true);
        return isDone  0: 1; 

    }



    public static class HBaseMR_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(value, NullWritable.get()); 
        }
    }


    public static class HBaseMR_Reducer extends TableReducer<Text, NullWritable, NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            String[] split = key.toString().split(","); 
            Put put = new  Put(split[0].getBytes());
            put.addColumn("info".getBytes(), "name".getBytes(), split[1].getBytes());             
            put.addColumn("info".getBytes(), "sex".getBytes(), split[2].getBytes());         
            put.addColumn("info".getBytes(), "age".getBytes(), split[3].getBytes());         
            put.addColumn("info".getBytes(), "department".getBytes(),split[4].getBytes()); 
            context.write(NullWritable.get(), put);
        } 

    }


}

2、HBase和MySQL进行数据互导

2.1、MySQL数据导入到HBase

下面是命令:

sqoop import \
--connect jdbc:mysql://hadoop02/bigdata \

--username root \
--password root \
--table student \
--hbase-create-table \
--hbase-table studenttest \
--column-family info \
--hbase-row-key id

命令解释:

--hbase-create-table自动在hbase中创建表

--column-family name指定列簇名字
--hbase-row-key id指定rowkey对应的mysql当中的键

执行上面的命令会报如下错误:

这是由于版本不兼容引起,我们可以通过事先创建好表就可以使用了。 创建表:create "studenttest", "info"

创建好表后,执行下面的命令:

sqoop import \
--connect jdbc:mysql://hadoop02/bigdata \

--username root \

--password root \
--table student \

--hbase-table studenttest \

--column-family info \

--hbase-row-key id

执行命令后效果如下:

导入过程的日志信息:

最后看HBase中的数据:

2.2、HBase数据导入到MySQL

目前没有直接的命令将HBase中的数据导出到MySQL。替代方案:先将HBase的数据导入到HDFS或者Hive,然后再将数据导入到MySQL

3、HBase整合Hive

3.1、原理

Hive与HBase利用两者本身对外的API来实现整合,主要是靠HBaseStorageHandler进行通信。利用HBaseStorageHandler,Hive可以获取到Hive表对应的HBase表名,列簇以及 列,InputFormat和OutputFormat类,创建和删除HBase表等。

Hive访问HBase中表数据,实质上是通过MapReduce读取HBase表数据,其实现是在MR中,使用HiveHBaseTableInputFormat完成对HBase表的切分,获取RecordReader对象来读取数据。

对HBase表的切分原则是一个Region切分成一个Split,即表中有多少个Regions,MapReduce中就有多少个Map。

读取HBase表数据都是通过构建Scanner,对表进行全表扫描,如果有过滤条件,则转化为Filter。当过滤条件为RowKey时,则转化为对RowKey的过滤,Scanner通过RPC调用RegionServer的next()来获取数据。

3.2、准备HBase表和数据

创建HBase表:create 'mingxing',{NAME => 'base_info',VERSIONS => 1},{NAME => 'extra_info',VERSIONS => 1}

插入准备数据:

put 'mingxing','rk001','base_info:name','huangbo'

put 'mingxing','rk001','base_info:age','33'
put 'mingxing','rk001','extra_info:math','44'
put 'mingxing','rk001','extra_info:province','beijing'

put 'mingxing','rk002','base_info:name','xuzheng'

put 'mingxing','rk002','base_info:age','44'

put 'mingxing','rk003','base_info:name','wangbaoqiang'

put 'mingxing','rk003','base_info:age','55'
put 'mingxing','rk003','base_info:gender','male'

put 'mingxing','rk004','extra_info:math','33'

put 'mingxing','rk004','extra_info:province','tianjin'

put 'mingxing','rk004','extra_info:children','3'

put 'mingxing','rk005','base_info:name','liutao'

put 'mingxing','rk006','extra_info:name','liujialing'

3.3、Hive端操作

进入Hive客户端,需要进行一下参数设置:

指定hbase所使用的zookeeper集群的地址:默认端口是2181,可以不写

set hbase.zookeeper.quorum=hadoop02:2181,hadoop03:2181,hadoop04:2181;

指定hbase在zookeeper中使用的根目录

set zookeeper.znode.parent=/hbase;

加入指定的处理jar

add jar /home/hadoop/apps/apache-hive-2.3.3-bin/lib/hive-hbase-handler-2.3.3.jar;

创建基于HBase表的hive表:

所有列簇:

create external table mingxing(rowkey string, base_info map<string, string>, extra_info map<string, string>)
row format delimited fields terminated by '\t'

stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" = ":key,base_info:,extra_info:") tblproperties("hbase.table.name"="mingxing","hbase.mapred.output.outputtable"="mingxing");

部分列簇部分列:

create external table mingxing1(rowkey string, name string, province string)
row format delimited fields terminated by '\t'
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" = ":key,base_info:name,extra_info:province")tblproperties("hbase.table.name"="mingxing","hbase.mapred.output.outputtable"="mingxing");

org.apache.hadoop.hive.hbase.HBaseStorageHandler:处理hive到hbase转换关系的处理器

hbase.columns.mapping:定义hbase的列簇和列到hive的映射关系

hbase.table.name:hbase表名

3.4、验证

查询语句:

select * from mingxing;
select count(*) from mingxing;
select count(rowkey) as total from mingxing;
select count(base_info['name']) as total from mingxing;
select rowkey,base_info['name'] from mingxing;
select rowkey,extra_info['province'] from mingxing;
select rowkey,base_info['name'], extra_info['province'] from mingxing;

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hbase   snapshot 下一篇Phoenix与hbase权限控制使用笔记

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目