写点什么

如何实现一个延时队列 ?

  • 2022 年 7 月 04 日
  • 本文字数:6592 字

    阅读完需:约 22 分钟

如何实现一个延时队列 ?

一、什么是延时队列

延时队列,顾名思义,就是元素在入队列时,会指定一个延时时间,期望在经过指定时间后再处理该元素。

实现延时队列的方式有多种,可以采用我们熟悉的方式实现自己的延时队列。一般延时队列可以作为基础公共服务提供给全公司使用,这种方式需要独立维护一个项目,对 qps 稳定性,数据一致性,精度等要求更高,可以采用时间轮算法实现。 我在项目中遇到一个使用延时队列的场景,因为项目中使用了 redis,所以我用 redis 实现了一个延时队列来满足需求,本文介绍下如何用 redis 实现一个延时队列。


作者可爱儿子

二、延时队列适用的场景?

延时队列适用的场景有很多:

2.1 比如超时关单:

即用户在电商平台下单后没有立即支付,等超过指定时间后订单自动关闭。

2.2 比如回调重试:

对于异步接口来说,如果给调用方回调时,由于网络不通或其他原因导致回调失败时,我们可以采用延时策略对调用方的回调接口进行重试。为了避免因网络抖动或其他原因造成的回调失败,我们可以采用的延时策略为 1min  5 min  10 min  30 min  1hour 等间隔进行回调。

2.3 会议提醒:

比如我们用的 lark 会议,在会议开始前 10 分钟对参会人进行提醒,这个功能也可以采用延时队列来实现。

2.4 各种延时提醒:

比如用户下单未支付时,系统在关单前 10 分钟提醒用户去支付。比如我曾做过的二手车的一个需求就是提醒买手尽快出价等。

这些场景都用到了延时队列,其实上述场景采用定时任务也能实现,但是相比于定时任务,延时队的时间把控更精准,延时队列不用扫描库表,对系统消耗更少。

三、延时队列的实现方式?

•DelayQueue,这个是 jdk 自带的一种延时队列,位于 java.util.concurrent 包下,它是一个有界的阻塞队列,它内部封装了一个 PriorityQueue(优先队列)

PriorityQueue 内部使用完全二叉堆来实现队列元素排序,当向 DelayQueue 队列中添加元素时,会给元素一个 Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了 Delay 时间才允许从队列中取出。有兴趣的同学可以详细查看源码。

•Quartz 定时任务,对时间精准度要求不高,数据量较小的任务,可以采用定时任务替代延时队列。

•redis 过期回调,我们可以开启监听 key 是否过期的事件,一旦 key 过期会触发一个 callback 事件。这样我们就能通过设置 key 的过期时间,来实现延时队列的效果。

•redis  sorted set,主要是利用 zset 的 score 属性,我们将延时时间转成(当前时间+延时时间)(时间单位毫秒)作为 scroe 属性。然后开启一个消费线程轮训 redis 队列,当 score 属性的值小于当前时间时,证明延时消息到期,可以进行消费。

•Mq,通过 rabbitMq  或 rocketMq 可以实现延时队列,具体实现方式省略。

•时间轮,基于 kafka 或 netty 的时间轮算法来实现延时队列,实现方式省略,有兴趣的同学可以自己查看。


实现延时队列的方式有多种,可以采用我们熟悉的方式实现自己的延时队列。一般延时队列可以作为基础公共服务提供给全公司使用,这种方式需要独立维护一个项目,对 qps 稳定性,数据一致性,精度等要求更高,可以采用时间轮算法实现。

我在项目中遇到一个使用延时队列的场景,因为项目中使用了 redis,所以我用 redis 实现了一个延时队列来满足需求,下面介绍下如何用 redis 实现一个延时队列。

四、如何用 redis 实现延时队列?

通过 redis 实现延时队列有两种方式,第一种是 redis 的过期回调,但是这种方式需要修改 redis 的配置文件并重启服务,这对于我们正在使用的 redis 服务来说比较困难。

所以我们用第二种方式来实现一个延时队列:

4.1 我们使用 redisson 来实现,首先引入 redisson 包
       <dependency>        <groupId>org.redisson</groupId>        <artifactId>redisson-spring-boot-starter</artifactId>        <version>3.16.3</version>      </dependency>
复制代码
4.2、配置 redisson
redisson:  redisModel: SINGLE  singleConfig:    address: redis://127.0.0.1:6379
复制代码


4.3 延时服务:添加延时任务,取消延时任务
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
@Autowired private RedissonClient redissonClient;
/** * 新增一个延时任务,添加job元信息 * * @param job 元信息,job中包含 taskId 主键,topicId 队列id,retryNum 重试次数,delaytime 延时时间 等,此处不展开。 */ @Override public void addJob(Job job) { //添加分布式锁,防止重复添加任务 RLock lock = redissonClient.getLock(RedisQueueKey.ADD_JOB_LOCK + job.getJobId()); try { boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo()); } String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
// 1. 将job添加到 JobPool中,jobpool 作为一个全局索引,所有未执行任务都存在jobPool 中。 RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY); if (jobPool.get(topicId) != null) { throw new BizException(ErrorMessageEnum.JOB_ALREADY_EXIST.getInfo()); }
jobPool.put(topicId, job);
// 2. 将job添加到 DelayBucket中,按延时时间进行排序 RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.add(job.getDelay(), topicId); } catch (InterruptedException e) { log.error("addJob error", e); throw new BizException("add delay job error,reason:" + e.getMessage()); } finally { if (lock != null) { lock.unlock(); } } }

/** * 删除job信息,为什么要删除job 信息? * 当我们确信上一个延时任务没有必要执行时,我们可以提前取消延时任务的执行。 * * @param jobDie 元信息 */ @Override public void deleteJob(JobDie jobDie) {
RLock lock = redissonClient.getLock(RedisQueueKey.DELETE_JOB_LOCK + jobDie.getJobId()); try { boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo()); } String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId()); //从全局索引中删除任务。 RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY); jobPool.remove(topicId); //从zset 中删除任务 RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.remove(topicId); } catch (InterruptedException e) { log.error("addJob error", e); throw new BizException("delete job error,reason:" + e.getMessage()); } finally { if (lock != null) { lock.unlock(); } } }}
复制代码
4.4 搬运线程:

