设为首页 加入收藏

TOP

HBase分页查询---指定PageNumber和PageSize
2019-03-29 01:44:26 】 浏览:198
Tags:HBase 查询 --- 指定 PageNumber PageSize

问题:通过指定的页码和每页记录的条数来进行分页查询。

分析问题:

要实现分页查询,因为查询的是一个范围,所以使用Scan的查询方式。然后还使用PageFilter,这样能够获取指定数量的记录。

那么问题来了,要使用scan的方式进行区间查询那么startRow是什么???

所以现在将问题转换成了求每一页的startRow !!!

当我们使用scan查询又没有指定StartRow的时候,结果是什么样的呢???

很明显,结果是从第一条记录开始的。

OK,可以确定第一页的StartRow了不是吗 这个StartRow就是 null

那么问题是不是有思路了?

通过给定的PageNumber进行循环遍历,以获取指定页的StartRow。

通过上一页的EndRow来获取当前页的StartRow。大体有两种实现方式(这两种方式根本的思路都是通过从第一页循环遍历知道获取到上一页的EndRow为止):

1、通过把上一页的最后一条记录的EndRow向后移动一点点的位置,比如上一页最后一条记录的RowKey是user_info_0001,那么将当前页的StartRow可不可以设置成user_info_00010000000000000000呢??? 这样是不是就可以查当前页的数据了,而且设置的这个RowKey和上一页最后一个RowKey非常接近,视乎不会遗漏数据。

2、当然,会有人说不怕一万就怕万一,万一要是有遗漏的数据呢? OK,你赢了 那就使用第二种方式,我们在遍历的时候每次查询 PageSize + 1 条记录,这样每次查询的EndRow就是下一页的StartRow不是吗? OK, 问题解决了,话不多说 直接上代码。

import java.util.Iterator;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

public class GetDataByPage {
	private static Configuration conf = null;
	private static Connection conn = null;
	private static HTable table = null;
	private static TableName tn = null;
	private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
	private static final String ZK_CONNECT_VALUE = "hadoop2:2181,hadoop3:2181,hadoop4:2181";
	
	static{
		try {
			conf = HBaseConfiguration.create();
			conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE);
			conn = ConnectionFactory.createConnection(conf);
			tn = TableName.valueOf("users");
			table = (HTable) conn.getTable(tn);
			
			
		} catch (Exception e) {
			System.err.println("初始化参数失败");
		}
	}
	
	@Test
	public void tests() throws Exception{
		ResultScanner pageData = null;
		pageData = getPageData(0,20);
		if(pageData != null)
			printResultScanner(pageData);
	}
	
	/**
	 * 通过传入页码数和每一页的记录条数来获取当页的数据
	 * @param page
	 * @param pagesize
	 * @return
	 * @throws Exception 
	 */
	public ResultScanner getPageData(int page, int pagesize) throws Exception{
		
		//如果每一页的记录数在少或者太多都设置为一个标准值 5
		if(pagesize < 3 || pagesize > 15) pagesize = 5;

		String startRowkey = getStartRowKey(page,pagesize);
		ResultScanner pagedate = getDate(startRowkey,pagesize);
		return pagedate;
	}
	
	/**
	 * 获取指定页的startRowKey
	 * @param page
	 * @param pagesize
	 * @return
	 * @throws Exception 
	 */
	public String getStartRowKey(int page, int pagesize) throws Exception{
		
		//如果page不标准,比如是0 或者-1那么就按照第一页处理
		if(page <= 1){
			return null;
		} else{
			//解决思路 我们每页获取pagesize+1条记录
			//每次遍历记录下最后一条记录的rowkey,就是下一页的开始rowkey
			String startRowKey = null;
			
			for (int i = 1; i < page; i++) {
				
				ResultScanner date = getDate(startRowKey,pagesize+1);
				Iterator<Result> iterator = date.iterator();
				Result next = null;
				while (iterator.hasNext()) {
					 next = iterator.next();
				}
				
				startRowKey = Bytes.toString(next.getRow());
			}
			
			return startRowKey;
		}
	}
	
	
	/**
	 * 通过pageFilter获取指定页的数据
	 * @param startRowKey
	 * @param pagesize
	 * @return
	 * @throws Exception
	 */
	public ResultScanner getDate(String startRowKey,int pagesize) throws Exception{
		
		Scan scan = new Scan();
		if(!StringUtils.isBlank(startRowKey)){
			scan.setStartRow(startRowKey.getBytes());
		}
		
		Filter pageFilter = new PageFilter(pagesize);
		
		scan.setFilter(pageFilter);
		
		ResultScanner scanner = table.getScanner(scan);
		
		return scanner;
	}
	
	/**
	 *  打印结果集
	 */
	public static void printResultScanner(ResultScanner resultScann) {
		for (Result result : resultScann) {
			printResult(result);
		}
	}
	
	public static void printResult(Result result) {
		List<Cell> cells = result.listCells();
		for (int i = 0; i < cells.size(); i++) {
			Cell cell = cells.get(i);
			printCell(cell);
		}
	}
	
	public static void printCell(Cell cell) {
		System.out.println(Bytes.toString(cell.getRow()) + "\t" + Bytes.toString(cell.getFamily()) + "\t" + Bytes.toString(cell.getQualifier())
				+ "\t" + Bytes.toString(cell.getValue()) + "\t" + cell.getTimestamp());
	}
	
	public static void printKeyValye(KeyValue kv) {
		System.out.println(Bytes.toString(kv.getRow()) + "\t" + Bytes.toString(kv.getFamily()) + "\t" + Bytes.toString(kv.getQualifier()) + "\t"
				+ Bytes.toString(kv.getValue()) + "\t" + kv.getTimestamp());
	}
	
}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hbase regionserver的region到300.. 下一篇MapReduce统计结果输出到hbase

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目