写点什么

RocketMQ 消息回溯实践与解析

作者:Geek_e3e86e
  • 2025-03-07
    湖南
  • 本文字数:13186 字

    阅读完需:约 43 分钟

1 问题背景

前段时间,小 A 公司的短信服务出现了问题,导致一段时间内的短信没有发出去,等服务修复后,需要重新补发这批数据。

由于短信服务是直接通过 RocketMQ 触发,因此在修复这些数据的时候,小 A 犯了难,于是就有了以下对话

领导:小 A 呀,这数据这么多,你准备怎么修呀?

小A:头大呀领导,一般业务我们都有一个本地消息表来做幂等,我只需要把数据库表的状态重置,然后把数据捞出来重新循环执行就可以啦,但是短信服务我们没有本地表呀!

领导:那你有什么想法吗?

小A:简单的话,那就让上游重发吧,我们再消费一遍就好了。

领导:这样问题就更严重了呀,你想,上游重发一遍,那是不是所有的消费者组都要重新消费一遍,到时候其他业务同学就要来找你了。

小A:那就不好办了。。。

领导:其实 RocketMQ 有专门的消息回溯的能力,你可以试试

小A:这么神奇?我研究研究。。。

2 验证

2.1 生产者启动

准备一个新的 topic,并发送 1W 条消息

public static void main(String[] args) throws MQClientException, InterruptedException {        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");        producer.setNamesrvAddr("127.0.0.1:9876");        producer.start();        for (int i = 0; i < 10000; i++) {            try {                Message msg = new Message("TopicTest" /* Topic */,                    "TagA" /* Tag */,                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */                );                SendResult sendResult = producer.send(msg);                System.out.printf("%s%n", sendResult);            } catch (Exception e) {                e.printStackTrace();                Thread.sleep(1000);            }        }        producer.shutdown();    }
复制代码

2.2 消费者启动

准备一个新的消费者组,消费 topic 下数据并记录总条数

public static void main(String[] args) throws InterruptedException, MQClientException {    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");    consumer.setNamesrvAddr("127.0.0.1:9876");    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);    consumer.subscribe("TopicTest", "*");        final AtomicInteger count = new AtomicInteger();        consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,            ConsumeConcurrentlyContext context) {            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);            count.incrementAndGet();            System.out.printf("%s Receive New Messages End: %s %n", Thread.currentThread().getName(), msgs);            System.out.println(count.get());            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        }    });
    consumer.start();}
复制代码


消费者消息记录

2.3 执行回溯

命令行执行

mqadmin.cmd resetOffsetByTime -n 127.0.0.1:9876 -t TopicTest -g please_rename_unique_group_name_4 -s 1722240069000
复制代码

以下为 mqadmin.cmd 的内容,因此也可以直接通过调用 MQAdminStartup 的 main 方法执行

MQAdminStartup 手动执行

代码执行

public static void main(String[] args) {    String[] params = new String[]{"resetOffsetByTime","-n","127.0.0.1:9876","-t", "TopicTest", "-g", "please_rename_unique_group_name_4", "-s", "1722240069000"};    MQAdminStartup.main(params);}
复制代码

2.4 结果验证

客户端重置成功记录

消费者重新消费记录

2.5 验证小结

从结果上来看,消费者 offset 被重置到了指定的时间戳位置,由于指定时间戳早于最早消息的创建时间,因此重新消费了所有未被删除的消息。

那 rocketmq 究竟做了什么呢?

2.5.1 分析参数

动作标识:resetOffsetByTime

额外参数:

-n nameserver 的地址

-t 指定 topic 名称

-g 指定消费者组名称

-s 指定回溯时间

2.5.2 思考

消息回溯思考

3 分析

以下源码部分均出自 4.2.0 版本,展示代码有所精简。

3.1 策略模式,解析命令行

org.apache.rocketmq.tools.command.MQAdminStartup#main

/*根据动作标识解析除对应的处理类,我们本次请求实际处理策略类:ResetOffsetByTimeCommand*/SubCommand cmd = findSubCommand(args[0]);/*解析命令行*/Options options = ServerUtil.buildCommandlineOptions(new Options());CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),                            new PosixParser());                            /*提交请求执行*/cmd.execute(commandLine, options, rpcHook);
复制代码

