HRegionServer调用合并请求 主要逻辑如下: - CompactSplitThread#requestCompaction(){
- for(Stores:r.getStores().values()){
- CompactionRequestcr=Store.requestCompaction(priority,request);
- ThreadPoolExecutorpool=s.throttleCompaction(cr.getSize())
- largeCompactions:smallCompactions;
- pool.execute(cr);
- ret.add(cr);
- }
- }
- Store#throttleCompaction(){
- longthrottlePoint=conf.getLong(
- "hbase.regionserver.thread.compaction.throttle",
- 2*this.minFilesToCompact*this.region.memstoreFlushSize);
- returncompactionSize>throttlePoint;
- }
- Store#compactSelection(){
- if(storefile.maxTimeStamp+store.ttl<now_timestamp){
- }
- while(pos<compactSelection.getFilesToCompact().size()&&
- compactSelection.getFilesToCompact().get(pos).getReader().length()
- >while(pos<compactSelection.getFilesToCompact().size()&&
- compactSelection.getFilesToCompact().get(pos).getReader().length()
- >maxCompactSize&&
- !compactSelection.getFilesToCompact().get(pos).isReference())++pos;
- if(pos!=0)compactSelection.clearSubList(0,pos);&&
- !compactSelection.getFilesToCompact().get(pos).isReference()){
- ++pos;
- }
- if(pos!=0){
- compactSelection.clearSubList(0,pos);
- }
- if(compactSelection.getFilesToCompact().size()<minFilesToCompact){
- return;
- }
- intcountOfFiles=compactSelection.getFilesToCompact().size();
- long[]fileSizes=newlong[countOfFiles];
- long[]sumSize=newlong[countOfFiles];
- for(inti=countOfFiles-1;i>=0;--i){
- StoreFilefile=compactSelection.getFilesToCompact().get(i);
- fileSizes[i]=file.getReader().length();
- inttooFar=i+this.maxFilesToCompact-1;
- sumSize[i]=fileSizes[i]+((i+1<countOfFiles)sumSize[i+1]:0)
- -((tooFar<countOfFiles)fileSizes[tooFar]:0);
- }
- while(countOfFiles-start>=this.minFilesToCompact&&fileSizes[start]>
- Math.max(minCompactSize,(long)(sumSize[start+1]*r))){
- ++start;
- }
- intend=Math.min(countOfFiles,start+this.maxFilesToCompact);
- longtotalSize=fileSizes[start]+((start+1<countOfFiles)sumSize[start+1]:0);
- compactSelection=compactSelection.getSubList(start,end);
- if(majorcompaction&&compactSelection.getFilesToCompact().size()>this.maxFilesToCompact){
- intpastMax=compactSelection.getFilesToCompact().size()-this.maxFilesToCompact;
- compactSelection.getFilesToCompact().subList(0,pastMax).clear();
- }
- }
CompactionRequest线程(用于执行major和minor合并) 压缩相关的类图如下:
major和minor合并的差别其实很小,如果最后待合并的总大小 > 2*minFilesToCompact*memstoreFlushSize 则认为这次是一个major合并,方到major线程池中执行,否则认为是一次minor合并 另外在创建StoreScanner构造函数时,会根据ScanType来判断是major还是minor合并,之后在 ScanQueryMathcer中根据ScanType的不同(有用户类型,minor和major三种类型)来决定返回的不同值的 主要逻辑如下: - CompactionRequest#run(){
- booleancompleted=HRegion.compact(this);
- if(completed){
- if(s.getCompactPriority()<=0){
- server.getCompactSplitThread().requestCompaction(r,s,"Recursiveenqueue",null);
- }else{
- server.getCompactSplitThread().requestSplit(r);
- }
- }
- }
- HRegion#compact(){
- Preconditions.checkArgument(cr.getHRegion().equals(this));
- lock.readLock().lock();
- CompactionRequest.getStore().compact(cr);
- lock.readLock().unlock();
- }
- Store#compact(){
- List<StoreFile>filesToCompact=request.getFiles();
- StoreFile.Writerwriter=this.compactor.compact(cr,maxId);
- if(this.conf.getBoolean("hbase.hstore.compaction.complete",true)){
- sf=completeCompaction(filesToCompact,writer);
- }else{
- sf=newStoreFile(this.fs,writer.getPath(),this.conf,this.cacheConf,
- this.family.getBloomFilterType(),this.dataBlockEncoder);
- sf.createReader();
- }
- }
- Store#completeCompaction(){
- PathorigPath=compactedFile.getPath();
- PathdestPath=newPath(homedir,origPath.getName());
- HBaseFileSystem.renameDirForFileSystem(fs,origPath,destPath);
- StoreFileresult=newStoreFile(this.fs,destPath,this.conf,this.cacheConf,
- this.family.getBloomFilterType(),this.dataBlockEncoder);
- passSchemaMetricsTo(result);
- result.createReader();
- }
- Compactor#compact(){
- for(StoreFilefile:filesToCompact){
- StoreFile.Readerr=file.getReader();
- longkeyCount=(r.getBloomFilterType()==store.getFamily()
- .getBloomFilterType())
- r.getFilterEntries():r.getEntries();
- maxKeyCount+=keyCount;
- }
- intcompactionKVMax=getConf().getInt("hbase.hstore.compaction.kv.max",10);
- Compression.Algorithmcompression=store.getFamily().getCompression();
- List<StoreFileScanner>scanners=StoreFileScanner
- .getScannersForStoreFiles(filesToCompact,false,false,true);
- Scanscan=newScan();
- scan.setMaxVersions(store.getFamily().getMaxVersions());
- InternalScannerscanner=newStoreScanner(store,store.getScanInfo(),scan,scanners,majorCompactionScanType.<strong>MAJOR_COMPACT</strong>:ScanType.<strong>MINOR_COMPACT</strong>,
- smallestReadPoint,earliestPutTs);
- do{
- hasMore=scanner.next(kvs,compactionKVMax);
- if(writer==null&&!kvs.isEmpty()){
- writer=store.createWriterInTmp(maxKeyCount,compactionCompression,true,
- maxMVCCReadpoint>=smallestReadPoint);
- }
- for(KeyValuekv:kvs){
- writer.append(kv);
- }
- }while(hasMore);
- scanner.close();
- StoreFile$Writer.appendMetadata(maxId,majorCompaction);
- StoreFile$Writer.close();
- }
压缩算法和的核心逻辑演示类图 根据由新到老排序文件,选择出合适的文件 这里的滑动窗口是从0下标开始过滤掉size过大的文件,这样可以提高合并效率
使用到的一些重要类 其中内部scan的时候使用到的相关类图如下 相关重要的类:
- Hbase在实现该算法的过程中重要的是下面这五个类。
- 1.org.apache.hadoop.hbase.regionserver.Store
- 2.org.apache.hadoop.hbase.regionserver.StoreScanner
- 3.org.apache.hadoop.hbase.regionserver.StoreFileScanner
- 4.org.apache.hadoop.hbase.regionserver.KeyValueHeap
- 5.org.apache.hadoop.hbase.regionserver.ScanQueryMatcher
- 这五个类的关系是
- 1.Store类调用StoreScanner的next方法,并循环输出kv到合并文件;
- 2.StoreScanner的作用是负责创建并持有多个输入文件的StoreFileScanner,
- 内部遍历这些StoreFileScanner并通过KeyValueHeap来排序这些输入文件的首条记录;
- 3.StoreFileScanner的作用是遍历单个输入文件,管理并提供单个输入文件的首条记录;
- 4.KeyValueHeap的作用就是通过堆来排序每个输入文件的首条记录。
- 5.ScanQueryMatcher的作用是当输入文件的首条记录来的时候,根据一定的策略判断这条记录到底是该输出还是该跳过。
StoreScanner及相关类的主要逻辑如下: - StoreScanner#构造函数(){
- ScanQueryMatchermatcher=newScanQueryMatcher(scan,scanInfo,null,scanType,
- smallestReadPoint,earliestPutTs,oldestUnexpiredTS);
- List<extendsKeyValueScanner>scanners=selectScannersFrom(scanners);
- for(KeyValueScannerscanner:scanners){
- scanner.seek(matcher.getStartKey());
- }
- KeyValueHeapheap=newKeyValueHeap(scanners,store.comparator);
- }
- StoreFile$Writer#append(){
- appendGeneralBloomfilter(kv);
- appendDeleteFamilyBloomFilter(kv);
- HFileWriterV2.append(kv);
- trackTimestamps(kv);
- }
- StoreSanner#next(){
- KeyValuepeeked=this.heap.peek();
- if(peeked==null){
- close();
- returnfalse;
- }
- LOOP:
- while((kv=this.heap.peek())!=null){
- ScanQueryMatcher.MatchCodeqcode=matcher.match(kv);
- switch(qcode){
- caseINCLUDE:
- caseINCLUDE_AND_SEEK_NEXT_ROW:
- caseINCLUDE_AND_SEEK_NEXT_COL:
- Filterf=matcher.getFilter();
- outResult.add(f==nullkv:f.transform(kv));
- count++;
- if(qcode==ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW){
- if(!matcher.moreRowsMayExistAfter(kv)){
- returnfalse;
- }
- reseek(matcher.getKeyForNextRow(kv));
- }elseif(qcode==ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL){
- reseek(matcher.getKeyForNextColumn(kv));
- }else{
- this.heap.next();
- }
- cumulativeMetric+=kv.getLength();
- if(limit>0&&(count==limit)){
- breakLOOP;
- }
- continue;
- caseDONE:
- returntrue;
- caseDONE_SCAN:
- close();
- returnfalse;
- caseSEEK_NEXT_ROW:
- if(!matcher.moreRowsMayExistAfter(kv)){
- returnfalse;
- }
- reseek(matcher.getKeyForNextRow(kv));
- break;
- caseSEEK_NEXT_COL:
- reseek(matcher.getKeyForNextColumn(kv));
- break;
- caseSKIP:
- this.heap.next();
- break;
- caseSEEK_NEXT_USING_HINT:
- KeyValuenextKV=matcher.getNextKeyHint(kv);
- if(nextKV!=null){
- reseek(nextKV);
- }else{
- heap.next();
- }
- break;
- default:
- thrownewRuntimeException("UNEXPECTED");
- }
- }
- KeyValueHeap#构造函数(){
- this.comparator=newKVScannerComparator(comparator);
- heap=newPriorityQueue<KeyValueScanner>(scanners.size(),
- this.comparator);
- for(KeyValueScannerscanner:scanners){
- if(scanner.peek()!=null){
- this.heap.add(scanner);
- }else{
- scanner.close();
- }
- }
- this.current=pollRealKV();
- }
- KeyValueHeap#next(){
- InternalScannercurrentAsInternal=(InternalScanner)this.current;
- booleanmayContainMoreRows=currentAsInternal.next(result,limit,metric);
- KeyValuepee=this.current.peek();
- if(pee==null||!mayContainMoreRows){
- this.current.close();
- }else{
- this.heap.add(this.current);
- }
- this.current=pollRealKV();
- return(this.current!=null);
- }
- ScanQueryMatcher#构造函数(){
- this.isUserScan=scanType==ScanType.USER_SCAN;
- this.retainDeletesInOutput=scanType==ScanType.MINOR_COMPACT||scan.isRaw();
- }
HRegionServer调用split请求 执行逻辑如下:
- HRegionServer#splitRegion(){
- HRegionregion=getRegion(regionInfo.getRegionName());
- region.flushcache();
- region.forceSplit(splitPoint);
- compactSplitThread.requestSplit(region,region.checkSplit());
- }
- CompactSplitThread#requestSplit(){
- ThreadPoolExecutor#execute(newSplitRequest(r,midKey,HRegionServer.this));
- }
split线程执行过程
META表更新的瞬间 主要逻辑如下: - SplitRequest#run(){
- SplitTransactionst=newSplitTransaction(parent,midKey);
- if(!st.prepare()){
- return;
- }
- st.execute(this.server,this.server);
- }
- SplitTransaction#execute(){
- PairOfSameType<HRegion>regions=createDaughters(server,services);
- openDaughters(server,services,regions.getFirst(),regions.getSecond());
- transitionZKNode(server,services,regions.getFirst(),regions.getSecond());
- }
- SplitTransaction#prepare(){
- HRegionInfohri=parent.getRegionInfo();
- hri_a=newHRegionInfo(hri.getTableName(),startKey,splitrow,false,rid);
- hri_b=newHRegionInfo(hri.getTableName(),splitrow,endKey,false,rid);
- }
- SplitTransaction#createDaughters(){
- createNodeSplitting();
- this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
- transitionNodeSplitting();
- createSplitDir();
- List<StoreFile>hstoreFilesToSplit=this.parent.close(false);
- HRegionServer.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
- splitStoreFiles(this.splitdir,hstoreFilesToSplit);
- this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
- HRegiona=createDaughterRegion(this.hri_a,this.parent.rsServices);
- this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
- HRegionb=createDaughterRegion(this.hri_b,this.parent.rsServices);
- MetaEditor.offlineParentInMeta(server.getCatalogTracker(),
- this.parent.getRegionInfo(),a.getRegionInfo(),b.getRegionInfo());
- returnnewPairOfSameType<HRegion>(a,b);
- }
- SplitTransaction#splitStoreFiles(){
- for(StoreFilesf:hstoreFilesToSplit){
- StoreFileSplittersfs=newStoreFileSplitter(sf,splitdir);
- futures.add(threadPool.submit(sfs));
- }
- }
- SplitTransaction$StoreFileSplitter#call(){
- splitStoreFile(sf,splitdir);
- }
- SplitTransaction#splitStoreFile(){
- FileSystemfs=this.parent.getFilesystem();
- byte[]family=sf.getFamily();
- Stringencoded=this.hri_a.getEncodedName();
- Pathstoredir=Store.getStoreHomedir(splitdir,encoded,family);
- StoreFile.split(fs,storedir,sf,this.splitrow,Range.bottom);
- encoded=this.hri_b.getEncodedName();
- storedir=Store.getStoreHomedir(splitdir,encoded,family);
- StoreFile.split(fs,storedir,sf,this.splitrow,Range.top);
- }
- StoreFile#split(){
- if(range==Reference.Range.bottom){
- KeyValuesplitKey=KeyValue.createLastOnRow(splitRow);
- byte[]firstKey=f.createReader().getFirstKey();
- if(f.getReader().getComparator().compare(splitKey.getBuffer(),
- splitKey.getKeyOffset(),splitKey.getKeyLength(),
- firstKey,0,firstKey.length)<0){
- returnnull;
- }
- }else{
- KeyValuesplitKey=KeyValue.createFirstOnRow(splitRow);
- byte[]lastKey=f.createReader().getLastKey();
- if(f.getReader().getComparator().compare(splitKey.getBuffer(),
- splitKey.getKeyOffset(),splitKey.getKeyLength(),
- lastKey,0,lastKey.length)>0){
- returnnull;
- }
- }
- Referencer=newReference(splitRow,range);
- StringparentRegionName=f.getPath().getParent().getParent().getName();
- Pathp=newPath(splitDir,f.getPath().getName()+"."+parentRegionName);
- returnr.write(fs,p);
- }
- SplitTransaction#createDaughterRegion(){
- FileSystemfs=this.parent.getFilesystem();
- PathregionDir=getSplitDirForDaughter(this.parent.getFilesystem(),
- this.splitdir,hri);
- HRegionr=HRegion.newHRegion(this.parent.getTableDir(),
- this.parent.getLog(),fs,this.parent.getBaseConf(),
- hri,this.parent.getTableDesc(),rsServices);
- longhalfParentReadRequestCount=this.parent.getReadRequestsCount()/2;
- r.readRequestsCount.set(halfParentReadRequestCount);
- r.setOpMetricsReadRequestCount(halfParentReadRequestCount);
- longhalfParentWriteRequest=this.parent.getWriteRequestsCount()/2;
- r.writeRequestsCount.set(halfParentWriteRequest);
- r.setOpMetricsWriteRequestCount(halfParentWriteRequest);
- HRegion.moveInitialFilesIntoPlace(fs,regionDir,r.getRegionDir());
- returnr;
- }
- MetaEditor#offlineParentInMeta(){
- HRegionInfocopyOfParent=newHRegionInfo(parent);
- copyOfParent.setOffline(true);
- copyOfParent.setSplit(true);
- Putput=newPut(copyOfParent.getRegionName());
- addRegionInfo(put,copyOfParent);
- put.add("info","splitA",Writables.getBytes(a));
- put.add("info","splitB",Writables.getBytes(b));
- putToMetaTable(catalogTracker,put);
- }
- SplitTransaction#openDaughters(){
- DaughterOpeneraOpener=newDaughterOpener(server,a);
- DaughterOpenerbOpener=newDaughterOpener(server,b);
- aOpener.start();
- bOpener.start();
- aOpener.join();
- bOpener.join();
- HRegionServer.postOpenDeployTasks(b,server.getCatalogTracker(),true);
- HRegionServer.addToOnlineRegions(b);
- HRegionServer.postOpenDeployTasks(a,server.getCatalogTracker(),true);
- HRegionServer.addToOnlineRegions(a);
- }
- HRegionServer#postOpenDeployTasks(){
- for(Stores:r.getStores().values()){
- if(s.hasReferences()||s.needsCompaction()){
- getCompactionRequester().requestCompaction(r,s,"OpeningRegion",null);
- }
- }
- if(r.getRegionInfo().isRootRegion()){
- RootLocationEditor.setRootLocation(getZooKeeper(),
- this.serverNameFromMasterPOV);
- }elseif(r.getRegionInfo().isMetaRegion()){
- MetaEditor.updateMetaLocation(ct,r.getRegionInfo(),
- this.serverNameFromMasterPOV);
- }else{
- if(daughter){
- MetaEditor.addDaughter(ct,r.getRegionInfo(),
- this.serverNameFromMasterPOV);
- }else{
- MetaEditor.updateRegionLocation(ct,r.getRegionInfo(),
- this.serverNameFromMasterPOV);
- }
- }
- }
- SplitTransaction#transitionZKNode(){
- transitionNodeSplit();
- tickleNodeSplit();
- }
一些辅助逻辑: - HRegion#close(){
- waitForFlushesAndCompactions();
- internalFlushcache();
- ThreadPoolExecutorstoreCloserThreadPool=
- getStoreOpenAndCloseThreadPool("StoreCloserThread-"
- +this.regionInfo.getRegionNameAsString());
- CompletionService<ImmutableList<StoreFile>>completionService=
- newExecutorCompletionService<ImmutableList<StoreFile>>(
- storeCloserThreadPool);
- for(finalStorestore:stores.values()){
- completionService.submit(newCallable<ImmutableList<StoreFile>>(){
- publicImmutableList<StoreFile>call()throwsIOException{
- returnstore.close();
- }
- });
- }
- }
- Store#close(){
- for(finalStoreFilef:result){
- completionService.submit(newCallable<Void>(){
- publicVoidcall()throwsIOException{
- f.closeReader(true);
- returnnull;
- }
- }
- }
compactionChecker线程 这个类是用于定期检查region server下的region是否需要做compact 主要逻辑如下: - CompactionChecker#chore(){
- for(HRegionr:this.instance.onlineRegions.values()){
- for(Stores:r.getStores().values()){
- if(s.needsCompaction()){
- this.instance.compactSplitThread.requestCompaction(r,s,getName());
- }elseif(s.isMajorCompaction()){
- if(majorCompactPriority==DEFAULT_PRIORITY
- ||majorCompactPriority>r.getCompactPriority()){
- this.instance.compactSplitThread.requestCompaction(r,s,getName());
- }else{
- this.instance.compactSplitThread.requestCompaction(r,s,getName());
- }
- }
- }
- }
- }
参考 深入分析HBase Compaction机制 Hbase的Region Compact算法实现分析 深入分析HBase RPC(Protobuf)实现机制 HBase region split源码分析
|