设为首页 加入收藏

TOP

HBase如何存取多个版本的值
2019-02-09 01:48:43 】 浏览:9
Tags:HBase 如何 存取 多个 版本

HBase如何存取多个版本的值?

废话少说,一般情况下使用Put的这个方法保存一个版本:

/**
   * Add the specified column and value to this Put operation.
   * @param family family name
   * @param qualifier column qualifier
   * @param value column value
   */
  public Put add(byte [] family, byte [] qualifier, byte [] value) {
    return add(family, qualifier, this.timestamp, value);
  }

然后获取HTable的实例,调用Put方法,更新入库。

这里我要刻意说明一下时间戳 ,可以看到上面的add方面实现里面调用了另外一个方法,增加了个参数:默认时间戳

this.timestamp

它的值是:

  private long timestamp = HConstants.LATEST_TIMESTAMP;

  static final long LATEST_TIMESTAMP = Long.MAX_VALUE;

那么获取的时候,获取一个值 也很简单,使用下面的方法:

创建一个Put的实例
/**
   * Create a Get operation for the specified row.
   * <p>
   * If no further operations are done, this will get the latest version of
   * all columns in all families of the specified row.
   * @param row row key
   */
  public Get(byte [] row) {
    this(row, null);
  }

然后获取HTable实例调用方法 
public Result get(final Get get) throws IOException

返回的Result对象 调用 
 public byte [] getValue(byte [] family, byte [] qualifier)

就可以获取值。

一般的应用可以像上面这样来解决,服务器正常的话不会碰到什么问题,可是要存取多个版本 的话,有可能你会遇到我遇到的问题。 看下面一段代码是否有问题

(里面有如何插入多个版本 ,如何获取多个版本 的方法,顺带说明一个问题)

/**
 * Test get value of multi versions
 * 
 * @author dev-cjj
 * 
 */
public class GetRowVersionsTest extends TestCase
{
    private final byte[] family     = Bytes.toBytes("log");

    private final byte[] qualifier  = Bytes.toBytes("q1");

    private final byte[] qualifier2 = Bytes.toBytes("q2");

    private final byte[] rowKey     = Bytes.toBytes(10001);

    private final HTable table      = IDMHBaseConfiguration.getTable("testTable");

    private final long   ts1        = 1298529542218L;

    private final long   ts2        = ts1 + 100;

    private final long   ts3        = ts1 + 1000;

    private final long   ts4        = ts1 + 2000;

    private final long   ts5        = ts1 + 3000;

    private final byte[] value1     = Bytes.toBytes("value1");

    private final byte[] value2     = Bytes.toBytes("value2");

    private final byte[] value3     = Bytes.toBytes("value3");

    private final byte[] value4     = Bytes.toBytes("value4");

    private final byte[] value5     = Bytes.toBytes("value5");

    private void insert(final long ts, final byte[] value) throws IOException
    {
        //        table.setAutoFlush(false);
        final Put put = new Put(rowKey);
        put.add(family, qualifier, ts, value);
        put.add(family, qualifier2, ts, value);
        table.put(put);
    }

    private void sleep()
    {
        try
        {
            Thread.sleep(1000);
        }
        catch (final InterruptedException e)
        {
            e.printStackTrace();
        }
    }

    @Test
    public void testGetRowMultipleVersions() throws Exception
    {
        insert(ts1, value1);
        sleep();
        insert(ts2, value2);
        sleep();
        insert(ts3, value3);
        sleep();
        insert(ts4, value4);
        sleep();
        insert(ts5, value5);
        sleep();

        // check getRow with multiple versions
        final Get get = new Get(rowKey);
        get.setMaxVersions();
        final Result r = table.get(get);

        // one way get values of all version
        //        final List<KeyValue> list = r.list();
        //        for (final KeyValue kv : list)
        //        {
        //            System.err.println(Bytes.toString(kv.getKey()));
        //            System.err.println(kv.getTimestamp());
        //            System.err.println(Bytes.toString(kv.getValue()));
        //        }

        // another way get values of all version
        final NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = r.getMap();
        final NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(family);
        final NavigableMap<Long, byte[]> versionMap = familyMap.get(qualifier);
        for (final Map.Entry<Long, byte[]> entry : versionMap.entrySet())
        {
            System.err.println(Bytes.toString(qualifier) + " -> " + Bytes.toString(entry.getValue()));
        }

        final NavigableMap<Long, byte[]> versionMap2 = familyMap.get(qualifier2);
        for (final Map.Entry<Long, byte[]> entry : versionMap2.entrySet())
        {
            System.err.println(Bytes.toString(qualifier2) + " -> " + Bytes.toString(entry.getValue()));
        }

        //        assertTrue(versionMap.size() == 5);

        //        assertTrue(value1 == versionMap.get(ts1));
        //        assertTrue(value2 == versionMap.get(ts2));
        //        assertTrue(value3 == versionMap.get(ts3));

        //        table.delete(new Delete(rowKey));

        //        assertTrue(table.get(get).size() == 0);
        //        table.close();
    }
}

这里我只是打印输出结果 ,不用断言, 结果如你所期望的是:

q1 -> value5
q1 -> value4
q1 -> value3
q1 -> value2
q1 -> value1
q2 -> value5
q2 -> value4
q2 -> value3
q2 -> value2
q2 -> value1

上面的代码中,我注视掉了一个关键行

        //        table.delete(new Delete(rowKey));

如果取消注释,再运行几次你会发现一个大问题! 保存不上了,一个版本都没有。

final Result r = table.get(get); 里面什么都不会有! 很奇怪!

问题原因 (针对上面的代码和问题):

1、插入的时候使用的时间戳都是过去的时间戳。

2、删除的时候没有指定时间戳(如果没有指定则默认一个当前的时间戳)

3、在HBase中删除操作并没有删除对应的文件,只是做一个带有时间戳的删除标记(任何在这个时间戳之前的列值都会被认为是删除的)

那么知道上面三条规则,问题就简单了,新插入的列值的时间戳要在删除标记的时间戳之后,才会被认为是不被删除的列值。

补充一点,创建/添加列族时候默认是三个版本,可以修改为1个版本或者更多个版本,我上面5个版本是因为我指定了5个版本。

{NAME => 'testTable', FAMILIES => [{NAME => 'log', COMPRESSION => 'NON true                      
 E', VERSIONS => '5', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEM                           
 ORY => 'false', BLOCKCACHE => 'true'}]}  


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala spark hbase 操作案例 下一篇SparkSQL+Hbase+HDFS实现SQL完全..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }