优化技术专题 - 线程间的高性能消息框架 - 深入浅出 Disruptor 的使用和原理
基于事件驱动模型,不用消费者主动拉取消息
比 JDK 的 ArrayBlockingQueue 性能高一个数量级
为什么这么快
无锁序号栅栏
缓存行填充,消除伪共享
内存预分配
环形队列 RingBuffer
Disruptor 核心概念
RingBuffer(环形队列):基于数组的内存级别缓存,是创建 sequencer(序号)与定义 WaitStrategy(拒绝策略)的入口。
Disruptor(总体执行入口):对 RingBuffer 的封装,持有 RingBuffer、消费者线程池 Executor、消费之集合 ConsumerRepository 等引用。
Sequence(序号分配器):对 RingBuffer 中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个 Sequence 可以跟踪标识某个事件的处理进度,同时还能消除伪共享。
Sequencer(数据传输器):Sequencer 里面包含了 Sequence,是 Disruptor 的核心,Sequencer 有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer 主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法
SequenceBarrier(消费者屏障):用于控制 RingBuffer 的 Producer 和 Consumer 之间的平衡关系,并且决定了 Consumer 是否还有可处理的事件的逻辑。
WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将 Event 生产进 Disruptor,WaitStrategy 有多种实现策略,分别是:
BlockingWaitStrategy(最稳定的策略):阻塞方式,效率较低,但对 cpu 消耗小,内部使用的是典型的锁和条件变量机制(java 的 ReentrantLock),来处理线程的唤醒,这个策略是 disruptor 等待策略中最慢的一种,但是是最保守使用消耗 cpu 的一种用法,并且在不同的部署环境下最能保持性能一致。 但是,随着我们可以根据部署的服务环境优化出额外的性能。
BusySpinWaitStrategy(性能最好的策略):自旋方式,无锁,BusySpinWaitStrategy 是性能最高的等待策略,但是受部署环境的制约依赖也越强。 仅当 event 处理线程数少于物理核心数的时候才应该采用这种等待策略。 例如,超线程不可开启。
LiteBlockingWaitStrategy(几乎不用,最接近原生的策略机制):BlockingWaitStrategy 的变体版本,目前感觉不建议使用
LiteTimeoutBlockingWaitStrategy:LiteBlockingWaitStrategy 的超时版本
PhasedBackoffWaitStrategy(最低 CPU 配置的策略):自旋 + yield + 自定义策略,当吞吐量和低延迟不如 CPU 资源重要,CPU 资源紧缺,可以使用此策略。
SleepingWaitStrategy:自旋休眠方式(无锁),性能和 BlockingWaitStrategy 差不多,但是这个对生产者线程影响最小,它使用一个简单的 loop 繁忙等待循环,但是在循环体中间它调用了 LockSupport.parkNanos(1)。
一般情况在 linux 系统这样会使得线程停顿大约 60 微秒。不过这样做的好处是,生产者线程不需要额外的累加计数器,也不需要产生条件变量信号量开销。
负面影响是,在生产者线程与消费者线程之间传递 event 数据的延迟变高。所以 SleepingWaitStrategy 适合在不需要低延迟, 但需要很低的生产者线程影响的情形。一个典型的案例是异步日志记录功能。
TimeoutBlockingWaitStrategy:BlockingWaitStrategy 的超时阻塞方式
YieldingWaitStrategy(充分进行实现 CPU 吞吐性能策略):自旋线程切换竞争方式(Thread.yield()),最快的方式,适用于低延时的系统,在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中推荐使用此策略,它会充分使用压榨 cpu 来达到降低延迟的目标。
通过不断的循环等待 sequence 去递增到合适的值。 在循环体内,调用 Thread.yield()来允许其他的排队线程执行。 这是一种在需要极高性能并且 event handler 线程数少于 cpu 逻辑内核数的时候推荐使用的策略。
这里说一下 YieldingWaitStrategy 使用要小心,不是特别要求性能的情况下,要谨慎使用,否则会引起服务起 cpu 飙升的情况,因为他的内部实现是在线程做 100 次递减然后 Thread.yield(),可能会压榨 cpu 性能来换取速度。
注意:超线程是 intel 研发的一种 cpu 技术,可以使得一个核心提供两个逻辑线程,比如 4 核心超线程后有 8 个线程。
Event:从生产者到消费者过程中所处理的数据单元,Event 由使用者自定义。
EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了 Disruptor 中的一个消费者的接口。
EventProcessor:这是个事件处理器接口,实现了 Runnable,处理主要事件循环,处理 Event,拥有消费者的 Sequence,这个接口有 2 个重要实现:
WorkProcessor:多线程处理实现,在多生产者多消费者模式下,确保每个 sequence 只被一个 processor 消费,在同一个 WorkPool 中,确保多个 WorkProcessor 不会消费同样的 sequence
BatchEventProcessor:单线程批量处理实现,包含了 Event loop 有效的实现,并回调到了一个 EventHandler 接口的实现对象,这接口实际上是通过重写 run 方法,轮询获取数据对象,并把数据经过等待策略交给消费者去处理。
Disruptor 整体架构
接下来我们来看一下 Disruptor 是如何做到无阻塞、多生产、多消费的。
[?](
)
构建 Disruptor 的各个参数以及 ringBuffer 的构造:
EventFactory:创建事件(任务)的工厂类。
ringBufferSize:容器的长度。
Executor:消费者线程池,执行任务的线程。
ProductType:生产者类型:单生产者、多生产者。
WaitStrategy:等待策略。
RingBuffer:存放数据的容器。
EventHandler:事件处理器。
Disruptor 使用方式
maven 依赖:
xml
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
生产单消费简单模式案例:
Event 数据模型
java
import lombok.Data;
@Data
public class SampleEventModel {
private String data;
}
Event 事件模型 Factory 工厂类
java
import com.lmax.disruptor.EventFactory;
/**
* 消息对象生产工厂
*/
public class SampleEventModelFactory implements EventFactory<SampleEventModel> {
@Override
public SampleEventModel newInstance() {
//返回空的消息对象数据 Event
return new SampleEventModel();
}
}
EventHandler 处理器操作
java
import com.lmax.disruptor.EventHandler;
/**
* 消息事件处理器
*/
public class SampleEventHandler implements EventHandler<SampleEventModel> {
/**
* 事件驱动模式
*/
@Override
public void onEvent(SampleEventModel event, long sequence, boolean endOfBatch) throws Exception {
// do ...
System.out.println("消费者消费处理数据:" + event.getData());
}
}
EventProducer 工厂生产者服务处理器操作
java
import com.lmax.disruptor.RingBuffer;
/**
* 消息发送
*/
public class SampleEventProducer {
private RingBuffer<SampleEventModel> ringBuffer;
public SampleEventProducer(RingBuffer<SampleEventModel> ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* 发布数据信息
* @param data
*/
public void publish(String data){
//从 ringBuffer 获取可用 sequence 序号
long sequence = ringBuffer.next();
try {
//根据 sequence 获取 sequence 对应的 Event
//这个 Event 是一个没有赋值具体数据的对象
TestEvent testEvent = ringBuffer.get(seque
nce);
testEvent.setData(data);
} finally {
//提交发布
ringBuffer.publish(sequence);
}
}
}
EventProducer 工厂生产者服务处理器操作
java
public class TestMain {
public static void main(String[] args) {
SampleEventModelFactory eventFactory = new SampleEventModelFactory();
int ringBufferSize = 1024 * 1024;
//这个线程池最好自定义
ExecutorService executor = Executors.newCachedThreadPool();
//实例化 disruptor
Disruptor<SampleEventModel> disruptor = new Disruptor<SampleEventModel>(
eventFactory, //消息工厂
ringBufferSize, //ringBuffer 容器最大容量长度
executor, //线程池,最好自定义一个
ProducerType.SINGLE, //单生产者模式
new BlockingWaitStrategy() //等待策略
);
//添加消费者监听 把 TestEventHandler 绑定到 disruptor
disruptor.handleEventsWith(new SampleEventHandler());
//启动 disruptor
disruptor.start();
//获取实际存储数据的容器 RingBuffer
RingBuffer<SampleEventModel> ringBuffer = disruptor.getRingBuffer();
//生产发送数据
SampleEventProducer producer = new SampleEventProducer(ringBuffer);
for(long i = 0; i < 100; i ++){
producer.publish(i);
}
disruptor.shutdown();
executor.shutdown();
}
}
Disruptor 的原理分析
使用循环数组代替队列生产者消费者模型自然是离不开队列的,使用预先填充数据的方式来避免 GC;
使用 CPU 缓存行填充的方式来避免极端情况下的数据争用导致的性能下降;
多线程编程中尽量避免锁争用的编码技巧。
Disruptor 为我们提供了一个思路和实践,基本的循环数组实现,定义一个数组,长度为 2 的次方幂。
[正在上传…重新上传取消?](
)
循环数组
设定一个数字标志表示当前的可用的位置(可以从 0 开始)。
头标志位表示下一个可以插入的位置。
头标志位不能大于尾标志位一个数组长度(因为这样就插入的位置和读取的位置就重叠了会导致数据丢失)。
尾标志位表示下一个可以读取的位置。
评论