在Hbase Endpoint Coprocessor中使用coprocessor Proxy操作例子与问题解析(一)

2015-07-24 09:23:36 · 作者: · 浏览: 2

一、先说注意事项吧:

1、Coprocessor启动有三种方式:配置文件、shell和程序中指定,我使用的是程序指定:

?

    static {
        EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
        HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
        family.setInMemory(true);
        family.setMaxVersions(1);
        EP_TABLE_DISCRIPTOR.addFamily(family);
        try {
            EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
        } catch (IOException ioe) {

        }

上段代码中的addCoprocessor就是指定该表启动coprocessor操作。但前提是 必须重启HBase才能把jar包载入进来。

?

2、如果客户端连接后出现如下问题:No matching handler **** for protocol in *** region,说明jar包还没有载入到HBaes中,确保HBase已经重启,另外检查代码中addCoprocessor("ict.wde.test.RowCountServer");的类名“RowCountServer”是否写正确了

二、说下步骤

2.1编写服务端代码:

1)接口类(固定格式)

?

package ict.wde.test;

import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;

import java.io.File;
import java.io.IOException;

/**
 * Created by Michael on 2015/6/22.
 */
public interface RowCountProtocol extends Coprocessor, CoprocessorProtocol {

    public long getRowCount() throws IOException;

    public long getRowCount(Filter filter) throws IOException;

    public String getStr() throws IOException;

    //public long getKeyValue() throws IOException;
}
2)真正起作用的类
package ict.wde.test;

import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;

import java.io.IOException;

/**
 * Created by Michael on 2015/6/27.
 */
public class RowCountServer implements RowCountProtocol {

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {

    }

    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {

    }

    @Override
    public ProtocolSignature getProtocolSignature(String protocol,
                                                  long clientVersion, int clientMethodsHash) throws IOException {
        return new ProtocolSignature(3, null);
    }

    @Override
    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
        return 3;
    }

    @Override
    public long getRowCount() throws IOException {
        return this.getRowCount(new FirstKeyOnlyFilter());
    }

    @Override
    public long getRowCount(Filter filter) throws IOException {
        return this.getRowCount(filter, false);
    }

    @Override
    public String getStr() throws IOException {
        String name = "Hello Doctor Michael Zhang, again!";
        return name;
    }

//    @Override
//    public long getKeyValueCount() {
//        return 0;
//    }

    public long getRowCount(Filter filter, boolean countKeyValue) throws IOException {
        Scan scan = new Scan();
        scan.setMaxVersions(1);
        if (filter != null) {
            scan.setFilter(filter);
        }

        return 1;
    }

}

上述两个类打包jar后放入hbase的lib目录下

?

2.2客户端代码

?

import ict.wde.test.RowCountProtocol;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.clien