在学习使用Hbase的时候,因为Hbase中存放的是byte数组。所以大部分的输入类型都是byte数组格式的。这样方便了存储其它大型文件,但是对于初学者的我们来讲就是十分不友好的。因此,我在这里对一些东西做了简单的二次封装,提高了学习的效率。这样,我们可以有更多的时间去考虑如何利用Hbase来设计业务逻辑,降低学习的难度。同时,这里做的一些封装,也可以为以后开发的时候封装工具做一个前瞻性的探索。
HbaseUtil封装
封装了HbaseJavaAPI中一些最常用,最基本的的元素。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
public class HbaseUtil {
private static String host = "master"; //Hbase所在的主机的IP地址
private static String port = "2181"; //Hbase所使用端口
private static Configuration config;
private static ResultScanner results;
protected Admin admin;
protected Connection conn;
public HbaseUtil(){}
static{
File configFile = new File("src\\main\\resources\\config"); //有关远程Hbase服务器的相关配置信息
if(configFile.exists ()){
try {
FileReader fr = new FileReader (configFile);
BufferedReader br = new BufferedReader (fr);
String temp = null;
while((temp = br.readLine ())!=null){
if(temp.split ("=")[0].equals ("host")){
host = temp.split ("=")[1];
}else if(temp.split ("=")[0].equals ("port")){
port = temp.split ("=")[1];
}
}
} catch (FileNotFoundException e) {
e.printStackTrace ( );
} catch (IOException e) {
e.printStackTrace ( );
}
}
config = new Configuration ();
config.set("hbase.zookeeper.quorum",host);//配置连接哪个机器
config.set("hbase.zookeeper.property.clientPort", port);//配置端口
}
public Connection getConnection(){
try {
conn = ConnectionFactory.createConnection (config); //获得Hbase的连接
} catch (IOException e) {
e.printStackTrace ( );
}
return conn;
}
public Admin getAdmin(){
try {
admin = getConnection ().getAdmin (); //获得Hbase的admin对象
} catch (IOException e) {
e.printStackTrace ( );
}
return admin;
}
public static void showScanner(ResultScanner results){ //调用这个方法将Scan到的结果进行显示,看自己的操作是不是正确
boolean f = true;
for(Result result:results){
List<Cell> cells = result.listCells ();
if(f){
for(Cell cell:cells)
System.out.print ("\t\t" + new String (cell.getQualifierArray (),cell.getQualifierOffset (),cell.getQualifierLength ()) + "\t | ");
f = false;
System.out.println ( );
}
System.out.print (new String (result.getRow () )+ "\t--\t");
for(Cell cell:cells){
System.out.print ("\t" + new String (cell.getValueArray (),cell.getValueOffset(),cell.getValueLength()) + "\t | \t");
}
System.out.println ( );
}
}
public static List<List<String>> getResults(ResultScanner rs){ //返回一个list集合,这样在别的地方就可以任意使用了
List<List<String>> result = new ArrayList<List<String>> ();
for(Result ru:rs){
List<String> temp = new ArrayList<String> ();
boolean f= true;
for(Cell cell :ru.listCells ()){
if(f){
temp.add (new String(cell.getRowArray (),cell.getRowOffset (),cell.getRowLength ()));
f = false;
}
temp.add (new String(cell.getValueArray (),cell.getValueOffset (),cell.getValueLength ()));
}
result.add (temp);
}
return result;
}
public void close(){ //关闭相关资源
if(admin!=null){
try {
admin.close ();
} catch (IOException e) {
e.printStackTrace ( );
}
}
if(conn !=null){
try {
conn.close ();
} catch (IOException e) {
e.printStackTrace ( );
}
}
}
}
上面封装了HbaseJavaApi中操作Hbase的最基础的一些元素。可以直接继承这个类然后在操作的时候就不用每次都写创建连接等代码。并且在测试方法的时候,直接使用相关方法,也可以省去书写测试代码的精力。
FilterUtil封装
Filter是结合Scan使用的。使用Filter可以对Hbase进行一些较为精细化的操作。也可以通过Filter对Hbase总存储的大量数据进行一个比较精细的分析,已得到我们想要的分析结果。二在学习Filter的时候大量的因为多种情况,需要不同的解决方案。所以写了一个简单的封装。
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.List;
import static com.oracle.hbase.tools.util.HbaseUtil.getResults;
public class FilterUtil {
/*******************过滤器*******************/
public static final int ROW_FILTER=1; //将几种常用的Filter通过常量配置起来。在以后的使用过程中直接选择这些常量就可以选择相应的过滤器
public static final int PREFIX_FILTER=2;
public static final int FAMILY_FILTER=3;
public static final int QUALIFIER_FILTER=4;
/*****************比较器*******************/
public static final int BINARY_COMPARATOR=24; //和上面类似,这是是将一些常用的比较器写了出来。
public static final int SUBSTRING_COMPARATOR=26;
public static final int REGEXSTRING_COMPARATOR = 28;
private static FilterUtil filterUtil = new FilterUtil ();
private Scan utilScan;
private static Table table;
//私有化构造器
private FilterUtil(){}
/*******************************初始化器********************************/
public static FilterUtil getInstance(String tableName) {
return getInstance(TableName.valueOf (tableName));
}
//既然是封装的工具类,那么就没有必要每一次使用都创建一个对象,这里使用了单例模式
//并且通过初始化器,将要扫描的Hbase的表明配置进去。
public static FilterUtil getInstance(TableName tableName) {
try {
table = new HbaseUtil ().getConnection ().getTable (tableName);
} catch (IOException e) {
e.printStackTrace ( );
}
return filterUtil;
}
private Filter utilFilter;
//通过过滤器返回结果的核心方法,提供了多个参数。分别指定过滤器类型,比较器类型,比较规则,列族,列等。进行精细的操作。
public List<List<String>> scanFilter(int filterType,String compareOp,int comparator,String filterParment,String family,String qualifier){
Filter filter = null;
switch (filterType){
case ROW_FILTER:filter = new RowFilter (getCompareOp (compareOp),getComparator(comparator,filterParment));break;
case PREFIX_FILTER:filter = new PrefixFilter (Bytes.toBytes (filterParment));break;
case FAMILY_FILTER:filter = new FamilyFilter (getCompareOp (compareOp),getComparator(comparator,filterParment));break;
case QUALIFIER_FILTER:filter = new QualifierFilter (getCompareOp (compareOp),getComparator(comparator,filterParment));break;
}
if(table==null){
throw new NullPointerException ("FilterUtil is not initialized!!");
}
Scan scan = new Scan ();
if(family!=null&&qualifier!=null){
scan.addColumn (Bytes.toBytes (family),Bytes.toBytes (qualifier));
}else if(family!=null && qualifier ==null){
scan.addFamily (Bytes.toBytes (family));
}
utilFilter = filter;
scan.setFilter (filter);
List<List<String>> list = null;
utilScan = scan;
try {
list = getResults (table.getScanner (scan));
} catch (IOException e) {
e.printStackTrace ( );
}
return list;
}
public List<List<String>> scanFilter(int filterType,String compareOp,int comparator,String filterParment){
return scanFilter (filterType,compareOp,comparator,filterParment,null,null);
}
public List<List<String>> scanFilter(int filterType,String compareOp,int comparator,String filterParment,String family){
return scanFilter (filterType,compareOp,comparator,filterParment,family,null);
}
/**
*
* @param comparator 比较器类型
* @param filterParment 比较内容
* @return 返回一个比较器
*/
public static ByteArrayComparable getComparator(int comparator,String filterParment){
ByteArrayComparable comparable;
if(comparator==BINARY_COMPARATOR){
comparable = new BinaryComparator (Bytes.toBytes (filterParment ));
}else if(comparator == SUBSTRING_COMPARATOR){
comparable = new SubstringComparator (filterParment);
}else if(comparator == REGEXSTRING_COMPARATOR) {
comparable = new RegexStringComparator (filterParment);
}else
{
comparable = new BinaryPrefixComparator (Bytes.toBytes (filterParment));
}
return comparable;
}
/**
*
* @param operator 运算符号直接输入<,><=,>=,!=,"",来决定是做何种操作
* @return 返回一个操作的枚举类型值
*/
public static CompareFilter.CompareOp getCompareOp(String operator){
if (">".equals (operator)) {
return CompareFilter.CompareOp.GREATER;
} else if (">=".equals (operator)) {
return CompareFilter.CompareOp.GREATER_OR_EQUAL;
} else if ("<=".equals (operator)) {
return CompareFilter.CompareOp.LESS_OR_EQUAL;
} else if ("<".equals (operator)) {
return CompareFilter.CompareOp.LESS;
} else if ("!=".equals (operator)) {
return CompareFilter.CompareOp.NOT_EQUAL;
} else if ("".equals (operator)) {
return CompareFilter.CompareOp.NO_OP;
} else {
return CompareFilter.CompareOp.EQUAL;
}
}
/**
* 进行一次过滤操作之后,可以将本次使用的过滤器返回。用做一盒的各种操作
* @return 返回过滤器
*/
public Filter getFilter(){
return utilFilter;
}
/**
* 通过传递一个过滤器和一个表明。这个静态方法在测试额时候大有用处。
* @param filter 过滤器
* @param tableName Hbase的表名
* @return 直接返回一盒List集合存放所有扫描的结果
*/
public static List<List<String>> scanFilter(Filter filter,String tableName){
Scan scan = new Scan ();
List<List<String>> list = null;
scan.setFilter (filter);
try {
Table table = new HbaseUtil().getConnection ().getTable (TableName.valueOf (tableName));
list = getResults(table.getScanner (scan));
} catch (IOException e) {
e.printStackTrace ( );
}
return list;
}
/**
* 这个方法,可以使用上一次的扫描器(Scan)。直接打印出来。这样加快了测试的速度。
*/
public void showFilter(){
if(utilFilter!=null){
try {
HbaseUtil.showScanner (table.getScanner (utilScan));
} catch (IOException e) {
e.printStackTrace ( );
}
}else{
throw new NullPointerException("No other items were scanned!!");
}
}
/**
* 关闭测试表的资源。
*/
public void close(){
if(table!=null){
try {
table.close ();
} catch (IOException e) {
e.printStackTrace ( );
}
}
}
}
这个方法的封装的还算比较彻底。不过之封装了一些常用的过滤器,要是还想分装其它的过滤器或者增加其它的功能的话也可以做其他的添加。感觉就算以后做成专业的工具,利用这样的思路也还是可以的吧。
对于一个数据库来讲,除了查询之外。其它的就是类似于创建,删除,修改等操作。这些操作比较简单。但是我也做了一些简单的封装。
MyDelete封装
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import java.io.IOException;
public class MyDelete extends Delete {
public MyDelete(byte[] row) {
super (row);
}
public MyDelete(byte[] row, long timestamp) {
super (row, timestamp);
}
public MyDelete(String row) {
this (Bytes.toBytes (row));
}
/**
* @param row 字符串类型的行键(在真正的开发中单独使用字符串可能会有不便,不过为了提高学习的效率,还是可以的。)
* @param timestamp 时间戳
*/
public MyDelete(String row, long timestamp) {
this (Bytes.toBytes (row), timestamp);
}
/**
* 使用这个方法直接将这个Delete对象设置好了,这样可以省去几步操作。
* @param row 字符串类型的行键
* @param family 列族
* @param qualifier 列
*/
public MyDelete(String row, String family, String qualifier) {
this (row);
this.addColumn (family, qualifier);
}
public Delete addColumns(String family, String qualifier) {
return this.addColumns (family, qualifier, -1);
}
/**
* 对Delete中的addColumns的一个简单二次封装
* @param family 列族
* @param qualifier 列
* @param timestamp 时间戳
* @return 返回一个Delete对象
*/
public Delete addColumns(String family, String qualifier, long timestamp) {
return (timestamp != -1) super.addColumns (Bytes.toBytes (family),
Bytes.toBytes (qualifier), timestamp) : super.addColumns (Bytes.toBytes (family),
Bytes.toBytes (qualifier));
}
public Delete addColumn(String family, String qualifier) {
return this.addColumn (family, qualifier, -1);
}
/**
* 对Delete中的addColumn的一个简单二次封装
* @param family 列族
* @param qualifier 列
* @param timestamp 时间戳
* @return 返回一个Delete对象
*/
public Delete addColumn(String family, String qualifier, long timestamp) {
return (timestamp != -1) super.addColumn (Bytes.toBytes (family), Bytes.toBytes (qualifier), timestamp) : super.addColumn (Bytes.toBytes (family), Bytes.toBytes (qualifier));
}
}
上面是对删除操作中的Delete的简单封装。这样可以极大的提高学习的效率。
数据库除了删除操作之外还有添加和修改操作。在Hbase中没有修改只有覆盖操作。所以只简单的封装了一个Put操作的实体。
MyPut封装
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
public class MyPut extends Put {
//put的封装类似于Delete的封装,因此就不写那么多的注释了。所以就是这些了。
public MyPut(byte[] row) {
super (row);
}
public MyPut(byte[] row, long ts) {
super (row, ts);
}
public MyPut(String row){
super(Bytes.toBytes (row ));
}
public MyPut(String row,long ts){
super(Bytes.toBytes (row),ts);
}
public MyPut(String row,String family, String qualifier,String value){
this(row);
this.addColumn (family,qualifier,value);
}
public MyPut(String row,String family, String qualifier,String value,long ts ){
this(row,ts);
this.addColumn (family,qualifier,ts,value);
}
public Put addColumn(String family, String qualifier, String value) {
return this.addColumn (family, qualifier, -1,value);
}
public Put addColumn(String family, String qualifier, long ts, String value) {
return (ts!=-1)super.addColumn (Bytes.toBytes
(family), Bytes.toBytes (qualifier), ts, Bytes.toBytes (value)):super.
addColumn(Bytes.toBytes (family), Bytes.toBytes (qualifier), Bytes.toBytes (value));
}
}
除了以上的单独操纵的对象以外,我们常做的还有对于各种操作的一个总和利用的学习。鉴于此,我也对添加和删除为他们封装了一个专门的工具类。
HbaseDMLUtil
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class HbaseDMLUtil extends HbaseUtil{
private static Table table;
private static String tableNameS;
private static TableName tableName;
/**
* 直接传入一个TableName对象来设置表明。
* @param tableName
*/
public static void setTableName(TableName tableName){
MyPutUtil.tableName = tableName;
try {
table = new HbaseUtil ().getConnection ().getTable (MyPutUtil.tableName);
} catch (IOException e) {
e.printStackTrace ( );
}
}
/**
* 直接传入一个字符串来设置表名。
* @param tableName
*/
public static void setTableName(String tableName){
tableNameS = tableName;
setTableName (TableName.valueOf (tableName));
}
/**
* 通过传入大量的参数,来添加或者修改值
* @param tablename 表名
* @param row 行键值
* @param family 列族名
* @param column 列名
* @param value 添加或者要修改的值
* @param timestamp 时间戳
*/
public static void put(String tablename,String row,String family,String column,String value,long timestamp){
if(tablename!=null)
setTableName (tablename);
Put put = new Put (Bytes.toBytes (row));
if(timestamp!=-1)
put.addColumn (Bytes.toBytes (family),Bytes.toBytes (column),timestamp,Bytes.toBytes (value));
else{
put.addColumn (Bytes.toBytes (family),Bytes.toBytes (column),Bytes.toBytes (value));
}
try {
table.put (put);
} catch (IOException e) {
e.printStackTrace ( );
}
}
//上面那个函数的简单重载
public static void put(String tablename,String row,String family,String column,String value){
put(tablename,row,family,column,value,-1);
}
/**
* 通过传入一个Map来批量插入或修改值,、
* Hbase是存放大量数据的,通过批量插入。能够极其有效的添加数据。
* @param tablename 表名
* @param family 列族
* @param column 列
* @param keyValue KeyValue的Map
*/
public static void putList(String tablename,String family,String column,Map<String,String> keyValue){
setTableName (tablename);
List<Put> list = new ArrayList<Put> ();
Set<Map.Entry<String,String>> rkValues = keyValue.entrySet ();
for(Map.Entry<String,String> rkValue:rkValues){
Put put = new Put (Bytes.toBytes (rkValue.getKey ()));
put.addColumn (Bytes.toBytes (family),Bytes.toBytes (column),Bytes.toBytes (rkValue.getValue ()));
list.add (put);
}
try {
table.put (list);
} catch (IOException e) {
e.printStackTrace ( );
}
}
/**
* 传入一个Delete对象然后直接删除,该对象所表示的值。
* @param tableName
* @param del
*/
public static void delete(String tableName,Delete del){
setTableName (tableName);
try {
table.delete (del);
} catch (IOException e) {
e.printStackTrace ( );
}
}
/**
* 通过传入表名等参数,删除不需要的值
* @param tablename 表名
* @param family 列族
* @param column 列
* @param timestamp 实践戳
* @param rowKey 行键
*/
public static void delete(String tablename,String family,String column,long timestamp,String rowKey){
setTableName (tablename);
Delete del = null;
if(timestamp!=-1)
del = new Delete (Bytes.toBytes (rowKey),timestamp);
else
del = new Delete (Bytes.toBytes (rowKey));
if(column!=null) del.addColumn (Bytes.toBytes (family),Bytes.toBytes (column));
else del.addFamily (Bytes.toBytes (family));
}
/**
* 一个简单重载
* @param tablename
* @param family
* @param rowKey
*/
public static void delete( String tablename,String family,String rowKey){
delete (tablename,family,null,-1,rowKey);
}
/**
* 删除的批量操作
* @param tablename
* @param family
* @param column
* @param rowKeys 行键列表
*/
public static void deleteList(String tablename,String family,String column,List<String> rowKeys){
setTableName (tablename);
List<Delete> list = new ArrayList<Delete> ();
for(String rowKey:rowKeys){
Delete del = new Delete (Bytes.toBytes (rowKey));
del.addColumn (Bytes.toBytes (family),Bytes.toBytes (column));
list.add (del);
}
try {
table.delete (list);
} catch (IOException e) {
e.printStackTrace ( );
}
}
}
当然,上上面就是对HbaseJavaAPI的一些简单封装,写的很随意。当然也是一个初学者的水平。只是从初学者的角度为各位提供了一些比较快速的方法。
但是自我感觉还是有一些内容,可以对以后的开发中有所启示。