D大小,BUFFER_PAD分别在头尾
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
//初始化整个buffer
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
@SuppressWarnings("unchecked")
protected final E elementAt(long sequence)
{
//sequence & indexMask即对sequence取模, 最终算出来的就是基地址+偏移地址
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
}
主体代码基本如上。其他代码可以自行参考。
下面介绍下一些常见问题。
1. disruptor应该如何用才能发挥最大功效?
disruptor原本就是事件驱动的设计,其整个架构跟普通的多线程很不一样。比如一种用法,将disruptor作为业务处理,中间带I/O处理,这种玩法比多线程还慢;相反,如果将disruptor做业务处理,需要I/O时采用nio异步调用,不阻塞disruptor消费者线程,等到I/O异步调用回来后在回调方法中将后续处理重新塞到disruptor队列中,可以看出来,这是典型的事件处理架构,确实能在时间上占据优势,加上ringBuffer固有的几项性能优化,能让disruptor发挥最大功效。
2. disruptor为啥这么快?
这个问题参考之前的一篇文章 disruptor框架为什么这么强大
3. 多生产者如何写入消息?
多生产者的消息写入实际上是通过availableBuffer与消费者来同步最后一个生产者写入的位置,这样,消费者永远不能超越最慢的那个生产者。见如下代码段
private void setAvailableBufferValue(int index, int flag)
{
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
@Override
public boolean isAvailable(long sequence)
{
int index = calculateIndex(sequence);
int flag = calculateAvailabilityFlag(sequence);
long bufferAddress = (index * SCALE) + BASE;
return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}
@Override
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
{
if (!isAvailable(sequence))
{
return sequence - 1;
}
}
return availableSequence;
}
可以参考这篇文章 RingBuffer多生产者写入
4. 除了多个消费者重复处理生产者发送的消息,是否可以多消费者不重复处理生产者发送的消息,即各处理各的?
若要多消费者重复处理生产者的消息,则使用disruptor.handleEventsWith方法将消费者传入;而若要消费者不重复的处理生产者的消息,则使用disruptor.handleEventsWithWorkerPool方法将消费者传入。