写点什么

Disruptor 源码解读

用户头像
lich0079
关注
发布于: 2 小时前
Disruptor 源码解读

本文适合对 Disruptor 框架和源码有初步了解的读者。


Disruptor 版本 3.4.3


转载请注明来源 https://github.com/lich0079


RingBuffer


Disruptor 框架中的核心数据结构,event 的容器。内部实现是

Object[] entries;
复制代码


创建 RingBuffer 的时候要给出一个 bufferSize,必须是 2 的次方(数据访问优化)。


但实际上在内部会给这个数组开出的实际空间是 bufferSize + 2*BUFFER_PAD,在我的电脑上这个 PAD 是 32,所以最后的数组其实是这样的


entries=[null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
例子里event装的是long
复制代码


为什么说它是 ring 呢,就是因为访问它的数据用的是 sequence 来定位,sequence 是一个只会增加的 long 类型。


举个例子,给定 bufferSize 16, cursor 是数组实际的下标,那么对应关系就是如下所示。

sequence   cursor-----------------0          01          12          23          3 ..........15.........1516.........017.........118.........219.........3 ..........32.........033.........134.........235.........3 ..........
复制代码


EventFactoryEventFactory


EventFactory 是一次性使用的类,在最开始的时候用来给 RingBuffer 预填充数据。


为了避免 JAVA GC 带来的性能影响,Disruptor 采用的设计是在数组上预填充好对象,每次 publish event 的时候,只是通过 RingBuffer.get(seq)拿到对象,更新对象的值,然后就发布出去了。整个生产消费过程中再也不会有 event 对象的创建和销毁。


Sequence

用来表达 event 序例号的对象,但这里为什么不直接用 long 呢 ?


为了高并发下的可见性,肯定不能直接用 long 的,至少也是 volatile long。但 Disruptor 觉得 volatile long 还是不够用,所以创造了 Sequence 类。它的内部实现主要是 volatile long,


volatile long value;
复制代码


除此以外还支持以下特性:


  • CAS 更新

  • order writes (Store/Store barrier,改动不保证立即可见) vs volatile writes (Store/Load barrier,改动保证立即可见)

  • 在 volatile 字段 附近添加 padding 解决伪共享问题


简单理解就是高并发下优化的 long 类型。 这里有另一篇解释volatile的文章


在整个框架中可以看到在不同的类里,不同场景下对 sequence 的表达,有时用 long,有时用的 Sequence 类,这其实是背后对于效率和高并发可见性的考量。


比如在对 EventProcessor.sequence 的更新中都是用的 order writes,不保证立即可见,但速度快很多。在这个场景里,造成的结果是显示的消费进度可能比实际上慢,导致生产者有可能在可以生产的情况下没有去生产。但生产者看的是多个消费者中最慢的那个消费进度,所以影响可能没有那么大。


Sequencer

Sequencer 是 Disruptor 框架的核心类。


生产者发布 event 的时候首先需要预定一个 sequence,Sequencer 就是计算和发布 sequence 的。它主要有 2 个实现类:SingleProducerSequencer 和 MultiProducerSequencer。


SingleProducerSequencer

生产者每次通过 Sequencer.next(n) 来预定下面 n 个可以写入的数据位,然后修改数据,发布 event。


但因为 RingBuffer 是环形的,一个 size 16 的 RingBuffer 当你拿到 Sequence 为 16 时相当于又要去写 RingBuffer[0]的位置了,假如之前的数据还没被消费过就会被覆盖了。


Sequencer 是这样解决这个问题的,它在内部维护了一个


volatile Sequence[] gatingSequences = new Sequence[0];
复制代码


每个消费者会维护一个自己的 Sequence 对象,来记录自己已经消费到的序例位置。


每添加一个消费者,都会把消费者的 Sequence 引用添加到 gatingSequences 中。


通过访问 gatingSequences,Sequencer 可以得知消费的最慢的消费者消费到了哪个位置。


gatingSequences=[7, 8, 9, 10, 3, 4, 5, 6, 11]
8个消费者的例子,最慢的消费完了3,此时可以写seq 19的数据,但不能写seq 20。
复制代码


在 next(n)方法里,如果计算出的下一个 event 的 Sequence 值减去 bufferSize


long nextValue = this.nextValue;long nextSequence = nextValue + n;long wrapPoint = nextSequence - bufferSize;
复制代码


得出来的 wrapPoint > min(gatingSequences),说明即将写入的位置上,之前的 event 还有消费者没有消费,这时 SingleProducerSequencer 会等待并自旋。


while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))    {        LockSupport.parkNanos(1L);    }
复制代码


