写点什么

一种异步延迟队列的实现方式

  • 2023-03-21
    四川
  • 本文字数:3617 字

    阅读完需:约 12 分钟

一种异步延迟队列的实现方式

作者:京东零售 张路瑶

1.应用场景

目前系统中有很多需要用到延时处理的功能:支付超时取消、排队超时、短信、微信等提醒延迟发送、token 刷新、会员卡过期等等。通过延时处理,极大的节省系统的资源,不必轮询数据库处理任务。


目前大部分功能通过定时任务完成,定时任务还分使用 quartz 及 xxljob 两种类型轮询时间短,每秒执行一次,对数据库造成一定的压力,并且会有 1 秒的误差。轮询时间久,如 30 分钟一次,03:01 插入一条数据,正常 3:31 执行过期,但是 3:30 执行轮询时,扫描 3:00-3:30 的数据,是扫描不到 3:31 的数据的,需要 4:00 的时候才能扫描到,相当于多延迟了 29 分钟!

2.延时处理方式调研

1.DelayQueue

1.实现方式:


jvm 提供的延迟阻塞队列,通过优先级队列对不同延迟时间任务进行排序,通过 condition 进行阻塞、睡眠 dealy 时间 获取延迟任务。


当有新任务加入时,会判断新任务是否是第一个待执行的任务,若是,会解除队列睡眠,防止新加入的元素时需要执行的元素而不能正常被执行线程获取到。


2.存在的问题:


1.单机运行,系统宕机后,无法进行有效的重试


2.没有执行记录和备份


3.没有重试机制


4.系统重启时,会将任务清空!


5.不能分片消费


3.优势:实现简单,无任务时阻塞,节省资源,执行时间准确

2.延迟队列 mq

实现方式:依赖 mq,通过设置延迟消费时间,达到延迟消费功能。像 rabbitMq、jmq 都可以设置延迟消费时间。RabbitMq 通过将消息设置过期时间,放入死信队列进行消费实现。


存在的问题:


1.时间设置不灵活,每个 queue 是固定的到期时间,每次新创建延时队列,需要创建新的消息队列


优点:依靠 jmq,可以有效的监控、消费记录、重试,具备多机同时消费能力,不惧怕宕机

3.定时任务

通过定时任务轮询符合条件的数据


缺点:


1.必须要读业务数据库,对数据库造成一定的压力,


2.存在延时


3.一次扫描数据量过大时,占用过多的系统资源。


4. 无法分片消费


优点:


1.消费失败后,下次还能继续消费,具备重试能力,


2.消费能力稳定

4.redis

任务存储在 redis 中,使用 redis 的 zset 队列根据 score 进行排序,程序通过线程不断获取队列数据消费,实现延时队列


优点:


1、查询 redis 相比较数据库快,set 队列长度过大,会根据跳表结构进行查询,效率高


2、redis 可根据时间戳进行排序,只需要查询当前时间戳内的分数的任务即可


3、无惧机器重启


4、分布式消费


缺点:


1.受限于 redis 性能,并发 10W


2.多个命令无法保证原子性,使用 lua 脚本会要求所有数据都在一个 redis 分片上。

5. 时间轮

通过时间轮实现的延迟任务执行,也是基于 jvm 单机运行,如 kafka、netty 都有实现时间轮,redisson 的看门狗也是通过 netty 的时间轮实现的。


缺点:不适合分布式服务的使用,宕机后,会丢失任务。


3.实现目标

兼容目前在使用的异步事件组件,并提供更可靠,可重试、有记录、可监控报警、高性能的延迟组件。


•消息传输可靠性:消息进入到延迟队列后,保证至少被消费一次。


•Client 支持丰富:支持多重语言。


•高可用性:支持多实例部署。挂掉一个实例后,还有后备实例继续提供服务。


•实时性:允许存在一定的时间误差。


•支持消息删除:业务使用方,可以随时删除指定消息。


•支持消费查询


•支持手动重试


•对当前异步事件的执行增加监控

4.架构设计

5.延迟组件实现方式

1.实现原理

目前选择使用 jimdb 通过 zset 实现延时功能,将任务 id 和对应的执行时间作为 score 存在在 zset 队列中,默认会按照 score 排序,每次取 0-当前时间内的 score 的任务 id,


发送延迟任务时,会根据时间戳+机器 ip+queueName+sequence 生成唯一的 id,构造消息体,加密后放入 zset 队列中。


通过搬运线程,将达到执行时间的任务移动到发布队列中,等待消费者获取。


监控方通过集成 ump


消费记录通过 redis 备份+数据库持久化完成。


通过缓存实现的方式,只是实现的一种,可以通过参数控制使用哪一种实现方式,并可通过 spi 自由扩展。

2.消息结构

