设为首页 加入收藏

TOP

HBase的Scan实现源码分析
2019-03-05 01:41:43 】 浏览:72
Tags:HBase Scan 实现 源码 分析
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/bryce123phy/article/details/52638988
我们从接口InternalScanner开始分析,实现该接口的类表示其是使用于HBase内部的scanner,不暴露给客户端使用。实现了这个接口的类如下所示:
KeyValueHeap、StoreScanner、RegionScanner

下面再看KeyValueScanner,KeyValueScanner也是一个接口,它是一个可以向外迭代出KeyValue的scanner。它定义的主要方法包括了peek()、next()、seek(KeyValue key)等等,其中next和peek都能获取scanner中的下一个KeyValue,但是next会移动iterator,而peek不会,而seek就是将iterator定位到指定的KeyValue,如果不存在该KeyValue则定位到其后面的那个KeyValue,在scanner初始化的时候会调用下seek接口。

需要注意的是KeyValueScanner是可以排序的,其大小由peek()获取到KeyValue的大小决定,即如果KeyValueScanner1.peek() < KeyValueScanner2.peek(),则KeyValueScanner1 < KeyValueScanner2。KeyValue的大小比较依字典序进行,比较的优先级依次是RowKey cf+cq timestamp type,当比较两个keyValue时,先比较RowKey的大小('row1' < 'row2'),相同的情况下比较cf+cq的大小('cf1:q1' < 'cf2:q1' < 'cf2:q2'),还相同则比较时间戳,时间戳值越大则数据越新,在队列中的位置越靠前。最后比较TYPE('DeleteFamily' < 'DeleteColumn' < 'Delete' < 'Put')。

再看第三个关键的类KeyValueHeap,该类实现了上述两个接口,并且包含了三个主要的成员变量,分别是由KeyValueScanner组成的堆heap(优先级队列,内部实现就是堆),heap的堆顶元素current(注意该元素是个KeyValueScanner),以及用于比较KeyValueScanner元素的comparator,comparator的比较方法同上文所述,即取KeyValueScanner的堆顶元素做cell比较。

接着是KeyValueHeap中的几个重要方法,首先是peek和next,他们都是返回堆顶元素(实际上是堆顶KeyValueScanner的堆顶Cell),不同在于next会将堆顶元素出堆,并重新调整堆,对外来说就是迭代器向前移动了,而peek()不会将堆顶出堆,堆顶不变。peek和next返回的都是指向KeyValueHeap堆顶scanner的元素current的堆顶cell,如下是KeyValueHeap中的peek实现,可见其调用KeyValueScanner(current指向)的peek方法。

public Cell peek() {
   if (this.current == null) {
      return null;
   }
   return this.current.peek();
}

讲完了上述三个重要的数据结构,回归到hbase系统,HBase的表数据分为多个层次,分别是HRegion->HStore->[HFile,HFile,....,MemStoreFile]。一个表首先会水平分片形成多个HRegion,一个HRegion内不同的Column Family对应着不同的HStore,一个HStore下包含多个HFile和一个Memstore,数据写入时先写入MemstoreFile,MemStoreFile会不断刷新形成新的HFile。

复杂的数据结构形成了复杂的Scanner,在一个scan流程中,会形成如下描述的scanner对象:每个region的数据读取由一个RegionScanner对象负责,RegionScanner有一个scanner的优先队列,里面放的是storeScanner(这个优先队列在实现上是由多个StoreScanner组成的堆,使用RegionScanner的成员变量KeyValueHeap storeHeap来表示)。

每个StoreScanner对象对应着一个Column Family内的数据读取,其也有一个KeyValueHeap类型的成员变量heap,保存的是隶属于该store的MemStoreScanner和StoreFileScanner。

storeHeap&StoreScanner的构造代码如下所示,在RegionScannerImpl中,会遍历该Region下的所有store,并针对每个store建立对应的StoreScanner。

for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
          scan.getFamilyMap().entrySet()) {         //遍历该region下的各store
        Store store = stores.get(entry.getKey());
        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);  //new一个该store的StoreScanner
        if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
          || this.filter.isFamilyEssential(entry.getKey())) {
          scanners.add(scanner);        //将不同的StoreScanner归入不同的scanner list中
        } else {
          joinedScanners.add(scanner);
        }
}
initializeKVHeap(scanners, joinedScanners, region);   //然后用这些StoreScanner初始化一个KeyValueHeap

store.getScanner就是针对每个store创建一个StoreScanner,最后一步的initializeKVHeap则将这些StoreScanner构建成一个堆保存在RegionScanner的成员变量storeHeap中,用于遍历取该region下所有store中的数据,而storeScanner同样是一个由FileScanner组成的heap,其在store.getScanner中完成对每个store的storeScanner构造,构造函数中的关键两步如下所示:

// Pass columns to try to filter out unnecessary StoreFiles.
    List<KeyValueScanner> scanners = getScannersNoCompaction();  //返回该Store下对应的MemStore/StoreFile Scanner

    // Seek all scanners to the start of the Row (or if the exact matching row
    // key does not exist, then to the start of the next matching Row).
    // Always check bloom filter to optimize the top row seek for delete
    // family marker.
    seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
        && lazySeekEnabledGlobally, isParallelSeekEnabled);     //对这些StoreFileScanner和MemStoreScanner分别进行seek
                                                                //seekKey是matcher.getStartKey()

getScannersNoCompaction()返回这个Store下包含的HFileScanner和memstoreScanner,保存在scanners中。

