设为首页 加入收藏

TOP

HBase-压缩和分割原理
2019-03-05 01:43:05 】 浏览:56
Tags:HBase- 压缩 分割 原理

HRegionServer调用合并请求

主要逻辑如下:

  1. //遍历每个Store然后计算需要合并的文件,生成
  2. //CompactionRequest对象并提交到线程池中执行
  3. //根据throttleCompaction()函数规则来判断是提交到
  4. //largeCompactions线程池还是smallCompactions线程池
  5. CompactSplitThread#requestCompaction(){
  6. for(Stores:r.getStores().values()){
  7. CompactionRequestcr=Store.requestCompaction(priority,request);
  8. ThreadPoolExecutorpool=s.throttleCompaction(cr.getSize())
  9. largeCompactions:smallCompactions;
  10. pool.execute(cr);
  11. ret.add(cr);
  12. }
  13. }
  14. //如果CompactionRequest的总大小>
  15. //minFilesToCompact*2*memstoreFlushSize
  16. //则这次任务为major合并,否则在为minor合并
  17. Store#throttleCompaction(){
  18. longthrottlePoint=conf.getLong(
  19. "hbase.regionserver.thread.compaction.throttle",
  20. 2*this.minFilesToCompact*this.region.memstoreFlushSize);
  21. returncompactionSize>throttlePoint;
  22. }
  23. Store#compactSelection(){
  24. //选择出已经过期的StoreFile
  25. if(storefile.maxTimeStamp+store.ttl<now_timestamp){
  26. //返回已经过期的storefile文件集合
  27. }
  28. //从0开始遍历到最后,如果发现有文件>maxCompactSize则pos++
  29. //然后过滤掉这些大于maxCompactSize的文件
  30. while(pos<compactSelection.getFilesToCompact().size()&&
  31. compactSelection.getFilesToCompact().get(pos).getReader().length()
  32. >while(pos<compactSelection.getFilesToCompact().size()&&
  33. compactSelection.getFilesToCompact().get(pos).getReader().length()
  34. >maxCompactSize&&
  35. !compactSelection.getFilesToCompact().get(pos).isReference())++pos;
  36. if(pos!=0)compactSelection.clearSubList(0,pos);&&
  37. !compactSelection.getFilesToCompact().get(pos).isReference()){
  38. ++pos;
  39. }
  40. if(pos!=0){
  41. compactSelection.clearSubList(0,pos);
  42. }
  43. if(compactSelection.getFilesToCompact().size()<minFilesToCompact){
  44. return;
  45. }
  46. //计算出sumSize数组,数组大小就是Store中的文件数量
  47. //sumSize数组中每个元素的大小是根据StroeFile的大小再加上sumSize[i+1](或者0)
  48. //然后减去fileSizes[tooFar](或者0)
  49. //sumSize的内容跟元素的fileSizes数组应该差别不大
  50. intcountOfFiles=compactSelection.getFilesToCompact().size();
  51. long[]fileSizes=newlong[countOfFiles];
  52. long[]sumSize=newlong[countOfFiles];
  53. for(inti=countOfFiles-1;i>=0;--i){
  54. StoreFilefile=compactSelection.getFilesToCompact().get(i);
  55. fileSizes[i]=file.getReader().length();
  56. //calculatethesumoffileSizes[i,i+maxFilesToCompact-1)foralgo
  57. inttooFar=i+this.maxFilesToCompact-1;
  58. sumSize[i]=fileSizes[i]+((i+1<countOfFiles)sumSize[i+1]:0)
  59. -((tooFar<countOfFiles)fileSizes[tooFar]:0);
  60. }
  61. //如果fileSize[start]>Math.max(minCompactSize,sumSize[start+1]*r)
  62. //则下标++,这里的操作是过滤掉过大的文件,以免影响合并时间
  63. while(countOfFiles-start>=this.minFilesToCompact&&fileSizes[start]>
  64. Math.max(minCompactSize,(long)(sumSize[start+1]*r))){
  65. ++start;
  66. }
  67. intend=Math.min(countOfFiles,start+this.maxFilesToCompact);
  68. longtotalSize=fileSizes[start]+((start+1<countOfFiles)sumSize[start+1]:0);
  69. compactSelection=compactSelection.getSubList(start,end);
  70. //如果是majorcompact,并且需要执行的文件数量过多,则去掉一些
  71. if(majorcompaction&&compactSelection.getFilesToCompact().size()>this.maxFilesToCompact){
  72. intpastMax=compactSelection.getFilesToCompact().size()-this.maxFilesToCompact;
  73. compactSelection.getFilesToCompact().subList(0,pastMax).clear();
  74. }
  75. }

