Disruptor 高效的秘密 -Sequencer
上篇文章已经讲过了 RingBuffer 了, RingBuffer 是消息的容器,但是 Disruptor 中最复杂的部分在于如何并发控制消息的增加和消费,而这部分由 Senquencer 来完成。
这篇文章基于 Disruptor 官方提供的示例代码。
Sequencer 简介
Sequencer 可以被认为是 Disruptor 的大脑,而 Sequence 则可以认为是神经元,Sequencer 会产生信号(Sequence 中的 value)来控制消费者和生产者。在一个 Disruptor 实例中,只会有一个 Sequencer 实例,在创建 RingBuffer 时创建。
Sequencer 接口有两种实现,SingleProducerSequencer
和 MultiProducerSequencer
,分别来处理单个生产者和多个生产者的情况。
在 Sequencer 中有一个 next()
方法,就是这个方法来产生 Sequence 中的 value。Sequence 本质上可以认为是一个 AtomicLong,消费者和生产者都会维护自己的 Sequence。
Sequence 中的 value 表示 RingBuffer 消息的编号,Disruptor 中控制逻辑都是围绕这个编号来完成的。RingBuffer 的 sequence 从 0 开始增长。这里需要注意的是在 Disruptor 中共享的并不是 Sequence 对象,而是 sequence 中的 value。
生产者中 Sequence 的 value 表示当前消息已经生产到哪个位置,消费者中 Sequence 的 value 表示消费者已经处理到哪个位置。对于 Sequencer 和 Sequence 已经介绍清楚了,那么 Sequencer 是怎么运行的呢?
RingBuffer 是消息的容器,为了让消息能够被正常传递,RingBuffer 需要满足两个要求,第一个是对于所有的消费者,在 RingBuffer 为空时,就不能再从中取数据,对于生产者,新生产的内容不能把未消费的数据覆盖掉。
Sequencer 的核心就是解决了这两个问题,通过 Gating
和 Barrier
两个工具。
Gating 通过 RingBuffer.addGatingSequences()
方法来获取,Barrier 通过 RingBuffer.newBarrier()
方法来获取。
上图中 C 代表消费者,P 代表生产者。
需要说明的是,EventProcessor + EventHandler 才是一个完整的消费者。EventProcessor 中会维护一个 Sequence 对象,记录该消费者处理到哪条消息,每个消费者维护自己的 Sequence
生产者的 Sequence 在 RingBuffer 维护
Gating 实现
Gating 的设计其实很简单,其实就是将多个所有消费者的 Sequence 监控起来,然后在生产者向 RingBuffer 中写入数据时,判断是否有足够的空间来存入新的消息。
所有消费者的 Sequence 通过如下的方法调用路径,最后存入到 Sequencer.gatingSequences 变量中。
Disruptor.handleEventsWith() -> RingBuffer.addGatingSequences() -> Sequencer.addGatingSequences()
Sequencer.next() 中会对 gatingSequences 进行判断,具体判断的逻辑就是看当前这些被监控的 Sequence 中最小的 value 是否已经落后一圈了,落后一圈就表示新的消息没有写入的空间:
如果没有足够的空间,那么 next() 方法就会被阻塞,新的消息无法加入到 RingBuffer 中。
上面是 gating 的示意图,c1 和 c2 处理的速度不一样,c1 在 1 的位置上,而 c2 在 2 的位置上,生产者 P 已经无法在向 RingBuffer 中添加新的消息,因此会被阻塞,直到 c1 将 消息处理完成之后才能继续插入消息。
SequencerBarrier 实现
同时对于消费者来说,必须等到 RingBuffer 中有消息才能进行处理。 通过 SequenceBarrier 来进行管理, SequenceBarrier 实际生成的是 ProcessingSequenceBarrier
实例,按照如下的调用路径来初始化:
RingBuffer.newBarrier() -> Sequencer.newBarrier() -> new ProcessingSequenceBarriser()
消费者从 RingBuffer 中获取消息时,需要通过 SequencerBarrier 来确定是否有可用的消息, 使用 SequencerBarrier 的调用路径如下:
BatchEventProcessor.processEvents() -> sequenceBarrier.waitFor()
BatchEventProcessor 是默认使用的消费者,上面我们说到了 EventProcessor + EventHandler 才是一个完整的消费者。用户自己实现 EventHandler 来处理消息的逻辑。而实际从 RingBuffer 中获取消息的逻辑则在 BatchEventProcessor 中实现,关键代码如下:
如果没有获取到可处理的 sequence, 那么当前的处理消息的 handlers 也会被阻塞。
SenquenceBarrier 除了可以控制消费者从 RingBuffer 取数据之外,还可以控制多个消费者执行的顺序。如果要安排消费者执行的顺序,用如下的代码就可以。
上面的代码表示 AnotherLongEventHandler
需要等 LongEventHandler
处理完成之后,才能对消息进行处理。
消费者之间控制依赖关系其实就是控制 sequence 的大小,如果说 C2 消费者 依赖 C1,那就表示 C2 中 Sequence 的值一定小于等于 C1 的 Sequence。
其中 then 关系是通过 Disruptor.updateGatingSequencesForNextInChain() 方法来实现:
其实 Disruptor 控制的秘密就是这些了,其实也不是很复杂,只是实现的方式很巧妙,再加上并发控制没有使用锁,才造就了一个如此高效的框架。
版权声明: 本文为 InfoQ 作者【Rayjun】的原创文章。
原文链接:【http://xie.infoq.cn/article/0e9e5868aefd9286d34d2fbeb】。文章转载请联系作者。
评论