设为首页 加入收藏

TOP

Spring Boot中引入HBase数据库
2019-03-15 01:39:41 】 浏览:58
Tags:Spring Boot 引入 HBase 数据库

使用HBase API对数据库的基本操作,包括创建表、查找表数据等。

1、HBase配置

pom.xml

<!-- Apache HBase Client -->
<dependency>
	<groupId>org.apache.hbase</groupId>
	<artifactId>hbase-client</artifactId>
	<version>2.0.1</version>

 
			
perator"><
/dependency>

application.properties

HBase.nodes=10.xx.xx.43
HBase.maxsize=500000

HBaseConfig.java

@Configuration
public class HBaseConfig {
	//HBase相关配置
    @Value("${HBase.nodes}")
    private String nodes;
    @Value("${HBase.maxsize}")
    private String maxsize;

    @Bean
    public HBaseService getHbaseService(){
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum",nodes );
        conf.set("hbase.client.keyvalue.maxsize",maxsize);
        return new HBaseService(conf);
    }
}

2、业务模拟
HBaseService.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import .....
/**
 * HBase数据库基本操作
 */
public class HBaseService {
	private Logger log = LoggerFactory.getLogger(HBaseService.class);

	// The administrative API for HBase
	// Admin can be used to create, drop, list, enable and disable and
	// otherwise modify tables,
	// as well as perform other administrative operations.
	private Admin admin = null;
	private Connection connection = null;

	public HBaseService(Configuration conf) {
		try {
			connection = ConnectionFactory.createConnection(conf);
			admin = connection.getAdmin();
		} catch (IOException e) {
			log.error("获取HBase连接失败");
		}
	}

	/**
	 * 创建表
	 * create <table>, {NAME => <column family>, VERSIONS => <VERSIONS>}
	 * shell command: create ‘user’, ‘cf1’
	 */
	public boolean creatTable(String tableName, List<String> columnFamily) {
		try {
			//列族 column family
			List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());
			columnFamily.forEach(cf -> {
				cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
						Bytes.toBytes(cf)).build());
			});
			//表 table
			TableDescriptor tableDesc = TableDescriptorBuilder
					.newBuilder(TableName.valueOf(tableName))
					.setColumnFamilies(cfDesc).build();

			if (admin.tableExists(TableName.valueOf(tableName))) {
				log.debug("table Exists!");
			} else {
				admin.createTable(tableDesc);
				log.debug("create table Success!");
			}
		} catch (IOException e) {
			log.error(MessageFormat.format("创建表{0}失败", tableName), e);
			return false;
		} finally {
			close(admin, null, null);
		}
		return true;
	}

	/**
	 * 查询库中所有表的表名
	 * shell command: list
	 */
	public List<String> getAllTableNames() {
		List<String> result = new ArrayList<>();		
		try {
			TableName[] tableNames = admin.listTableNames();
			for (TableName tableName : tableNames) {
				result.add(tableName.getNameAsString());
			}
		} catch (IOException e) {
			log.error("获取所有表的表名失败", e);
		} finally {
			close(admin, null, null);
		}
		return result;
	}

	/**
	 * 遍历查询指定表中的所有数据
	 * shell command: scan 'user'
	 */
	public Map<String, Map<String, String>> getResultScanner(String tableName) {
		Scan scan = new Scan();
		return this.queryData(tableName, scan);
	}
	
	/**
	 * 通过表名以及过滤条件查询数据
	 */
	private Map<String, Map<String, String>> queryData(String tableName,
			Scan scan) {
		// <rowKey,对应的行数据>
		Map<String, Map<String, String>> result = new HashMap<>();

		ResultScanner rs = null;
		// 获取表
		Table table = null;
		try {
			table = getTable(tableName);
			rs = table.getScanner(scan);
			for (Result r : rs) {
				// 每一行数据
				Map<String, String> columnMap = new HashMap<>();
				String rowKey = null;
				// 行键,列族和列限定符一起确定一个单元(Cell)
				for (Cell cell : r.listCells()) {
					if (rowKey == null) {
						rowKey = Bytes.toString(cell.getRowArray(),
								cell.getRowOffset(), cell.getRowLength());
					}
					columnMap.put(
							// 列限定符
							Bytes.toString(cell.getQualifierArray(),
									cell.getQualifierOffset(),
									cell.getQualifierLength()),
							// 列族
							Bytes.toString(cell.getValueArray(),
									cell.getValueOffset(),
									cell.getValueLength()));
				}

				if (rowKey != null) {
					result.put(rowKey, columnMap);
				}
			}
		} catch (IOException e) {
			log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}",
					tableName), e);
		} finally {
			close(null, rs, table);
		}

		return result;
	}

	/**
	 * 根据tableName和rowKey精确查询行数据
	 */
	public Map<String, String> getRowData(String tableName, String rowKey) {
		// 返回的键值对
		Map<String, String> result = new HashMap<>();

		Get get = new Get(Bytes.toBytes(rowKey));
		// 获取表
		Table table = null;
		try {
			table = getTable(tableName);
			Result hTableResult = table.get(get);
			if (hTableResult != null && !hTableResult.isEmpty()) {
				for (Cell cell : hTableResult.listCells()) {
					result.put(
							Bytes.toString(cell.getQualifierArray(),
									cell.getQualifierOffset(),
									cell.getQualifierLength()),
							Bytes.toString(cell.getValueArray(),
									cell.getValueOffset(),
									cell.getValueLength()));
				}
			}
		} catch (IOException e) {
			log.error(MessageFormat.format(
					"查询一行的数据失败,tableName:{0},rowKey:{1}", tableName, rowKey), e);
		} finally {
			close(null, null, table);
		}

		return result;
	}

	/**
	 * 为表添加 or 更新数据
	 */
	public void putData(String tableName, String rowKey, String familyName,
			String[] columns, String[] values) {
		// 获取表
		Table table = null;
		try {
			table = getTable(tableName);

			putData(table, rowKey, tableName, familyName, columns, values);
		} catch (Exception e) {
			log.error(MessageFormat.format(
					"为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}",
					tableName, rowKey, familyName), e);
		} finally {
			close(null, null, table);
		}
	}
	
	private void putData(Table table, String rowKey, String tableName,
			String familyName, String[] columns, String[] values) {
		try {
			// 设置rowkey
			Put put = new Put(Bytes.toBytes(rowKey));

			if (columns != null && values != null
					&& columns.length == values.length) {
				for (int i = 0; i < columns.length; i++) {
					if (columns[i] != null && values[i] != null) {
						put.addColumn(Bytes.toBytes(familyName),
								Bytes.toBytes(columns[i]),
								Bytes.toBytes(values[i]));
					} else {
						throw new NullPointerException(MessageFormat.format(
								"列名和列数据都不能为空,column:{0},value:{1}", columns[i],
								values[i]));
					}
				}
			}

			table.put(put);
			log.debug("putData add or update data Success,rowKey:" + rowKey);
			table.close();
		} catch (Exception e) {
			log.error(MessageFormat.format(
					"为表添加 or 更新数据失败,tableName:{0},rowKey:{1},familyName:{2}",
					tableName, rowKey, familyName), e);
		}
	}

	/**
	 * 根据表名 获取table
	 * Used to communicate with a single HBase table.
	 * Table can be used to get, put, delete or scan data from a table.
	 */
	private Table getTable(String tableName) throws IOException {
		return connection.getTable(TableName.valueOf(tableName));
	}

	/**
	 * 关闭流
	 */
	private void close(Admin admin, ResultScanner rs, Table table) {
		if (admin != null) {
			try {
				admin.close();
			} catch (IOException e) {
				log.error("关闭Admin失败", e);
			}
		}

		if (rs != null) {
			rs.close();
		}

		if (table != null) {
			try {
				table.close();
			} catch (IOException e) {
				log.error("关闭Table失败", e);
			}
		}
	}
}

