写点什么

Disruptor 在流程编排中的应用与探索

作者:ZA技术社区
  • 2023-10-18
    上海
  • 本文字数:4096 字

    阅读完需:约 13 分钟

Disruptor在流程编排中的应用与探索

作者姓名:邓泽波

 

文章简介:Disruptor 是一种高性能的异步事件处理框架,它通过无锁的方式实现了高效的并发处理,通过本文为大家介绍将优秀的框架引入流程编排中并落地


文章内容:

金融事业部同学为大家介绍将 Disruptor 这一框架引入流程编排中的应用探索

 

1、流程编排引入 Disruptor 背景

在业务开发中,经常会遇到需要将同步接收的请求转换为异步流程的场景,例如归集还款接收合作方的同步请求后,需要异步处理文件,并逐笔调用核心还款冲销。类似的场景很多,在流程编排中提供了异步链功能,可以通过配置来指定链的异步或同步方式。对于异步链,它会被加入到线程池中进行异步处理,这样在业务开发中无需关注异步线程池的具体实现,只需进行简单的配置即可完成异步流程的编排。这种优化方案能够简化异步流程的开发过程,提高效率和可维护性。

那为何选 Disruptor 的作为异步线程池实现方案,Disruptor 是一种高性能的异步事件处理框架,最初由 LMAX Exchange 公司开发。它通过无锁(lock-free)的方式实现了高效的并发处理,可以在多核 CPU 环境下处理百万级别的事件。Disruptor 通过将事件数据和处理逻辑分离,实现了高效的数据流转和处理,同时也提供了丰富的可配置参数,方便用户根据具体的应用场景进行定制化配置。

应用 Disruptor 的知名项目有如下的一些:Storm, Camel, Log4j2,还有目前的美团点评技术团队也有很多不少的应用,或者说有一些借鉴了它的设计机制。

 

2、Disruptor 结构

Disruptor 是一个开源的框架,可以在无锁的情况下对队列进行高并发操作,那么这个队列的设计就是 Disruptor 的核心所在;

环形数组 RingBuffer



在 Disruptor 中,采用了 RingBuffer 来作为队列的数据结构,RingBuffer 就是一个环形的数组,既然是数组,我们便可对其设置大小;

在这个 ringBuffer 中,除了数组之外,还有一个序列号,用来指向数组中的下一个可用元素,供生产者使用或者消费者使用,也就是生产者可以生产的地方,或者消费者可以消费的地方(序列号和数组索引是两个概念);

序列号= 2 的 63 次方-1. (30 万年才能用完

RingBuffer 的优点

1. 高效的内存管理:RingBuffer 是基于内存的实现,可以有效地管理内存,减少内存分配和回收的开销,提高系统性能。

2. 无锁设计:RingBuffer 采用无锁设计,可以避免线程间的锁竞争,提高系统的并发性能。

3. 高效的数据结构:RingBuffer 是一个环形缓冲区,可以高效地存储和访问数据,同时支持多生产者和多消费者模式。

4. 支持批量读写:RingBuffer 可以支持批量读写操作,可以减少系统的上下文切换开销,提高系统的性能。

5. 易于扩展:RingBuffer 可以很容易地扩展到多个消费者和生产者,支持动态增加和删除消费者和生产者,具有很好的可扩展性。

RingBuffer 具有高效、无锁、高效的数据结构、支持批量读写和易于扩展等优点,可以帮助开发者构建高性能、可扩展的系统

 

3、Disruptor 主要实现类

Disruptor

Disruptor 的入口,主要封装了环形队列 RingBuffer、消费者集合 ConsumerRepository 的引用,主要提供了获取环形队列、添加消费者、生产者向 RingBuffer 中添加事件(即生产者生产数据的操作)

RingBuffer

Disruptor 中队列具体的实现,底层封装了 Object[]数组,在初始化时,会使用 Event 事件对数组进行填充,填充的大小就是 bufferSize 设置的值,此外该对象内部还维护了 Sequencer(序列生成器)具体的实现

Sequencer

序列生成器,分别有 MultiProducerSequencer(多生产者序列生产器) 和 SingleProducerSequencer(单生产者序列生产器)两个实现类。在 Sequencer 中,维护了消费者的 Sequence(序列对象)和生产者自己的 Sequence(序列对象);以及维护了生产者与消费者序列冲突时候的等待策略 WaitStrategy

Sequence

序列对象,内部维护了一个 long 型的 value,这个序列指向了 RingBuffer 中 Object[]数组具体的角标,生产者和消费者各自维护自己的 Sequence,但都是指向 RingBuffer 的 Object[]数组;

WaitStrategy

等待策略,当没有可消费的事件时,消费者根据特定的策略进行等待,当没有可生产的地方时,生产者根据特定的策略进行等待

Event

事件对象,就是我们 Ringbuffer 中存在的数据,在 Disruptor 中用 Event 来定义数据,并不存在 Event 类,它只是一个定义,是一个概念,表示要传递的数据;

EventProcessor

事件处理器,单独在一个线程内执行,判断消费者的序列和生产者序列关系,决定是否调用自定义的事件处理器,也就是是否可以进行消费

EventHandler

事件处理器,由用户自定义实现,也就是最终的事件消费者,需要实现 EventHandler 接口

Producer

事件生产者,我们定义的发送事件的对象

4、Disruptor 的生产和消费

(1)当 Disruptor 框架启动:



(2)此时,还没有数据进行写入


 

(3)准备写入数据前的准备,获取可以写入数据的最大序列;


 

(4)写入数据完成,更新生产者序列对象的值;


 

以上就是单生产者写入数据的过程,要注意的是,无论是生产者还是消费者,序列的初始值都是-1

当引入消费者后,生产者在获取可写入的序列之前,都会判断消费者所处的序列。

 

5、Disruptor 常用等待策略

com.lmax.disruptor.WaitStrategy

决定一个消费者如何等待生产者将 Event 置入 Disruptor;

其所有实现都是针对消费者线程的;

主要策略有

com.lmax.disruptor.BlockingWaitStrategy

最低效的策略,但其对 CPU 的消耗最小,并且在各种部署环境中能提供更加一致的性能表现;

内部维护了一个重入锁 ReentrantLock 和 Condition;

 

com.lmax.disruptor.SleepingWaitStrategy

性能表现和 com.lmax.disruptor.BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;

是一种无锁的方式,比如 log4j2 使用了 Disruptor 框架;

 

com.lmax.disruptor.YieldingWaitStrategy

性能最好,适合用于低延迟的系统,在要求极高性能且事件处理线程数小于 CPU 逻辑核心数的场景中,推荐使用此策略,例如 CPU 开启超线程的特性;

6、Disruptor 为什么这么快

Disruptor 通过以下设计来解决队列速度慢的问题:

环形数组结构

为了避免垃圾回收,采用数组而非链表,同时,数组对处理器的缓存机制更加友好。

元素位置定位

数组长度 2^n,通过位运算,加快定位的速度,下标采取递增的形式,不用担心 index 溢出的问题,index 是 long 类型,即使 100 万 QPS 的处理速度,也需要 30 万年才能用完。

无锁设计

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

 

7、 性能测试及内存分析

为了直观地感受 Disruptor 有多快,设计了一个性能对比测试:Producer 发布 1 亿次事件,从发布第一个事件开始计时,捕捉 Consumer 处理完所有事件的耗时。

测试用例在 Producer 如何将事件通知到 Consumer 的实现方式上,设计了两种不同的实现:

1. Producer 的事件发布和 Consumer 的事件处理在不同的线程,通过 ArrayBlockingQueue 传递给 Consumer 进行处理;

2. Producer 的事件发布和 Consumer 的事件处理在不同的线程,通过 Disruptor 传递给 Consumer 进行处理;

 

关键测试代码

a. 抽象类

进行一亿次 CAS 运算计算耗时



b. ArrayBlockingQueue 性能测试代码


 

c. Disruptor 性能测试代码


 

耗时测试对比结果

 

堆内存对比结果

指定-Xms1024m -Xmx1024m 运行环境

ArrayBlockingQueueTest 测试结果



DisruptorTest 测试结果



gc 对比:ArrayBlockingQueueTest 触发了 13 次 gc,而 DisruptorTest 使用 ringBuffer 避免了每次需要分配和释放,因此未触发 gc;

新生代内存:ArrayBlockingQueueTest 占用了 177176K,而 DisruptorTest 占用 82740k,节省 40%的内存占用

老年的内存:ArrayBlockingQueueTest 占用 5%,而 DisruptorTest 占用 0%

经过对比可以看出 Disruptor 对应内存的使用优化更优

 

8、实际场景应用

流程编排如何支持异步链,将同步链路转为异步链路,传统方式开启异步线程执行,线程数量超过核心线程数时,线程将放入 java 队列中等待唤起,java 队列的缺点上面已经阐述,我们可以通过 Disruptor 的特性自定义实现线程池



流程编排增加 Disruptor 异步线程池扩展,丰富组件的功能

 


以上为根据 Disruptor 组件封装的自定义线程池,收发消息的公共模块

1. Disruptor 中队列具体的实现,底层封装了 Object[]数组

2. WorkPoolManager 提供 Disruptor 自定义线程组件初始化,缓存线程池

3. DisruptorWorkPool 实现自定义线程池的封装,线程池的一级缓存队列、二级缓存队列,

ringbuffer 虽然号称无界队列,但本质是一个数组(有界),只是消息在数组上可以反复覆盖。

当消息没有被覆盖的情况下,已经消费的消息一直被引用不会 GC,所以建议 ringbuffer 的 size 不要太大。

但是异步任务很可能是一个耗时的长任务,所以在此引进了二级缓存的概念

4. WorkHandler 为消费者接口,DefaultWorkerHandler 封装消费 handler 执行的父类

针对组件的封装,使用者只需关注业务逻辑 handler 实现即可

 

以流程编排执行一个异步流程的 handler 为例,画如下时序图



1. 服务启动时,RouterConfigrator 加载链配置,循环 handler 调用 WorkPoolManager.init()初始化线程池,并设置事件业务处理器对应消费者处理



2. 用户发起请求后,业务系统接收到请求,RouterService 解析请求参数的 routerName,在 spring 容器中获取该 bean

3. RouterPipleLine 处理类循环遍历 Router 中的 handler,判断 async 是否为异步标识,如上图配置所示。当 async 为 false 同步时,直接执行 handler;当 async 为 true 异步时,向 DisruptorWorkPool 自定义线程池中加入该 handler。



4. DefaultWorkerHandler 消费监听到消息,解析消息信息,并执行对应 handler,完成异步链路。

9、总结

       Disruptor 和传统的线程池相比,具有更高的并发性能和更低的延迟。这是因为 Disruptor 使用了无锁算法和基于序列的技术来实现数据共享和通信,避免了线程间的互斥和同步操作,从而提高了并发性能,并且由于没有线程切换的开销,也可以降低延迟。

     Disruptor 适用于需要高性能、低延迟、大规模并发、对数据顺序有要求等场景,例如高频交易系统、大规模数据处理系统、实时消息系统等。Disruptor 是一种本地内存消息传递机制,不适用于分布式系统。如果需要在分布式环境中使用 Disruptor,可以考虑使用类似于 Kafka 的分布式消息队列来代替。

 

用户头像

还未添加个人签名 2023-09-13 加入

还未添加个人简介

评论

发布
暂无评论
Disruptor在流程编排中的应用与探索_金融科技_ZA技术社区_InfoQ写作社区