设为首页 加入收藏

TOP

disruptor 源码解读(一)
2018-04-29 06:06:35 】 浏览:692
Tags:disruptor 源码 解读

disruptor经过几年的发展,似乎已经成为性能优化的大杀器,几乎每个想优化性能的项目宣称自己用上了disruptor,性能都会呈现质的跃进。毕竟,最好的例子就是LMAX自己的架构设计,支撑了600w/s的吞吐。

本文试图从代码层面将关键问题做些解答。

基本概念

Disruptor: 实际上就是整个基于ringBuffer实现的生产者消费者模式的容器。

RingBuffer: 著名的环形队列,可以类比为BlockingQueue之类的队列,ringBuffer的使用,使得内存被循环使用,减少了某些场景的内存分配回收扩容等耗时操作。

 

EventProcessor: 事件处理器,实际上可以理解为消费者模型的框架,实现了线程Runnable的run方法,将循环判断等操作封在了里面。

EventHandler: 事件处置器,与前面处理器的不同是,事件处置器不负责框架内的行为,仅仅是EventProcessor作为消费者框架对外预留的扩展点罢了。

Sequencer: 作为RingBuffer生产者的父接口,其直接实现有SingleProducerSequencer和MultiProducerSequencer。

EventTranslator: 事件转换器。实际上就是新事件向旧事件覆盖的接口定义。

SequenceBarrier: 消费者路障。规定了消费者如何向下走。都说disruptor无锁,事实上,该路障算是变向的锁。

WaitStrategy: 当生产者生产得太快而消费者消费得太慢时的等待策略。

把上面几个关键概念画个图,大概长这样:

所以接下来主要也就从生产者,消费者以及ringBuffer3个维度去看disruptor是如何玩的。

生产者

生产者发布消息的过程从disruptor的publish方法为入口,实际调用了ringBuffer的publish方法。publish方法主要做了几件事,一是先确保能拿到后面的n个sequence;二是使用translator来填充新数据到相应的位置;三是真正的声明这些位置已经发布完成。

    public void publishEvent(EventTranslator<E> translator)  
   {  
     final long sequence = sequencer.next();  
     translateAndPublish(translator, sequence);  
   }  
    public void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize)  
    {  
        checkBounds(translators, batchStartsAt, batchSize);  
        final long finalSequence = sequencer.next(batchSize);  
        translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);  
    }

获取生产者下一个sequence的方法,细节已经注释,实际上最终目的就是确保生产者和消费者互相不越界。

public long next(int n)  
{  
    if (n < 1)  
    {  
        throw new IllegalArgumentException("n must be > 0");  
    }  
    //该生产者发布的最大序列号  
    long nextValue = this.nextValue;  
    //该生产者欲发布的序列号  
    long nextSequence = nextValue + n;  
    //覆盖点,即该生产者如果发布了这次的序列号,那它最终会落在哪个位置,实际上是nextSequence做了算术处理以后的值,最终目的是统一计算,否则就要去判绝对值以及取模等麻烦操作  
    long wrapPoint = nextSequence - bufferSize;  
    //所有消费者中消费得最慢那个的前一个序列号  
    long cachedGatingSequence = this.cachedValue;  
  
    //这里两个判断条件:一是看生产者生产是不是超过了消费者,所以判断的是覆盖点是否超过了最慢消费者;二是看消费者是否超过了当前生产者的最大序号,判断的是消费者是不是比生产者还快这种异常情况  
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)  
    {  
        cursor.setVolatile(nextValue);  // StoreLoad fence  
  
        long minSequence;  
        //覆盖点是不是已经超过了最慢消费者和当前生产者序列号的最小者(这两个有点难理解,实际上就是覆盖点不能超过最慢那个生产者,也不能超过当前自身,比如一次发布超过bufferSize),gatingSequences的处理也是类似算术处理,也可以看成是相对于原点是正还是负  
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))  
        {  
            //唤醒阻塞的消费者  
            waitStrategy.signalAllWhenBlocking();  
            //等上1纳秒  
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?  
        }  
        //把这个最慢消费者缓存下来,以便下一次使用  
        this.cachedValue = minSequence;  
    }  
    //把当前序列号更新为欲发布序列号  
    this.nextValue = nextSequence;  
  
    return nextSequence;  
}

translator由用户在调用时自己实现,其实就是预留的一个扩展点,将覆盖事件预留出来。大部分实现都是将ByteBuffer复制到Event中,参考disruptor github官方例子。

最后声明新序列号发布完成,实际上就是设置了cursor,并且通知可能阻塞的消费者,这里已经发布完新的Event了,快来消费吧。

public void publish(long sequence)  
{  
    cursor.set(sequence);  
    waitStrategy.signalAllWhenBlocking();  
}

以上就是单生产者的分析,MultiProducerSequencer可以类似分析。

等待策略

等待策略实际上就是用来同步生产者和消费者的方法。SequenceBarrier只有一个实现ProcessingSequenceBarrier,中间就用到了WaitStrategy

BlockingWaitStrategy就是真正的加锁阻塞策略,采用的就是ReentrantLock以及Condition来控制阻塞与唤醒。

TimeoutBlockingWaitStrategy是BlockingWaitStr

首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Java基础教程反射详解 下一篇java (bitmap bitvector)的解析..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目