前言
介绍高性能队列 Disruptor 原理以及使用例子。
Disruptor 是什么?
Disruptor 是外汇和加密货币交易所运营商 LMAX group 建立高性能的金融交易所的结果。用于解决生产者、消费者及其数据存储的设计问题的高性能队列实现。可以对标 JDK 中的 ArrayBlockingQueue。是目前单机且基于内存存储的最高性能的队列实现。见 与 ArrayBlockingQueue 性能对比。
Disruptor 高性能秘诀
使用 CAS 代替锁
锁非常昂贵,因为它们在竞争时需要仲裁。这种仲裁是通过到操作系统内核的上下文切换来实现的,该内核将挂起等待锁的线程,直到它被释放。系统提供的原子操作 CAS(Compare And Swap/Set)是很好的锁替代方案,Disruptor 中同步就是使用的这种。
比如多生产者模式中 com.lmax.disruptor.MultiProducerSequencer 就是用了 Java 里 sun.misc.Unsafe 类基于 CAS 实现的 API。
等待策略 com.lmax.disruptor.BlockingWaitStrategy 使用了基于 CAS 实现的 ReentrantLock。
独占缓存行
为了提高效率 CPU 硬件不会以字节或字为单位移动内存,而是以缓存行,通常大小为 32-256 字节的缓存行,最常见的缓存行是 64 字节。这意味着,如果两个变量在同一个缓存行中,并且由不同的线程写入,那么它们会出现与单个变量相同的写入争用问题。为了获得高性能,如果要最小化争用,那么确保独立但同时写入的变量不共享相同的缓存行是很重要的。
比如 com.lmax.disruptor.RingBuffer 中属性前后都用未赋值的 long 来独占。com.lmax.disruptor.SingleProducerSequencerPad 也有相同处理方式。
环形队列
队列相比链表在访问速度上占据优势,而有界队列相比可动态扩容的无界队列则避免扩容产生的同步问题效率更高。Disruptor 和 JDK 中的 ArrayBlockingQueue 一样使用有界队列。队列长度要设为 2 的 n 次幂,有利于二进制计算。
Disruptor 在逻辑上将数组的的头尾看成是相连的,即一个环形数组(RingBuffer)。
生产和消费都需要维护自增序列值(Sequence),从 0 开始。
生产方只维护一个代表生产的最后一个元素的序号。代表生产的最后一个元素的序号。每次向 Disruptor 发布一个元素都调用 Sequenced.next()来获取下个位置的写入权。
在单生产者模式(SINGLE)由于不存在并发写入,则不需要解决同步问题。在多生产者模式(MULTI)就需要借助 JDK 中基于 CAS(Compare And Swap/Set)实现的 API 来保证线程安全。
多个消费者各自维护自己的消费序列值(Sequence)保存数组中。
而环形通过与运算(sequence & indexMask)实现的,indexMask 就是环形队列的长度-1。以环形队列长度 8 为例,第 9 个元素 Sequence 为 8,8 & 7 = 0,刚好又回到了数组第 1 个位置。
见 com.lmax.disruptor.RingBuffer.elementAt(long sequence)
预分配内存
环形队列存放的是 Event 对象,而且是在 Disruptor 创建的时候调用 EventFactory 创建并一次将队列填满。Event 保存生产者生产的数据,消费也是通过 Event 获取,后续生产则只需要替换掉 Event 中的属性值。这种方式避免了重复创建对象,降低 JVM 的 GC 产频率。
见 com.lmax.disruptor.RingBuffer.fill(EventFactory eventFactory)
消费者 8 种等待策略
当消费速度大于生产速度情况下,消费者执行的等待策略。
消费者序列
所有消费者的消费序列(Sequence)都放在一个数组中,见 com.lmax.disruptor.AbstractSequencer,通过 SEQUENCE_UPDATER 来更新对应的序列值。
调用更新的地方在 com.lmax.disruptor.RingBuffer.addGatingSequences(Sequence... gatingSequences)。
消费太慢队列满了怎么办?
生产者线程被阻塞。生产者调用 Sequenced.next()争夺写入权的时候需要判断最小的消费序列值进行比较。如果写入的位置还未消费则会进入循环不断获取最小消费序列值进行比较。
见包 com.lmax.disruptor 下 SingleProducerSequencer 或 MultiProducerSequencer 中 next(int n)方法。
Disruptor 开发步骤
Event 是环形队列(RingBuffer)中的元素,是生产者数据的载体;EventFactory 是定义 Event 创建方式的工厂类;EventHandler 则是 Event 的处理器,定义如何消费 Event 中的数据。
另外有必要定义一个消费异常处理器 ExceptionHandler,它是和 EventHandler 绑定的。当 EventHandler.onEvent()执行抛出异常时会执行对应的异常回调方法。
创建 Disruptor 需要指定 5 个参数 eventFactory、ringBufferSize、threadFactory、producerType、waitStrategy。
EventFactory 是上面定义的 Event 工厂类;
ringBufferSize 是环形队列的长度,这个值要是 2 的 N 次方;
threadFactory 是定义消费者线程创建方式的工厂类;
producerType 是指明生产者是一个(SINGLE)还是多个(MULTI)。默认是 MULTI,会使用 CAS(Compare And Swap/Set)保证线程安全。如果指定为 SINGLE,则不使用没必要的 CAS,使单线程处理更高效。
waitStrategy 指明消费者等待生产时的策略。
指明 EventHandler 并绑定 ExceptionHandler。指定多个 EventHandler 时,会为每个 EventHandler 分配一个线程,一个 Event 会被多个并行 EventHandler 处理。
也可以指明多个 WorkHandler,每个 WorkHandler 分配一个线程并行消费队列中的 Event,一个 Event 只会被一个 WorkHandler 处理。
EventTranslator 定义生产者数据转换为 Event 的方式,不同数量参数有不同的接口用来实现。
例子程序
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
复制代码
/**
* 事件
*
* @param <T>发布的数据类型
*/
public class MyEvent<T> {
private T data;
public T getData() {
return data;
}
public MyEvent<T> setData(T data) {
this.data = data;
return this;
}
}
复制代码
import com.lmax.disruptor.EventFactory;
/**
* 创建事件的工厂
*
* @param <T>发布的数据类型
*/
public class MyEventFactory<T> implements EventFactory<MyEvent<T>> {
@Override
public MyEvent<T> newInstance() {
return new MyEvent<>();
}
}
复制代码
import com.lmax.disruptor.EventHandler;
/**
* 事件消费方法
*
* @param <T>发布的数据类型
*/
public class MyEventHandler<T> implements EventHandler<MyEvent<T>> {
@Override
public void onEvent(MyEvent<T> tMyEvent, long l, boolean b) throws Exception {
System.out.println(Thread.currentThread().getName() + "MyEventHandler消费:" + tMyEvent.getData());
}
}
复制代码
import com.lmax.disruptor.ExceptionHandler;
/**
* 消费者异常处理器
*
* @param <T>发布的数据类型
*/
public class MyExceptionHandler<T> implements ExceptionHandler<MyEvent<T>> {
@Override
public void handleEventException(Throwable ex, long sequence, MyEvent<T> event) {
System.out.println("handleEventException");
}
@Override
public void handleOnStartException(Throwable ex) {
System.out.println("handleOnStartException");
}
@Override
public void handleOnShutdownException(Throwable ex) {
System.out.println("handleOnShutdownException");
}
}
复制代码
单消费者
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
/**
* 单消费者
*/
public class SingleConsumerSample {
public static void main(String[] args) {
// 环形数组长度,必须是2的n次幂
int ringBufferSize = 1024;
// 创建事件(Event)对象的工厂
MyEventFactory<String> eventFactory = new MyEventFactory<>();
// 创建消费者线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等待策略
WaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<MyEvent<String>> disruptor =
new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);
// 指定一个处理器
MyEventHandler<String> eventHandler = new MyEventHandler<>();
disruptor.handleEventsWith(eventHandler);
// 处理器异常处理器
ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
disruptor.setDefaultExceptionHandler(exceptionHandler);
disruptor.start();
// 通过事件转换器(EventTranslator)来指明如何将发布的数据转换到事件对象(Event)中
// 这里是一个参数的转换器,另外还有两个(EventTranslatorTwoArg)、三个(EventTranslatorThreeArg)
// 和多个(EventTranslatorVararg)参数的转换器可以使用,参数类型可以不一样
EventTranslatorOneArg<MyEvent<String>, String> eventTranslatorOneArg =
new EventTranslatorOneArg<MyEvent<String>, String>() {
@Override
public void translateTo(MyEvent<String> event, long sequence, String arg0) {
event.setData(arg0);
}
};
// 发布
for (int i = 0; i < 10; i++) {
disruptor.publishEvent(eventTranslatorOneArg, "One arg " + i);
}
disruptor.shutdown();
}
}
复制代码
单消费者 Lambda 写法
这种只是迎合 Java8 Lambda 语法特性,代码更简洁。
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
public class LambdaSample {
public static void main(String[] args) {
// 环形数组长度,必须是2的n次幂
int ringBufferSize = 1024;
// 创建消费者线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等待策略
WaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<MyEvent<String>> disruptor =
new Disruptor<>(MyEvent::new, ringBufferSize, threadFactory, SINGLE, waitStrategy);
// 指定一个处理器
EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
System.out.println(Thread.currentThread().getName() + "MyEventHandler消费:" + event.getData());
disruptor.handleEventsWith(eventHandler);
// 处理器异常处理器
ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
disruptor.setDefaultExceptionHandler(exceptionHandler);
disruptor.start();
// 通过事件转换器(EventTranslator)来指明如何将发布的数据转换到事件对象(Event)中
// 一个参数的转换器
disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg ");
// 两个参数的转换器
disruptor.publishEvent((event, sequence, pA, pB) -> event.setData(pA + pB), "Two arg ", 1);
// 三个参数的转换器
disruptor.publishEvent((event, sequence, pA, pB, pC) -> event.setData(pA + pB + pC)
, "Three arg ", 1, false);
// 多个参数的转换器
disruptor.getRingBuffer().publishEvent((event, sequence, params) -> {
List<String> paramList = Arrays.stream(params).map(Object::toString).collect(Collectors.toList());
event.setData("Var arg " + String.join(",", paramList));
}, "param1", "param2", "param3");
disruptor.shutdown();
}
}
复制代码
多消费者重复消费元素
关键只在于指定多个 EventHandler,并且 EventHandler 还可以分别绑定不同的 ExceptionHandler。
每个 EventHandler 分配一个线程,一个 Event 会被每个 EventHandler 处理,适合两个不同的业务都需要处理同一个元素的情况,类似广播模式。
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
/**
* 一个元素多个消费者重复消费
*/
public class RepetitionConsumerSample {
public static void main(String[] args) {
// 环形数组长度,必须是2的n次幂
int ringBufferSize = 1024;
// 创建事件(Event)对象的工厂
MyEventFactory<String> eventFactory = new MyEventFactory<>();
// 创建消费者线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等待策略
WaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<MyEvent<String>> disruptor =
new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);
// 这里指定了2个消费者,那就会产生2个消费线程,一个事件会被消费2次
EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
System.out.println(Thread.currentThread().getName() + "MyEventHandler消费:" + event.getData());
EventHandler<MyEvent<String>> eventHandler2 = (event, sequence, endOfBatch) ->
System.out.println(Thread.currentThread().getName() + "MyEventHandler——2消费:" + event.getData());
disruptor.handleEventsWith(eventHandler, eventHandler2);
// 分别指定异常处理器
ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
disruptor.handleExceptionsFor(eventHandler).with(exceptionHandler);
disruptor.handleExceptionsFor(eventHandler2).with(exceptionHandler);
disruptor.start();
for (int i = 0; i < 10; i++) {
disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg " + i);
}
disruptor.shutdown();
}
}
复制代码
多消费者
关键只在于定义 WorkHandler,然后实例化多个来消费。
每个 WorkHandler 分配一个线程,一个元素只会被一个 WorkHandler 处理。
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
public class MultiConsumerSample {
public static void main(String[] args) {
// 环形数组长度,必须是2的n次幂
int ringBufferSize = 1024;
// 创建事件(Event)对象的工厂
MyEventFactory<String> eventFactory = new MyEventFactory<>();
// 创建消费者线程工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等待策略
WaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<MyEvent<String>> disruptor =
new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);
// 处理器异常处理器
ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
disruptor.setDefaultExceptionHandler(exceptionHandler);
// 设置2个消费者,2个线程,一个Event只被一个消费者消费
WorkHandler<MyEvent<String>> workHandler = tMyEvent ->
System.out.println(Thread.currentThread().getName() + "WorkHandler消费:" + tMyEvent.getData());
disruptor.handleEventsWithWorkerPool(workHandler, workHandler2);
disruptor.start();
for (int i = 0; i < 10; i++) {
disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg " + i);
}
disruptor.shutdown();
}
}
复制代码
评论