设为首页 加入收藏

TOP

Java 操作Hbase 简单案例 (Kerberos已开启)
2018-12-16 09:44:26 】 浏览:12
Tags:Java 操作 Hbase 简单 案例 Kerberos 开启
版权声明:小兄弟,慢慢熬吧...... https://blog.csdn.net/u013850277/article/details/77513130
package com.hbase;

/**
 * @time 2017年7月22日
 * @author YeChunBo 
 * 类说明: 操作 Hbase (Kerberos已开启)
 * Hbase 版本号:1.2.4
 */
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;

public class HBaseSimple {

    public static Configuration conf = null;
    public static Admin admin;
    public static Connection connection;
    public static Table table;

//  public static String principal = "hbase/hdp40@BMSOFT.COM";
//  public static String keytabPath = "./conf/bms/hbase.service.keytab"; // OK,奇怪的是这里如果在Ranger中将hbase对应的权限去掉,依然是可以有访问权限,笔者的理解可能是这个组件对应的keytab拥有最高权限。其他自己新建的keytab完全可通过Ranger控制其相关权限

     public static String principal = "project2/hdp39@BMSOFT.COM"; //
     public static String keytabPath = "./conf/bms/project2.keytab";

    static {
        try {
            conf = HBaseConfiguration.create();
            System.setProperty("java.security.krb5.conf", "C:/Windows/krbconf/bms/krb5.ini");
            conf.set("hadoop.security.authentication", "Kerberos");

            UserGroupInformation.setConfiguration(conf);
            try {
                UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal,
                        keytabPath);
                UserGroupInformation.setLoginUser(ugi);
            } catch (IOException e1) {
                throw new RuntimeException("Kerberos身份认证失败:" + e1.getMessage(), e1);
            }
            // 第二种连接方式
            // UserGroupInformation.loginUserFromKeytab("hbase/hdp40@BMSOFT.COM","./conf/bms/hbase.keytab");

            connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建一张表
     * 
     * @param myTableName
     * @param colFamily
     * @param deleteFlag
     *            true:存在则删除再重建
     * @throws Exception
     */
    public static void creatTable(String myTableName, String[] colFamily, boolean deleteFlag) throws Exception {
        TableName tableName = TableName.valueOf(myTableName);
        if (admin.tableExists(tableName)) {
            if (!deleteFlag) {
                System.out.println(myTableName + " table exists!");
            } else {
                HBaseSimple.deleteTable(myTableName); // 先删除原先的表
                HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
                for (String str : colFamily) {
                    HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(str);
                    hColumnDescriptor.setMaxVersions(10);
                    hTableDescriptor.addFamily(hColumnDescriptor);

                }
                admin.createTable(hTableDescriptor);
                System.out.println(myTableName + "表创建成功。。。");
            }

        } else {
            HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
            for (String str : colFamily) {
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(str);
                //hColumnDescriptor.setMaxVersions(10); 设置数据最大保存的版本数
                hTableDescriptor.addFamily(hColumnDescriptor);
            }
            admin.createTable(hTableDescriptor);
            System.out.println(myTableName + "表创建成功。。。");
        }
        // close();
    }

    /**
     * 往表中添加数据(单条添加)
     */
    public static void inserData(String myTableName, String rowKey, String colFamily, String col, String val) {

        try {
            table = connection.getTable(TableName.valueOf(myTableName));
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col), Bytes.toBytes(val));//当然这里可以一次性设置多个列族,以及多个列
            table.put(put);
            System.out.println("数据插入成功。。。rowkey为:" + rowKey);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // close();
        }
    }

    /**
     * 往表中批量添加数据
     */
    public static void batchInserData(String myTableName, String colFamily, String col, int insertNum) {

        try {
            table = connection.getTable(TableName.valueOf(myTableName));
            List<Put> list = new ArrayList<Put>();
            Put put;
            for (int i = 1; i < insertNum; i++) {
                put = new Put(Bytes.toBytes("rowKey" + i));
                put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col), Bytes.toBytes( "B" +i));//当然这里可以一次性设置多个列族,以及多个列,如下被注释的代码
//              put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes("id"), Bytes.toBytes(String.valueOf(i)));// id
//              put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes("salary"), Bytes.toBytes(String.valueOf((int)((Math.random()*9+1)*100000)) + "元"));// salary
//              put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes("url"), Bytes.toBytes(String.valueOf("http://blog.csdn.net/s20082043/article/details/" + (int)((Math.random()*9+1)*1000000))));// salary
//              put.addColumn(Bytes.toBytes("other"), Bytes.toBytes("details"), Bytes.toBytes(String.valueOf("hello world " + "avz7qgu77wog3r6c5qw8426b4ape432523974591we9t5u314356hzy1kxj7x8g39a2l9tl7734mbxn3oa2192kaq938" + i)));// details

                list.add(put);
                if (i % 500000 == 0) { // 每500000条保存一次
                    table.put(list);
                    list.clear();
                    System.out.println(i + " :条数据插入成功。。。。 ");
                }
            }
            table.put(list);
            System.out.println("数据插入成功。。。");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // close();
        }
    }

    /**
     * 获取数据(根据行键获取其整行数据)
     */
    public static void getDataFromRowKey(String myTableName, String rowKey) {
        try {
            table = connection.getTable(TableName.valueOf(myTableName));
            Get get = new Get(Bytes.toBytes(rowKey));
            get.setMaxVersions(10);// 设置获取多少个版本的数据,当填入的数据大于hbase中存储的量时,会取出目前所有版本的数据
            Result re = table.get(get);
            List<Cell> listCells = re.listCells();
            for (Cell cell : listCells) {
                System.out.println("getDataFromRowKey: " + new String(CellUtil.cloneRow(cell)) + "\t"
                        + new String(CellUtil.cloneFamily(cell)) + "\t" + new String(CellUtil.cloneQualifier(cell))
                        + "\t" + new String(CellUtil.cloneva lue(cell)) + "\t" + cell.getTimestamp());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // close();
        }
    }

    /**
     * 根据表名与行键及列簇获取数据
     * 
     * @param myTableName
     * @param rowKey
     * @param colFamily
     * @param Col
     * @throws IOException
     */
    private static void getData(String myTableName, String rowKey, String colFamily, String col) throws IOException {
        table = connection.getTable(TableName.valueOf(myTableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
        Result re = table.get(get);
        if (re.isEmpty()) {
            System.out.println("查询结果为空。。。。");
            return;
        }
        List<Cell> listCells = re.listCells();
        for (Cell cell : listCells) {
            System.out.println(new String(CellUtil.cloneRow(cell)) + "\t" + new String(CellUtil.cloneFamily(cell))
                    + "\t" + new String(CellUtil.cloneQualifier(cell)) + "\t" + new String(CellUtil.cloneva lue(cell))
                    + "\t" + cell.getTimestamp());
        }
        // close();
    }

    /**
     * 根据表名查询整张表的数据(当然同样可根据列簇,列分割符等进行scan的查询,这里不进行细写了)
     * 
     * @param tablename
     * @throws IOException
     */
    private static void getScanData(String tablename) throws IOException {
        table = connection.getTable(TableName.valueOf(tablename));
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes("rowKey147658"));
        scan.setStopRow(Bytes.toBytes("rowKey147669"));
        //scan.setMaxResultsPerColumnFamily(1);
        //scan.setBatch(20);
        //scan.setMaxResultSize(1l);
        //scan.setMaxVersions(2);// 默认返回与时间戳最靠近的那一列数据,设置为n则返回n列数据
        //scan.setTimeRange(1503282422243l, 1503282457086l);// 设置扫描数据的时间范围
        ResultScanner scanner = table.getScanner(scan);

        Iterator<Result> it = scanner.iterator();
        while (it.hasNext()) {
            Result re = it.next();
            List<Cell> listCells = re.listCells();
            for (Cell cell : listCells) {
                System.out.println("getScanData: " + new String(CellUtil.cloneRow(cell)) + "\t"
                        + new String(CellUtil.cloneFamily(cell)) + "\t" + new String(CellUtil.cloneQualifier(cell))
                        + "\t" + new String(CellUtil.cloneva lue(cell)) + "\t" + cell.getTimestamp());
            }
        }

    }

    /**
     * 删除数据
     * 
     * @param tableName
     * @param rowKey
     * @throws IOException
     */
    private static void delDByRowKey(String tableName, String rowKey) {
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            table.delete(delete);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println(tableName + " 表中rowKey为 " + rowKey + " 的数据已被删除....");
    }

    /**
     * 删除一张表
     * 
     * @param args
     */
    public static void deleteTable(String myTableName) {
        try {
            TableName tableName = TableName.valueOf(myTableName);
            admin.disableTable(tableName); // 删除表前先对表进行disable
            admin.deleteTable(tableName);
            System.out.println(tableName + " 表已被删除。。。");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            //close();
        }

    }

    /**
     * 删除列簇
     * 
     * @param args
     */
    public static void deleteColumnFamily(String myTableName, byte[] colFamily) {
        try {
            TableName tableName = TableName.valueOf(myTableName);
            admin.disableTable(tableName); // 删除前先对表进行disable
            admin.deleteColumn(tableName, colFamily);
            System.out.println(tableName + " 表 " + colFamily + " 列已被删除。。。");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close();
        }
    }

    // 关闭连接
    public static void close() {
        try {
            if (admin != null) {
                admin.close();
            }
            if (null != connection) {
                connection.close();
            }
            if (table != null) {
                table.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        try {
//------------------------------------------1、创建表----------------------------------------------          
//          String tablename = "t_hbase100";
//          // 创建表
//          String[] familys = { "info", "other" };
//          boolean booleanFlag = true;
//          HBaseSimple.creatTable(tablename, familys, booleanFlag);


// ---------------------------------------2、往表中插入数据-------------------------------------------------
             /**
             * 往表中插入数据:插入数据的时候指定数据所属的列簇与列分割符
             */
//           String tablename = "t_hbase1";
//           String rowKey = "ycb";
//           String colFamily = "course";
//           String col = "English";
//           String val = "77";
//          
//           String rowKey2 = "hehe";
//           String val2 = "79";
//          
//           HBaseSimple.inserData(tablename, rowKey, colFamily, col, val);
//           HBaseSimple.inserData(tablename, rowKey2, colFamily, col, val2);

// ------------------------------------3、根据表名与行键查询整行数据------------------------------------------------
//          /**
//           * 根据表名与行键查询整行数据
//           */
//          String tablename = "t_hbase1";
//          String rowKey = "ycb";
//          String rowKey2 = "hehe";
//          getDataFromRowKey(tablename, rowKey);
//          getDataFromRowKey(tablename, rowKey2);
//          System.out.println("------------------------------------");

// -------------------------------------4、查询整张表的数据---------------------------------------------------
            // * 查询整张表的数据
            // */
    //      String tablename = "t_hbase1";
//          getScanData(tablename);

// -----------------------------------5、批量插入数据-----------------------------------------------------------
            /**
             * 批量插入数据
             */
            String tablename = "t_hbase10";// 这张表t_hbase1, 100万条数据,t_hbase10 , 1000万数据
            String colFamily = "info";
            String col = "id";
            //String col = "salary";
            int inserNum = 10000000;
            batchInserData(tablename, colFamily, col, inserNum);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
} 

pom.xml ,因为笔者这里还有其他代码引用到Hive,所以这里多了Hive相关的jar包。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>HbaseSimple</groupId>
    <artifactId>HbaseSimple</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.1.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.3</version>
        </dependency>

        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>

        <!-- https://mvnrepository.com/artifact/commons-httpclient/commons-httpclient -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.2.1</version>
        </dependency>

    </dependencies>

</project>

采用该方式创建Maven 项目,笔者没有通过代码将Hbase相关的配置文件配置上去,而是将其对应的配置文件add 进项目中。或者将配置文件放到src/main/resources目录下,在代码进行编译时会自动加载resource目录下的配置文件,如下图所示:

这里写图片描述

或者:
这里写图片描述

遇到的坑:

  • 其他人通过远程访问笔者集群Hbase时,发现运行上述代码时代码无法得出正确的执行结果,总是在运行到调用具体方法时就卡住了,而且并没有报出任何的错误。

  • 在排查问题时,发现是Hbase-site.xml配置文件中对于主机名配置的是别名,而在别人的机器上并没有配置对应的映射,所以是解析不了hbase集群中所对应的机器名。

  • 部分hbase-site.xml 如下所示:

    <property>
      <name>hbase.zookeeper.quorum</name>
      <value>hdp40,hdp41,hdp39</value>
    </property>

解决方法:知道原因之后解决方法就很简单了,直接将对应的主机映射名添加到远程访问的服务器上便可。
具体操作如下:
在要进行远程访问hbase主机的服务器上编辑hosts文件

一、vi /etc/hosts
10.164.166.39  hdp39.bmsoft.com hdp39
10.164.166.40  hdp40.bmsoft.com hdp40
10.164.166.41  hdp41.bmsoft.com hdp41

二、source /etc/hosts
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase最佳实践 – 客户端重试机制 下一篇java连接Hbase 1.4.0集群进行增删..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目