写点什么

Disruptor 源码解读

用户头像
lich0079
关注
发布于: 2021 年 04 月 20 日
Disruptor 源码解读

本文对 Disruptor 的运行机制在源码级别上做出解读, 适合对 Disruptor 框架和源码有初步了解的读者。


Disruptor 版本 3.4.3


转载请注明来源 https://github.com/lich0079/notes/blob/master/knowledge/disruptor.md


RingBuffer


Disruptor 框架中的核心数据结构,event 的容器。内部实现是

Object[] entries;
复制代码


创建 RingBuffer 的时候要给出一个 bufferSize,必须是 2 的次方(数据访问优化,bit mask)。


但实际上在内部会给这个数组开出的实际空间是 bufferSize + 2*BUFFER_PAD,在我的电脑上这个 PAD 是 32,所以最后的数组其实是这样的


entries=[null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
例子里event装的是long
复制代码


为什么说它是 ring 呢,就是因为访问它的数据用的是 sequence 来定位,sequence 是一个只会增加的 long 类型。


举个例子,给定 bufferSize 16, cursor 是数组实际的下标,那么对应关系就是如下所示。

sequence   cursor-----------------0          01          12          23          3 ..........15.........1516.........017.........118.........219.........3 ..........32.........033.........134.........235.........3 ..........
复制代码


EventFactoryEventFactory


EventFactory 是一次性使用的类,在最开始的时候用来给 RingBuffer 预填充数据。


为了避免 JAVA GC 带来的性能影响,Disruptor 采用的设计是在数组上预填充好对象,每次 publish event 的时候,只是通过 RingBuffer.get(seq)拿到对象,更新对象的值,然后就发布出去了。整个生产消费过程中再也不会有 event 对象的创建和销毁。


Sequence

用来表达 event 序例号的对象,但这里为什么不直接用 long 呢 ?


为了高并发下的可见性,肯定不能直接用 long 的,至少也是 volatile long。但 Disruptor 觉得 volatile long 还是不够用,所以创造了 Sequence 类。它的内部实现主要是 volatile long,


volatile long value;
复制代码


除此以外还支持以下特性:


  • CAS 更新

  • order writes (Store/Store barrier,改动不保证立即可见) vs volatile writes (Store/Load barrier,改动保证立即可见)

  • 在 volatile 字段 附近添加 padding 解决伪共享问题


简单理解就是高并发下优化的 long 类型。 这里有另一篇解释volatile的文章


在整个框架中可以看到在不同的类里,不同场景下对 sequence 的表达,有时用 long,有时用的 Sequence 类,这其实是背后对于效率和高并发可见性的考量。


比如在对 EventProcessor.sequence 的更新中都是用的 order writes,不保证立即可见,但速度快很多。在这个场景里,造成的结果是显示的消费进度可能比实际上慢,导致生产者有可能在可以生产的情况下没有去生产。但生产者看的是多个消费者中最慢的那个消费进度,所以影响可能没有那么大。


Sequencer

Sequencer 是 Disruptor 框架的核心类。


生产者发布 event 的时候首先需要预定一个 sequence,Sequencer 就是计算和发布 sequence 的。它主要有 2 个实现类:SingleProducerSequencer 和 MultiProducerSequencer。


SingleProducerSequencer

生产者每次通过 Sequencer.next(n) 来预定下面 n 个可以写入的数据位,然后修改数据,发布 event。


但因为 RingBuffer 是环形的,一个 size 16 的 RingBuffer 当你拿到 Sequence 为 16 时相当于又要去写 RingBuffer[0]的位置了,假如之前的数据还没被消费过就会被覆盖了。


Sequencer 是这样解决这个问题的,它在内部维护了一个


volatile Sequence[] gatingSequences = new Sequence[0];
复制代码


每个消费者会维护一个自己的 Sequence 对象,来记录自己已经消费到的序例位置。


每添加一个消费者,都会把消费者的 Sequence 引用添加到 gatingSequences 中。


通过访问 gatingSequences,Sequencer 可以得知消费的最慢的消费者消费到了哪个位置。


gatingSequences=[7, 8, 9, 10, 3, 4, 5, 6, 11]
8个消费者的例子,最慢的消费完了3,此时可以写seq 19的数据,但不能写seq 20。
复制代码


在 next(n)方法里,如果计算出的下一个 event 的 Sequence 值减去 bufferSize


long nextValue = this.nextValue;long nextSequence = nextValue + n;long wrapPoint = nextSequence - bufferSize;
复制代码


得出来的 wrapPoint > min(gatingSequences),说明即将写入的位置上,之前的 event 还有消费者没有消费,这时 SingleProducerSequencer 会等待并自旋。


while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))    {        LockSupport.parkNanos(1L);    }
复制代码


举个例子,gatingSequences=[7, 8, 9, 10, 3, 4, 5, 6, 11], RingBuffer size 16 的情况下,如果算出来的 nextSequence 是 20,wrapPoint 是 20-16=4, 这时 gatingSequences 里最小的是 3。


说明下一个打算写入的位置是 wrapPoint 4,但最慢的消费者才消费到 3,你不能去覆盖之前 4 上的数据,这时只能等待,等消费者把之前的 4 消费掉。


为什么 wrapPoint = nextSequence - bufferSize,而不是 bufferSize 的 n 倍呢,因为消费者只能落后生产者一圈,不然就已经存在数据覆盖了。


等到 SingleProducerSequencer 自旋到下一个位置所有人都消费过的时候,它就可以从 next 方法中返回,生产者拿着 sequence 就可以继续去发布。


MultiProducerSequencer

MultiProducerSequencer 是在多个生产者的场合使用的,多个生产者的情况下存在竞争,导致它的实现更加复杂。


int[] availableBuffer;int indexMask;int indexShift;
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy){ super(bufferSize, waitStrategy); availableBuffer = new int[bufferSize]; indexMask = bufferSize - 1; indexShift = Util.log2(bufferSize); initialiseAvailableBuffer();}
复制代码


数据结构上多出来的主要就是这个 availableBuffer,用来记录 RingBuffer 上哪些位置有数据可以读。


还是从 Sequencer.next(n)说起,计算下一个数据位 Sequence 的逻辑是一样的,包括消费者落后导致 Sequencer 自旋等待的逻辑。不同的是因为有多个 publisher 同时访问 Sequencer.next(n)方法,所以在确定最终位置的时候用了一个 CAS 操作,如果失败了就自旋再来一次。


cursor.compareAndSet(current, next)
复制代码


另一个不同的地方是 publish(final long sequence) 方法,SingleProducer 的版本很简单,就是移动了一下 cursor。


public void publish(long sequence){    cursor.set(sequence);    waitStrategy.signalAllWhenBlocking();}
复制代码


MultiProducer 的版本则是


public void publish(final long sequence){    setAvailable(sequence);    waitStrategy.signalAllWhenBlocking();}
复制代码


setAvailable 做了什么事呢,它去设置 availableBuffer 的状态位了。给定一个 sequence,先计算出对应的数组下标 index,然后计算出在那个 index 上要写的数据 availabilityFlag,最后执行


availableBuffer[index]=availabilityFlag
复制代码


根据 calculateAvailabilityFlag(sequence) 方法计算出来的 availabilityFlag 实际上是该 sequence 环绕 RingBuffer 的圈数。


availableBuffer=[6, 6, 6, 6, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
例子:前4个已经走到第6圈。
复制代码


availableBuffer 主要用于判断一个 sequence 下的数据是否可用


public boolean isAvailable(long sequence){    int index = calculateIndex(sequence);    int flag = calculateAvailabilityFlag(sequence);    long bufferAddress = (index * SCALE) + BASE;    return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;}
复制代码


作为比较,来看一下 SingleProducer 的方法


public boolean isAvailable(long sequence){    return sequence <= cursor.get();}
复制代码


在单个生产者的场景下,publishEvent 的时候才会推进 cursor,所以只要 sequence<=cursor,就说明数据是可消费的。


多个生产者的场景下,在 next(n)方法中,就已经通过 cursor.compareAndSet(current, next) 移动 cursor 了,此时 event 还没有 publish,所以 cursor 所在的位置不能保证 event 一定可用。


在 publish 方法中是去 setAvailable(sequence)了,所以 availableBuffer 是数据是否可用的标志。那为什么值要写成圈数呢,应该是避免把上一轮的数据当成这一轮的数据,错误判断 sequence 是否可用。


另一个值得一提的是 getHighestPublishedSequence 方法,这个是消费者用来查询最高可用 event 数据的位置。


public long getHighestPublishedSequence(long lowerBound, long availableSequence){    for (long sequence = lowerBound; sequence <= availableSequence; sequence++)    {        if (!isAvailable(sequence))        {            return sequence - 1;        }    }
return availableSequence;}
复制代码


给定一段 range,依次查询 availableBuffer,直到发现不可用的 Sequence,那么该 Sequence 之前的都是可用的。或全部都是可用的。


单生产者的版本:


    public long getHighestPublishedSequence(long lowerBound, long availableSequence)    {        return availableSequence;    }
复制代码


说完了生产者,下面来看看消费者


EventProcessor

EventProcessor extends Runnable, 可以理解它是一个消费者线程。


EventProcessor 本身也只是个 interface。


BatchEventProcessor<T>

主要属性有


DataProvider<T> dataProvider;   // 就是RingBuffer, event容器SequenceBarrier sequenceBarrier; // 用来获取可用event的sequenceEventHandler<? super T> eventHandler;  // 真正消费event的业务代码Sequence sequence = new Sequence(-1);      // 该消费线程消费完的sequence位置
复制代码


SequenceBarrier

ProcessingSequenceBarrier 内部持有 Sequencer 的 cursor 引用,知道生产者生产到哪个位置了。BatchEventProcessor.sequence 是当前消费线程消费到的位置。sequence + 1 就是下一个打算消费的位置 nextSequence,sequenceBarrier.waitFor(nextSequence) 会去获取下一个可以消费的 availableSequence。


拿到的 availableSequence 可能比要求的 nextSequence 大,意味着生产者生产出了很多可以消费的 event。这时就会一个个去消费,并且更新 BatchEventProcessor 的 sequence 至 availableSequence。此时 Sequencer 上的 gatingSequences 因为是引用的关系也会被更新。


WaitStrategy

调用 sequenceBarrier.waitFor(nextSequence) 时可能不会立即有新的 event,这时的行为由 waitStrategy 决定,有多种实现,比如 BlockingWaitStrategy。


Sequencer 在构造的时候就会传入一个 waitStrategy,sequenceBarrier 是由 Sequencer 创建的,创建的时候把 Sequencer 的 waitStrategy 传递给 sequenceBarrier。Sequencer 和 SequenceBarrier 持有同样的 waitStrategy,相当于在两者间起到了 传递信息和回调 的作用。


消费者在没有可消费的 event 时会调用 waitStrategy.waitFor 陷入等待,生产者会在生产出新 event 后调用 waitStrategy.signalAllWhenBlocking 来唤醒消费者。


不同的 WaitStrategy 的实现会有不同的效率和性能。

BlockingWaitStrategy

该实现依赖 Lock 来设置等待和唤醒。 系统吞吐量和低延迟的表现比较差,好处是对 CPU 的消耗比较少。


Lock lock = new ReentrantLock();Condition processorNotifyCondition = lock.newCondition();
复制代码


processorNotifyCondition.await();
等待
复制代码


processorNotifyCondition.signalAll();
唤醒
复制代码


SleepingWaitStrategy

该实现是在性能和 CPU 占用之间的一种折中。


inal int DEFAULT_RETRIES = 200;long DEFAULT_SLEEP = 100;
int retries;long sleepTimeNs;
复制代码


if (counter > 100){    --counter;}else if (counter > 0){    --counter;    Thread.yield();}else{    LockSupport.parkNanos(sleepTimeNs);}
等待的实现,上面的counter就是retries。
复制代码


该实现对负责调用唤醒方法的生产者比较友好,因为啥都不用做。相当于完全依赖消费者端的自旋 retry。


public void signalAllWhenBlocking(){}
复制代码


YieldingWaitStrategy

该实现和 SleepingWaitStrategy 很类似,只是它在等待的时候会吃掉 100%的 CPU。


if (0 == counter){    Thread.yield();}else{    --counter;}
等待的实现, 只有 counter==0 的时候才让出CPU,其他时候都在自旋。
复制代码


和 SleepingWaitStrategy 一样,唤醒的时候啥都不用做。


public void signalAllWhenBlocking(){}
复制代码


BusySpinWaitStrategy

该实现的唤醒也是啥都不做。性能最好的实现,但对部署环境的要求也最高。消费者线程数应该要少于 CPU 的实际物理核心数。


ThreadHints.onSpinWait();
等待的实现。
复制代码


上面介绍的是通过 EventProcessor,EventHandler<T> 来消费 event 的场景,每一组 EventProcessor + EventHandler 是一个独立的消费者,会消费全量的 event 数据。如果你通过


handleEventsWith(final EventHandler<? super T>... handlers)
复制代码


接口添加了 10 个 EventHandler,那么 event 就会被消费 10 遍。


Disruptor 还支持一种多个线程共同消费 event 的模式。相关的类就是 WorkerPool,WorkProcessor,WorkHandler。


WorkerPool

Sequence workSequence = new Sequence(-1);WorkProcessor<?>[] workProcessors
复制代码


WorkerPool 内部维护了一个 workSequence,代表着整个 pool 分配出去的 event 位置。<=workSequence 的 event 已经被分配给某个 workProcessors 了,但是不是一定已经被消费完。这个设计和多生产者的情况下,先分配 sequence 到具体的某个生产者,然后再去填充,提交是一样的道理。


WorkProcessor

WorkProcessor 是基本的消费者线程,它保有 workSequence 的引用。


在它的 run loop 中,它会首先尝试 CAS 去抢 workSequence 的下一个位置,抢到了就会去消费。


如果没有可消费的 event 了,它就会调用 sequenceBarrier.waitFor(nextSequence) 陷入等待。但即使有了新的 event 被唤醒,它还是要靠 CAS 去抢下一个 event 的消费权。


while (true){    try    {        // if previous sequence was processed - fetch the next sequence and set        // that we have successfully processed the previous sequence        // typically, this will be true        // this prevents the sequence getting too far forward if an exception        // is thrown from the WorkHandler        if (processedSequence)   // 这个if里面的代码都是为了CAS拿event        {            processedSequence = false;            do            {                nextSequence = workSequence.get() + 1L; // 拿到下一个sequence                sequence.set(nextSequence - 1L);                 // 更新这个WorkProcessor的消费位置,这个位置主要是反映到Sequencer的gatingSequence从而影响生产者是否继续生产。                // 但实际上(nextSequence - 1L)这个位置很有可能不是这个WorkProcessor消费掉的            }            while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));        }
if (cachedAvailableSequence >= nextSequence) // 如果该nextSequence已经被生产出来 { event = ringBuffer.get(nextSequence); workHandler.onEvent(event); processedSequence = true; } else { cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); // 没有被生产出来就在这等待 } } // exception handler}
复制代码


一些思考

从 Disruptor 的实现可以看到因为采用环形数据结构,它会覆盖老数据,生产新数据必须等到老数据都被消费后,生产速度和消费速度是紧耦合的,一旦消费者们中有一个人慢了一点、卡顿、异常,会影响整个系统的生产以至消费。


唯一可以缓冲的就是 Buffer,即数组长度 bufferSize。


比如,官方文档就建议生产 event 的时候使用 EventTranslator API。因为外层有 try finally 兜底。


    private void translateAndPublish(EventTranslator<E> translator, long sequence)    {        try        {            translator.translateTo(get(sequence), sequence);        }        finally        {            sequencer.publish(sequence);        }    }
复制代码


2021.6 更新

定义 Event model 的时候可以加个 long seq 字段,把当时 disruptor 分配的 sequence 记住,方便以后 debug


Exception Handle

 

Producer

如果可能在生产的时候出异常,必须在 publishEvent 方法外面包一层 try catch, 否则会导致程序崩坏


// translateTo 方法中会出异常public static EventTranslatorTwoArg<StubEvent, String, Integer> eventTranslator = new EventTranslatorTwoArg<StubEvent, String, Integer>() {     @Override    public void translateTo(StubEvent event, long sequence, String arg0, Integer arg1) {        event.setSeq(sequence);        if (sequence % 3 == 0) {            throw new RuntimeException("translateTo exception," + arg0 + ",arg1=" + arg1 + ",seq=" + sequence);        }        event.setMsg(arg0);        event.setValue(arg1);    }};   //  publishEvent 需要 try catchfor (long i = 0; i < count ; i++) {    try {        disruptor.publishEvent(StubEvent.eventTranslator, "msg " + i, r.nextInt(100));    } catch (Exception e) {        log.error("publishEvent exception", e);    }}
复制代码


有一点需要注意的是 如果在生产的时候出了异常,导致 event 属性的填充出了问题,你又没有 clean 之前的属性的话, 因为是环形数据结构,消费者有可能消费上一次的属性。


比如 下面的log中, seq 6 的 event 在生产阶段出错了,消费阶段消费的是上一次的属性值


java.lang.RuntimeException: translateTo exception,msg 6,value=2,seq=6 // 生产时 seq 6 的event, value 是 2


journal StubEvent(value=84, msg=msg 2, seq=6) // 消费seq 6的时候, value 是 84, 其实是上一次的
复制代码


所以最好在 event 上设置一个 flag(比如: produceCompleted),放在 translateTo 方法的最后一行执行。 这样消费者可以在消费的时候检查这个 flag,防止误把以前的 event 消费 2 次。


// 生产者在开始生产的时候需要先把flag 设为false, 在最后一行设为true@Overridepublic void translateTo(StubEvent event, long sequence, String arg0, Integer arg1) {    event.setProduceCompleted(false);    event.setSeq(sequence);     if (sequence % 3 == 0) {        throw new RuntimeException("translateTo exception, arg0=" + arg0 + ",arg1=" + arg1 + ",seq=" + sequence);    }     event.setMsg(arg0);    event.setValue(arg1);     event.setProduceCompleted(true);}
复制代码


Consumer

在消费端,可通过


disruptor.setDefaultExceptionHandler
复制代码


方法设定异常处理, 如果没调用过这个方法的话,默认实现是 FatalExceptionHandler, log 并抛出异常, 会导致整个 disruptor 崩坏。


private ExceptionHandler<? super T> getExceptionHandler(){    ExceptionHandler<? super T> handler = exceptionHandler;    if (handler == null)    {        return ExceptionHandlers.defaultHandler();    }    return handler;}
复制代码


框架提供了一个 IgnoreExceptionHandler 只会输出 log, 不会抛出。建议采用或自己实现。


转载请注明来源 https://github.com/lich0079/notes/blob/master/knowledge/disruptor.md


发布于: 2021 年 04 月 20 日阅读数: 157
用户头像

lich0079

关注

还未添加个人签名 2018.09.17 加入

https://github.com/lich0079

评论

发布
暂无评论
Disruptor 源码解读