设为首页 加入收藏

TOP

PUT服务器端写入过程+源码分析
2019-05-12 13:47:15 】 浏览:128
Tags:PUT 服务器 写入 过程 源码 分析
版权声明:本文为博主原创文章,未经允许,请勿转载! https://blog.csdn.net/yulin_Hu/article/details/83063475

PUT服务器端写入过程+源码分析

本文主要内容:
mem写入+wal写入 过程和源码分析

前言

HBase是一种基于LSM模型的分布式数据库。LSM的全称是Log-Structured Merge-Trees,即日志-结构化合并-树。LSM模型的最大特点就是,在读写之间采取一种平衡,牺牲部分读数据的性能,来大幅度的提升写数据的性能。通俗的讲,HBase写数据如此快,正是由于基于LSM模型,将数据写入内存和日志文件后即立即返回。

但是,数据始终在内存和日志中是不妥当的,首先内存毕竟是有限的稀缺资源,持续的写入会造成内存的溢出,而日志的写入仅是由于内存数据系统宕机或进程退出后立刻消失而采取的一种保护性措施,而不是作为最终的数据持久化。原因是写入日志时仅仅是简单的追加(append),这样的文件会导致读数据时效率会非常非常的低。而HBase本身采用的HFile的数据是严格排序的,这样在读取的时候可以采用二分查找等来提升读取的效率。

MemStore的flush就是为了解决上述问题而采取的一种有效措施

以上内容参考自:https://blog.csdn.net/lipeng_bigdata/article/details/50427147utm_source=copy

WAL介绍

WAL(Write-Ahead Logging)是一种高效的日志算法,几乎是所有非内存数据库提升写性能的不二法门,基本原理是在数据写入之前首先顺序写入日志,然后再写入缓存,等到缓存写满之后统一落盘。之所以能够提升写性能,是因为WAL将一次随机写转化为了一次顺序写加一次内存写。提升写性能的同时,WAL可以保证数据的可靠性,即在任何情况下数据不丢失。假如一次写入完成之后发生了宕机,即使所有缓存中的数据丢失,也可以通过恢复日志还原出丢失的数据。

HBase中可以通过设置WAL的持久化等级决定是否开启WAL机制、以及HLog的落盘方式。WAL的持久化等级分为如下四个等级:

  • SKIP_WAL:只写缓存,不写HLog日志。这种方式因为只写内存,因此可以极大的提升写入性能,但是数据有丢失的风险。在实际应用过程中并不建议设置此等级,除非确认不要求数据的可靠性。

  • ASYNC_WAL:异步将数据写入HLog日志中。

  • SYNC_WAL:同步将数据写入日志文件中,需要注意的是数据只是被写入文件系统中,并没有真正落盘。

  • FSYNC_WAL:同步将数据写入日志文件并强制落盘。最严格的日志写入等级,可以保证数据不会丢失,但是性能相对比较差。

  • USER_DEFAULT:默认如果用户没有指定持久化等级,HBase使用SYNC_WAL等级持久化数据。

用户可以通过客户端设置WAL持久化等级,代码:put.setDurability(Durability. SYNC_WAL );

简单来说WAL就是预先把要写到内存中的数据先写入到一个log文件中,HBase中WAL的实现就是HLog,每个Region Server拥有一个HLog日志,所有region的写入都是写到同一个HLog(目前我们暂时理解为一个regionSrver只有一个wal,据说(1.x版本可以开启MultiWAL功能,允许多个HLog))

HLog应该是一个sequenceFile,当数据写入时,是将数据对<HLogKey,WALEdit>按照顺序追加到HLog中,这一个数据对是一个WalEntry。WAL是按照修改时间的顺序进行存储,包括了同一个服务器上的所有region。

在这里插入图片描述

HlogKey主要存储了log sequence number,更新时间 write time,region name,表名table name以及cluster ids。其中log sequncece number作为HFile中一个重要的元数据,和HLog的生命周期息息相关;region name和table name分别表征该段日志属于哪个region以及哪张表;cluster ids用于将日志复制到集群中其他机器上。

