设为首页 加入收藏

TOP

hbase协处理器endpoint应用:hbase数据加盐(Salting)后的数据查询方法
2018-11-29 02:39:00 】 浏览:30
Tags:hbase 处理器 endpoint 应用 数据 加盐 Salting 后的 数据查询 方法

hbase协处理器endpoint应用:hbase数据加盐(Salting)后的数据查询方法

1 介绍

上一篇文章中介绍了hbase数据加盐的方法,并简单介绍了加盐后的数据查询思路,但没有给出具体的实现方法,本文则介绍一下具体用hbase协处理器endpoint的实现。

协处理器分两种类型,系统协处理器可以全局导入region server上的所有数据表,表协处理器即是用户可以指定一张表使用协处理器。协处理器框架为了更好支持其行为的灵活性,提供了两个不同方面的插件。一个是观察者(observer),类似于关系数据库的触发器。另一个是终端(endpoint),动态的终端有点像存储过程。本文介绍的实现是endpoint的应用。

2.实现

2.1 示例

首先看一下hbase给出的示例计算表的行数RowCountEndpoint.java,源代码在hbase-examplesorg.apache.hadoop.hbase.coprocessor.example

public voidgetRowCount(RpcController controller, ExampleProtos.CountRequest request,

RpcCallback<ExampleProtos.CountResponse> done) {

Scan scan= newScan();

scan.setFilter(new FirstKeyOnlyFilter());

ExampleProtos.CountResponse response = null;

InternalScanner scanner = null;

try{

scanner= env.getRegion().getScanner(scan);

List<Cell> results = newArrayList<Cell>();

booleanhasMore= false;

byte[]lastRow= null;

longcount= 0;

do{

hasMore = scanner.next(results);

for (Cell kv : results) {

byte[] currentRow = CellUtil.cloneRow(kv);

if (lastRow == null|| !Bytes.equals(lastRow, currentRow)){

lastRow = currentRow;

count++;

}

}

results.clear();

} while(hasMore);

response= ExampleProtos.CountResponse.newBuilder()

.setCount(count).build();

} catch(IOException ioe){

ResponseConverter.setControllerException(controller, ioe);

} finally{

if(scanner!= null){

try {

scanner.close();

} catch (IOException ignored) {}

}

}

done.run(response);

}

实现比较简单,region遍历所有的行返回行数,客户端再把所有的region行数相加即得到整个表的行数。

2.2 server实现

接下来给出仿照RowCountEndpoint实现hbase数据加盐(Salting)后的数据查询方法。

1)接口协议定义

由于hbase内部通信使用的protobuf协议,首先我们要生存协议类,如上面的ExampleProtos,定义自己要实现的协议类DataProtos

package generated;

optionjava_package="com.bigdata.coprocessor.endpoint.generated";

optionjava_outer_classname="DataProtos";

option java_generic_services = true;

option java_generate_equals_and_hash = true;

option optimize_for = SPEED;

message DataQueryRequest {

optional string tableName = 1;

optional string startRow = 2;

optional string endRow = 3;

optional string rowKey = 4;

optional bool incluedEnd = 5;

optional bool isSalting = 6;

}

message DataQueryResponse {

messageCell{

requiredbytes value = 1;

requiredbytes family = 2;

requiredbytes qualifier = 3;

requiredbytes row = 4;

}

message Row{

optionalbytes rowKey = 1;

repeatedCell cellList = 2;

}

repeated Row rowList = 1;

}

service QueryDataService{

rpcqueryByStartRowAndEndRow(DataQueryRequest)

returns (DataQueryResponse);

rpcqueryByRowKey(DataQueryRequest)

returns (DataQueryResponse);

}

里面定义了请求对象DataQueryRequest与响应对象BigDataQueryResponse,定义了一个服务DataService,服务里定义了两个方法,分别是根据起止行rowkey查询和根据单个rowkey查询,然后需要用protoc.exe生成对应的java实现类

执行命令protoc.exe DataProtos.proto --java_out=e:\hbase\protoc-2.4.1即可生成DataProtos.java,protoc.exe工具我也上传了,可以下载使用。

2)实现协处理器

server端代码

