disruptor简介
背景
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka(Apache Kafka)、RabbitMQ(RabbitMQ)用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍The LMAX Architecture。同年它还获得了Oracle官方的Duke大奖。其他关于disruptor的背景就不在此多言,可以自己google。
官方资料
disruptor github wiki有关于disruptor相关概念和原理的介绍,该wiki已经很久没有更新。像Design and Implementation,对于想了解disruptor的人是很有吸引力的,但是只有题目没有内容,还是很遗憾的。本文稍后会对其内部原理做一个介绍性的描述。
disruptor github wiki:
Home · LMAX-Exchange/disruptor Wiki
disruptor github:
LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library
这个地方也有很多不错的资料:Disruptor by LMAX-Exchange
性能
disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。
官方也对disruptor和ArrayBlockingQueue的性能在不同的应用场景下做了对比,本文列出其中一组数据,数据中P代表producer,C代表consumer,ABS代表ArrayBlockingQueue:
完整的官方性能测试数据在Performance Results · LMAX-Exchange/disruptor Wiki可以看到,性能测试的代码已经包含在disruptor的代码中,你完全可以git下来在自己的主机上测试一下看看
如何使用
单生产者,单消费者
//声明disruptor中事件类型及对应的事件工厂 private class LongEvent { private long value; public LongEvent() { this.value = 0L; } public void set(long value) { this.value = value; } public long get() { return this.value; } } private EventFactory<LongEvent> eventFactory = new EventFactory<LongEvent>() { public LongEvent newInstance() { return new LongEvent(); } }; //声明disruptor, private int ringBufferSize = 1024; private Executor executor = Executors.newFixedThreadPool(8); private Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor); //pubisher逻辑,将原始数据转换为event,publish到ringbuffer private class Publisher implements EventTranslatorOneArg<LongEvent , String> { public void translateTo(LongEvent event, long sequence, String arg0) { event.set(Long.parseLong(arg0)); } } //consumer逻辑,获取event进行处理 private class Consumer implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { long value = event.get(); int index = (int) (value % Const.NUM_OF_FILE); fileWriter[index].write("" + value + "\n"); if(value == Long.MAX_VALUE) { isFinish = true; } } } //注册consumer启动disruptor disruptor.handleEventsWith(new Consumer()); disruptor.start(); //获取disruptor的ringbuffer,用于生产数据 private RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ringBuffer.publishEvent(new Publisher(), line);
多生产者
多生产者的改动相对简单,只需将disruptor的声明换一个构造函数即可,但是多生产者ringbuffer的处理逻辑完全不同,只是这些不同对使用者透明,本文将在后边讨论单生产者,多生产者ringbuffer逻辑的不同
private Disruptor<LongEvent> disruptor1 = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy());
多消费者
多消费者的情况分为两类:
- 广播:对于多个消费者,每条信息会达到所有的消费者,被多次处理,一般每个消费者业务逻辑不通,用于同一个消息的不同业务逻辑处理
- 分组:对于同一组内的多个消费者,每条信息只会被组内一个消费者处理,每个消费者业务逻辑一般相同,用于多消费者并发处理一组消息
广播
- 消费者之间无依赖关系
假设目前有handler1,