seekScanners就是在memstore或者hfile中定位到指定的keyValue(通常是scan时startKey&endKey指定的keyValue),如果指定的keyValue不存在,则seek到指定keyValue的下一个元素。实际实现时这里采用了lazy seek优化,优化的目的是为了不需要对所有的HFile进行seek寻找目标keyValue,而只需对keyValue真实存在的HFile进行seek。

典型的客户端发起scan请求的代码如下所示:

Scan scan = new Scan();
scan.setStartRow(........);
scan.setStopRow(........);
Result result;
try (ResultScanner rs = table.getScanner(scan)) {
   while ((result=rs.next()) != null) {
       //your code here
   }
}

进入上述代码的getScanner方法,会发现其new一个ClientScanner对象,该对象包含了用户传入的Scan对象以及缓存、连接、重试次数、表名、region信息等参数。ClientScanner构造函数的最后调用initializeScannerInConstruction(),这个函数实际上包装了一个如下的调用:
nextScanner(this.caching, false);
其中,this.caching是一个int型变量,表示一次scan的rpc请求返回的结果数量,返回结果保存在客户端的cache中。

进入nextScanner函数,其首先检查是否已scan至表尾,如果已scan至表尾则关闭scan并返回false给客户端,否则更新localStartKey作为本次scan的开始位置,并将输入参数this.caching赋值给nbRows以表示本次scan返回的数据量,以上述两个变量作为参数调用getScannerCallable方法,该方法会返回一个ScannerCallableWithReplicas类型的对象callable,接着调用callable对象中的call方法向服务端发起一次rpc调用,调用路径如下:
ScannerCallableWithReplicas.call->ScannerCallable.call
服务调用是通过构造一个ScanRequest类型的对象request,并将其发往服务端来实现的,核心代码如下:
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
controller = controllerFactory.newController();
response = getStub().scan(controller, request);
这里需要注意的是构造request时包装了一个long型的变量nextCallSeq。

至此客户端发起scan请求的流程结束,下面介绍服务端是如何处理这些scan请求,并与前面的知识建立起联系。

客户端调用的getStub().scan向服务端发起了一次scan的rpc请求,服务端scan的实现在RSRpcServices中,首先其申请一个租约Lease.lease,用于客户端和服务端之间的心跳连接,然后对照客户端和服务端的nextCallSeq字段(目的是保证客户端顺序得到所有数据而不漏),代码如下:

if (request.hasNextCallSeq()) {
          if (rsh == null) {
            rsh = scanners.get(scannerName);
          }
          if (rsh != null) {
            if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
              throw new OutOfOrderScannerNextException(
                "Expected nextCallSeq: " + rsh.getNextCallSeq()
                + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
                "; request=" + TextFormat.shortDebugString(request));
            }
            // Increment the nextCallSeq value which is the next expected from client.
            rsh.incNextCallSeq();  //保证客户端顺序得到所有数据不漏,Client和RS都维护一个nextCallSeq字段
          }
}

RSRpcServices中包含一个ConcurrentHashMap<String, RegionScannerHolder>类型的变量scanners,String是region的名字,也就是说每一个region都租用一个RegionScanner。回到scan函数,参数request中可以获得scannerName,凭借scannerName从scanners中获取对应的RegionScanner对象scanner。

接着从request中提取本次scan的信息,如是否是small scan、reverse scan等等,根据这些信息构造ScannerContext类型的对象scannerContext,以此为参数调用RegionScanner的nextRaw方法,这样就与前面介绍的RegionScanner建立起联系,返回结果存放在List<KeyValue>类型的变量values中:
moreRows = scanner.nextRaw(values, scannerContext)

newRaw方法的调用路径如下:
nextRaw() -> RegionScanner.nextInternal() -> populateResult()
其中,在RegionScanner.nextInternal()会进行一些对stopRow/filterRow的检查,populateResult函数开始迭代取数据,调用语句如下:
populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
this.storeHeap是前面我们说的RegionScanner中维护的一个由StoreScanner组成的堆。populateResult的主要逻辑简化如下:
do{
      heap.next(results, scannerContext);
                    
      nextKv= heap.peek();
      moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
} while (moreCellsInRow)

populateResult中真正返回数据调用的是heap的next方法,这里还记得前面说的heap是由storescanner组成的堆,在next中用current变量记住当前正在处理的storescanner,然后调用next()函数返回了该storescanner中可能存在的结果。

到StoreScanner的next方法,StoreScanner维护着一个由StoreFileScanner/memstoreScanner构造的堆,next实际是从它的scanner堆中peek出一个StoreFileScanner或者是MemStoreScanner,然后调用next()取得数据,再将该scanner添加回队列中。

在StoreScanner的next方法有下面一段代码需要注意:
LOOP: while((cell = this.heap.peek()) != null) {
      // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
      if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
        scannerContext.updateTimeProgress();
        if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
          return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreva lues();
        }
      }
      。。。。。。。。
}
这段代码维护了服务端和客户端的一个心跳,是为了防止服务端scan到较大的数据时长时间没有给客户端返回响应,而造成客户端误以为服务端挂掉而产生超时错误。其中cellsPerHeartbeatCheck定义了心跳发送的周期,该值由"hbase.cells.scanned.per.heartbeat.check"配置,默认是10000,表示的是每scan出10000个cell,则服务端向客户端发送一条心跳。




参考资料:
https://blogs.apache.org/hbase/page=1

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇分布式存储系统-HBASE 下一篇flink实战--读写Hbase

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目