/*

*Licensed to the Apache Software Foundation (ASF) under one

* ormore contributor license agreements. Seethe NOTICE file

*distributed with this work for additional information

*regarding copyright ownership. The ASFlicenses this file

* toyou under the Apache License, Version 2.0 (the

*"License"); you may not use this file except in compliance

*with the License. You may obtain a copyof the License at

*

*http://www.apache.org/licenses/LICENSE-2.0

*

*Unless required by applicable law or agreed to in writing, software

*distributed under the License is distributed on an "AS IS" BASIS,

*WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

*See the License for the specific language governing permissions and

*limitations under the License.

*/

package com.bigdata.coprocessor.endpoint;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.Coprocessor;

importorg.apache.hadoop.hbase.CoprocessorEnvironment;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.Get;

importorg.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.coprocessor.CoprocessorException;

importorg.apache.hadoop.hbase.coprocessor.CoprocessorService;

importorg.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;

importorg.apache.hadoop.hbase.filter.Filter;

import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

importorg.apache.hadoop.hbase.filter.InclusiveStopFilter;

importorg.apache.hadoop.hbase.protobuf.ResponseConverter;

importorg.apache.hadoop.hbase.regionserver.HRegion;

importorg.apache.hadoop.hbase.regionserver.InternalScanner;

import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.ByteString;

import com.google.protobuf.RpcCallback;

import com.google.protobuf.RpcController;

import com.google.protobuf.Service;

importcom.bigdata.coprocessor.endpoint.generated.DataProtos;

importcom.bigdata.coprocessor.endpoint.generated.DataProtos.DataQueryRequest;

importcom.bigdata.coprocessor.endpoint.generated.DataProtos.DataQueryResponse;

public class QueryEndpoint extendsDataProtos.QueryDataService implements