3.2 创建客户端,与服务端交互

org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand#execute

public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);        String group = commandLine.getOptionValue("g").trim();//消费者组    String topic = commandLine.getOptionValue("t").trim();//主题    String timeStampStr = commandLine.getOptionValue("s").trim();//重置时间戳    long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : Long.parseLong(timeStampStr);//重置时间戳    boolean isC = false;//是否C客户端    boolean force = true;//是否强制重置,这里提前解释一下,有可能时间戳对应的offset比当前消费进度要大,强制的话会出现部分消息消费不到    if (commandLine.hasOption('f')) {        force = Boolean.valueOf(commandLine.getOptionValue("f").trim());    }
    /*与nameserver以及broker交互的客户端启动*/    defaultMQAdminExt.start();    /*正式执行命令*/    Map<MessageQueue, Long> offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);}
复制代码

3.3 获取 topic 对应的 broker 地址,提交重置请求

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp

public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,    boolean isC)    throws RemotingException, MQBrokerException, InterruptedException, MQClientException {    /*从nameserver处获取broker地址*/    TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);    /*由于消息数据分区分片,topic下的messagequeue可能存在多个broker上,因此这是个列表*/    List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();    Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();    if (brokerDatas != null) {        for (BrokerData brokerData : brokerDatas) {            String addr = brokerData.selectBrokerAddr();            if (addr != null) {                /*循环与各个broker交互,执行重置操作*/                Map<MessageQueue, Long> offsetTable =                    this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,                        timeoutMillis, isC);                if (offsetTable != null) {                    allOffsetTable.putAll(offsetTable);                }            }        }    }    return allOffsetTable;}
复制代码

3.4 与 nameserver 交互获取 broker 地址

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#examineTopicRouteInfo

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,    boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();    requestHeader.setTopic(topic); /*同样的组装参数,请求码:105*/    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
    /*创建请求与nameserver交互*/    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);    byte[] body = response.getBody();    if (body != null) {        return TopicRouteData.decode(body, TopicRouteData.class);    }}
复制代码

3.4.1 nameserver 收到请求,获取路由信息并返回

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,    RemotingCommand request) throws RemotingCommandException {    final RemotingCommand response = RemotingCommand.createResponseCommand(null);    final GetRouteInfoRequestHeader requestHeader =        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
    /*nameserver内部存储topic的路由信息*/    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); byte[] content = topicRouteData.encode();    response.setBody(content);    response.setCode(ResponseCode.SUCCESS);    response.setRemark(null);    return response;}
复制代码

3.4.2 RouteInfoManager 的核心属性

//topic路由信息,根据这个做负载均衡,QueueData里面记录brokerNameprivate final HashMap<String/* topic */, List<QueueData>> topicQueueTable;//broke基本信息 名称  所在集群信息   主备broke地址  brokerId=0表示master   >0表示slaveprivate final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;//集群信息,包含集群所有的broke信息private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;//存活的broke信息,以及对应的channelprivate final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;//broke的过滤类信息private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
复制代码

3.5 与 broker 交互,执行重置操作

org.apache.rocketmq.client.impl.MQClientAPIImpl#invokeBrokerToResetOffset

public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,    final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)    throws RemotingException, MQClientException, InterruptedException {        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();    requestHeader.setTopic(topic);    requestHeader.setGroup(group);    requestHeader.setTimestamp(timestamp);    requestHeader.setForce(isForce);
    /*同样的组装参数,请求码:222*/    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);    if (isC) {        request.setLanguage(LanguageCode.CPP);    } /*创建请求与broker交互,注意这里是同步invokeSync*/    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);    if (response.getBody() != null) {        ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);        return body.getOffsetTable();    }}
复制代码

broker 收到请求,开始处理

org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset

public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,    boolean isC) {    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
    /*记录下该消费者组消费topic下的队列要重置到哪条offset*/    Map<MessageQueue/*队列*/, Long/*offser*/> offsetTable = new HashMap<MessageQueue, Long>();
    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {        MessageQueue mq = new MessageQueue();        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());        mq.setTopic(topic);        mq.setQueueId(i);
        /*broker可以获取该topic下的consumergroup下的某个队列的offset*/        long consumerOffset =            this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);//消费者组当前已经消费的offset        if (-1 == consumerOffset) {            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark(String.format("THe consumer group <%s> not exist", group));            return response;        }
        long timeStampOffset;        if (timeStamp == -1) {   //没有指定表示当前队列最大的offset            timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);        } else {            //根据时间戳查到队列下对应的offset            timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);        }
        if (timeStampOffset < 0) {            //<0表示消息已经被删掉了            log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);            timeStampOffset = 0;        }
        /*如果isForce=false,则要重置的offset<当前正在消费的offset才会重置。也过来,也就是说重置不仅会回溯,消费进度过慢也可以往后拨,加快消费进度*/        if (isForce || timeStampOffset < consumerOffset) {            offsetTable.put(mq, timeStampOffset);        } else {            offsetTable.put(mq, consumerOffset);        }    }
    /*确定了要先重置的offset之后开始与客户端交互,准备客户端重置,请求码220*/    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();    requestHeader.setTopic(topic);    requestHeader.setGroup(group);    requestHeader.setTimestamp(timeStamp);    RemotingCommand request =        RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);    if (isC) {        // c++ language        ResetOffsetBodyForC body = new ResetOffsetBodyForC();        List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);        body.setOffsetTable(offsetList);        request.setBody(body.encode());    } else {        // other language        ResetOffsetBody body = new ResetOffsetBody();        body.setOffsetTable(offsetTable);        request.setBody(body.encode());    }
    /*拿到与当前broker建立连接的消费者组客户端信息*/    ConsumerGroupInfo consumerGroupInfo =        this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
    if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {        //获取长连接channel        ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =            consumerGroupInfo.getChannelInfoTable();        for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {            int version = entry.getValue().getVersion();            /*这里版本可以判断,只有客户端版本>3.0.7才支持重置*/            if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {                try {                    /*注意这里是只管发不管收,可以简单理解为异步了invokeOneway*/                    this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);                    log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",                        topic, group, entry.getValue().getClientId());                } catch (Exception e) {                    log.error("[reset-offset] reset offset exception. topic={}, group={}",                        new Object[] {topic, group}, e);                }            } else {                response.setCode(ResponseCode.SYSTEM_ERROR);                response.setRemark("the client does not support this feature. version="                    + MQVersion.getVersionDesc(version));                log.warn("[reset-offset] the client does not support this feature. version={}",                    RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));                return response;            }        }    } else {        String errorInfo =            String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",                requestHeader.getGroup(),                requestHeader.getTopic(),                requestHeader.getTimestamp());        log.error(errorInfo);        response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);        response.setRemark(errorInfo);        return response;    }    response.setCode(ResponseCode.SUCCESS);    ResetOffsetBody resBody = new ResetOffsetBody();    resBody.setOffsetTable(offsetTable);    response.setBody(resBody.encode());    return response;}
复制代码

3.6 消费客户端收到请求,开始处理

org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset

public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {    DefaultMQPushConsumerImpl consumer = null;    try {        /*根据消费者组找到对应的消费实现,即我们熟悉的DefaultMQPushConsumerImpl或者DefaultMQPullConsumerImpl*/        MQConsumerInner impl = this.consumerTable.get(group);        if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {            consumer = (DefaultMQPushConsumerImpl) impl;        } else {            //由于PullConsumer消费进度自己控制,因此直接返回            log.info("[reset-offset] consumer dose not exist. group={}", group);            return;        }                consumer.suspend();//暂停消费
        /*暂停消息拉取,以及待处理的消息缓存都清掉*/        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();        for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {            MessageQueue mq = entry.getKey();            if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {                ProcessQueue pq = entry.getValue();                pq.setDropped(true);                pq.clear();            }        }          /*这里的等待实现比较简单,与broker交互是同步,broker与consumer交互是异步,因此这里阻塞10秒是为了保证所有的consumer都在这里存储offset并触发reblance*/        try {            TimeUnit.SECONDS.sleep(10);        } catch (InterruptedException e) {        }
        Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();        while (iterator.hasNext()) {            MessageQueue mq = iterator.next();            //获取messagequeue应该被重置的offset            Long offset = offsetTable.get(mq);            if (topic.equals(mq.getTopic()) && offset != null) {                try {                    /*更新更新本地offset,这里注意集群模式是先修改本地,然后定时任务每五秒上报broker,而广播模式offset在本地存储,因此只需要修改消费者本地的offset即可*/                    consumer.updateConsumeOffset(mq, offset);                    consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));                    iterator.remove();                } catch (Exception e) {                    log.warn("reset offset failed. group={}, {}", group, mq, e);                }            }        }    } finally {        if (consumer != null) {            /*重新触发reblance,由于broker已经重置的该消费者组的offset,因此重分配后以broker为准*/            consumer.resume();        }    }}
复制代码

4 核心流程

消息回溯全流程

5 总结

消息回溯功能是 RocketMQ 提供给业务方的定心丸,业务在出现任何无法恢复的问题后,都可以及时通过消息回溯来恢复业务或者订正数据。特别是在流或者批计算的场景,重跑数据往往是常态。

RocketMQ 能实现消息回溯功能得益于其简单的位点管理机制,可以很容易通过 mqadmin 工具重置位点。但要注意,由于 topic 的消息实际都是存储在 broker 上,且有一定的删除机制,因此首先要确认需要消息回溯的集群 broker 不能下线节点或者回溯数据被删除之前的时间点,确保消息不会丢失。

6 延申

通过消息回溯的功能,我们可以任意向前或者向后拨动 offset,那当我们想要指定一个区间进行消费,这个时候怎么办呢。比如当消费进度过慢,我们选择向后拨动 offset,那就会有一部分未消费的消息出现,针对这部分消息,我们应该在空余时间把他消费完成,就需要指定区间来消费了。

其实通过上面代码 org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset 中我们可以看到,对于 DefaultMQPullConsumerImpl 类型的消费者,消息重置是不生效的,这是因为 DefaultMQPullConsumerImpl 的消费进度完全由消费者来控制,那我们就可以采用拉模式来进行消费。

示例代码:

public class PullConsumerLocalTest {    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();    private static final Map<MessageQueue, Pair<Long/*最小offset*/,Long/*最大offset*/>> QUEUE_OFFSE_SECTION_TABLE = new HashMap<>();    private static final Long MIN_TIMESTAMP = 1722240069000L;//最小时间戳    private static final Long MAX_TIMESTAMP = 1722240160000L;//最大时间戳
    public static void main(String[] args) throws MQClientException {        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");        consumer.setNamesrvAddr("127.0.0.1:9876");        consumer.start();
        /*初始化待处理的offset*/        String topic = "TopicTest";        init(consumer, topic);
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);        for (MessageQueue mq : mqs) {            System.out.printf("Consume from the queue: %s%n", mq);            SINGLE_MQ:            while (true) {                try {                    PullResult pullResult =                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);                    System.out.printf("%s%n", pullResult);                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());                    switch (pullResult.getPullStatus()) {                        case FOUND:                            //check max offset and dosomething...                            break;                        case NO_MATCHED_MSG:                            break;                        case NO_NEW_MSG:                            break SINGLE_MQ;                        case OFFSET_ILLEGAL:                            break;                        default:                            break;                    }                } catch (Exception e) {                    e.printStackTrace();                }            }        }
        consumer.shutdown();    }
    private static void init(DefaultMQPullConsumer consumer, String topic) throws MQClientException {        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);        for (MessageQueue mq : mqs) {            long minOffset = consumer.searchOffset(mq, MIN_TIMESTAMP);            long maxOffset = consumer.searchOffset(mq, MAX_TIMESTAMP);            //记录区间内范围内最小以及最大的offset            QUEUE_OFFSE_SECTION_TABLE.put(mq, new Pair<>(minOffset, maxOffset));            //将最小offset写为下次消费的初始offset            OFFSE_TABLE.put(mq, minOffset);        }    }
    private static long getMessageQueueOffset(MessageQueue mq) {        Long offset = OFFSE_TABLE.get(mq);        if (offset != null)            return offset;
        return 0;    }
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {        OFFSE_TABLE.put(mq, offset);    }
}
复制代码

7 对比



用户头像

Geek_e3e86e

关注

还未添加个人签名 2025-03-07 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ消息回溯实践与解析_Java_Geek_e3e86e_InfoQ写作社区