设为首页 加入收藏

TOP

HBase Java 编程
2019-01-17 01:45:51 】 浏览:60
Tags:HBase Java 编程
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/yulei_qq/article/details/82250580

一、环境配置

1、引入Maven 库

   <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.3</version>
   </dependency>

2、将hbase-site.xml 和hdfs-site.xml 文件copy进本地项目的classpath路径中.

两个文件的个别参数配置内容视你搭建的HDFS 集群和HBASE集群而定.

hbase-site.xml

<xml version="1.0">
<configuration>
    <!-- 使用完全分布式 -->
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>

    <!-- 指定hbase数据在hdfs上的存放路径 -->
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://mycluster/hbase</value>
    </property>
    <!-- 配置zk地址 -->
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>s201:2181,s202:2181,s203:2181</value>
    </property>
    <!-- zk的本地目录 -->
    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/home/hadoop/zookeeper</value>
    </property>
</configuration>

hdfs-site.xml

<xml version="1.0" encoding="UTF-8">
<xml-stylesheet type="text/xsl" href="configuration.xsl">
<configuration>
    <property>
        <name>dfs.nameservices</name>
        <value>mycluster</value>
    </property>
    <property>
        <name>dfs.ha.namenodes.mycluster</name>
        <value>nn1,nn2</value>
    </property>
    <property>
        <name>dfs.namenode.rpc-address.mycluster.nn1</name>
        <value>s201:8020</value>
    </property>
    <property>
        <name>dfs.namenode.rpc-address.mycluster.nn2</name>
        <value>s206:8020</value>
    </property>
    <property>
        <name>dfs.namenode.http-address.mycluster.nn1</name>
        <value>s201:50070</value>
    </property>
    <property>
        <name>dfs.namenode.http-address.mycluster.nn2</name>
        <value>s206:50070</value>
    </property>
    <property>
        <name>dfs.namenode.shared.edits.dir</name>
        <value>qjournal://s202:8485;s203:8485;s204:8485/mycluster</value>
    </property>
    <property>
        <name>dfs.client.failover.proxy.provider.mycluster</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
    <property>
        <name>dfs.ha.fencing.methods</name>
        <value>
            sshfence
            shell(/bin/true)
        </value>
    </property>
    <property>
        <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/home/centos/.ssh/id_rsa</value>
    </property>
    <property>
        <name>dfs.journalnode.edits.dir</name>
        <value>/home/hadoop/hadoop/journal</value>
    </property>
    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>s206:50090</value>
    </property>
    <property>
        <name>dfs.ha.automatic-failover.enabled</name>
        <value>true</value>
    </property>
</configuration>

二、Java 编程

1、创建名称空间

 /**
     * 创建名称空间
     * @throws Exception
     */
    @Test
    public void createNameSpace() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        //创建名字空间描述符
        NamespaceDescriptor nsd = NamespaceDescriptor.create("ns1").build();
        admin.createNamespace(nsd);

        NamespaceDescriptor[] ns = admin.listNamespaceDescriptors();
        for(NamespaceDescriptor n : ns){
            System.out.println(n.getName());
        }
    }

2、创建表

 /**
     * 创建表
     * @throws Exception
     */
    @Test
    public void createTable() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        //创建表名对象
        TableName tableName = TableName.valueOf("ns1:t1");
        //创建表描述符对象
        HTableDescriptor tbl = new HTableDescriptor(tableName);
        //创建列族描述符
        HColumnDescriptor col = new HColumnDescriptor("f1");
        tbl.addFamily(col);

        admin.createTable(tbl);
        System.out.println("over");
    }

创建表完成之后,我们可以在hbase shell 命令行,通过list 可以查看表.

hbase(main):007:0> list
TABLE                                                                                                                                    
ns1:t1                                                                                                                                   
1 row(s) in 0.0460 seconds

=> ["ns1:t1"]

3、添加数据到表中

    public void put() throws Exception {
        //创建conf对象
        Configuration conf = HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn = ConnectionFactory.createConnection(conf);
        //通过连接查询tableName对象
        TableName tname = TableName.valueOf("ns1:t1");
        //获得table
        Table table = conn.getTable(tname);

        //通过bytes工具类创建字节数组(将字符串)
        byte[] rowid = Bytes.toBytes("row1");

        //创建put对象
        Put put = new Put(rowid);

        byte[] f1 = Bytes.toBytes("f1");
        byte[] id = Bytes.toBytes("id") ;
        byte[] value = Bytes.toBytes(102);
        put.addColumn(f1,id,value);

        //执行插入
        table.put(put);
    }

用scan 命令查看

hbase(main):011:0> scan 'ns1:t1'
ROW                                 COLUMN+CELL                                                                                          
 row1                               column=f1:id, timestamp=1535676987854, value=\x00\x00\x00f                                           
1 row(s) in 0.0760 seconds

每一个put操作实际上都是一个RPC操作,它将客户端数据传送到服务器然后返回。它只适合小数据量的操作,如果有个应用程序需要每秒存储上千行数据到HBase表中,这样处理就不合适了。

