Disruptor 高效的秘密 -Sequencer

用户头像
Rayjun
关注
发布于: 2020 年 05 月 01 日
Disruptor 高效的秘密-Sequencer

上篇文章已经讲过了 RingBuffer 了, RingBuffer 是消息的容器,但是 Disruptor 中最复杂的部分在于如何并发控制消息的增加和消费,而这部分由 Senquencer 来完成。



这篇文章基于 Disruptor 官方提供的示例代码。

Sequencer 简介



Sequencer 可以被认为是 Disruptor 的大脑,而 Sequence 则可以认为是神经元,Sequencer 会产生信号(Sequence 中的 value)来控制消费者和生产者。在一个 Disruptor 实例中,只会有一个 Sequencer 实例,在创建 RingBuffer 时创建。



// 多个生产者的情况
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy)
{
MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
// 单个生产者的情况
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy)
{
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}



Sequencer 接口有两种实现,SingleProducerSequencerMultiProducerSequencer,分别来处理单个生产者和多个生产者的情况。



在 Sequencer 中有一个 next() 方法,就是这个方法来产生 Sequence 中的 value。Sequence 本质上可以认为是一个 AtomicLong,消费者和生产者都会维护自己的 Sequence。



Sequence 中的 value 表示 RingBuffer 消息的编号,Disruptor 中控制逻辑都是围绕这个编号来完成的。RingBuffer 的 sequence 从 0 开始增长。这里需要注意的是在 Disruptor 中共享的并不是 Sequence 对象,而是 sequence 中的 value。



生产者中 Sequence 的 value 表示当前消息已经生产到哪个位置,消费者中 Sequence 的 value 表示消费者已经处理到哪个位置。对于 Sequencer 和 Sequence 已经介绍清楚了,那么 Sequencer 是怎么运行的呢?



RingBuffer 是消息的容器,为了让消息能够被正常传递,RingBuffer 需要满足两个要求,第一个是对于所有的消费者,在 RingBuffer 为空时,就不能再从中取数据,对于生产者,新生产的内容不能把未消费的数据覆盖掉。



Sequencer 的核心就是解决了这两个问题,通过 GatingBarrier 两个工具。



Gating 通过 RingBuffer.addGatingSequences() 方法来获取,Barrier 通过 RingBuffer.newBarrier() 方法来获取。



图1:Sequencer 示意图



上图中 C 代表消费者,P 代表生产者。



需要说明的是,EventProcessor + EventHandler 才是一个完整的消费者。EventProcessor 中会维护一个 Sequence 对象,记录该消费者处理到哪条消息,每个消费者维护自己的 Sequence

生产者的 Sequence 在 RingBuffer 维护



Gating 实现



Gating 的设计其实很简单,其实就是将多个所有消费者的 Sequence 监控起来,然后在生产者向 RingBuffer 中写入数据时,判断是否有足够的空间来存入新的消息。



所有消费者的 Sequence 通过如下的方法调用路径,最后存入到 Sequencer.gatingSequences 变量中。



Disruptor.handleEventsWith() -> RingBuffer.addGatingSequences() -> Sequencer.addGatingSequences()



Sequencer.next() 中会对 gatingSequences 进行判断,具体判断的逻辑就是看当前这些被监控的 Sequence 中最小的 value 是否已经落后一圈了,落后一圈就表示新的消息没有写入的空间:



// MultiProducerSequencer.next() 方法
do
{
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize; // 获取一圈之前的值
long cachedGatingSequence = gatingSequenceCache.get(); // 获取缓存的 gatingSequences 的最小值
// 如果大于缓存的值,则进行进一步的判断
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
// 获取当前实际最小的 sequence
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 如果比实际的最小 sequqnce 还大,说明已经没有位置了,则继续进行自旋(无限循环)
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next)) // 如果比缓存的值小,说明目前还有空闲位置,可以继续存入消息
{
break;
}
}
while (true); // 这是一个无限循环,直到有新的空间可以存入消息



如果没有足够的空间,那么 next() 方法就会被阻塞,新的消息无法加入到 RingBuffer 中。

图2:Gating 示意图



上面是 gating 的示意图,c1 和 c2 处理的速度不一样,c1 在 1 的位置上,而 c2 在 2 的位置上,生产者 P 已经无法在向 RingBuffer 中添加新的消息,因此会被阻塞,直到 c1 将 消息处理完成之后才能继续插入消息。



SequencerBarrier 实现



同时对于消费者来说,必须等到 RingBuffer 中有消息才能进行处理。 通过 SequenceBarrier 来进行管理, SequenceBarrier 实际生成的是 ProcessingSequenceBarrier 实例,按照如下的调用路径来初始化:



RingBuffer.newBarrier() -> Sequencer.newBarrier() -> new ProcessingSequenceBarriser()



消费者从 RingBuffer 中获取消息时,需要通过 SequencerBarrier 来确定是否有可用的消息, 使用 SequencerBarrier 的调用路径如下:



BatchEventProcessor.processEvents() -> sequenceBarrier.waitFor()



BatchEventProcessor 是默认使用的消费者,上面我们说到了 EventProcessor + EventHandler 才是一个完整的消费者。用户自己实现 EventHandler 来处理消息的逻辑。而实际从 RingBuffer 中获取消息的逻辑则在 BatchEventProcessor 中实现,关键代码如下:



// BatchEventProcessor.processEvents() 方法,删除了部分代码
while (true)
{
try
{
// 获取可用消息的最大值
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
// 如果当前的位置小于可用的位置,说明有消息可以处理,进行消息处理
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); // 调用实际的 Handler 处理消息
nextSequence++;
}
sequence.set(availableSequence); // 将自己的 sequence 设置处理完成的位置
}
}



如果没有获取到可处理的 sequence, 那么当前的处理消息的 handlers 也会被阻塞。

图3:Barrier 示意图



SenquenceBarrier 除了可以控制消费者从 RingBuffer 取数据之外,还可以控制多个消费者执行的顺序。如果要安排消费者执行的顺序,用如下的代码就可以。

disruptor.handleEventsWith(new LongEventHandler()).then(new AnOtherLongEventHandler());



上面的代码表示 AnotherLongEventHandler 需要等 LongEventHandler 处理完成之后,才能对消息进行处理。



消费者之间控制依赖关系其实就是控制 sequence 的大小,如果说 C2 消费者 依赖 C1,那就表示 C2 中 Sequence 的值一定小于等于 C1 的 Sequence。



其中 then 关系是通过 Disruptor.updateGatingSequencesForNextInChain() 方法来实现:



private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
if (processorSequences.length > 0)
{
ringBuffer.addGatingSequences(processorSequences);
for (final Sequence barrierSequence : barrierSequences)
{
ringBuffer.removeGatingSequence(barrierSequence);
}
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}



其实 Disruptor 控制的秘密就是这些了,其实也不是很复杂,只是实现的方式很巧妙,再加上并发控制没有使用锁,才造就了一个如此高效的框架。



发布于: 2020 年 05 月 01 日 阅读数: 58
用户头像

Rayjun

关注

程序员,王小波死忠粉 2017.10.17 加入

非著名程序员,还在学习如何写代码,公众号同名

评论

发布
暂无评论
Disruptor 高效的秘密-Sequencer