WALEdit一个事物的更新集合,里面会包含很多个操作。

参考:https://www.cnblogs.com/163yun/p/9020608.html

写入

HBase是针对行级别的ACID操作。在上一篇文章中(HBase put过程客户端+服务端初步解析),我们简述了整个put的大体流程。对于写入的所有行,需要获取对应的锁,在写完之后进行释放。

1.写入过程

代码如下:

  • processRowsWithLocks
// processRowsWithLocks方法
public void processRowsWithLocks(RowProcessor<,> processor, long timeout,
  long nonceGroup, long nonce){
WALEdit walEdit = new WALEdit();
MultiVersionConsistencyControl.WriteEntry writeEntry = null;// 这是一个事物
boolean locked;
boolean walSyncSuccessful = false;
List<RowLock> acquiredRowLocks;
List<Mutation> mutations = new ArrayList<Mutation>();
List<Cell> memstoreCells = new ArrayList<Cell>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();//从processer中取出需要上锁的行
long mvccNum = 0;
WALKey walKey = null;
try {
  // 2. Acquire the row lock(s)
  acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
  for (byte[] row : rowsToLock) {
    // Attempt to lock all involved rows, throw if any lock times out
    acquiredRowLocks.add(getRowLock(row));//获取行锁
  }
  // 3. Region lock
  lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0  1 : acquiredRowLocks.size());
  // Get a mvcc write number
  // 这个方法返回的是:return sequenceId.incrementAndGet() + 1000000000;
  // 也就是seqid 先自增1,然后加上10亿的值作为mvccnum
  // 为什么加上10亿 前面已经解释过
  // 具体需要集合mvcc机制 和读取的流程
  mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);

  try {
    // doProcessRowWithTimeout方法这里面会把processor里面的action都取出来,分别放到mutations和walEdit中,代表了要写到memstore和wal中的数据
    //取得时候,如果action是put或事delete的话,就放到mutations,但是对于要写到wal的action
    // 则要多一步and
    // 判断如下:
    // for (Mutation m : mutations) {
   //    for (List<Cell> cells : m.getFamilyCellMap().values()) {
   //    boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
   //    for (Cell cell : cells) {
   //      if (writeToWAL) walEdit.add(cell);
   //    }
  //   }
  // }
  // 可以看到是判断每一个列族Durability,如果列族是SKIP_WAL,
  // 那么该列族下的所有cell都不会放放入
    doProcessRowWithTimeout(
        processor, now, this, mutations, walEdit, timeout);
    if (!mutations.isEmpty()) {
      // 5. Start mvcc transaction
      // 根据mvccnum创建writeEntry
      // 传进去的mvccNum应该是一个事物id
      // 但是目前的用处是什么 还不太清楚?????
      writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
      // 6. Call the preBatchMutate hook
      processor.preBatchMutate(this, walEdit);
      // 7. Apply to memstore
      //之前已经把需要写入mem的放入了mutations
    
      for (Mutation m : mutations) {
        // Handle any tag based cell features
        rewriteCellTags(m.getFamilyCellMap(), m);

        for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
          Cell cell = cellScanner.current();
          //----------------------------------
            //----------------------------
            // 这里为每一个cell设置的seqid是mvccnum
          CellUtil.setSequenceId(cell, mvccNum);
          // 得到对应HStore
          // 每一个Hregion维护一个 protected final Map<byte[], Store> stores
          // HStore是store的一个实现,他是一个memstore和一系列的storefile
          Store store = getStore(cell);
          
          if (store == null) {
            checkFamily(CellUtil.cloneFamily(cell));
            // unreachable
          }
          // 调用store的add方法,这个add方法调用了this.memstore.add(cell)
          Pair<Long, Cell> ret = store.add(cell);
           // public Pair<Long, Cell> add(final Cell cell) { 这是add方法的实现
           //    lock.readLock().lock(); //此时不可读 这个lock是hstore的
           //     try {
           //       return this.memstore.add(cell);
           //     } finally {
           //       lock.readLock().unlock();
           //     }
           //   }
           // 一个HStore对应的是一个memstore
           // 全局维护了long addedSize 应该是表示当前整个region在mem中的大小。
           // 代表的是整个region,那单个mem的大小呢??????????
          addedSize += ret.getFirst();
          memstoreCells.add(ret.getSecond());
        }
      }

      long txid = 0;
      // 8. Append no sync
      // 如果wal不为空,则写入wal
      if (!walEdit.isEmpty()) {
        // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
        // 构建walkey,这个时候walkey里面传入的logSeqNum是WALKey.NO_SEQUENCE_ID,值为-1
        // WALKey.NO_SEQUENCE_ID表示这个时候我们不关心这个号
        // 在真正写入wal的时候,这个值会被重新赋值
        walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
          this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
          processor.getClusterIds(), nonceGroup, nonce);
          // 前面已经构建了walEdit 这里构造了walKey
          
          // 调用wal的append
          // 这个append方法会去构建FSWALEntry
          // wal的append方法并不是直接就放到我们的log文件里了
          // 他有一个缓冲区,append的应该是先放到这个缓冲区里面
          // 再写到文件里面
          // 返回事物id
          // 前面写入cell的seqid是mvcum
          // wal.append 见后文wal写入部分
          // 返回的序列号作为事务id
        txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
          walKey, walEdit, getSequenceId(), true, memstoreCells);
      }
      // 9. Release region lock
      if (locked) {
        this.updatesLock.readLock().unlock();
        locked = false;
      }
      // 10. Release row lock(s)
      // 释放行锁
      releaseRowLocks(acquiredRowLocks);

      // 11. Sync edit log
      // 这里就是同步我们的log文件
      // 前面说了 wal会先写到一个缓冲区里面
      // 采用的应该是批量刷新
      // 因为这个过程还是应该挺费时间的
      if (txid != 0) {
        syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
      }  
      walSyncSuccessful = true;
      // 12. call postBatchMutate hook
      processor.postBatchMutate(this);
    }
  } finally {
    // ......

} finally {
  closeRegionOperation(); 
  // 这里需要对memstore进行刷新进行判断,具体的刷新逻辑,在memstore flush部分
  if (!mutations.isEmpty() &&
      isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
    requestFlush();
  }
}
  • HRegion.syncOrDefer

    在上面的方法中调用了这个方法。

    这是同步wal操作,之前的walEntry是写入到我们的buffer里,那就需要有wal的操作。上面调用的syncOrDefer就是来进行这个操作的。

    但是调用这个方法并不是说马上就会进行同步的操作。因为这个同步的操作是挺费时的,所以不会立即执行,而是和append操作一样,向我们的ringbuffer里面添加一个sync的操作。当ringbuffer的消费线程读到这个sync的时候会进行相应的操作。具体在消费线程的代码那里讲

    在RingBufferTruck中阐述了ringbuffer里面的数据类型有两种,一是SyncFuture,二是FSWALEntry,此时这里放入的就是SyncFuture。当然会进行Durability类型的判断。

