设为首页 加入收藏

TOP

HBase协处理器入门示例(1.2.0-cdh5.8.0版本)
2019-02-19 13:47:00 】 浏览:128
Tags:HBase 处理器 入门 示例 1.2.0-cdh5.8.0 版本

出自《HBase不睡觉书》,转载请注明出处。

当我们使用传统的关系型数据库的时候,如果有一些作业对性能要求比较高,并且需求不会经常变动,我们往往会采用存储过程来实现;还有一些作业我们需要当数据达到某种条件的时候自动触发,我们往往会采用触发器来实现。在HBase中也有类似存储过程和触发器的功能,它叫协处理器)(0.92版本之后)。

HBase的协处理器涵盖了两种类似关系型数据库中的应用场景:存储过程和触发器。协处理器也分两种:用来实现存储过程的中断程序(EndPoint)和用来实现触发器功能的观察者(Obsevers)。
=================================================
需求:

当插入数据时,如果遇到单元格为mycf:name=JACK,则在mycf:message这个列插入一句话:Hello World! Welcome back!。这个需求在关系型数据库中使用触发器来实现。在HBase中我们使用协处理器来实现。
step1
新建maven项目,打包方式选择jar。原生版本只需添加hbase-server依赖(不要同时添加hbase-client依赖)。而cdh版本需要同时添加hbase-server、hbase-client,hbase-common以及guava包。
pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.idayan</groupId>
  <artifactId>coprocessor</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>coprocessor</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
      <version>1.2.0-cdh5.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>1.2.0-cdh5.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-common</artifactId>
      <version>1.2.0-cdh5.8.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>12.0.1</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

step 2

新建类:HelloWorldObserver虽然所有的协处理器都要实现接口Coprocessor,但是实现接口需要实现很多不必要的方法。所以我选择直接继承BaseRegionObserver类,这样很多方法都有了基本你的实现,我们只需要重写需要的方法即可。

step 3

编码:

HelloWorldObserver.java

package com.idayan;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.List;

/**
 * @author: xianghu.wang
 * @Date: 2018/3/15
 * @Description:
 */
public class HelloWorldObserver extends BaseRegionObserver{

    public static final String JACK = "JACK";

    /**
     * 重写prePut方法。
     * 这个方法会在Put动作之前进行操作
     * @param e
     * @param put
     * @param edit
     * @param durability
     * @throws IOException
     */
    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e,
                       Put put, WALEdit edit, Durability durability) throws IOException {

        //获取mycf:name的单元格
        List<Cell> name = put.get(Bytes.toBytes("mycf"),Bytes.toBytes("name"));

        //如果该put中不存在mycf:name,则不作操作,这个单元格直接返回
        if (name == null || name.size() == 0) {
            return;
        }

        //如果该put中存在mycf:name,则判断mycf:name是否为JACK
        if (JACK.equals(Bytes.toString(CellUtil.cloneva lue(name.get(0))))) {
            //如果mycf:name是JACK,则在mycf:message中添加一句话
            put.addColumn(Bytes.toBytes("mycf"),Bytes.toBytes("message"),Bytes.toBytes("Hello World! Welcome back !"));
        }

    }
}

step 4
打包,放到HDFS上。我的jar包名是coprocessor-1.0-SNAPSHOT.jar,上传到HDFS目录下。

[maxiu@idayan00 hadoop-2.6.0-cdh5.8.0]$ bin/hdfs dfs -put /home/maxiu/myData/coprocessor-1.0-SNAPSHOT.jar /user/maxiu
18/03/15 11:09:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[maxiu@idayan00 hadoop-2.6.0-cdh5.8.0]$ bin/hdfs dfs -ls /user/maxiu
18/03/15 11:10:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
-rw-r--r--   1 maxiu supergroup       2844 2018-03-15 11:09 /user/maxiu/coprocessor-1.0-SNAPSHOT.jar

我的jar:链接:https://pan.baidu.com/s/1vUw66lCjypE0EO8O3aO-5Q 密码:8wdm
step5
在HBase中启用这个观察者,登录服务器,使用hbase shell 执行以下命令:

hbase(main):002:0> create 'mytable','mycf' //创建表
0 row(s) in 1.5240 seconds

=> Hbase::Table - mytable
hbase(main):003:0> alter 'mytable' , METHOD =>'table_att','coprocessor'=>'hdfs://idayan00//user/maxiu/coprocessor-1.0-SNAPSHOT.jar|com.idayan.HelloWorldObserver||'
Updating all regions with the new schema... //启用协处理器
1/1 regions updated.
Done.
0 row(s) in 2.5410 seconds

hbase(main):004:0> desc 'mytable' //查看表属性
Table mytable is ENABLED                                                                                                                                                                                                                                                      
mytable, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://idayan00//user/maxiu/coprocessor-1.0-SNAPSHOT.jar|com.idayan.HelloWorldObserver||'}                                                                                                                                   
COLUMN FAMILIES DESCRIPTION                                                                                                                                                                                                                                                   
{NAME => 'mycf', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'
}                                                                                                                                                                                                                                                                             
1 row(s) in 0.0490 seconds

hbase(main):005:0>
  • table_att是固定词组,意思就是调用setValue()方法给表设置属性
  • 属性名为 coprocessor,就是协处理器的意思
  • 属性值为<包所在路径>|<协处理器类全名>||,并且中间不能有空格。

如果想删除该协处理器可以执行以下命令:

hbase(main):005:0> alter 'mytable',METHOD =>'table_att_unset',NAME => 'coprocessor$1'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 2.5900 seconds

测试协处理器

现在尝试用Java API来添加包含有mycf:name=JACK的数据

Put put = new Put(Bytes.toBytes("row1"));

put.addColumn(Bytes.toBytes("mycf"),Bytes.toBytes("name"),Bytes.toBytes("JACK"));

table.put(put);

查看结果:

通过hbase shell 添加数据:

put 'mytable','row2','mycf:name','JACK'


可以看到通过java api 和hbase shell添加数据 协处理器都生效了。

下面我们再添加一条名字不是JACK的

put 'mytable','row3','mycf:name','MAXIU'


可以看到第三行只有一条记录,并没有在message那一列添加数据。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HBase学习之五:HBase的RowKey设计.. 下一篇位图快速转化成区域

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目