《重学 Java 高并发》Disruptor 使用实战
disruptor 的常用类体系如下图所示:
其职责说明如下:
RingBuffer
环形队列,disruptor 中的核心存储类
Sequencer
序号实现器,维护发送者发送的序号生成逻辑、消费方获取可消费的序号,是无锁化访问的核心实现类,共有两个实现类,MultiProducerSequencer 为多生产者实现类、SingleProducerSequencer 单生产者实现类。
WaitStrategy
等待策略,消息发送者在容器已写满时、消费者在无消费数据时的等待策略,disruptor 提供了 N 种实现类:
BlockingWaitStrategy
基于 juc 的阻塞等待。
LiteTimeoutBlockingWaitStrategy
带超时时间的阻塞等待。
YieldingWaitStrategy
先自旋 100 次,如果还不满足条件,则每次调用 yield 方法,让出 CPU,重新参与调度。
BusySpinWaitStrategy
自旋,直到满足条件,生产环境慎用。
TimeoutBlockingWaitStrategy
带超时时间的阻塞等待,与 LiteTimeoutBlockingWaitStrategy 的区别是 TimeoutBlockingWaitStrategy 等待时间必须严格低于设定的值。
SleepingWaitStrategy
前 100 次自旋每一次都调用 yield,然后阻塞 1ms ,继续重试。
PhasedBackoffWaitStrategy
组合策略,可以指定上述策略,然后退化为 yield 自旋。
WaitStrategy 在创建 RingBuffer 时指定,默认为 BlockingWaitStrategy。
SequenceBarrier
序号栈栏。在流水线上有多个步骤,后一个步骤必须依赖前一个步骤的完成,栈栏的作用就在于此。
EventFactory
事件生成器工厂类,RingBuffer 的设计为力避免频繁的垃圾回收,在 RingBuffer 中存储的值会预先创建,生产者获取一个 Event 对象,并填充具体的值,故通常事件对象通常创建的事一个包装类。
EventProcessor
事件处理器,disruptor 中提供了两类事件处理器 WorkProcessor、BatchEventProcessor(批处理),它的职责是从 RingBuffer 中获取可消费的事件,然后调用 EventHandler 的 onEvent 方法。
EventHandler
事件处理器在获取一个可处理的事件后调用 EventHandler 的 onEvent 方法,这也是用户自定处理程序的入口,即编写用户业务代码的扩展点。
ExceptionHandler
异常处理策略。
首先
以笔者在工作中遇到一个经典使用场景来和大家观摩一下 disruptor 的基本使用。
在互联网行业中有一种经典的读写分离架构:数据异构,以物流下单为示例,通常关系型数据库只负责订单的创建业务,而关于订单查询、订单轨迹查询等查询类业务,通常会去查询 es,依此来降低数据库压力,但接踵而来的问题是如何将数据库的数据准实时同步到 Es 呢?canal 闪亮登场,其核心理念就是订阅并解析 binlog,其基本的流程如下:
在示例中解析 binlog 的目的是提取数据的变化,即 DML 语句(插入、更新、删除),将这些数据变更在目标端进行重放,为了提高性能,采用 disruptor 框架提高性能,该如何实现呢?
将解析动作分解为两步,第一步判断事件是否是 dml 事件,即是否需要解析。
解析 dml
为什么要这样拆分呢?一是将粒度降低,解耦,灵活提供不一样的并发度。
接下来我们看一下 canal 中是如何使用 disruptor 来解决该问题的。
2.1 创建 EventFactory
首先需要创建一个 EventFactory,用于填充 RingBuffer 中的对象,避免过多垃圾回收。
2.2 创建 RingBuffer
根据 bingo dump 协议,mysql 的解析线程创建一个,故该场景下的事件发送者只有一个,创建一个单生产者的 RingBuffer,其代码如下:
评论