//传入的txid其实是这个数据在ringbbuffer种的序列号
private void syncOrDefer(long txid, Durability durability) throws IOException {
    if (this.getRegionInfo().isMetaRegion()) {
     // 调用wal 也就是FHlog的sync方法 在FHlog处进行说明
        this.wal.sync(txid);
    } else {
      switch(durability) {
      case USE_DEFAULT:
        // do what table defaults to
        if (shouldSyncWAL()) {
          this.wal.sync(txid);
        }
        break;
      case SKIP_WAL:
        // nothing do to
        break;
      case ASYNC_WAL:
        // nothing do to
        break;
      case SYNC_WAL:
      case FSYNC_WAL:
        // sync the WAL edit (SYNC and FSYNC treated the same for now)
        this.wal.sync(txid);
        break;
      }
    }
  }

WAL写入相关

1.FSHlog

在FSHlog的实现中,有一个private final Disruptor<RingBufferTruck> disruptor,disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue。disruptor主要用于大规模低延迟的高并发业务场景,其核心disruptor是一个基于事件源驱动机制的业务逻辑处理器,整个业务逻辑处理器完全运行在内存中,disruptor在无锁的网络情况下,实现了Queue的并发。disruptor适用于大规模低延迟的并发场景。可用于读写操作分离、数据缓存,速度匹配(因为其实现了生产者-消费者模型)、或者是基于内存的事件流处理机制的场景(参考https://blog.csdn.net/jeffsmish/article/details/53572043)。Disruptor一般用于线程间消息的传递

Disruptor内部有一个ringbuffer。所有的读写操作都是基于这个ringbuffer的。

对于Disruptor的生产者,很明显的是:当用一个简单队列来发布事件的时候会牵涉更多的细节,这是因为事件对象还需要预先创建。发布事件最少需要两步:获取下一个事件槽并发布事件(发布事件的时候要使用try/finnally保证事件一定会被发布)。如果我们使用RingBuffer.next()获取一个事件槽,那么一定要发布对应的事件。 如果不能发布事件,那么就会引起Disruptor状态的混乱。尤其是在多个事件生产者的情况下会导致事件消费者失速,从而不得不重启应用才能会恢复。

事件的发布

 //1.可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
 long sequence = ringBuffer.next();
 try {
    //2.用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
    LongEvent event = ringBuffer.get(sequence);
    //3.获取要通过事件传递的业务数据
    event.setValue(bb.getLong(0));
 } finally {
   //4.发布事件
   //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
   ringBuffer.publish(sequence);
 }

选自博客https://www.cnblogs.com/sigm/p/6251910.html
一个disruptor还需要绑定一个事件的消费者

  • FSHLog
// 在FSHlog的实现中

// 初始化disruptor
// 数据类型是RingBufferTruck 这个是hbase自己实现的,代表了buffer里面的数据类型
// 见后面对RingBufferTruck这个类的说明
// preallocatedEventCount是这个ringbuffer的size
final int preallocatedEventCount =
      this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
this.disruptor =
      new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount,
        this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy())
