设为首页 加入收藏

TOP

Hbase分页查询代码实现
2018-11-28 17:32:19 】 浏览:65
Tags:Hbase 查询 代码 实现

全部写在一起了, 后期可以分离优化一下 各个方法

package com.ruif.hbase.dao;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;

import com.ruif.hbase.service.PageData;

public class asdasd {

	private final Logger logger = Logger.getLogger(getClass());

	private static Configuration conf;

	private static final String TABLE_NAME = "a1";

	private static HConnection conn = null;
	// 配置信息
	static {
		conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.property.clientPort", "2181");
		// conf.set("hbase.zookeeper.quorum", "data01,data02,data05");
		// conf.set("hbase.master", "192.168.1.50:8020");
		conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03");
		// hTablePool = new HTablePool(conf, 10);
		// 填充线程池
		// 方法2
	//	hbasePool = new HTablePool(conf, 10);
		try {
			conn = HConnectionManager.createConnection(conf);
		}
		catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}	
	public static void main(String[] args) {
	

	}

	/**
	 * 数据查询代码
	 * 
	 * @param tableName
	 *            表名
	 * @param startRow
	 *            起点key
	 * @param stopRow
	 *            结束key
	 * @param objKey
	 *            筛选id
	 * @param timespace
	 *            数据间隔(每几个数据取一个)
	 * @param currentPage
	 *            当前页
	 * @param pageSize
	 *            每页数量
	 * @return
	 * @throws IOException
	 */
	public PageData getDataMap(String tableName, String startRow, String stopRow, String objKey, Integer currentPage, Integer pageSize) throws IOException {

		List<Map<String, String>> mapList = null;
		mapList = new LinkedList<Map<String, String>>();

		ResultScanner scanner = null;
		// 为分页创建的封装类对象,下面有给出具体属性
		PageData tbData = null;
		try {

			// 计算起始页和结束页
			Integer firstPage = (currentPage - 1) * pageSize;

			Integer endPage = firstPage + pageSize;

			// 从表池中取出HBASE表对象
			HTableInterface table = conn.getTable(tableName);
			// 获取筛选对象
			Scan scan = getScan(startRow, stopRow);
			// 给筛选对象放入过滤器(true标识分页,具体方法在下面)
			// scan.setFilter(packageFilters(true));
			// ---------------添加过滤查询
			// if (!StringUtils.isBlank(objKey)) {
			// FilterList filterList = new FilterList();
			// List<String> arr = new ArrayList<String>();
			// arr.add("info,tag, " + objKey);
			// for (String v : arr) { //
			// String[] s = v.split(",");
			// filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(s[0]), Bytes.toBytes(s[1]), CompareOp.EQUAL, Bytes.toBytes(s[2])));
			// scan.setFilter(filterList);
			// }
			// }
			if (!StringUtils.isBlank(objKey)) {// key最后的值=objKey
				Filter filterList = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(objKey));

				scan.setFilter(filterList);
			}

			// ----------------添加过滤查询
			// 缓存1000条数据
			scan.setCaching(10000);
			scan.setCacheBlocks(false);
			scanner = table.getScanner(scan);
			int i = 0;
			List<byte[]> rowList = new LinkedList<byte[]>();
			// 遍历扫描器对象, 并将需要查询出来的数据row key取出
			for (Result result : scanner) {
				String row = toStr(result.getRow());
				if (i >= firstPage && i < endPage) {
					System.out.println(row);
					rowList.add(getBytes(row));
				}
				i++;
			}

			// 获取取出的row key的GET对象
			List<Get> getList = getList(rowList);
			Result[] results = table.get(getList);
			// 遍历结果
			for (Result result : results) {
				Map<byte[], byte[]> fmap = packFamilyMap(result);
				Map<String, String> rmap = packRowMap(fmap, result);
				mapList.add(rmap);
			}

			// 封装分页对象
			tbData = new PageData();
			tbData.setCurrentPage(currentPage);
			tbData.setPageSize(pageSize);
			tbData.setTotalCount(i);
			tbData.setTotalPage(getTotalPage(pageSize, i));
			tbData.setResultList(mapList);
		}
		catch (IOException e) {
			e.printStackTrace();
		}
		finally {
			closeScanner(scanner);
		}

