设为首页 加入收藏

TOP

Disruptor-源码解读(一)
2023-07-25 21:31:39 】 浏览:62
Tags:Disruptor- 解读

前言

Disruptor的高性能,是多种技术结合以及本身架构的结果。本文主要讲源码,涉及到的相关知识点需要读者自行去了解,以下列出:

  • 锁和CAS
  • 伪共享和缓存行
  • volatile和内存屏障

原理

此节结合demo来看更容易理解:传送门

添加了中文注释的源码:Disruptor

下图来自官方文档

Untitled

官方原图有点乱,我翻译一下

Untitled

在讲原理前,先了解 Disruptor 定义的术语

  • Event

    存放数据的单位,对应 demo 中的 LongEvent

  • Ring Buffer

    环形数据缓冲区:这是一个首尾相接的环,用于存放 Event ,用于生产者往其存入数据和消费者从其拉取数据

  • Sequence

    序列:用于跟踪进度(生产进度、消费进度)

  • Sequencer

    Disruptor的核心,用于在生产者和消费者之间传递数据,有单生产者和多生产者两种实现。

  • Sequence Barrier

    序列屏障,消费者之间的依赖关系就靠序列屏障实现

  • Wait Strategy

  • 等待策略,消费者等待生产者将发布的策略

  • Event Processor

    事件处理器,循环从 RingBuffer 获取 Event 并执行 EventHandler。

  • Event Handler

    事件处理程序,也就是消费者

  • Producer

    生产者

Ring Buffer

环形数据缓冲区(RingBuffer),逻辑上是首尾相接的环,在代码中用数组来表示Object[]。Disruptor生产者发布分两步

  • 步骤一:申请写入 n 个元素,如果可以写入,这返回最大序列号
  • 步骤二:根据序列号去 RingBuffer 中获取 Event,修改并发布
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 获取下一个可用位置的下标(步骤1)
long sequence = ringBuffer.next();
try {
    // 返回可用位置的元素
    LongEvent event = ringBuffer.get(sequence);
    // 设置该位置元素的值
    event.set(l);
} finally {
    // 发布
    ringBuffer.publish(sequence);
}

这两个步骤由 Sequencer 完成,分为单生产者和多生产者实现

Sequencer

单生产者

如果申请 2 个元素,则如下图所示(圆表示 RingBuffer)

// 一般不会有以下写法,这里为了讲解源码才使用next(2)
// 向RingBuffer申请两个元素
long sequence = ringBuffer.next(2);
for (long i =  sequence-1; i <= sequence; i++) {
    try {
        // 返回可用位置的元素
        LongEvent event = ringBuffer.get(i);
        // 设置该位置元素的值
        event.set(1);
    } finally {
        ringBuffer.publish(i);
    }
}

Untitled

next 申请成功的序列,cursor 消费者最大可用序列,gatingSequence 表示能申请的最大序列号。红色表示待发布,绿色表示已发布。申请相当于占位置,发布需要一个一个按顺序发布

如果 RingBuffer 满了呢,在上图步骤二的基础上,生产者发布了3个元素,消费者消费1个。此时生产者再申请 2个元素,就会变成下图所示

Untitled

只剩下 1 个空间,但是要申请 2个元素,此时程序会自旋等待空间足够。

接下来结合代码看,单生产者的 Sequencer 实现为 SingleProducerSequencer,先看看构造方法

abstract class SingleProducerSequencerPad extends AbstractSequencer
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }
}

abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
{
    SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }

    long nextValue = Sequence.INITIAL_VALUE;
    long cachedValue = Sequence.INITIAL_VALUE;
}

public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }
}

这是 Disruptor 高性能的技巧之一,SingleProducerSequencer 需要的类变量只有 nextValue 和cachedValue,p1 ~ p7 的作用是填充缓存行,这能保证 nextValue 和cachedValue 必定在独立的缓存行,我们可以用ClassLayout打印内存布局看看

Untitled

接下来看如何获取序列号(也就是步骤一)

// 调用路径
// RingBuffer#next()
// SingleProducerSequencer#next()
public long next(int n)
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = this.nextValue;

    //生产者当前序号值+期望获取的序号数量后达到的序号值
    long nextSequence = nextValue + n;
    //减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’
    long wrapPoint = nextSequence - bufferSize;
    //从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’
    //这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。
    long cachedGatingSequence = this.cachedValue;

    //(wrapPoin
首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇static和final关键字 下一篇大话CAS

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目