// ringbuffer前进1,使得起从1开始而不是0
this.disruptor.getRingBuffer().next();
this.ringBufferEventHandler =
      new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5),
        maxHandlersCount);
this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
// 这一步应该是很关键的吧 给这个disruptor关联时间消费者处理线程
// 见后面这个线程具体的解释
this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
// Presize our map of SyncFutures by handler objects.
this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
// Starting up threads in constructor is a no no; Interface should have an init call.
// 启动
this.disruptor.start();

// 以上的这些操作在wal构建的时候就会执行

// append方法 Hregion中调用
public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
      final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
      final List<Cell> memstoreCells) {
    // Make a trace scope for the append.  It is closed on other side of the ring buffer by the
    // single consuming thread.  Don't have to worry about it.
    TraceScope scope = Trace.startSpan("FSHLog.append");
    
    FSWALEntry entry = null;
    long sequence = this.disruptor.getRingBuffer().next();//得到ringbuffer的在一个事件槽
    try {
        // 根据序号得到RingBufferTruck 然后设置值 
      RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
      entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri,
        (memstoreCells != null) memstoreCells: edits == null null: edits.getCells());
      truck.loadPayload(entry, scope.detach());
    } finally {
        // 发布
      this.disruptor.getRingBuffer().publish(sequence);
    }
    return sequence; // 返回序列号
    
  }


// sync方法:
  @Override
  public void sync(long txid) throws IOException {
    if (this.highestSyncedSequence.get() >= txid){
      // Already sync'd.
      return;
    }
    TraceScope scope = Trace.startSpan("FSHLog.sync");
    try {
        // 放进ringbuffer 并阻塞线程 等待唤醒
        // publishSyncOnRingBuffer方法里面会分别调用blockOnSyn 和 publishSyncOnRingBuffer方法
        // publishSyncOnRingBuffer会构造一个SyncFuture并放到ringbuffer中,和append方法类似。
        // 而blockOnSyn 则是阻塞线程的作用
      scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
    } finally {
      assert scope == NullScope.INSTANCE || !scope.isDetached();
      scope.close();
    }
  }