搬运线程的目的是将 zset 中已经到期的任务搬运到消费队列中,消费队列中的任务会被消费线程消费。之所以会增加一个消费队列,是考虑到我们的消费能力和数据安全,如果消费能力比较弱,可能会造成消费线程阻塞,或者数据丢失。我们把到期任务放到一个阻塞队列中,可以让消费线程顺序消费。


这个地方还能继续优化,比如可以落库,可以建立多个阻塞队列,每个阻塞队列可以指定一个线程池进行消费等。

@Slf4j@Componentpublic class CarryJobScheduled {
@Autowired private RedissonClient redissonClient;
/** * 启动定时开启搬运JOB信息 */ @Scheduled(cron = "*/1 * * * * *") public void carryJobToQueue() { //System.out.println("carryJobToQueue --->"); RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK); try { boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo()); } // 将到期的任务取出来 RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); long now = System.currentTimeMillis(); Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true); if (CollectionUtils.isEmpty(jobCollection)) { return; } // 将到期的任务搬运到消费队列中,zset 中的任务删除 List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList()); RList<String> readyQueue = redissonClient.getList(RedisQueueKey.RD_LIST_TOPIC_PRE);
if (CollectionUtils.isEmpty(jobList)) { return; } if (readyQueue.addAll(jobList)) { bucketSet.removeAllAsync(jobList); }
} catch (InterruptedException e) { log.error("carryJobToQueue error", e); } finally { if (lock != null) { lock.unlock(); } } }}
复制代码
4.5 消费线程:

开启一个消费线程,消费线程会消费阻塞队列中的到期任务,其中 ConsumerService 可以采用策略模式,根据不同的 topic 进行不同的业务处理。

@Slf4j@Componentpublic class ReadyQueueContext {
@Autowired private RedissonClient redissonClient;
@Autowired private ConsumerService consumerService;
/** * TOPIC消费 */ @PostConstruct public void startTopicConsumer() { TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程"); }
/** * 开启TOPIC消费线程 * 将所有可能出现的异常全部catch住,确保While(true)能够不中断 */ @SuppressWarnings("InfiniteLoopStatement") private void runTopicThreads() { while (true) { RLock lock = null; try { lock = redissonClient.getLock(RedisQueueKey.CONSUMER_TOPIC_LOCK); } catch (Exception e) { log.error("runTopicThreads getLock error", e); } try { if (lock == null) { continue; } // 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错 boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { continue; }
// 1. 获取ReadyQueue中待消费的数据 RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RedisQueueKey.RD_LIST_TOPIC_PRE); String topicId = queue.poll(60, TimeUnit.SECONDS); if (StringUtils.isEmpty(topicId)) { continue; }
// 2. 获取job元信息内容 RMap<String, Job> jobPoolMap = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY); Job job = jobPoolMap.get(topicId);
// 3. 消费 FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getJobId(),job.getTopic(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId()); if (taskResult.get()) { // 3.1 消费成功,删除JobPool和DelayBucket的job信息 jobPoolMap.remove(topicId); } else { /** * 重试次数为0就直接返回 */ if (job.getRetry() == 0) { return; } int retrySum = job.getRetry() + 1; // 3.2 消费失败,则根据策略重新加入Bucket
// 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) { jobPoolMap.remove(topicId); continue; } job.setRetry(retrySum); long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000; // log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime)); RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.add(nextTime, topicId); // 3.3 更新元信息失败次数 jobPoolMap.put(topicId, job); } } catch (Exception e) { log.error("runTopicThreads error", e); } finally { if (lock != null) { try { lock.unlock(); } catch (Exception e) { log.error("runTopicThreads unlock error", e); } } } } }
复制代码


关于领创集团(Advance Intelligence Group)

领创集团成立于 2016 年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含 ADVANCE.AI 和 Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于 AI 技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包括亚洲领先的先享后付平台 Atome 和数字金融服务。2021 年 9 月,领创集团宣布完成超 4 亿美元 D 轮融资,融资完成后领创集团估值已超 20 亿美元,成为新加坡最大的独立科技创业公司之一。


往期回顾 BREAK AWAY

Spring data JPA 实践和原理浅析

如何解决海量数据更新场景下的 Mysql 死锁问题

企业级 APIs 安全实践指南 (建议初中级工程师收藏)

Cypress UI 自动化测试框架

serverless让我们的运维更轻松


▼ 如果觉得这篇内容对你有所帮助,有所启发,欢迎点赞收藏:

1、点赞、关注领创集团,获取最新技术分享和公司动态。

2、关注我们的公众号 & 知乎号「领创集团 Advance Group」或访问官方网站,了解更多企业动态。


发布于: 刚刚阅读数: 4
用户头像

智慧领创美好生活 2021.08.12 加入

AI技术驱动的科技集团,致力于以技术赋能为核心,通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈,带来个性化、陪伴式的产品服务和优质体验。

评论

发布
暂无评论
如何实现一个延时队列 ?_延时队列_领创集团Advance Intelligence Group_InfoQ写作社区