		return tbData;

	}

	private static int getTotalPage(int pageSize, int totalCount) {
		int n = totalCount / pageSize;
		if (totalCount % pageSize == 0) {
			return n;
		} else {
			return ((int) n) + 1;
		}

	}

	// 获取扫描器对象
	private static Scan getScan(String startRow, String stopRow) {
		Scan scan = new Scan();
		scan.setStartRow(getBytes(startRow));
		scan.setStopRow(getBytes(stopRow));

		return scan;
	}

	/**
	 * 封装查询条件
	 */
	private static FilterList packageFilters(boolean isPage) {
		FilterList filterList = null;
		// MUST_PASS_ALL(条件 AND) MUST_PASS_ONE(条件OR)
		filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
		Filter filter1 = null;
		Filter filter2 = null;
		filter1 = newFilter(getBytes("family1"), getBytes("column1"), CompareOp.EQUAL, getBytes("condition1"));
		filter2 = newFilter(getBytes("family2"), getBytes("column1"), CompareOp.LESS, getBytes("condition2"));
		filterList.addFilter(filter1);
		filterList.addFilter(filter2);
		if (isPage) {
			filterList.addFilter(new FirstKeyOnlyFilter());
		}
		return filterList;
	}

	private static Filter newFilter(byte[] f, byte[] c, CompareOp op, byte[] v) {
		return new SingleColumnValueFilter(f, c, op, v);
	}

	private static void closeScanner(ResultScanner scanner) {
		if (scanner != null)
			scanner.close();
	}

	/**
	 * 封装每行数据
	 *
	 * @param result
	 */
	private static Map<String, String> packRowMap(Map<byte[], byte[]> dataMap, Result result) {
		Map<String, String> map = new LinkedHashMap<String, String>();

		for (byte[] key : dataMap.keySet()) {

			byte[] value = dataMap.get(key);

			map.put(toStr(key), toStr(value));

		}
		map.put("key", Bytes.toString(result.getRow()));
		return map;
	}

	/* 根据ROW KEY集合获取GET对象集合 */
	private static List<Get> getList(List<byte[]> rowList) {
		List<Get> list = new LinkedList<Get>();
		for (byte[] row : rowList) {
			Get get = new Get(row);

			get.addColumn(getBytes("info"), getBytes("tag"));
			get.addColumn(getBytes("info"), getBytes("timestamp"));
			get.addColumn(getBytes("info"), getBytes("value"));
			list.add(get);
		}
		return list;
	}

	/**
	 * 封装配置的所有字段列族
	 */
	private static Map<byte[], byte[]> packFamilyMap(Result result) {
		Map<byte[], byte[]> dataMap = null;
		dataMap = new LinkedHashMap<byte[], byte[]>();
		dataMap.putAll(result.getFamilyMap(getBytes("info")));
		// dataMap.putAll(result.getFamilyMap(getBytes("timestamp")));
		// dataMap.putAll(result.getFamilyMap(getBytes("value")));
		return dataMap;
	}

	private static String toStr(byte[] bt) {
		return Bytes.toString(bt);
	}

	/**
	 * 根据Key查询所有的值
	 * 
	 * @param getList
	 * @return
	 * @throws IOException
	 */
	public Result[] getList(List<Get> getList, String tableName) throws IOException {
		// 从表池中取出HBASE表对象
		HTableInterface table = conn.getTable(tableName);
		return table.get(getList);
	}
	
	/* 转换byte数组 */
	public static byte[] getBytes(String str) {
		if (str == null)
			str = "";

		return Bytes.toBytes(str);
	}

}


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇如何用java操作hbase数据库(增,.. 下一篇HBase 健康检查工具

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目