// publishSyncOnRingBuffer方法会将得到的syncFuture返回回来
// 而blockOnSyn的阻塞也是基于这个syncFuture进行的
// 调用syncFuture的get方法实现阻塞,如下:
 public synchronized long get() throws InterruptedException, ExecutionException {
    // syncFuture有个属性doneSequence ,构建的时候会设置为0
    // 只要doneSequence==0 isDone都会返回false 进行阻塞
     while (!isDone()) { 
      wait(1000);
    }
    if (this.throwable != null) throw new ExecutionException(this.throwable);
    return this.doneSequence;
  }


// 在FSHLog维护了一个private final Map<Thread, SyncFuture> syncFuturesByHandler
// 在每次getSyncFuture的时候都会执行syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
// 这个map应该是在线程唤醒的时候用
// 在disruptor的消费者线程里面会讲到线程唤醒
  • RingBufferTruck

    RingBufferTruck里面的数据类型有两种:一是FSWALEntry,对应我们的append操作。二是SyncFuture,代表我们的同步操作,但是一个RingBufferTruck只会是这两种类型中的一种。

    
    class RingBufferTruck {
      /**
       * Either this syncFuture is set or entry is set, but not both.
       */
      private SyncFuture syncFuture;
      private FSWALEntry entry;
    
      /**
       * The tracing span for this entry.  Can be null.
       * TODO: Fix up tracing.
       */
      private Span span;
      void loadPayload(final FSWALEntry entry, final Span span) {
        this.entry = entry;
        this.span = span;
        this.syncFuture = null;
      }
      void loadPayload(final SyncFuture syncFuture) {
        this.syncFuture = syncFuture;
        this.entry = null;
        this.span = null;
      }
    
      boolean hasFSWALEntryPayload() {
        return this.entry != null;
      }
      boolean hasSyncFuturePayload() {
        return this.syncFuture != null;
      }
      FSWALEntry unloadFSWALEntryPayload() {
        FSWALEntry ret = this.entry;
        this.entry = null;
        return ret;
      }
      SyncFuture unloadSyncFuturePayload() {
        SyncFuture ret = this.syncFuture;
        this.syncFuture = null;
        return ret;
      }
      Span unloadSpanPayload() {
        Span ret = this.span;
        this.span = null;
        return ret;
      }
      final static EventFactory<RingBufferTruck> EVENT_FACTORY = new EventFactory<RingBufferTruck>() {
        public RingBufferTruck newInstance() {
          return new RingBufferTruck();
        }
      };
    }
    

2. Disruptor消费线程,即wal刷新

到目前,整个写入的流程个wal其实已经基本清楚了。在前面数据写入和sync操作都是利用了disruptor,在前面也介绍过这个,在FSHlog的构造方法中,它绑定了一个消费者线程。这个消费者线程就是用来处理wal写入和刷新的。wal的刷新是一个费时的操作,所以HBase采用的是批量刷新的操作。

这个批量刷新是说,虽然调用了sync的操作,向队列中写入了一个SyncFuture,但是消费线程并不是读到一个SyncFuture就会执行sync的操作,二是批量读到一定数量的SyncFuture才会进行同步操作。

在FSHlog的里RingBufferEventHandler实现了 EventHandler,这就是前面绑定的消费者线程。