CompactionRequest线程(用于执行major和minor合并)

压缩相关的类图如下:



major和minor合并的差别其实很小,如果最后待合并的总大小 > 2*minFilesToCompact*memstoreFlushSize

则认为这次是一个major合并,方到major线程池中执行,否则认为是一次minor合并

另外在创建StoreScanner构造函数时,会根据ScanType来判断是major还是minor合并,之后在

ScanQueryMathcer中根据ScanType的不同(有用户类型,minor和major三种类型)来决定返回的不同值的

主要逻辑如下:

  1. //在单独的线程中执行合并
  2. CompactionRequest#run(){
  3. booleancompleted=HRegion.compact(this);
  4. if(completed){
  5. if(s.getCompactPriority()<=0){
  6. server.getCompactSplitThread().requestCompaction(r,s,"Recursiveenqueue",null);
  7. }else{
  8. //seeifthecompactionhascausedustoexceedmaxregionsize
  9. server.getCompactSplitThread().requestSplit(r);
  10. }
  11. }
  12. }
  13. //这里会调用Store,来执行compact
  14. HRegion#compact(){
  15. Preconditions.checkArgument(cr.getHRegion().equals(this));
  16. lock.readLock().lock();
  17. CompactionRequest.getStore().compact(cr);
  18. lock.readLock().unlock();
  19. }
  20. //完成合并,调用Compactor#compact()完成最核心的compact逻辑
  21. //将合并后的文件移动到最终目录下并删除掉旧的文件
  22. Store#compact(){
  23. List<StoreFile>filesToCompact=request.getFiles();
  24. StoreFile.Writerwriter=this.compactor.compact(cr,maxId);
  25. if(this.conf.getBoolean("hbase.hstore.compaction.complete",true)){
  26. sf=completeCompaction(filesToCompact,writer);
  27. }else{
  28. //Createstorefilearoundwhatwewrotewithareaderonit.
  29. sf=newStoreFile(this.fs,writer.getPath(),this.conf,this.cacheConf,
  30. this.family.getBloomFilterType(),this.dataBlockEncoder);
  31. sf.createReader();
  32. }
  33. }
  34. //将/hbase/mytable/963cf86f3fd07c3d3161c1f4f15bef5a/.tmp/9c8614a6bd0d4833b419a13abfde5ac1
  35. //移动到
  36. ///hbase/mytable/963cf86f3fd07c3d3161c1f4f15bef5a/value/9c8614a6bd0d4833b419a13abfde5ac1
  37. //再对新的目标文件创建一个StroeFile对象包装
  38. //将旧的文件(这些底层的HFile都已经合并成一个文件了)删除
  39. //最后计算新的StoreFile文件大小等信息并返回
  40. Store#completeCompaction(){
  41. PathorigPath=compactedFile.getPath();
  42. PathdestPath=newPath(homedir,origPath.getName());
  43. HBaseFileSystem.renameDirForFileSystem(fs,origPath,destPath);
  44. StoreFileresult=newStoreFile(this.fs,destPath,this.conf,this.cacheConf,
  45. this.family.getBloomFilterType(),this.dataBlockEncoder);
  46. passSchemaMetricsTo(result);
  47. result.createReader();
  48. }
  49. //compact的最核心逻辑!!
  50. //对多个StoreFile进行合并,这里使用到了StoreScanner
  51. //迭代读取所有的StroeFile然后使用堆排序输出,并写入到
  52. //StoreFile$Writer#append()中
  53. Compactor#compact(){
  54. for(StoreFilefile:filesToCompact){
  55. StoreFile.Readerr=file.getReader();
  56. longkeyCount=(r.getBloomFilterType()==store.getFamily()
  57. .getBloomFilterType())
  58. r.getFilterEntries():r.getEntries();
  59. maxKeyCount+=keyCount;
  60. }
  61. intcompactionKVMax=getConf().getInt("hbase.hstore.compaction.kv.max",10);
  62. Compression.Algorithmcompression=store.getFamily().getCompression();
  63. List<StoreFileScanner>scanners=StoreFileScanner
  64. .getScannersForStoreFiles(filesToCompact,false,false,true);
  65. Scanscan=newScan();
  66. scan.setMaxVersions(store.getFamily().getMaxVersions());
  67. //这里会根据当前合并的类型选择ScanType的类型,之后ScanQueryMatcher根据ScanType的
  68. //的类型返回不同的值
  69. InternalScannerscanner=newStoreScanner(store,store.getScanInfo(),scan,scanners,majorCompactionScanType.<strong>MAJOR_COMPACT</strong>:ScanType.<strong>MINOR_COMPACT</strong>,
  70. smallestReadPoint,earliestPutTs);
  71. do{
  72. hasMore=scanner.next(kvs,compactionKVMax);
  73. if(writer==null&&!kvs.isEmpty()){
  74. //在tmp目录下创建一个临时文件,路径类似
  75. ///hbase/mytable/963cf86f3fd07c3d3161c1f4f15bef5a/.tmp/9c8614a6bd0d4833b419a13abfde5ac1
  76. writer=store.createWriterInTmp(maxKeyCount,compactionCompression,true,
  77. maxMVCCReadpoint>=smallestReadPoint);
  78. }
  79. for(KeyValuekv:kvs){
  80. writer.append(kv);
  81. }
  82. }while(hasMore);
  83. scanner.close();
  84. StoreFile$Writer.appendMetadata(maxId,majorCompaction);
  85. StoreFile$Writer.close();
  86. }