举个例子,gatingSequences=[7, 8, 9, 10, 3, 4, 5, 6, 11], RingBuffer size 16 的情况下,如果算出来的 nextSequence 是 20,wrapPoint 是 20-16=4, 这时 gatingSequences 里最小的是 3。


说明下一个打算写入的位置是 wrapPoint 4,但最慢的消费者才消费到 3,你不能去覆盖之前 4 上的数据,这时只能等待,等消费者把之前的 4 消费掉。


为什么 wrapPoint = nextSequence - bufferSize,而不是 bufferSize 的 n 倍呢,因为消费者只能落后生产者一圈,不然就已经存在数据覆盖了。


等到 SingleProducerSequencer 自旋到下一个位置所有人都消费过的时候,它就可以从 next 方法中返回,生产者拿着 sequence 就可以继续去发布。


MultiProducerSequencer

MultiProducerSequencer 是在多个生产者的场合使用的,多个生产者的情况下存在竞争,导致它的实现更加复杂。


int[] availableBuffer;int indexMask;int indexShift;
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy){ super(bufferSize, waitStrategy); availableBuffer = new int[bufferSize]; indexMask = bufferSize - 1; indexShift = Util.log2(bufferSize); initialiseAvailableBuffer();}
复制代码


数据结构上多出来的主要就是这个 availableBuffer,用来记录 RingBuffer 上哪些位置有数据可以读。


还是从 Sequencer.next(n)说起,计算下一个数据位 Sequence 的逻辑是一样的,包括消费者落后导致 Sequencer 自旋等待的逻辑。不同的是因为有多个 publisher 同时访问 Sequencer.next(n)方法,所以在确定最终位置的时候用了一个 CAS 操作,如果失败了就自旋再来一次。


cursor.compareAndSet(current, next)
复制代码


另一个不同的地方是 publish(final long sequence) 方法,SingleProducer 的版本很简单,就是移动了一下 cursor。


public void publish(long sequence){    cursor.set(sequence);    waitStrategy.signalAllWhenBlocking();}
复制代码


MultiProducer 的版本则是


public void publish(final long sequence){    setAvailable(sequence);    waitStrategy.signalAllWhenBlocking();}
复制代码


setAvailable 做了什么事呢,它去设置 availableBuffer 的状态位了。给定一个 sequence,先计算出对应的数组下标 index,然后计算出在那个 index 上要写的数据 availabilityFlag,最后执行


availableBuffer[index]=availabilityFlag
复制代码


根据 calculateAvailabilityFlag(sequence) 方法计算出来的 availabilityFlag 实际上是该 sequence 环绕 RingBuffer 的圈数。