// 先看这个类的说明
  /**
   * Handler that is run by the disruptor ringbuffer consumer. 这个线程就是 disruptor的消费者
   Consumer is a SINGLE 'writer/appender' thread.  Appends edits and starts up sync runs.  消费者是一个线程,数据追加和同步操作都由它运行
   Tries its best to batch up syncs. 批量同步 There is no discernible benefit batching appends so we just append as they come in because it simplifies the below implementation.  See metrics for batching effectiveness
   * (In measurement, at 100 concurrent handlers writing 1k, we are batching > 10 appends and 10
   * handler sync invocations for every actual dfsclient sync call; at 10 concurrent handlers,
   * YMMV).
   * <p>Herein, we have an array into which we store the sync futures as they come in.  When we
   * have a 'batch', we'll then pass what we have collected to a SyncRunner thread to do the
   * filesystem sync.  When it completes, it will then call
   * {@link SyncFuture#done(long, Throwable)} on each of SyncFutures in the batch to release
   * blocked Handler threads.会有SyncRunner线程去执行文件系统的同步,完成后会调用SyncFuture的done结束等待的线程
  .,..........
  */

  class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware {
    private final SyncRunner [] syncRunners;// 使用多个线程刷新的目的是保持较低的延迟
    private final SyncFuture [] syncFutures;
    // Had 'interesting' issues when this was non-volatile.  On occasion, we'd not pass all
    // syncFutures to the next sync'ing thread.
    private volatile int syncFuturesCount = 0;
    private volatile SafePointZigZagLatch zigzagLatch;
    /**
     * Set if we get an exception appending or syncing so that all subsequence appends and syncs
     * on this WAL fail until WAL is replaced.
     */
    private Exception exception = null;
    /**
     * Object to block on while waiting on safe point.
     */
    private final Object safePointWaiter = new Object();
    private volatile boolean shutdown = false;

    /**
     * Which syncrunner to use next.
     */
    private int syncRunnerIndex;
  
 // 主要看这个onEvent方法:     
  public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
    throws Exception {
      try {
          // 如果是sync的操作
        if (truck.hasSyncFuturePayload()) { 
            // 这个类开始的时候维护了一个数组 SyncFuture [] syncFutures
            // 如果是SyncFuture操作,则加入到数组中
          this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();
          // 如果数组满了 一批到结束了
          if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true;
        } else if (truck.hasFSWALEntryPayload()) {// 如果编辑操作
          TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
          try {
            FSWALEntry entry = truck.unloadFSWALEntryPayload();
            if (this.exception != null) {
              entry.stampRegionSequenceId();
              return;
            }
            // 这个append方法
            // 这个append方法里面重新为每个key和cell设置seqid  
            // append方法调用   下面讲这个方法
            append(entry);
          } catch (Exception e) {
          } finally {
            assert scope == NullScope.INSTANCE || !scope.isDetached();
            scope.close(); // append scope is complete
          }
        }
          
        if (this.exception == null) {
          // If not a batch, return to consume more events from the ring buffer before proceeding;
          // we want to get up a batch of syncs and appends before we go do a filesystem sync.
          if (!endOfBatch || this.syncFuturesCount <= 0) return;

          // 如果批处理到结尾了
          // 前面说了 有一个SyncRunner的数组 取一个出来
         int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length;
          try {
              //调用SyncRunner的offer方法
            this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount);
          } catch (Exception e) {
            // Should NEVER get here.
            requestLogRoll();
            this.exception = new DamagedWALException("Failed offering sync", e);
          }
        }
        // We may have picked up an exception above trying to offer sync
        if (this.exception != null) {
          cleanupOutstandingSyncsOnException(sequence,
            this.exception instanceof DamagedWALException
              this.exception:
              new DamagedWALException("On sync", this.exception));
        }
        attainSafePoint(sequence);
        this.syncFuturesCount = 0;

      }
      
      
   void append(final FSWALEntry entry) throws Exception {
      long regionSequenceId = WALKey.NO_SEQUENCE_ID;
      try {
          // stampRegionSequenceId会重新为这个entry的每一个cell和walkey设置seqid
        regionSequenceId = entry.stampRegionSequenceId();
        if (entry.getEdit().isEmpty()) {
          return;
        }
         //FSHlog维护了一个Writer,代表了当前的log file
        writer.append(entry);
        assert highestUnsyncedSequence < entry.getSequence();
        // 重新设置highestUnsyncedSequence 这是还未sync的最大的事务id
        highestUnsyncedSequence = entry.getSequence();
        Long lRegionSequenceId = Long.valueOf(regionSequenceId);
        //highestRegionSequenceIds是维护的一个map,代表了每一个region和最大的seqid的对应关系
        highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
        if (entry.isInMemstore()) {
          updateOldestUnflushedSequenceIds(encodedRegionName,
              entry.getFamilyNames(), lRegionSequenceId);
        } 
  }

在FSHlog里面的SyncRunner类,也就是上面提到的会调用这个方法进行同步操作

private class SyncRunner extends HasThread {
  //SyncRunner本身又维护了一个BlockingQueue
   private final BlockingQueue<SyncFuture> syncFutures;
    
    
    // RingBufferEventHandler中调用的是这个方法
    // 会传入当前SyncFuture数组中的所有的SyncFuture
    // 会把这些SyncFuture添加到BlockingQueue里面,run会从这个BlockingQueue里面取
   void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) {
      // Set sequence first because the add to the queue will wake the thread if sleeping.
      this.sequence = sequence;
      this.syncFutures.addAll(Arrays.asList(syncFutures).subList(0, syncFutureCount));
    }
    
   // run方法应该是负责具体的刷写文件缓存到磁盘的工作
   public void run() {
      long currentSequence;
      while (!isInterrupted()) {
        int syncCount = 0;
        SyncFuture takeSyncFuture;
        try {
          while (true) {  // 循环
            // We have to process what we 'take' from the queue
            takeSyncFuture = this.syncFutures.take();//get一个
            currentSequence = this.sequence;
            // 得到这个元素在ringbuffer中的序列号 
            long syncFutureSequence = takeSyncFuture.getRingBufferSequence();  
            if (syncFutureSequence > currentSequence) {
              throw new IllegalStateException("currentSequence=" + syncFutureSequence +
                ", syncFutureSequence=" + syncFutureSequence);
            }
            // See if we can process any syncfutures BEFORE we go sync
            // 和同步SyncFuture最大的序列号相比
            long currentHighestSyncedSequence = highestSyncedSequence.get();
            if (currentSequence < currentHighestSyncedSequence) {
                //  释放调当前syncFuture持有线程的等待
              syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
              // Done with the 'take'.  Go around again and do a new 'take'.
              continue;
            }
            break;
          }
          // I got something.  Lets run.  Save off current sequence number in case it changes
          // while we run.
          // 也就是说取到了最大的SyncFuture才会进行同步操作
          TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
          long start = System.nanoTime();
          Throwable lastException = null;
          try {
            Trace.addTimelineAnnotation("syncing writer");
            // 执行同步,其实就是流的flush操作
            writer.sync();
            Trace.addTimelineAnnotation("writer synced");
            currentSequence = updateHighestSyncedSequence(currentSequence);
          }finally {
            // reattach the span to the future before releasing.
            takeSyncFuture.setSpan(scope.detach());
            // First release what we 'took' from the queue.
            syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException);
            // Can we release other syncs
             // 释放等待线程 
            syncCount += releaseSyncFutures(currentSequence, lastException);
            if (lastException != null) requestLogRoll();
            else checkLogRoll();
          }
          postSync(System.nanoTime() - start, syncCount);
        } 
      }
    }
  }    
}

3. wal写入总结

从上面wal的写入代码的流程也可以知道wal写入的逻辑模型,这里借助两张图来表示wal的逻辑模型:
在这里插入图片描述

在这里插入图片描述
以上两张图分别选自https://www.cnblogs.com/ohuang/p/5807543.html

https://blog.csdn.net/tengxy_cloud/article/details/53579795

MemStore刷新

前面介绍完了wal的写入部分,对于这一块还有一个MemStore的刷新,将在下一篇文章进行描述

其他参考:
https://blog.csdn.net/tengxy_cloud/article/details/53579795

https://www.cnblogs.com/163yun/p/9020608.html

https://blog.csdn.net/lipeng_bigdata/article/category/6049175

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇JVM GC优化 下一篇SQL Server 2000数据库数据参数大..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目