设为首页 加入收藏

TOP

高性能线程间队列 DISRUPTOR 简介(一)
2017-12-29 06:06:38 】 浏览:87
Tags:高性能 线程 队列 DISRUPTOR 简介

disruptor简介

背景

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka(Apache Kafka)、RabbitMQ(RabbitMQ)用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍The LMAX Architecture。同年它还获得了Oracle官方的Duke大奖。其他关于disruptor的背景就不在此多言,可以自己google。

官方资料

disruptor github wiki有关于disruptor相关概念和原理的介绍,该wiki已经很久没有更新。像Design and Implementation,对于想了解disruptor的人是很有吸引力的,但是只有题目没有内容,还是很遗憾的。本文稍后会对其内部原理做一个介绍性的描述。

disruptor github wiki:
Home · LMAX-Exchange/disruptor Wiki

disruptor github:
LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library

这个地方也有很多不错的资料:Disruptor by LMAX-Exchange

性能

disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。

官方也对disruptor和ArrayBlockingQueue的性能在不同的应用场景下做了对比,本文列出其中一组数据,数据中P代表producer,C代表consumer,ABS代表ArrayBlockingQueue:

完整的官方性能测试数据在Performance Results · LMAX-Exchange/disruptor Wiki可以看到,性能测试的代码已经包含在disruptor的代码中,你完全可以git下来在自己的主机上测试一下看看

如何使用

单生产者,单消费者

//声明disruptor中事件类型及对应的事件工厂
private class LongEvent {
		private long value;
		
		public LongEvent() {
			this.value = 0L;
		}
		
		public void set(long value) {
			this.value = value;
		}
		
		public long get() {
			return this.value;
		}
	}
private EventFactory<LongEvent> eventFactory = new EventFactory<LongEvent>() {		
		public LongEvent newInstance() {
			return new LongEvent();
		}
};
//声明disruptor,
private int ringBufferSize = 1024;
private Executor executor = Executors.newFixedThreadPool(8);
private Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor);

//pubisher逻辑,将原始数据转换为event,publish到ringbuffer
private class Publisher implements EventTranslatorOneArg<LongEvent , String> {

		public void translateTo(LongEvent event, long sequence, String arg0) {
			event.set(Long.parseLong(arg0));
		}		
	}
//consumer逻辑,获取event进行处理
private class Consumer implements EventHandler<LongEvent> {

		public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
			long value = event.get();			
			int index = (int) (value % Const.NUM_OF_FILE);
			fileWriter[index].write("" + value + "\n");
			
			if(value == Long.MAX_VALUE) {
				isFinish = true;
			}
		}
		
	}
//注册consumer启动disruptor
disruptor.handleEventsWith(new Consumer());
disruptor.start();

//获取disruptor的ringbuffer,用于生产数据
private RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent(new Publisher(), line);

多生产者

多生产者的改动相对简单,只需将disruptor的声明换一个构造函数即可,但是多生产者ringbuffer的处理逻辑完全不同,只是这些不同对使用者透明,本文将在后边讨论单生产者,多生产者ringbuffer逻辑的不同

private Disruptor<LongEvent> disruptor1 = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy());

多消费者

多消费者的情况分为两类:

  • 广播:对于多个消费者,每条信息会达到所有的消费者,被多次处理,一般每个消费者业务逻辑不通,用于同一个消息的不同业务逻辑处理
  • 分组:对于同一组内的多个消费者,每条信息只会被组内一个消费者处理,每个消费者业务逻辑一般相同,用于多消费者并发处理一组消息

广播

  • 消费者之间无依赖关系

假设目前有handler1,
编程开发网

首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇代码生成利器:IDEA 强大的 Live .. 下一篇JVM 堆内存和非堆内存

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

最新文章

热门文章

C 语言

C++基础

windows编程基础

linux编程基础

C/C++面试题目