压缩算法和的核心逻辑演示类图

根据由新到老排序文件,选择出合适的文件

这里的滑动窗口是从0下标开始过滤掉size过大的文件,这样可以提高合并效率


使用到的一些重要类

其中内部scan的时候使用到的相关类图如下


相关重要的类:

  1. Hbase在实现该算法的过程中重要的是下面这五个类。
  2. 1.org.apache.hadoop.hbase.regionserver.Store
  3. 2.org.apache.hadoop.hbase.regionserver.StoreScanner
  4. 3.org.apache.hadoop.hbase.regionserver.StoreFileScanner
  5. 4.org.apache.hadoop.hbase.regionserver.KeyValueHeap
  6. 5.org.apache.hadoop.hbase.regionserver.ScanQueryMatcher
  7. 这五个类的关系是
  8. 1.Store类调用StoreScanner的next方法,并循环输出kv到合并文件;
  9. 2.StoreScanner的作用是负责创建并持有多个输入文件的StoreFileScanner,
  10. 内部遍历这些StoreFileScanner并通过KeyValueHeap来排序这些输入文件的首条记录;
  11. 3.StoreFileScanner的作用是遍历单个输入文件,管理并提供单个输入文件的首条记录;
  12. 4.KeyValueHeap的作用就是通过堆来排序每个输入文件的首条记录。
  13. 5.ScanQueryMatcher的作用是当输入文件的首条记录来的时候,根据一定的策略判断这条记录到底是该输出还是该跳过。