3、测试

HBaseTest.java

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class HBaseTest {

	@Autowired
	private HBaseService hbaseService;

	/**
	 * 测试删除、创建表
	 */
	@Test
	public void testCreateTable() {

		// 创建表
		hbaseService.creatTable("test_base", Arrays.asList("f", "back"));

		// 插入三条数据
		hbaseService.putData("test_base", "66804_000001", "f", new String[] {
				"project_id", "varName", "coefs", "pvalues", "tvalues",
				"create_time" }, new String[] { "40866", "mob_3", "0.9416",
				"0.0000", "12.2293", "null" });
		hbaseService.putData("test_base", "66804_000002", "f", new String[] {
				"project_id", "varName", "coefs", "pvalues", "tvalues",
				"create_time" }, new String[] { "40866", "idno_prov", "0.9317",
				"0.0000", "9.8679", "null" });
		hbaseService.putData("test_base", "66804_000003", "f", new String[] {
				"project_id", "varName", "coefs", "pvalues", "tvalues",
				"create_time" }, new String[] { "40866", "education", "0.8984",
				"0.0000", "25.5649", "null" });

		// 查询数据
		// 1. 根据rowKey查询
		Map<String, String> result1 = hbaseService.getRowData("test_base",
				"66804_000001");
		System.out.println("+++++++++++根据rowKey查询+++++++++++");
		result1.forEach((k, value) -> {
			System.out.println(k + "---" + value);
		});
		System.out.println();

		// 2. 遍历查询
		Map<String, Map<String, String>> result2 = hbaseService
				.getResultScanner("test_base");
		System.out.println("+++++++++++遍历查询+++++++++++");
		result2.forEach((k, value) -> {
			System.out.println(k + "---" + value);
		});
	}
}

shell命令行查看表:
list
eclipse运行结果:
查询结果
参考资料:https://www.zifangsky.cn/1286.html


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇python访问hbase数据 下一篇基于HBase的入库方案效率对比验证..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }