写点什么

优化技术专题 - 线程间的高性能消息框架 - 深入浅出 Disruptor 的使用和原理

用户头像
极客good
关注
发布于: 刚刚
  • 基于事件驱动模型,不用消费者主动拉取消息

  • 比 JDK 的 ArrayBlockingQueue 性能高一个数量级


为什么这么快


  • 无锁序号栅栏

  • 缓存行填充,消除伪共享

  • 内存预分配

  • 环形队列 RingBuffer


Disruptor 核心概念


  • RingBuffer(环形队列):基于数组的内存级别缓存,是创建 sequencer(序号)与定义 WaitStrategy(拒绝策略)的入口。

  • Disruptor(总体执行入口):对 RingBuffer 的封装,持有 RingBuffer、消费者线程池 Executor、消费之集合 ConsumerRepository 等引用。

  • Sequence(序号分配器):对 RingBuffer 中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个 Sequence 可以跟踪标识某个事件的处理进度,同时还能消除伪共享。

  • Sequencer(数据传输器):Sequencer 里面包含了 Sequence,是 Disruptor 的核心,Sequencer 有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer 主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法

  • SequenceBarrier(消费者屏障):用于控制 RingBuffer 的 Producer 和 Consumer 之间的平衡关系,并且决定了 Consumer 是否还有可处理的事件的逻辑。

  • WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将 Event 生产进 Disruptor,WaitStrategy 有多种实现策略,分别是:


  1. BlockingWaitStrategy(最稳定的策略):阻塞方式,效率较低,但对 cpu 消耗小,内部使用的是典型的锁和条件变量机制(java 的 ReentrantLock),来处理线程的唤醒,这个策略是 disruptor 等待策略中最慢的一种,但是是最保守使用消耗 cpu 的一种用法,并且在不同的部署环境下最能保持性能一致。 但是,随着我们可以根据部署的服务环境优化出额外的性能。

  2. BusySpinWaitStrategy(性能最好的策略):自旋方式,无锁,BusySpinWaitStrategy 是性能最高的等待策略,但是受部署环境的制约依赖也越强。 仅当 event 处理线程数少于物理核心数的时候才应该采用这种等待策略。 例如,超线程不可开启。

  3. LiteBlockingWaitStrategy(几乎不用,最接近原生的策略机制):BlockingWaitStrategy 的变体版本,目前感觉不建议使用

  4. LiteTimeoutBlockingWaitStrategy:LiteBlockingWaitStrategy 的超时版本

  5. PhasedBackoffWaitStrategy(最低 CPU 配置的策略):自旋 + yield + 自定义策略,当吞吐量和低延迟不如 CPU 资源重要,CPU 资源紧缺,可以使用此策略。

  6. SleepingWaitStrategy:自旋休眠方式(无锁),性能和 BlockingWaitStrategy 差不多,但是这个对生产者线程影响最小,它使用一个简单的 loop 繁忙等待循环,但是在循环体中间它调用了 LockSupport.parkNanos(1)


  • 一般情况在 linux 系统这样会使得线程停顿大约 60 微秒。不过这样做的好处是,生产者线程不需要额外的累加计数器,也不需要产生条件变量信号量开销。

  • 负面影响是,在生产者线程与消费者线程之间传递 event 数据的延迟变高。所以 SleepingWaitStrategy 适合在不需要低延迟, 但需要很低的生产者线程影响的情形。一个典型的案例是异步日志记录功能。


  1. TimeoutBlockingWaitStrategy:BlockingWaitStrategy 的超时阻塞方式

  2. YieldingWaitStrategy(充分进行实现 CPU 吞吐性能策略):自旋线程切换竞争方式(Thread.yield()),最快的方式,适用于低延时的系统,在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中推荐使用此策略,它会充分使用压榨 cpu 来达到降低延迟的目标。


  • 通过不断的循环等待 sequence 去递增到合适的值。 在循环体内,调用 Thread.yield()来允许其他的排队线程执行。 这是一种在需要极高性能并且 event handler 线程数少于 cpu 逻辑内核数的时候推荐使用的策略。

  • 这里说一下 YieldingWaitStrategy 使用要小心,不是特别要求性能的情况下,要谨慎使用,否则会引起服务起 cpu 飙升的情况,因为他的内部实现是在线程做 100 次递减然后 Thread.yield(),可能会压榨 cpu 性能来换取速度。


注意:超线程是 intel 研发的一种 cpu 技术,可以使得一个核心提供两个逻辑线程,比如 4 核心超线程后有 8 个线程。




  • Event:从生产者到消费者过程中所处理的数据单元,Event 由使用者自定义。

  • EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了 Disruptor 中的一个消费者的接口。

  • EventProcessor:这是个事件处理器接口,实现了 Runnable,处理主要事件循环,处理 Event,拥有消费者的 Sequence,这个接口有 2 个重要实现:

  • WorkProcessor:多线程处理实现,在多生产者多消费者模式下,确保每个 sequence 只被一个 processor 消费,在同一个 WorkPool 中,确保多个 WorkProcessor 不会消费同样的 sequence

  • BatchEventProcessor:单线程批量处理实现,包含了 Event loop 有效的实现,并回调到了一个 EventHandler 接口的实现对象,这接口实际上是通过重写 run 方法,轮询获取数据对象,并把数据经过等待策略交给消费者去处理。


Disruptor 整体架构


接下来我们来看一下 Disruptor 是如何做到无阻塞、多生产、多消费的。


[?](


)


  • 构建 Disruptor 的各个参数以及 ringBuffer 的构造:

  • EventFactory:创建事件(任务)的工厂类。

  • ringBufferSize:容器的长度。

  • Executor:消费者线程池,执行任务的线程。

  • ProductType:生产者类型:单生产者、多生产者。

  • WaitStrategy:等待策略。

  • RingBuffer:存放数据的容器。

  • EventHandler:事件处理器。


Disruptor 使用方式


maven 依赖:


xml


<dependency>


<groupId>com.lmax</groupId>


<artifactId>disruptor</artifactId>


<version>3.4.2</version>


</dependency>


生产单消费简单模式案例:


Event 数据模型


java


import lombok.Data;


@Data


public class SampleEventModel {


private String data;


}


Event 事件模型 Factory 工厂类


java


import com.lmax.disruptor.EventFactory;


/**


* 消息对象生产工厂


*/


public class SampleEventModelFactory implements EventFactory<SampleEventModel> {


@Override


public SampleEventModel newInstance() {


//返回空的消息对象数据 Event


return new SampleEventModel();


}


}


EventHandler 处理器操作


java


import com.lmax.disruptor.EventHandler;


/**


* 消息事件处理器


*/


public class SampleEventHandler implements EventHandler<SampleEventModel> {


/**


* 事件驱动模式


*/


@Override


public void onEvent(SampleEventModel event, long sequence, boolean endOfBatch) throws Exception {


// do ...


System.out.println("消费者消费处理数据:" + event.getData());


}


}


EventProducer 工厂生产者服务处理器操作


java


import com.lmax.disruptor.RingBuffer;


/**


* 消息发送


*/


public class SampleEventProducer {


private RingBuffer<SampleEventModel> ringBuffer;


public SampleEventProducer(RingBuffer<SampleEventModel> ringBuffer) {


this.ringBuffer = ringBuffer;


}


/**


* 发布数据信息


* @param data


*/


public void publish(String data){


//从 ringBuffer 获取可用 sequence 序号


long sequence = ringBuffer.next();


try {


//根据 sequence 获取 sequence 对应的 Event


//这个 Event 是一个没有赋值具体数据的对象


TestEvent testEvent = ringBuffer.get(seque


【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


nce);


testEvent.setData(data);


} finally {


//提交发布


ringBuffer.publish(sequence);


}


}


}


EventProducer 工厂生产者服务处理器操作


java


public class TestMain {


public static void main(String[] args) {


SampleEventModelFactory eventFactory = new SampleEventModelFactory();


int ringBufferSize = 1024 * 1024;


//这个线程池最好自定义


ExecutorService executor = Executors.newCachedThreadPool();


//实例化 disruptor


Disruptor<SampleEventModel> disruptor = new Disruptor<SampleEventModel>(


eventFactory, //消息工厂


ringBufferSize, //ringBuffer 容器最大容量长度


executor, //线程池,最好自定义一个


ProducerType.SINGLE, //单生产者模式


new BlockingWaitStrategy() //等待策略


);


//添加消费者监听 把 TestEventHandler 绑定到 disruptor


disruptor.handleEventsWith(new SampleEventHandler());


//启动 disruptor


disruptor.start();


//获取实际存储数据的容器 RingBuffer


RingBuffer<SampleEventModel> ringBuffer = disruptor.getRingBuffer();


//生产发送数据


SampleEventProducer producer = new SampleEventProducer(ringBuffer);


for(long i = 0; i < 100; i ++){


producer.publish(i);


}


disruptor.shutdown();


executor.shutdown();


}


}

Disruptor 的原理分析

  • 使用循环数组代替队列生产者消费者模型自然是离不开队列的,使用预先填充数据的方式来避免 GC;

  • 使用 CPU 缓存行填充的方式来避免极端情况下的数据争用导致的性能下降;

  • 多线程编程中尽量避免锁争用的编码技巧。


Disruptor 为我们提供了一个思路和实践,基本的循环数组实现,定义一个数组,长度为 2 的次方幂。


[正在上传…重新上传取消?](


)


循环数组


  • 设定一个数字标志表示当前的可用的位置(可以从 0 开始)。

  • 头标志位表示下一个可以插入的位置。

  • 头标志位不能大于尾标志位一个数组长度(因为这样就插入的位置和读取的位置就重叠了会导致数据丢失)。

  • 尾标志位表示下一个可以读取的位置。

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
优化技术专题-线程间的高性能消息框架-深入浅出Disruptor的使用和原理