StoreScanner及相关类的主要逻辑如下:

  1. //内部应用StoreFileScanner列表,创建ScanQueryMatcher用来判断是过滤还是输出
  2. //创建KeyValueHeap用于堆排序,根据堆的结构每次从堆顶拿出一个
  3. //注意这个构造函数中有一个参数ScanType,是扫描的类型,包括MAJOR_COMPACT,MINOR_COMPACT,
  4. //USER_COMPACT来返回不同的值,以达到major或minor的效果
  5. StoreScanner#构造函数(){
  6. ScanQueryMatchermatcher=newScanQueryMatcher(scan,scanInfo,null,scanType,
  7. smallestReadPoint,earliestPutTs,oldestUnexpiredTS);
  8. List<extendsKeyValueScanner>scanners=selectScannersFrom(scanners);
  9. for(KeyValueScannerscanner:scanners){
  10. scanner.seek(matcher.getStartKey());
  11. }
  12. KeyValueHeapheap=newKeyValueHeap(scanners,store.comparator);
  13. }
  14. //选择性的创建布隆过滤器,调用HFileWriterv2的append()
  15. //写入KeyValue信息
  16. StoreFile$Writer#append(){
  17. appendGeneralBloomfilter(kv);
  18. appendDeleteFamilyBloomFilter(kv);
  19. HFileWriterV2.append(kv);
  20. trackTimestamps(kv);
  21. }
  22. //这个方法封装了处理heap取出的记录值的逻辑,
  23. //根据matcher对该值的判断来决定这个值是输出还是跳过
  24. StoreSanner#next(){
  25. KeyValuepeeked=this.heap.peek();
  26. if(peeked==null){
  27. close();
  28. returnfalse;
  29. }
  30. LOOP:
  31. while((kv=this.heap.peek())!=null){
  32. ScanQueryMatcher.MatchCodeqcode=matcher.match(kv);
  33. switch(qcode){
  34. caseINCLUDE:
  35. caseINCLUDE_AND_SEEK_NEXT_ROW:
  36. caseINCLUDE_AND_SEEK_NEXT_COL:
  37. Filterf=matcher.getFilter();
  38. outResult.add(f==nullkv:f.transform(kv));
  39. count++;
  40. if(qcode==ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW){
  41. if(!matcher.moreRowsMayExistAfter(kv)){
  42. returnfalse;
  43. }
  44. reseek(matcher.getKeyForNextRow(kv));
  45. }elseif(qcode==ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL){
  46. reseek(matcher.getKeyForNextColumn(kv));
  47. }else{
  48. this.heap.next();
  49. }
  50. cumulativeMetric+=kv.getLength();
  51. if(limit>0&&(count==limit)){
  52. breakLOOP;
  53. }
  54. continue;
  55. caseDONE:
  56. returntrue;
  57. caseDONE_SCAN:
  58. close();
  59. returnfalse;
  60. caseSEEK_NEXT_ROW:
  61. if(!matcher.moreRowsMayExistAfter(kv)){
  62. returnfalse;
  63. }
  64. reseek(matcher.getKeyForNextRow(kv));
  65. break;
  66. caseSEEK_NEXT_COL:
  67. reseek(matcher.getKeyForNextColumn(kv));
  68. break;
  69. caseSKIP:
  70. this.heap.next();
  71. break;
  72. caseSEEK_NEXT_USING_HINT:
  73. KeyValuenextKV=matcher.getNextKeyHint(kv);
  74. if(nextKV!=null){
  75. reseek(nextKV);
  76. }else{
  77. heap.next();
  78. }
  79. break;
  80. default:
  81. thrownewRuntimeException("UNEXPECTED");
  82. }//endwhile
  83. }
  84. //KeyValueHeap使用堆排序输出结果
  85. //内部使用了优先队列,再用KVScannerComparator
  86. //作为比较工具
  87. KeyValueHeap#构造函数(){
  88. this.comparator=newKVScannerComparator(comparator);
  89. heap=newPriorityQueue<KeyValueScanner>(scanners.size(),
  90. this.comparator);
  91. for(KeyValueScannerscanner:scanners){
  92. if(scanner.peek()!=null){
  93. this.heap.add(scanner);
  94. }else{
  95. scanner.close();
  96. }
  97. }
  98. this.current=pollRealKV();
  99. }
  100. //堆里面最重要的方法其实就是next,不过看这个方法的主要功能不是
  101. //为了算出nextKeyValue,而主要是为了算出nextScanner,然后需在外部
  102. //再次调用peek方法来取得nextKeyValue,不是很简练。
  103. KeyValueHeap#next(){
  104. InternalScannercurrentAsInternal=(InternalScanner)this.current;
  105. booleanmayContainMoreRows=currentAsInternal.next(result,limit,metric);
  106. KeyValuepee=this.current.peek();
  107. if(pee==null||!mayContainMoreRows){
  108. this.current.close();
  109. }else{
  110. this.heap.add(this.current);
  111. }
  112. this.current=pollRealKV();
  113. return(this.current!=null);
  114. }
  115. //这里省略了其他部分,注意这里有两个赋值
  116. //对于compact来说如果是minor类型的则不会删除掉DELETE类型的KeyValue
  117. //而major类型在最终输出的时候会删除掉DELETE类型的KeyValue标记
  118. ScanQueryMatcher#构造函数(){
  119. //.....
  120. /*howtodealwithdeletes*/
  121. this.isUserScan=scanType==ScanType.USER_SCAN;
  122. this.retainDeletesInOutput=scanType==ScanType.MINOR_COMPACT||scan.isRaw();
  123. //..
  124. }

HRegionServer调用split请求


