写点什么

🏆【分布式技术专题】【分布式技术专题】RocketMQ 延迟消息实现原理和源码分析

发布于: 3 小时前
🏆【分布式技术专题】【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

痛点背景

业务场景

假设有这么一个需求,用户下单后如果 30 分钟未支付,则该订单需要被关闭。你会怎么做?

之前方案

最简单的做法,可以服务端启动个定时器,隔个几秒扫描数据库中待支付的订单,如果(当前时间-订单创建时间)>30 分钟,则关闭订单。

方案评估
  • 优点:是实现简单,缺点呢?

  • 缺点:定时扫描意味着隔个几秒就得查一次数据库,频率高的情况下,如果数据库中订单总量特别大,这种高频扫描会对数据库带来一定压力,待付款订单特别多时(做个爆品秒杀活动,或者啥促销活动),若一次性查到内存中,容易引起宕机,需要分页查询,多少也会有一定数据库层面压力。

延时队列出现
  • 能够在指定时间间隔后触发某个业务操作

  • 能够应对业务数据量特别大的特殊场景


RocketMQ 延时消息能够完美的解决上述需求,正常的消息在投递后会立马被消费者所消费,而延时消息在投递时,需要设置指定的延时级别(不同延迟级别对应不同延迟时间),即等到特定的时间间隔后消息才会被消费者消费,这样就将数据库层面的压力转移到了 MQ 中,也不需要手写定时器,降低了业务复杂度,同时 MQ 自带削峰功能,能够很好的应对业务高峰。

功能特点

  • RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;

  • 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;

  • 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;

  • broker 在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的 topic 的 queue 里面。

Broker 处理延迟消息

延时队列生产者端:

延时消息的关键点在于 Producer 生产者需要给消息设置特定延时级别,消费端代码与正常消费者没有差别。


public class Producer {  private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";    public static void main(String[] args) throws MQClientException, InterruptedException {        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");        //设置namesrv地址        producer.setNamesrvAddr("111.231.110.149:9876");        //启动生产者        producer.start();        //发送10条消息        for (int i = 0; i < 10; i++) {            try {                Message msg = new Message("TopicTest" /* Topic */,                    "TagA" /* Tag */,                    ("test message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */                );                //设置消息延时级别  3对应10秒后发送                //延时级别1对应延时1秒后发送消息                //延时级别2对应延时5秒后发送消息                //延时级别3对应延时10秒后发送消息                //以此类推。                msg.setDelayTimeLevel(3);                SendResult sendResult = producer.send(msg);                System.out.printf("%s%n", sendResult);            } catch (Exception e) {                e.printStackTrace();                Thread.sleep(1000);            }        }        /*         * Shut down once the producer instance is not longer in use.         */        producer.shutdown();    }}
复制代码


初始化


DefaultMessageStore 在启动时,会调用 ScheduleMessageService#load()方法来加载消息消费进度和初始化延迟级别对应 map,然后调用 ScheduleMessageService#start()方法来启动类


load 方法


public boolean load() {        boolean result = super.load();        result = result && this.parseDelayLevel();        return result;}
复制代码


ScheduleMessageService 继承自 ConfigManager 类,super.load()方法对应


public boolean load() {        String fileName = null;        try {            fileName = this.configFilePath();            String jsonString = MixAll.file2String(fileName);
if (null == jsonString || jsonString.length() == 0) { return this.loadBak(); } else { this.decode(jsonString); log.info("load " + fileName + " OK"); return true; } } catch (Exception e) { log.error("load " + fileName + " failed, and try to load backup file", e); return this.loadBak(); }}
复制代码

延时队列源码分析:

先从延时消息延迟级别设置与 broker 端消息持久化入手。

具体实现

RocketMQ 发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq 把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定 topic 的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早。

启动延迟消息定时任务

如果想要深入了解的可以看一下 ScheduleMessageService 这个类

内部变量含义

延时消息定时投递相关具体实现代码在 ScheduleMessageService 中,先看下变量定义



  • delayLevelTable 定义了延迟级别和延迟时间的对应关系

  • offsetTable 存放延延迟级别对应的队列消费的 offset


ScheduleMessageService.start()
复制代码


延迟消息投递


其中根据,delayLevel 获取消费队列 id 的方法如下,即 queueId = delayLevel-1


public static int delayLevel2QueueId(final int delayLevel) {        return delayLevel - 1;}
复制代码



核心逻辑就是取出 tagCode(延时消息持久化时,tagsCode 存储的是消息投递时间),解析成消息投递时间,与当前时间戳做差,判断是否应该进行消息投递,具体进行消息投递的方法,在 if (countdown <= 0)中,看下代码



每个扫描任务主要是把队列中所有到期的消息都拿出来,并发送到指定的 topic 下,并把延迟队列中的消息删除

重新投递实现

重新构建投递消息的关键点在于 messageTimeup 中,其构建了一个新的消息,并从延时消息属性中恢复出了原有的 topic,queueId,再调用 putMessage 重新进行投递。

总结

  • 优点:设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性

  • 缺点:定时器采用了 timer,timer 是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况

  • 改进点:可以在每个延迟队列上各采用一个 timer,或者使用 timer 进行扫描,加一个线程池对消息进行处理,这样可以提供效率

基本思路已经介绍完,梳理下延时消息实现思路

  • producer 端设置消息 delayLevel 延迟级别,消息属性 DELAY 中存储了对应了延时级别

  • broker 端收到消息后,判断延时消息延迟级别,如果大于 0,则备份消息原始 topic,queueId,并将消息 topic 改为延时消息队列特定 topic(SCHEDULE_TOPIC),queueId 改为延时级别-1

  • mq 服务端 ScheduleMessageService 中,为每一个延迟级别单独设置一个定时器,定时(每隔 1 秒)拉取对应延迟级别的消费队列

  • 根据消费偏移量 offset 从 commitLog 中解析出对应消息

  • 从消息 tagsCode 中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递

  • 若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的 topic,queueId,并清除消息延迟属性,从新进行消息投递

发布于: 3 小时前阅读数: 6
用户头像

🏆2021年InfoQ写作平台-签约作者 🏆 2020.03.25 加入

👑【酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“】 🏅 【Java技术领域,MySQL技术领域,APM全链路追踪技术及微服务、分布式方向的技术体系等】 我们始于迷惘,终于更高水平的迷惘

评论

发布
暂无评论
🏆【分布式技术专题】【分布式技术专题】RocketMQ延迟消息实现原理和源码分析