public Cell peek() {
if (this.current == null) {
return null;
}
return this.current.peek();
}
讲完了上述三个重要的数据结构,
回归到hbase系统,HBase的表数据分为多个层次,分别是HRegion->HStore->[HFile,HFile,....,MemStoreFile]。一个表首先会水平分片形成多个HRegion,一个HRegion内不同的Column
Family对应着不同的HStore,一个HStore下包含多个HFile和一个Memstore,数据写入时先写入MemstoreFile,MemStoreFile会不断刷新形成新的HFile。
复杂的数据结构形成了复杂的Scanner,在一个scan流程中,会形成如下描述的scanner对象:每个region的数据读取由一个RegionScanner对象负责,RegionScanner有一个scanner的优先队列,里面放的是storeScanner(这个优先队列在实现上是由多个StoreScanner组成的堆,使用RegionScanner的成员变量KeyValueHeap storeHeap来表示)。
每个StoreScanner对象对应着一个Column Family内的数据读取,其也有一个KeyValueHeap类型的成员变量heap,保存的是隶属于该store的MemStoreScanner和StoreFileScanner。
storeHeap&StoreScanner的构造代码如下所示,在RegionScannerImpl中,会遍历该Region下的所有store,并针对每个store建立对应的StoreScanner。
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) { //遍历该region下的各store
Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); //new一个该store的StoreScanner
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner); //将不同的StoreScanner归入不同的scanner list中
} else {
joinedScanners.add(scanner);
}
}
initializeKVHeap(scanners, joinedScanners, region); //然后用这些StoreScanner初始化一个KeyValueHeap
store.getScanner就是针对每个store创建一个StoreScanner,最后一步的initializeKVHeap则将这些StoreScanner构建成一个堆保存在RegionScanner的成员变量storeHeap中,用于遍历取该region下所有store中的数据,而storeScanner同样是一个由FileScanner组成的heap,其在store.getScanner中完成对每个store的storeScanner构造,构造函数中的关键两步如下所示:
// Pass columns to try to filter out unnecessary StoreFiles.
List<KeyValueScanner> scanners = getScannersNoCompaction(); //返回该Store下对应的MemStore/StoreFile Scanner
// Seek all scanners to the start of the Row (or if the exact matching row
// key does not exist, then to the start of the next matching Row).
// Always check bloom filter to optimize the top row seek for delete
// family marker.
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
&& lazySeekEnabledGlobally, isParallelSeekEnabled); //对这些StoreFileScanner和MemStoreScanner分别进行seek
//seekKey是matcher.getStartKey()
getScannersNoCompaction()返回这个Store下包含的HFileScanner和memstoreScanner,保存在scanners中。
seekScanners就是在memstore或者hfile中定位到指定的keyValue(通常是scan时startKey&endKey指定的keyValue),如果指定的keyValue不存在,则seek到指定keyValue的下一个元素。实际实现时这里采用了lazy seek优化,优化的目的是为了不需要对所有的HFile进行seek寻找目标keyValue,而只需对keyValue真实存在的HFile进行seek。
典型的客户端发起scan请求的代码如下所示:
Scan scan = new Scan();
scan.setStartRow(........);
scan.setStopRow(........);
Result result;
try (ResultScanner rs = table.getScanner(scan)) {
while ((result=rs.next()) != null) {
//your code here
}
}
进入上述代码的getScanner方法,会发现其new一个ClientScanner对象,该对象包含了用户传入的Scan对象以及缓存、连接、重试次数、表名、region信息等参数。ClientScanner构造函数的最后调用initializeScannerInConstruction(),这个函数实际上包装了一个如下的调用:
nextScanner(this.caching, false);
其中,this.caching是一个int型变量,表示一次scan的rpc请求返回的结果数量,返回结果保存在客户端的cache中。
进入nextScanner函数,其首先检查是否已scan至表尾,如果已scan至表尾则关闭scan并返回false给客户端,否则更新localStartKey作为本次scan的开始位置,并将输入参数this.caching赋值给nbRows以表示本次scan返回的数据量,以上述两个变量作为参数调用getScannerCallable方法,该方法会返回一个ScannerCallableWithReplicas类型的对象callable,接着调用callable对象中的call方法向服务端发起一次rpc调用,调用路径如下:
ScannerCallableWithReplicas.call->ScannerCallable.call
服务调用是通过构造一个ScanRequest类型的对象request,并将其发往服务端来实现的,核心代码如下:
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
controller = controllerFactory.newController();
response = getStub().scan(controller, request);
这里需要注意的是构造request时包装了一个long型的变量nextCallSeq。
至此客户端发起scan请求的流程结束,下面介绍服务端是如何处理这些scan请求,并与前面的知识建立起联系。
客户端调用的getStub().scan向服务端发起了一次scan的rpc请求,服务端scan的实现在RSRpcServices中,首先其申请一个租约Lease.lease,用于客户端和服务端之间的心跳连接,然后对照客户端和服务端的nextCallSeq字段(目的是保证客户端顺序得到所有数据而不漏),代码如下:
if (request.hasNextCallSeq()) {
if (rsh == null) {
rsh = scanners.get(scannerName);
}
if (rsh != null) {
if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
throw new OutOfOrderScannerNextException(
"Expected nextCallSeq: " + rsh.getNextCallSeq()
+ " But the nextCallSeq got from client: " + request.getNextCallSeq() +
"; request=" + TextFormat.shortDebugString(request));
}
// Increment the nextCallSeq value which is the next expected from client.
rsh.incNextCallSeq(); //保证客户端顺序得到所有数据不漏,Client和RS都维护一个nextCallSeq字段
}
}
RSRpcServices中包含一个ConcurrentHashMap<String, RegionScannerHolder>类型的变量scanners,String是region的名字,也就是说每一个region都租用一个RegionScanner。回到scan函数,参数request中可以获得scannerName,凭借scannerName从scanners中获取对应的RegionScanner对象scanner。
接着从request中提取本次scan的信息,如是否是small scan、reverse scan等等,根据这些信息构造ScannerContext类型的对象scannerContext,以此为参数调用RegionScanner的nextRaw方法,这样就与前面介绍的RegionScanner建立起联系,返回结果存放在List<KeyValue>类型的变量values中:
moreRows = scanner.nextRaw(values, scannerContext)
newRaw方法的调用路径如下:
nextRaw() -> RegionScanner.nextInternal() -> populateResult()
其中,在RegionScanner.nextInternal()会进行一些对stopRow/filterRow的检查,populateResult函数开始迭代取数据,调用语句如下:
populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
this.storeHeap是前面我们说的RegionScanner中维护的一个由StoreScanner组成的堆。populateResult的主要逻辑简化如下:
do{
heap.next(results, scannerContext);
nextKv= heap.peek();
moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
} while (moreCellsInRow)
populateResult中真正返回数据调用的是heap的next方法,这里还记得前面说的heap是由storescanner组成的堆,在next中用current变量记住当前正在处理的storescanner,然后调用next()函数返回了该storescanner中可能存在的结果。
到StoreScanner的next方法,StoreScanner维护着一个由StoreFileScanner/memstoreScanner构造的堆,next实际是从它的scanner堆中peek出一个StoreFileScanner或者是MemStoreScanner,然后调用next()取得数据,再将该scanner添加回队列中。
在StoreScanner的next方法有下面一段代码需要注意:
LOOP: while((cell = this.heap.peek()) != null) {
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
scannerContext.updateTimeProgress();
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreva lues();
}
}
。。。。。。。。
}
这段代码维护了服务端和客户端的一个心跳,是为了防止服务端scan到较大的数据时长时间没有给客户端返回响应,而造成客户端误以为服务端挂掉而产生超时错误。其中cellsPerHeartbeatCheck定义了心跳发送的周期,该值由"hbase.cells.scanned.per.heartbeat.check"配置,默认是10000,表示的是每scan出10000个cell,则服务端向客户端发送一条心跳。
参考资料: