Disruptor 源码解读
本文对 Disruptor 的运行机制在源码级别上做出解读, 适合对 Disruptor 框架和源码有初步了解的读者。
Disruptor 版本 3.4.3
转载请注明来源 https://github.com/lich0079/notes/blob/master/knowledge/disruptor.md
RingBuffer
Disruptor 框架中的核心数据结构,event 的容器。内部实现是
创建 RingBuffer 的时候要给出一个 bufferSize,必须是 2 的次方(数据访问优化,bit mask)。
但实际上在内部会给这个数组开出的实际空间是 bufferSize + 2*BUFFER_PAD,在我的电脑上这个 PAD 是 32,所以最后的数组其实是这样的
为什么说它是 ring 呢,就是因为访问它的数据用的是 sequence 来定位,sequence 是一个只会增加的 long 类型。
举个例子,给定 bufferSize 16, cursor 是数组实际的下标,那么对应关系就是如下所示。
EventFactoryEventFactory
EventFactory 是一次性使用的类,在最开始的时候用来给 RingBuffer 预填充数据。
为了避免 JAVA GC 带来的性能影响,Disruptor 采用的设计是在数组上预填充好对象,每次 publish event 的时候,只是通过 RingBuffer.get(seq)拿到对象,更新对象的值,然后就发布出去了。整个生产消费过程中再也不会有 event 对象的创建和销毁。
Sequence
用来表达 event 序例号的对象,但这里为什么不直接用 long 呢 ?
为了高并发下的可见性,肯定不能直接用 long 的,至少也是 volatile long。但 Disruptor 觉得 volatile long 还是不够用,所以创造了 Sequence 类。它的内部实现主要是 volatile long,
除此以外还支持以下特性:
CAS 更新
order writes (Store/Store barrier,改动不保证立即可见) vs volatile writes (Store/Load barrier,改动保证立即可见)
在 volatile 字段 附近添加 padding 解决伪共享问题
简单理解就是高并发下优化的 long 类型。 这里有另一篇解释volatile的文章
在整个框架中可以看到在不同的类里,不同场景下对 sequence 的表达,有时用 long,有时用的 Sequence 类,这其实是背后对于效率和高并发可见性的考量。
比如在对 EventProcessor.sequence 的更新中都是用的 order writes,不保证立即可见,但速度快很多。在这个场景里,造成的结果是显示的消费进度可能比实际上慢,导致生产者有可能在可以生产的情况下没有去生产。但生产者看的是多个消费者中最慢的那个消费进度,所以影响可能没有那么大。
Sequencer
Sequencer 是 Disruptor 框架的核心类。
生产者发布 event 的时候首先需要预定一个 sequence,Sequencer 就是计算和发布 sequence 的。它主要有 2 个实现类:SingleProducerSequencer 和 MultiProducerSequencer。
SingleProducerSequencer
生产者每次通过 Sequencer.next(n) 来预定下面 n 个可以写入的数据位,然后修改数据,发布 event。
但因为 RingBuffer 是环形的,一个 size 16 的 RingBuffer 当你拿到 Sequence 为 16 时相当于又要去写 RingBuffer[0]的位置了,假如之前的数据还没被消费过就会被覆盖了。
Sequencer 是这样解决这个问题的,它在内部维护了一个
每个消费者会维护一个自己的 Sequence 对象,来记录自己已经消费到的序例位置。
每添加一个消费者,都会把消费者的 Sequence 引用添加到 gatingSequences 中。
通过访问 gatingSequences,Sequencer 可以得知消费的最慢的消费者消费到了哪个位置。
在 next(n)方法里,如果计算出的下一个 event 的 Sequence 值减去 bufferSize
得出来的 wrapPoint > min(gatingSequences),说明即将写入的位置上,之前的 event 还有消费者没有消费,这时 SingleProducerSequencer 会等待并自旋。
举个例子,gatingSequences=[7, 8, 9, 10, 3, 4, 5, 6, 11], RingBuffer size 16 的情况下,如果算出来的 nextSequence 是 20,wrapPoint 是 20-16=4, 这时 gatingSequences 里最小的是 3。
说明下一个打算写入的位置是 wrapPoint 4,但最慢的消费者才消费到 3,你不能去覆盖之前 4 上的数据,这时只能等待,等消费者把之前的 4 消费掉。
为什么 wrapPoint = nextSequence - bufferSize,而不是 bufferSize 的 n 倍呢,因为消费者只能落后生产者一圈,不然就已经存在数据覆盖了。
等到 SingleProducerSequencer 自旋到下一个位置所有人都消费过的时候,它就可以从 next 方法中返回,生产者拿着 sequence 就可以继续去发布。
MultiProducerSequencer
MultiProducerSequencer 是在多个生产者的场合使用的,多个生产者的情况下存在竞争,导致它的实现更加复杂。
数据结构上多出来的主要就是这个 availableBuffer,用来记录 RingBuffer 上哪些位置有数据可以读。
还是从 Sequencer.next(n)说起,计算下一个数据位 Sequence 的逻辑是一样的,包括消费者落后导致 Sequencer 自旋等待的逻辑。不同的是因为有多个 publisher 同时访问 Sequencer.next(n)方法,所以在确定最终位置的时候用了一个 CAS 操作,如果失败了就自旋再来一次。
另一个不同的地方是 publish(final long sequence) 方法,SingleProducer 的版本很简单,就是移动了一下 cursor。
MultiProducer 的版本则是
setAvailable 做了什么事呢,它去设置 availableBuffer 的状态位了。给定一个 sequence,先计算出对应的数组下标 index,然后计算出在那个 index 上要写的数据 availabilityFlag,最后执行
根据 calculateAvailabilityFlag(sequence) 方法计算出来的 availabilityFlag 实际上是该 sequence 环绕 RingBuffer 的圈数。
availableBuffer 主要用于判断一个 sequence 下的数据是否可用
作为比较,来看一下 SingleProducer 的方法
在单个生产者的场景下,publishEvent 的时候才会推进 cursor,所以只要 sequence<=cursor,就说明数据是可消费的。
多个生产者的场景下,在 next(n)方法中,就已经通过 cursor.compareAndSet(current, next) 移动 cursor 了,此时 event 还没有 publish,所以 cursor 所在的位置不能保证 event 一定可用。
在 publish 方法中是去 setAvailable(sequence)了,所以 availableBuffer 是数据是否可用的标志。那为什么值要写成圈数呢,应该是避免把上一轮的数据当成这一轮的数据,错误判断 sequence 是否可用。
另一个值得一提的是 getHighestPublishedSequence 方法,这个是消费者用来查询最高可用 event 数据的位置。
给定一段 range,依次查询 availableBuffer,直到发现不可用的 Sequence,那么该 Sequence 之前的都是可用的。或全部都是可用的。
单生产者的版本:
说完了生产者,下面来看看消费者
EventProcessor
EventProcessor extends Runnable, 可以理解它是一个消费者线程。
EventProcessor 本身也只是个 interface。
BatchEventProcessor<T>
主要属性有
SequenceBarrier
ProcessingSequenceBarrier 内部持有 Sequencer 的 cursor 引用,知道生产者生产到哪个位置了。BatchEventProcessor.sequence 是当前消费线程消费到的位置。sequence + 1 就是下一个打算消费的位置 nextSequence,sequenceBarrier.waitFor(nextSequence) 会去获取下一个可以消费的 availableSequence。
拿到的 availableSequence 可能比要求的 nextSequence 大,意味着生产者生产出了很多可以消费的 event。这时就会一个个去消费,并且更新 BatchEventProcessor 的 sequence 至 availableSequence。此时 Sequencer 上的 gatingSequences 因为是引用的关系也会被更新。
WaitStrategy
调用 sequenceBarrier.waitFor(nextSequence) 时可能不会立即有新的 event,这时的行为由 waitStrategy 决定,有多种实现,比如 BlockingWaitStrategy。
Sequencer 在构造的时候就会传入一个 waitStrategy,sequenceBarrier 是由 Sequencer 创建的,创建的时候把 Sequencer 的 waitStrategy 传递给 sequenceBarrier。Sequencer 和 SequenceBarrier 持有同样的 waitStrategy,相当于在两者间起到了 传递信息和回调 的作用。
消费者在没有可消费的 event 时会调用 waitStrategy.waitFor 陷入等待,生产者会在生产出新 event 后调用 waitStrategy.signalAllWhenBlocking 来唤醒消费者。
不同的 WaitStrategy 的实现会有不同的效率和性能。
BlockingWaitStrategy
该实现依赖 Lock 来设置等待和唤醒。 系统吞吐量和低延迟的表现比较差,好处是对 CPU 的消耗比较少。
SleepingWaitStrategy
该实现是在性能和 CPU 占用之间的一种折中。
该实现对负责调用唤醒方法的生产者比较友好,因为啥都不用做。相当于完全依赖消费者端的自旋 retry。
YieldingWaitStrategy
该实现和 SleepingWaitStrategy 很类似,只是它在等待的时候会吃掉 100%的 CPU。
和 SleepingWaitStrategy 一样,唤醒的时候啥都不用做。
BusySpinWaitStrategy
该实现的唤醒也是啥都不做。性能最好的实现,但对部署环境的要求也最高。消费者线程数应该要少于 CPU 的实际物理核心数。
上面介绍的是通过 EventProcessor,EventHandler<T> 来消费 event 的场景,每一组 EventProcessor + EventHandler 是一个独立的消费者,会消费全量的 event 数据。如果你通过
接口添加了 10 个 EventHandler,那么 event 就会被消费 10 遍。
Disruptor 还支持一种多个线程共同消费 event 的模式。相关的类就是 WorkerPool,WorkProcessor,WorkHandler。
WorkerPool
WorkerPool 内部维护了一个 workSequence,代表着整个 pool 分配出去的 event 位置。<=workSequence 的 event 已经被分配给某个 workProcessors 了,但是不是一定已经被消费完。这个设计和多生产者的情况下,先分配 sequence 到具体的某个生产者,然后再去填充,提交是一样的道理。
WorkProcessor
WorkProcessor 是基本的消费者线程,它保有 workSequence 的引用。
在它的 run loop 中,它会首先尝试 CAS 去抢 workSequence 的下一个位置,抢到了就会去消费。
如果没有可消费的 event 了,它就会调用 sequenceBarrier.waitFor(nextSequence) 陷入等待。但即使有了新的 event 被唤醒,它还是要靠 CAS 去抢下一个 event 的消费权。
一些思考
从 Disruptor 的实现可以看到因为采用环形数据结构,它会覆盖老数据,生产新数据必须等到老数据都被消费后,生产速度和消费速度是紧耦合的,一旦消费者们中有一个人慢了一点、卡顿、异常,会影响整个系统的生产以至消费。
唯一可以缓冲的就是 Buffer,即数组长度 bufferSize。
比如,官方文档就建议生产 event 的时候使用 EventTranslator API。因为外层有 try finally 兜底。
2021.6 更新
定义 Event model 的时候可以加个 long seq 字段,把当时 disruptor 分配的 sequence 记住,方便以后 debug
Exception Handle
Producer
如果可能在生产的时候出异常,必须在 publishEvent 方法外面包一层 try catch, 否则会导致程序崩坏
有一点需要注意的是 如果在生产的时候出了异常,导致 event 属性的填充出了问题,你又没有 clean 之前的属性的话, 因为是环形数据结构,消费者有可能消费上一次的属性。
所以最好在 event 上设置一个 flag(比如: produceCompleted),放在 translateTo 方法的最后一行执行。 这样消费者可以在消费的时候检查这个 flag,防止误把以前的 event 消费 2 次。
Consumer
在消费端,可通过
方法设定异常处理, 如果没调用过这个方法的话,默认实现是 FatalExceptionHandler, log 并抛出异常, 会导致整个 disruptor 崩坏。
框架提供了一个 IgnoreExceptionHandler 只会输出 log, 不会抛出。建议采用或自己实现。
转载请注明来源 https://github.com/lich0079/notes/blob/master/knowledge/disruptor.md
版权声明: 本文为 InfoQ 作者【lich0079】的原创文章。
原文链接:【http://xie.infoq.cn/article/3ed1885602c2ca022bd00ee3c】。文章转载请联系作者。
评论