执行逻辑如下:

  1. //切分region
  2. HRegionServer#splitRegion(){
  3. HRegionregion=getRegion(regionInfo.getRegionName());
  4. region.flushcache();
  5. region.forceSplit(splitPoint);
  6. compactSplitThread.requestSplit(region,region.checkSplit());
  7. }
  8. //创建SplitRequest对象,放到线程池中执行
  9. CompactSplitThread#requestSplit(){
  10. ThreadPoolExecutor#execute(newSplitRequest(r,midKey,HRegionServer.this));
  11. }

split线程执行过程


META表更新的瞬间

主要逻辑如下:

  1. //在单线中执行
  2. SplitRequest#run(){
  3. SplitTransactionst=newSplitTransaction(parent,midKey);
  4. if(!st.prepare()){
  5. return;
  6. }
  7. st.execute(this.server,this.server);
  8. }
  9. //核心逻辑,先创建两个子region,再创建临时的ZK节点
  10. //将父region切分,创建临时目录,将region关闭
  11. //开始切分,将storefile放到目录中
  12. //创建子regionA和B,同时open这两个region,更新META信息
  13. //更新ZK信息,将原region下线
  14. SplitTransaction#execute(){
  15. PairOfSameType<HRegion>regions=createDaughters(server,services);
  16. openDaughters(server,services,regions.getFirst(),regions.getSecond());
  17. transitionZKNode(server,services,regions.getFirst(),regions.getSecond());
  18. }
  19. //预先创建两个子region
  20. SplitTransaction#prepare(){
  21. HRegionInfohri=parent.getRegionInfo();
  22. hri_a=newHRegionInfo(hri.getTableName(),startKey,splitrow,false,rid);
  23. hri_b=newHRegionInfo(hri.getTableName(),splitrow,endKey,false,rid);
  24. }
  25. SplitTransaction#createDaughters(){
  26. //创建类似/hbase/unassigned/fad11edf1e6e0a842b7fd3ad87f25053
  27. //这样的节点,其中的编码后的region就是待split的region
  28. createNodeSplitting();
  29. //用于记录事务的处理进展
  30. this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
  31. //将这个节点作为事务节点,待任务处理完后会删除这个节点
  32. transitionNodeSplitting();
  33. //创建类似/hbase/kvdb/fad11edf1e6e0a842b7fd3ad87f25053/.splits
  34. //的HDFS节点,用于临时处理split文件
  35. createSplitDir();
  36. //关闭待处理的region
  37. List<StoreFile>hstoreFilesToSplit=this.parent.close(false);
  38. HRegionServer.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
  39. splitStoreFiles(this.splitdir,hstoreFilesToSplit);
  40. this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
  41. HRegiona=createDaughterRegion(this.hri_a,this.parent.rsServices);
  42. this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
  43. HRegionb=createDaughterRegion(this.hri_b,this.parent.rsServices);
  44. //更新META表信息
  45. MetaEditor.offlineParentInMeta(server.getCatalogTracker(),
  46. this.parent.getRegionInfo(),a.getRegionInfo(),b.getRegionInfo());
  47. //返回两个子regionA和B
  48. returnnewPairOfSameType<HRegion>(a,b);
  49. }
  50. SplitTransaction#splitStoreFiles(){
  51. for(StoreFilesf:hstoreFilesToSplit){
  52. //splitStoreFile(sf,splitdir);
  53. StoreFileSplittersfs=newStoreFileSplitter(sf,splitdir);
  54. futures.add(threadPool.submit(sfs));
  55. }
  56. //等待线程池中的任务执行完后返回
  57. }
  58. //开始分割文件
  59. SplitTransaction$StoreFileSplitter#call(){
  60. splitStoreFile(sf,splitdir);
  61. }
  62. SplitTransaction#splitStoreFile(){
  63. FileSystemfs=this.parent.getFilesystem();
  64. byte[]family=sf.getFamily();
  65. Stringencoded=this.hri_a.getEncodedName();
  66. //地址类似
  67. ///hbase/kvdb/fad11edf1e6e0a842b7fd3ad87f25053/.splits/1977310abc183fac9aba3dc626b01a2d
  68. ///value/92e897822d804d3bb4805548e9a80bd2.fad11edf1e6e0a842b7fd3ad87f25053
  69. Pathstoredir=Store.getStoreHomedir(splitdir,encoded,family);
  70. //这里会根据splitRow分别创建两个文件,一个是从最开始到splitRow
  71. //还有一个是从splitRow到文件最后
  72. //这里是直接调用HDFS的API写入到底层文件系统中的
  73. StoreFile.split(fs,storedir,sf,this.splitrow,Range.bottom);
  74. encoded=this.hri_b.getEncodedName();
  75. storedir=Store.getStoreHomedir(splitdir,encoded,family);
  76. StoreFile.split(fs,storedir,sf,this.splitrow,Range.top);
  77. }
  78. //这里会根据传入的参数,是从开始到splitRow
  79. //还是从splitRow到文件结束
  80. //如果是从开始到splitRow,那么判断第一个key如果splitRow大则这个
  81. //文件就不需要分割了,直接返回即可
  82. StoreFile#split(){
  83. if(range==Reference.Range.bottom){
  84. KeyValuesplitKey=KeyValue.createLastOnRow(splitRow);
  85. byte[]firstKey=f.createReader().getFirstKey();
  86. if(f.getReader().getComparator().compare(splitKey.getBuffer(),
  87. splitKey.getKeyOffset(),splitKey.getKeyLength(),
  88. firstKey,0,firstKey.length)<0){
  89. returnnull;
  90. }
  91. }else{
  92. KeyValuesplitKey=KeyValue.createFirstOnRow(splitRow);
  93. byte[]lastKey=f.createReader().getLastKey();
  94. if(f.getReader().getComparator().compare(splitKey.getBuffer(),
  95. splitKey.getKeyOffset(),splitKey.getKeyLength(),
  96. lastKey,0,lastKey.length)>0){
  97. returnnull;
  98. }
  99. }
  100. Referencer=newReference(splitRow,range);
  101. StringparentRegionName=f.getPath().getParent().getParent().getName();
  102. Pathp=newPath(splitDir,f.getPath().getName()+"."+parentRegionName);
  103. returnr.write(fs,p);
  104. }
  105. //创建一个HRegion
  106. SplitTransaction#createDaughterRegion(){
  107. FileSystemfs=this.parent.getFilesystem();
  108. PathregionDir=getSplitDirForDaughter(this.parent.getFilesystem(),
  109. this.splitdir,hri);
  110. HRegionr=HRegion.newHRegion(this.parent.getTableDir(),
  111. this.parent.getLog(),fs,this.parent.getBaseConf(),
  112. hri,this.parent.getTableDesc(),rsServices);
  113. longhalfParentReadRequestCount=this.parent.getReadRequestsCount()/2;
  114. r.readRequestsCount.set(halfParentReadRequestCount);
  115. r.setOpMetricsReadRequestCount(halfParentReadRequestCount);
  116. longhalfParentWriteRequest=this.parent.getWriteRequestsCount()/2;
  117. r.writeRequestsCount.set(halfParentWriteRequest);
  118. r.setOpMetricsWriteRequestCount(halfParentWriteRequest);
  119. HRegion.moveInitialFilesIntoPlace(fs,regionDir,r.getRegionDir());
  120. returnr;
  121. }
  122. //设置region的info:regioninfo列为下线状态
  123. //再增加两个列info:splitA和info:splitB
  124. MetaEditor#offlineParentInMeta(){
  125. HRegionInfocopyOfParent=newHRegionInfo(parent);
  126. copyOfParent.setOffline(true);
  127. copyOfParent.setSplit(true);
  128. Putput=newPut(copyOfParent.getRegionName());
  129. addRegionInfo(put,copyOfParent);
  130. put.add("info","splitA",Writables.getBytes(a));
  131. put.add("info","splitB",Writables.getBytes(b));
  132. putToMetaTable(catalogTracker,put);
  133. }
  134. //这里的DaughterOpener是对HRegion的封装
  135. //会在新线程中启动HRegion#open()
  136. //之后会更新META表信息,之后META表在很短的时间内
  137. //会同时存在父region信息(已下线)和两个子region信息
  138. SplitTransaction#openDaughters(){
  139. DaughterOpeneraOpener=newDaughterOpener(server,a);
  140. DaughterOpenerbOpener=newDaughterOpener(server,b);
  141. aOpener.start();
  142. bOpener.start();
  143. aOpener.join();
  144. bOpener.join();
  145. HRegionServer.postOpenDeployTasks(b,server.getCatalogTracker(),true);
  146. //ShouldaddittoOnlineRegions
  147. HRegionServer.addToOnlineRegions(b);
  148. HRegionServer.postOpenDeployTasks(a,server.getCatalogTracker(),true);
  149. HRegionServer.addToOnlineRegions(a);
  150. }
  151. //如果StoreFile超过一定数量了会执行compact
  152. //然后更新ZK或者ROOT和META表
  153. HRegionServer#postOpenDeployTasks(){
  154. for(Stores:r.getStores().values()){
  155. if(s.hasReferences()||s.needsCompaction()){
  156. getCompactionRequester().requestCompaction(r,s,"OpeningRegion",null);
  157. }
  158. }
  159. //更新ZK或者ROOT和META表
  160. if(r.getRegionInfo().isRootRegion()){
  161. RootLocationEditor.setRootLocation(getZooKeeper(),
  162. this.serverNameFromMasterPOV);
  163. }elseif(r.getRegionInfo().isMetaRegion()){
  164. MetaEditor.updateMetaLocation(ct,r.getRegionInfo(),
  165. this.serverNameFromMasterPOV);
  166. }else{
  167. if(daughter){
  168. //Ifdaughterofasplit,updatewholerow,notjustlocation.
  169. MetaEditor.addDaughter(ct,r.getRegionInfo(),
  170. this.serverNameFromMasterPOV);
  171. }else{
  172. MetaEditor.updateRegionLocation(ct,r.getRegionInfo(),
  173. this.serverNameFromMasterPOV);
  174. }
  175. }
  176. }
  177. //将ZK中/hbase/unassigned节点下的
  178. //fad11edf1e6e0a842b7fd3ad87f25053(待处理的region)
  179. //删除
  180. SplitTransaction#transitionZKNode(){
  181. transitionNodeSplit();
  182. tickleNodeSplit();
  183. }