每个 Job 必须包含一下几个属性:


•Topic:Job 类型,即 QueueName


•Id:Job 的唯一标识。用来检索和删除指定的 Job 信息。


•Delay:Job 需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)


•Body:Job 的内容,供消费者做具体的业务处理,以 json 格式存储。


•traceId:发送线程的 traceId,待后续 pfinder 支持设置 traceId 后,可与发送线程公用同一个 traceiD,便于日志追踪


具体结构如下图表示:



TTR 的设计目的是为了保证消息传输的可靠性。

3.数据流转及流程图


基于 redis-disruptor 方式进行发布、消费,可以作为消息来进行使用,消费者采用原有异步事件的 disruptor 无锁队列消费,不同应用、不同 queue 之间无锁


1.支持应用只发布,不消费,达到消息队列的功能。


2:支持分桶,针对大 key 问题,若事件多,可以设置延迟队列和任务队列桶的数量,减小因大 key 造成的 redis 阻塞问题。


3: 通过 ducc 配置,进行性能的扩展,目前只支持开启消费和关闭消费。


4: 支持设置超时时间配置,防止消费线程执行过久


瓶颈: 消费速度慢,生产速度过快,会导致 ringbuffer 队列占满,当前应用既是生产者也是消费者时,生产者会休眠,性能取决于消费速度,可通过水平扩展机器,直接提升性能。监控 redis 队列的长度,若不断增长,可考虑增加消费者,直接提高性能。


可能出现的情况: 因一个应用公用一个 disruptor,拥有 64 个消费者线程,如果某一个事件消费过慢,导致 64 个线程都在消费这个事件,会导致其他事件无消费线程消费,生产者线程也被阻塞,导致所有事件的消费都被阻塞。


后期观察是否有这个性能瓶颈,可给每一个 queue 一个消费者线程池。

6.demo 示例

增加配置文件

判断是否开启 jd.event.enable:true


<dependency> <groupId>com.jd.car</groupId> <artifactId>senna-event</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
复制代码

配置

jd:senna:event:enable: truequeue:retryEventQueue:bucketNum: 1handleBean: retryHandle
复制代码

消费代码:

package com.jd.car.senna.admin.event;
import com.jd.car.senna.event.EventHandler;import com.jd.car.senna.event.annotation.SennaEvent;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;
/*** @author zhangluyao* @description* @create 2022-02-21-9:54 下午*/@Slf4j@Component("retryHandle")public class RetryQueueEvent extends EventHandler {
@Overrideprotected void onHandle(String key, String eventType) {log.info("Handler开始消费:{}", key);}
@Overrideprotected void onDelayHandle(String key, String eventType) {log.info("delayHandler开始消费:{}", key);}}
复制代码

注解形式:

package com.jd.car.senna.admin.event;
import com.jd.car.senna.event.EventHandler;import com.jd.car.senna.event.annotation.SennaEvent;import lombok.extern.slf4j.Slf4j;
/*** @author zhangluyao* @description* @create 2022-02-21-9:54 下午*/@Slf4j@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)public class TestQueueEvent extends EventHandler {
@Overrideprotected void onHandle(String key, String eventType) {log.info("Handler开始消费:{}", key);}
@Overrideprotected void onDelayHandle(String key, String eventType) {log.info("delayHandler开始消费:{}", key);}}
复制代码

发送代码


package com.jd.car.senna.admin.controller;
import com.jd.car.senna.event.queue.IEventQueue;import lombok.extern.slf4j.Slf4j;import org.springframework.context.annotation.Lazy;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.ResponseBody;import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;import java.util.concurrent.CompletableFuture;

/*** @author zly*/@RestController@Slf4jpublic class DemoController {
@Lazy@Resource(name = "testQueue")private IEventQueue eventQueue;
@ResponseBody@GetMapping("/api/v1/demo")public String demo() {log.info("发送无延迟消息");eventQueue.push("no delay 5000 millseconds message 3");return "ok";}
@ResponseBody@GetMapping("/api/v1/demo1")public String demo1() {log.info("发送延迟5秒消息");eventQueue.push(" delay 5000 millseconds message,name",1000*5L);return "ok";}
@ResponseBody@GetMapping("/api/v1/demo2")public String demo2() {log.info("发送延迟到2022-04-02 00:00:00执行的消息");eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));return "ok";}
}
复制代码


参考有赞设计:https://tech.youzan.com/queuing_delay/

7.目前应用:

1.云修到店排队 24 小时后自动取消


2..美团请求 token 定时刷新。


3.质保卡延期 24 小时生成


5. 结算单延期生成


6.短信延迟发送

发布于: 刚刚阅读数: 3
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
一种异步延迟队列的实现方式_架构_京东科技开发者_InfoQ写作社区