Coprocessor,CoprocessorService {

privateRegionCoprocessorEnvironment env;

publicQueryEndpoint() {

}

/**

* Just returns a reference to this object,which implements the

* RowCounterService interface.

*/

@Override

publicService getService() {

returnthis;

}

/**

* Returns a count of the rows in the regionwhere this coprocessor is

* loaded.

*/

@Override

publicvoid queryByStartRowAndEndRow(RpcController controller,

DataProtos.DataQueryRequestrequest,

RpcCallback<DataProtos.DataQueryResponse>done) {

DataProtos.DataQueryResponseresponse = null;

InternalScannerscanner = null;

try{

StringstartRow = request.getStartRow();

StringendRow = request.getEndRow();

StringregionStartKey = Bytes.toString(this.env.getRegion()

.getRegionInfo().getStartKey());

StringregionEndKey = Bytes.toString(this.env.getRegion()

.getRegionInfo().getEndKey());

if(request.getIsSalting())// 如果加盐过则在key前添加盐值

{

StringstartSalt = null;

StringendSalt = null;

if(null != regionStartKey && !regionStartKey.isEmpty()) {

startSalt= regionStartKey.split("_")[0];// 加盐的方式为盐值+"_",所以取_前面的

}

//if(null != regionEndKey && !regionEndKey.isEmpty())

//{

//endSalt = regionStartKey.split("_")[0];//加盐的方式为盐值+"_",所以取_前面的

//

//}

if(null != startSalt) {

if(null != startRow) {

startRow= startSalt + "_" + startRow;

endRow= startSalt + "_" + endRow;

}

}

}

Scanscan = new Scan();

if(null != startRow) {

scan.setStartRow(Bytes.toBytes(startRow));

}

if(null != endRow) {

if(request.getIncluedEnd()) {

Filterfilter = new InclusiveStopFilter(

Bytes.toBytes(endRow));

scan.setFilter(filter);

}else {

scan.setStopRow(Bytes.toBytes(endRow));

}

}

scanner= this.env.getRegion().getScanner(scan);

List<Cell>results = new ArrayList<Cell>();

booleanhasMore = false;

DataProtos.DataQueryResponse.BuilderresponseBuilder = DataProtos.DataQueryResponse

.newBuilder();

do{

hasMore= scanner.next(results);

DataProtos.DataQueryResponse.Row.BuilderrowBuilder = DataProtos.DataQueryResponse.Row

.newBuilder();

if(null != results && results.size() > 0) {

rowBuilder.setRowKey(ByteString.copyFrom(results.get(0)

.getRow()));

for(Cell kv : results) {

DataProtos.DataQueryResponse.Cell.BuildercellBuilder = DataProtos.DataQueryResponse.Cell

.newBuilder();

cellBuilder.setFamily(ByteString.copyFrom(kv

.getFamily()));

cellBuilder.setQualifier(ByteString.copyFrom(kv

.getQualifier()));

cellBuilder.setRow(ByteString.copyFrom(kv.getRow()));

cellBuilder

.setValue(ByteString.copyFrom(kv.getValue()));

rowBuilder.addCellList(cellBuilder);

}

}

responseBuilder.addRowList(rowBuilder);

results.clear();

}while (hasMore);

response= responseBuilder.build();

}catch (IOException ignored) {

ResponseConverter.setControllerException(controller,ignored);

}finally {

if(scanner != null) {

try{

scanner.close();

}catch (IOException e) {

}

}

}

done.run(response);

}

@Override

publicvoid queryByRowKey(RpcController controller,

DataQueryRequestrequest, RpcCallback<DataQueryResponse> done) {

DataProtos.DataQueryResponseresponse = null;

InternalScannerscanner = null;

try{

StringrowKey = request.getRowKey();

StringregionStartKey = Bytes.toString(this.env.getRegion()

.getRegionInfo().getStartKey());

StringregionEndKey = Bytes.toString(this.env.getRegion()

.getRegionInfo().getEndKey());

if(request.getIsSalting())// 如果加盐过则在key前添加盐值

{

StringstartSalt = null;

StringendSalt = null;

if(null != regionStartKey && !regionStartKey.isEmpty()) {

startSalt= regionStartKey.split("_")[0];// 加盐的方式为盐值+"_",所以取_前面的

}

if(null != startSalt) {

if(null != rowKey) {

rowKey= startSalt + "_" + rowKey;

}

}

}

Getget = new Get(Bytes.toBytes(rowKey));

Resultresult = this.env.getRegion().get(get);

DataProtos.DataQueryResponse.BuilderresponseBuilder = DataProtos.DataQueryResponse

.newBuilder();

DataProtos.DataQueryResponse.Row.BuilderrowBuilder = DataProtos.DataQueryResponse.Row

.newBuilder();

if(null!= result && !result.isEmpty())

{

List<KeyValue>list = result.list();

if(null!= list && !list.isEmpty())

{

rowBuilder.setRowKey(ByteString.copyFrom(list.get(0)

.getRow()));

for(KeyValuekv : list)

{

DataProtos.DataQueryResponse.Cell.BuildercellBuilder = DataProtos.DataQueryResponse.Cell

.newBuilder();

cellBuilder.setFamily(ByteString.copyFrom(kv

.getFamily()));

cellBuilder.setQualifier(ByteString.copyFrom(kv

.getQualifier()));

cellBuilder.setRow(ByteString.copyFrom(kv.getRow()));

cellBuilder

.setValue(ByteString.copyFrom(kv.getValue()));

rowBuilder.addCellList(cellBuilder);

}

}

}

responseBuilder.addRowList(rowBuilder);

response= responseBuilder.build();

}catch (IOException ignored) {

ResponseConverter.setControllerException(controller,ignored);

}finally {

if(scanner != null) {

try{

scanner.close();

}catch (IOException e) {

}

}

}

done.run(response);

}

/**

* Stores a reference to the coprocessorenvironment provided by the

* {@linkorg.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from

* the region where this coprocessor is loaded.Since this is a coprocessor

* endpoint, it always expects to be loaded ona table region, so always

* expects this to be an instance of {@linkRegionCoprocessorEnvironment}.

*

* @param env

*the environment provided by the coprocessor host

* @throws IOException

*if the provided environment is not an instance of

*{@code RegionCoprocessorEnvironment}

*/

@Override

publicvoid start(CoprocessorEnvironment env) throws IOException {

if(env instanceof RegionCoprocessorEnvironment) {

this.env= (RegionCoprocessorEnvironment) env;

}else {

thrownew CoprocessorException("Must be loaded on a table region!");

}

}

@Override

publicvoid stop(CoprocessorEnvironment env) throws IOException {

//nothing to do

}

}

2.3 client 查询实现

package com.bigdata.coprocessor.client;

import java.io.IOException;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;

importorg.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

importorg.apache.hadoop.hbase.client.coprocessor.Batch;

importorg.apache.hadoop.hbase.ipc.BlockingRpcCallback;

import org.apache.hadoop.hbase.ipc.ServerRpcController;

importcom.bigdata.coprocessor.endpoint.generated.DataProtos;

