设为首页 加入收藏

TOP

高性能线程间队列 DISRUPTOR 简介(二)
2017-12-29 06:06:38 】 浏览:1121
Tags:高性能 线程 队列 DISRUPTOR 简介
handler2,handler3三个消费者处理一批消息,每个消息都要被三个消费者处理到,三个消费者无依赖关系,则如下所示即可
disruptor.handleEventsWith(handler1,handler2,handler3);

  • 消费者之间有依赖关系

假设handler3必须在handler1,handler2处理完成后进行处理
disruptor.handleEventsWith(handler1,handler2).then(handler3);
其他情况可视为以上两种情况的排列组合

分组

分组情况稍微不同,对于消费者,需要实现WorkHandler而不是EventHandler,借口定义分别如下所示:

public interface EventHandler<T>
{
    /**
     * Called when a publisher has published an event to the {@link RingBuffer}
     *
     * @param event      published to the {@link RingBuffer}
     * @param sequence   of the event being processed
     * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
     * @throws Exception if the EventHandler would like the exception handled further up the chain.
     */
    void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
public interface WorkHandler<T>
{
    /**
     * Callback to indicate a unit of work needs to be processed.
     *
     * @param event published to the {@link RingBuffer}
     * @throws Exception if the {@link WorkHandler} would like the exception handled further up the chain.
     */
    void onEvent(T event) throws Exception;
}

假设handler1,handler2,handler3都实现了WorkHandler,则调用以下代码就可以实现分组

disruptor.handleEventsWithWorkerPool(handler1, handler2, handler3);

广播和分组之间也是可以排列组合的

tips

disruptor也提供了函数让你自定义消费者之间的关系,如
public EventHandlerGroup<T> handleEventsWith(final EventProcessor… processors)
当然,必须对disruptor有足够的了解才能正确的在EventProcessor中实现多消费者正确的逻辑

实现原理

为何高效

事件预分配

在定义disruptor的时候我们需要指定事件工厂EventFactory的逻辑,disruptor内部的ringbuffer的数据结构是数组,EventFactory就用于disruptor初始化时数组每个元素的填充。生产者开始后,是通过获取对应位置的Event,调用Event的setter函数更新Event达到生产数据的目的的。为什么这样?假设使用LinkedList,在生产消费的场景下生产者会产生大量的新节点,新节点被消费后又需要被回收,频繁的生产消费给GC带来很大的压力。使用数组后,在内存中存在的是一块大小稳定的内存,频繁的生产消费对GC并没有什么影响,大大减小了系统的最慢响应时间,更不会因为消费者的滞后导致OOM的发生。因此这种事件预分配的方法对于减轻GC压力可以说是一种简单有效的方法,日常工作中的借鉴意义还是很大的。

无锁算法

先看一段ABQ put算法的实现:

  • 每个对象一个锁,首先加锁
  • 如果数组是满的,加入锁的notFull条件等待队列。(notFull的具体机制可以看这里的一篇文章wait、notify与Condition | forever
  • 元素加入数组
  • 释放锁
public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

通过以上代码说明两点:

  • ABQ是通过lock机制实现的线程同步
  • ABQ的所有操作共用同一个lock,故所有操作均是互斥的

这篇文章中讲述了一个实验, 测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次,在2.4G 6核机器上得到了如下的实验数据:

METHOD TIME (MS)
Single thread 300
Single thread with CAS 5,700
Single thread with lock 10,000
Single thread with volatile write 4,700
Two threads with CAS 30,000
Two threads with lock 224,000

实验数据说明,使用CAS机制比使用lock机制快了一个数量级

另一方面,ABQ的所有操作都是互斥的,这点其实不是必要的,尤其像put和get操作,没必要共享一个lock,完全可以降低锁的粒度提高性能。

disruptor则与之不同:

disruptor使用了CAS机制同步线程,线程同步代价小于lock
disruptor遵守single writer原则,一块内存对应单个线程,不仅produce和consume不是互斥的,多线程的produce也不是互斥的

伪共享

伪共享一直是一个比较高级的话题,Doug lea在JDK的Concurrent使用了大量的缓存行机制避免伪共享,disruptor也是用了这样的机制。但是对于广大的码农而言,实际工作中我们可能很少会需要使用这样的机制。毕竟对于大部分人而言,与避免伪共享带来的性能提升而言,优化工程架构,算法,io等可能会给我们带来更大的性能提升。所以本文只简单提到这个话题,并不深入讲解,毕竟我也没有实际的应用经验去讲解这个话题。

单生产者模式

如图所示,图中数组代表ringbuffer,红色元素代表已经发布过的事件槽,绿色元素代表将要发布的事件槽,白色元素代表尚未利用的事件槽。disruptor生产时间包括三个阶段:申请事件槽,更新数据,发布事件槽。单生产者相对简单,

  • 申请事件槽:此时,ringbuffer会将cursor后的一个事件槽返
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇代码生成利器:IDEA 强大的 Live .. 下一篇JVM 堆内存和非堆内存

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目