handler2,handler3三个消费者处理一批消息,每个消息都要被三个消费者处理到,三个消费者无依赖关系,则如下所示即可
disruptor.handleEventsWith(handler1,handler2,handler3);
假设handler3必须在handler1,handler2处理完成后进行处理
disruptor.handleEventsWith(handler1,handler2).then(handler3);
其他情况可视为以上两种情况的排列组合
分组
分组情况稍微不同,对于消费者,需要实现WorkHandler而不是EventHandler,借口定义分别如下所示:
public interface EventHandler<T>
{
/**
* Called when a publisher has published an event to the {@link RingBuffer}
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain.
*/
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
public interface WorkHandler<T>
{
/**
* Callback to indicate a unit of work needs to be processed.
*
* @param event published to the {@link RingBuffer}
* @throws Exception if the {@link WorkHandler} would like the exception handled further up the chain.
*/
void onEvent(T event) throws Exception;
}
假设handler1,handler2,handler3都实现了WorkHandler,则调用以下代码就可以实现分组
disruptor.handleEventsWithWorkerPool(handler1, handler2, handler3);
广播和分组之间也是可以排列组合的
tips
disruptor也提供了函数让你自定义消费者之间的关系,如
public EventHandlerGroup<T> handleEventsWith(final EventProcessor… processors)
当然,必须对disruptor有足够的了解才能正确的在EventProcessor中实现多消费者正确的逻辑
实现原理
为何高效
事件预分配
在定义disruptor的时候我们需要指定事件工厂EventFactory的逻辑,disruptor内部的ringbuffer的数据结构是数组,EventFactory就用于disruptor初始化时数组每个元素的填充。生产者开始后,是通过获取对应位置的Event,调用Event的setter函数更新Event达到生产数据的目的的。为什么这样?假设使用LinkedList,在生产消费的场景下生产者会产生大量的新节点,新节点被消费后又需要被回收,频繁的生产消费给GC带来很大的压力。使用数组后,在内存中存在的是一块大小稳定的内存,频繁的生产消费对GC并没有什么影响,大大减小了系统的最慢响应时间,更不会因为消费者的滞后导致OOM的发生。因此这种事件预分配的方法对于减轻GC压力可以说是一种简单有效的方法,日常工作中的借鉴意义还是很大的。
无锁算法
先看一段ABQ put算法的实现:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
通过以上代码说明两点:
- ABQ是通过lock机制实现的线程同步
- ABQ的所有操作共用同一个lock,故所有操作均是互斥的
这篇文章中讲述了一个实验, 测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次,在2.4G 6核机器上得到了如下的实验数据:
METHOD |
TIME (MS) |
Single thread |
300 |
Single thread with CAS |
5,700 |
Single thread with lock |
10,000 |
Single thread with volatile write |
4,700 |
Two threads with CAS |
30,000 |
Two threads with lock |
224,000 |
实验数据说明,使用CAS机制比使用lock机制快了一个数量级
另一方面,ABQ的所有操作都是互斥的,这点其实不是必要的,尤其像put和get操作,没必要共享一个lock,完全可以降低锁的粒度提高性能。
disruptor则与之不同:
disruptor使用了CAS机制同步线程,线程同步代价小于lock
disruptor遵守single writer原则,一块内存对应单个线程,不仅produce和consume不是互斥的,多线程的produce也不是互斥的
伪共享
伪共享一直是一个比较高级的话题,Doug lea在JDK的Concurrent使用了大量的缓存行机制避免伪共享,disruptor也是用了这样的机制。但是对于广大的码农而言,实际工作中我们可能很少会需要使用这样的机制。毕竟对于大部分人而言,与避免伪共享带来的性能提升而言,优化工程架构,算法,io等可能会给我们带来更大的性能提升。所以本文只简单提到这个话题,并不深入讲解,毕竟我也没有实际的应用经验去讲解这个话题。
单生产者模式
如图所示,图中数组代表ringbuffer,红色元素代表已经发布过的事件槽,绿色元素代表将要发布的事件槽,白色元素代表尚未利用的事件槽。disruptor生产时间包括三个阶段:申请事件槽,更新数据,发布事件槽。单生产者相对简单,
- 申请事件槽:此时,ringbuffer会将cursor后的一个事件槽返