import com.google.protobuf.ServiceException;

public class QueryDemo

{

protected static Configuration conf = null;

static

{

conf = HBaseConfiguration.create();

conf.set("hbase.zookeeper.quorum", "10.175.55.138,10.175.55.139,10.175.55.140");

}

static publicList<DataProtos.DataQueryResponse.Row> queryByStartRowAndStopRow(String tableName, String startRow, String stopRow, boolean isIncludeEnd, boolean isSalting)

{

final DataProtos.DataQueryRequest.Builder requestBuilder = DataProtos.DataQueryRequest.newBuilder();

requestBuilder.setTableName(tableName);

requestBuilder.setStartRow(stopRow);

requestBuilder.setEndRow(stopRow);

requestBuilder.setIncluedEnd(isIncludeEnd);

requestBuilder.setIsSalting(isSalting);

try

{

HTabletable = new HTable(HBaseConfiguration.create(conf), tableName);

Map<byte[],List<DataProtos.DataQueryResponse.Row>> result = table.coprocessorService(DataProtos.QueryDataService.class, null, null, newBatch.Call<DataProtos.QueryDataService,List<DataProtos.DataQueryResponse.Row>>(){

public List<DataProtos.DataQueryResponse.Row>call(DataProtos.QueryDataService counter) throws IOException

{

ServerRpcControllercontroller = new ServerRpcController();

BlockingRpcCallback<DataProtos.DataQueryResponse>rpcCallback = new BlockingRpcCallback<DataProtos.DataQueryResponse>();

counter.queryByStartRowAndEndRow(controller, requestBuilder.build(),rpcCallback);

DataProtos.DataQueryResponseresponse = rpcCallback.get();

if(controller.failedOnException())

{

throw controller.getFailedOn();

}

List<DataProtos.DataQueryResponse.Row>rows = response.getRowListList();

if(null != rows)

{

for(DataProtos.DataQueryResponse.Row row : rows)

{

// System.out.println(row.getRowKey().toStringUtf8());

}

}

return rows;

}

});

List<DataProtos.DataQueryResponse.Row>rets = newLinkedList<DataProtos.DataQueryResponse.Row>();

for (Map.Entry<byte[],List<DataProtos.DataQueryResponse.Row>> entry: result.entrySet())

{

if(null != entry.getValue())

{

rets.addAll(entry.getValue());

}

}

return rets;

}

catch(ServiceException e)

{

}

catch(Throwable e)

{

}

return null;

}

public static void main(String[] args) throws IOException, Throwable

{

List<DataProtos.DataQueryResponse.Row>rows = queryByStartRowAndStopRow("test", "00", "01", true, true);

if(null != rows)

{

for(DataProtos.DataQueryResponse.Row row : rows)

{

System.out.println(row.getRowKey().toStringUtf8());

}

}

}

}

3. 部署

3.1 打包

将代码打包名字随意取,比如打包为query.jar

3.2通过hbase-site.xml增加

<property>

<name>hbase.coprocessor.region.classes</name>

<value>xxxx.CounterEndPoint </value>

</property>

如果要配置多个,就用逗号(,)分割。

包含此类的jar必须位于hbase的classpath

这种coprocessor是作用于所有的表,如果你只想作用于部分表,请使用下面一种方式。

3.3通过shell方式

讲query.jar 上传到hdfs目录/coprocessor/下
增加:

hbase(main):005:0> alter 't1', METHOD => 'table_att',

'coprocessor'=>'hdfs:///coprocessor/query.jar| com.bigdata.coprocessor.endpoint. QueryEndpoint 1001|arg1=1,arg2=2'

Updating all regions with the new schema...

1/1 regions updated.

Done.

0 row(s) in 1.0730 seconds

coprocessor格式为:
[FilePath]|ClassName|Priority|arguments
arguments: k=v[,k=v]+

其中FilePath是hdfs路径,例如/coprocessor/query.jar

ClassNameEndPoint实现类的全名

Priority为,整数,框架会根据这个数据决定多个cp的执行顺序

Arguments,传给cp的参数

如果hbase的classpath包含改类,FilePath可以留空

3.4 卸载

先describe“tableName‘,查看你要卸载的cp的编号

然后alter 't1',METHOD => 'table_att_unset', NAME=> 'coprocessor$3',coprocessor$3可变。


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇关于spring集成hbase 下一篇HBase事务

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目