availableBuffer=[6, 6, 6, 6, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
例子:前4个已经走到第6圈。
复制代码


availableBuffer 主要用于判断一个 sequence 下的数据是否可用


public boolean isAvailable(long sequence){    int index = calculateIndex(sequence);    int flag = calculateAvailabilityFlag(sequence);    long bufferAddress = (index * SCALE) + BASE;    return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;}
复制代码


作为比较,来看一下 SingleProducer 的方法


public boolean isAvailable(long sequence){    return sequence <= cursor.get();}
复制代码


在单个生产者的场景下,publishEvent 的时候才会推进 cursor,所以只要 sequence<=cursor,就说明数据是可消费的。


多个生产者的场景下,在 next(n)方法中,就已经通过 cursor.compareAndSet(current, next) 移动 cursor 了,此时 event 还没有 publish,所以 cursor 所在的位置不能保证 event 一定可用。


在 publish 方法中是去 setAvailable(sequence)了,所以 availableBuffer 是数据是否可用的标志。那为什么值要写成圈数呢,应该是避免把上一轮的数据当成这一轮的数据,错误判断 sequence 是否可用。


另一个值得一提的是 getHighestPublishedSequence 方法,这个是消费者用来查询最高可用 event 数据的位置。


public long getHighestPublishedSequence(long lowerBound, long availableSequence){    for (long sequence = lowerBound; sequence <= availableSequence; sequence++)    {        if (!isAvailable(sequence))        {            return sequence - 1;        }    }
return availableSequence;}
复制代码


给定一段 range,依次查询 availableBuffer,直到发现不可用的 Sequence,那么该 Sequence 之前的都是可用的。或全部都是可用的。


单生产者的版本:


    public long getHighestPublishedSequence(long lowerBound, long availableSequence)    {        return availableSequence;    }
复制代码


说完了生产者,下面来看看消费者


EventProcessor

EventProcessor extends Runnable, 可以理解它是一个消费者线程。


EventProcessor 本身也只是个 interface。


BatchEventProcessor<T>

主要属性有


DataProvider<T> dataProvider;   // 就是RingBuffer, event容器SequenceBarrier sequenceBarrier; // 用来获取可用event的sequenceEventHandler<? super T> eventHandler;  // 真正消费event的业务代码Sequence sequence = new Sequence(-1);      // 该消费线程消费完的sequence位置
复制代码


SequenceBarrier

ProcessingSequenceBarrier 内部持有 Sequencer 的 cursor 引用,知道生产者生产到哪个位置了。BatchEventProcessor.sequence 是当前消费线程消费到的位置。sequence + 1 就是下一个打算消费的位置 nextSequence,sequenceBarrier.waitFor(nextSequence) 会去获取下一个可以消费的 availableSequence。


拿到的 availableSequence 可能比要求的 nextSequence 大,意味着生产者生产出了很多可以消费的 event。这时就会一个个去消费,并且更新 BatchEventProcessor 的 sequence 至 availableSequence。此时 Sequencer 上的 gatingSequences 因为是引用的关系也会被更新。


WaitStrategy

调用 sequenceBarrier.waitFor(nextSequence) 时可能不会立即有新的 event,这时的行为由 waitStrategy 决定,有多种实现,比如 BlockingWaitStrategy。


Sequencer 在构造的时候就会传入一个 waitStrategy,sequenceBarrier 是由 Sequencer 创建的,创建的时候把 Sequencer 的 waitStrategy 传递给 sequenceBarrier。Sequencer 和 SequenceBarrier 持有同样的 waitStrategy,相当于在两者间起到了 传递信息和回调 的作用。


当消费者调用 sequenceBarrier.waitFor(nextSequence) 时因为没有新 event,方法会被 waitStrategy 调用


processorNotifyCondition.await();
复制代码


来停止。


这时如果生产者调用 Sequencer.publish 会调用 waitStrategy.signalAllWhenBlocking 然后间接调用


processorNotifyCondition.signalAll();
复制代码


从而唤醒 sequenceBarrier.waitFor 方法继续执行,看有没有新 event 可以用。有的话就可以继续消费。没有就再次陷入等待。


上面介绍的是通过 EventProcessor,EventHandler<T> 来消费 event 的场景,每一组 EventProcessor + EventHandler 是一个独立的消费者,会消费全量的 event 数据。如果你通过


handleEventsWith(final EventHandler<? super T>... handlers)
复制代码


接口添加了 10 个 EventHandler,那么 event 就会被消费 10 遍。


Disruptor 还支持一种多个线程共同消费 event 的模式。相关的类就是 WorkerPool,WorkProcessor,WorkHandler。


WorkerPool

Sequence workSequence = new Sequence(-1);WorkProcessor<?>[] workProcessors
复制代码


WorkerPool 内部维护了一个 workSequence,代表着整个 pool 分配出去的 event 位置。<=workSequence 的 event 已经被分配给某个 workProcessors 了,但是不是一定已经被消费完。这个设计和多生产者的情况下,先分配 sequence 到具体的某个生产者,然后再去填充,提交是一样的道理。


WorkProcessor

WorkProcessor 是基本的消费者线程,它保有 workSequence 的引用。


在它的 run loop 中,它会首先尝试 CAS 去抢 workSequence 的下一个位置,抢到了就会去消费。


如果没有可消费的 event 了,它就会调用 sequenceBarrier.waitFor(nextSequence) 陷入等待。但即使有了新的 event 被唤醒,它还是要靠 CAS 去抢下一个 event 的消费权。


while (true){    try    {        // if previous sequence was processed - fetch the next sequence and set        // that we have successfully processed the previous sequence        // typically, this will be true        // this prevents the sequence getting too far forward if an exception        // is thrown from the WorkHandler        if (processedSequence)   // 这个if里面的代码都是为了CAS拿event        {            processedSequence = false;            do            {                nextSequence = workSequence.get() + 1L; // 拿到下一个sequence                sequence.set(nextSequence - 1L);                 // 更新这个WorkProcessor的消费位置,这个位置主要是反映到Sequencer的gatingSequence从而影响生产者是否继续生产。                // 但实际上(nextSequence - 1L)这个位置很有可能不是这个WorkProcessor消费掉的            }            while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));        }
if (cachedAvailableSequence >= nextSequence) // 如果该nextSequence已经被生产出来 { event = ringBuffer.get(nextSequence); workHandler.onEvent(event); processedSequence = true; } else { cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); // 没有被生产出来就在这等待 } } // exception handler}
复制代码


转载请注明来源 https://github.com/lich0079

发布于: 2 小时前阅读数: 6
用户头像

lich0079

关注

还未添加个人签名 2018.09.17 加入

https://github.com/lich0079

评论

发布
暂无评论
Disruptor 源码解读