4、客户端的写缓冲区 (大表操作)

HBase 的API配备了一个客户端的写缓冲区(write buffer),缓冲区负责收集put操作,然后调用PRC操作一次性将put送往服务器。全局交换机控制着该缓冲区是否在用。以下是其方法:

void setAutoFlust(boolean autoFlush)

boolean isAutoFlush()

默认情况下,客户端缓冲区是禁用的。可以通过将autoflush 设置为false来激活缓冲区。table.setAutoFlush(false).

当需要强制把数据写到服务器时,可以调用另一个API函数:

void flushCommits() throw IOException

用户可以在hbase-site.xml 配置文件中添加一个属性配置缓冲区大小, 一旦超出缓冲区指定的大小限制,客户端就会隐士的调用刷写命令。

默认大小是2M

<property>
    <name>hbase.client.write.buffer</name>
    <value>20971520</value>
    <source>hbase-default.xml</source>
</property>

这会将缓冲区大小增加到20M.

    @Test
    public void bigInsert() throws Exception {

        DecimalFormat format = new DecimalFormat();
        format.applyPattern("0000");

        long start = System.currentTimeMillis() ;
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");
        HTable table = (HTable)conn.getTable(tname);
        //不要自动清理缓冲区
        table.setAutoFlush(false);

        for(int i = 1 ; i < 1000 ; i ++){
            Put put = new Put(Bytes.toBytes("row" + format.format(i))) ;
            //关闭写前日志
            put.setWriteToWAL(false);
            put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("id"),Bytes.toBytes(i));
            put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("name"),Bytes.toBytes("tom" + i));
            put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("age"),Bytes.toBytes(i % 100));
            table.put(put);

            if(i % 200 == 0){
                table.flushCommits();
            }
        }
        //
        table.flushCommits();
        System.out.println(System.currentTimeMillis() - start );
    }

5、原子性put操作

HBase 还有一个特别的put调用,其能保证自身操作的原子性:检查写(check and put )。方法签名如下:

boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
  byte[] value, Put put) throws IOException;
    /**
     * 原子性操作,先检查,在插入
     * @throws IOException
     */
    @Test
    public void testCompareAndSet() throws IOException {

        Configuration conf = HBaseConfiguration.create();
        Connection conn =ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");
        Table table =conn.getTable(tname);

        Put put = new Put(Bytes.toBytes("row1"));

        put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("id"),Bytes.toBytes(1));

        boolean res =table.checkAndPut(Bytes.toBytes("row1"),Bytes.toBytes("f1"),Bytes.toBytes("id"),Bytes.toBytes(1),put);
        System.out.println(res);
    }

6 、Get 操作

a、单行get

    public void get() throws Exception {
        //创建conf对象
        Configuration conf = HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn = ConnectionFactory.createConnection(conf);
        //        //通过连接查询tableName对象
        TableName tname = TableName.valueOf("ns1:t1");
        //获得table
        Table table = conn.getTable(tname);

        //通过bytes工具类创建字节数组(将字符串)
        byte[] rowid = Bytes.toBytes("row0001");
        Get get = new Get(rowid);
        Result r = table.get(get);
        byte[] idvalue = r.getValue(Bytes.toBytes("f1"),Bytes.toBytes("id"));
        System.out.println(Bytes.toInt(idvalue));
    }

b. get 列表

即传递多个get ,返回Result 数组.

Result[] get(List<Get> gets) throws IOException;

7、delete(删除操作)

a、单行删除

void delete(Delete delete) throws IOException;
    @Test
    public void deleteData() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");

        Table table = conn.getTable(tname);
        Delete del = new Delete(Bytes.toBytes("row0001"));
        del.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("id"));
        del.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("name"));
        table.delete(del);
        System.out.println("over");
    }

b、列表删除

void delete(List<Delete> deletes) throws IOException;

8、扫描器

    public void scan() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");
        Table table = conn.getTable(tname);
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes("row0001"));
        scan.setStopRow(Bytes.toBytes("row0003"));

        ResultScanner rs = table.getScanner(scan);
        Iterator<Result> it = rs.iterator();
        while (it.hasNext()) {
            Result r = it.next();
            byte[] name = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name"));
            System.out.println(Bytes.toString(name));
        }
        rs.close();
    }

如下是“HBase权威指南”的扫描器超时案例:

 public void testScanTimeOut() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn =ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");
        Table table =conn.getTable(tname);
        Scan scan = new Scan();
        ResultScanner scanner =table.getScanner(scan);
        int scannerTimeOut= (int) conf.getLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,-1);
        System.out.println(scannerTimeOut);
        Thread.sleep(scannerTimeOut+5000);
        while (true){
            try {
            Result result = scanner.next();
            if(result==null) break;
            System.out.println(result);
            }
            catch (Exception e){
                e.printStackTrace();
                break;
            }
        }
        scanner.close();
    }

不之为啥,并没有出现预期的ScannerTimeoutException ,发现问题所在的请留言,非常感谢

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇开发笔记 – Spring Boot集成HBase 下一篇Hbase中的列式表映射到hive的外表

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目