一些辅助逻辑:

  1. //等待压缩完成,然后刷新数据
  2. //最后再线程池中关闭所有的Store
  3. HRegion#close(){
  4. waitForFlushesAndCompactions();
  5. internalFlushcache();
  6. ThreadPoolExecutorstoreCloserThreadPool=
  7. getStoreOpenAndCloseThreadPool("StoreCloserThread-"
  8. +this.regionInfo.getRegionNameAsString());
  9. CompletionService<ImmutableList<StoreFile>>completionService=
  10. newExecutorCompletionService<ImmutableList<StoreFile>>(
  11. storeCloserThreadPool);
  12. for(finalStorestore:stores.values()){
  13. completionService.submit(newCallable<ImmutableList<StoreFile>>(){
  14. publicImmutableList<StoreFile>call()throwsIOException{
  15. returnstore.close();
  16. }
  17. });
  18. }
  19. }
  20. //提交到线程池中关闭所有打开的StoreFile
  21. Store#close(){
  22. for(finalStoreFilef:result){
  23. completionService.submit(newCallable<Void>(){
  24. publicVoidcall()throwsIOException{
  25. f.closeReader(true);
  26. returnnull;
  27. }
  28. }
  29. }

compactionChecker线程

这个类是用于定期检查region server下的region是否需要做compact

主要逻辑如下:

  1. //不停的遍历当前RegionServer下的所有Region
  2. //然后检查是否需要做compact
  3. CompactionChecker#chore(){
  4. for(HRegionr:this.instance.onlineRegions.values()){
  5. for(Stores:r.getStores().values()){
  6. if(s.needsCompaction()){
  7. //Queueacompaction.Willrecognizeifmajorisneeded.
  8. this.instance.compactSplitThread.requestCompaction(r,s,getName());
  9. }elseif(s.isMajorCompaction()){
  10. if(majorCompactPriority==DEFAULT_PRIORITY
  11. ||majorCompactPriority>r.getCompactPriority()){
  12. this.instance.compactSplitThread.requestCompaction(r,s,getName());
  13. }else{
  14. this.instance.compactSplitThread.requestCompaction(r,s,getName());
  15. }
  16. }
  17. }
  18. }
  19. }

参考

深入分析HBase Compaction机制

Hbase的Region Compact算法实现分析

深入分析HBase RPC(Protobuf)实现机制

HBase region split源码分析



】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇关于hbase应用的一些看法 下一篇Hbase的存储原理

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目