写点什么

一张图进阶 RocketMQ - 消费者这个大冤种

作者:三此君
  • 2022 年 8 月 14 日
    广东
  • 本文字数:10137 字

    阅读完需:约 33 分钟

一张图进阶 RocketMQ - 消费者这个大冤种

前 言

三此君看了好几本书,看了很多遍源码整理的 一张图进阶 RocketMQ 图片,关于 RocketMQ 你只需要记住这张图!觉得不错的话,记得点赞关注哦。

本文是《一张图解析 RocketMQ》系列的第 7 篇,之前我们已经了解了:

  1. 一张图进阶 RocketMQ - 整体架构

  2. 一张图进阶 RocketMQ - NameServer

  3. 一张图进阶 RocketMQ - 消息发送

  4. 一张图进阶 RocketMQ - 通信机制

  5. 一张图进阶 RocketMQ - 消息存储

  6. 一张图进阶 RocketMQ - 消息刷盘&索引构建

生产者发送消息,到 Broker 存储消息,现在 RocketMQ 消费者终于可以消费了。消费者是上帝嘛,应该衣来伸手,饭来张口,等着 Broker 将消息投喂过来就行。可是现实狠狠的抽了我一个大嘴巴子,消息要消费者自己去取,还得干一堆脏活累活。直接降维打击,消从上帝变成纯纯的打工人。

都是打工人,打工人肯定能互相理解,所以大家肯定很快就能理解 RocketMQ 消费者。我们还是通过一个示例,并以其中的关键代码为切入点,深入分析 RocketMQ 消费者的设计与实现。

消费者示例

public class Consumer {  public static void main(String[] args) throws InterruptedException, MQClientException {      // 实例化消费者        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");      // 设置NameServer的地址        consumer.setNamesrvAddr("localhost:9876");      // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息        consumer.subscribe("sancijun", "*");      // 注册回调实现类来处理从broker拉取回来的消息        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(              List<MessageExt> msgs,ConsumeConcurrentlyContext context) {                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);                // 标记该消息已经被成功消费                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        // 启动消费者实例        consumer.start();  }}
复制代码
  • 首先,实例化一个 DefaultMQPushConsumer 消费者 consumer,告诉它 NameServer 的地址,这样消费者才能从 NameServer 获取路由信息。除了 DefaultMQPushConsumer,还有 DefaultMQPullConsumer。DefaultMQPullConsumer 消费行为主要由业务方自己控制,而 DefaultMQPushConsumer 主要由 RocketMQ 控制消费行为。也就是 DefaultMQPullConsumer 需要业务调用 API 去拉取消息,而 DefaultMQPushConsumer 其实是基于 Pull 拉的方式来实现 Push 推的效果。DefaultMQPushConsumer 会自动把消息拉取回来,然后回调业务实现的 MessageListener,把消息交回给业务方。在实践过程中也更常用,所以我们主要分析 DefaultMQPushConsumer 的原理。

  • 然后这个消费者需要知道自己可以消费哪些 Topic 的消息,也就是每个消费者需要订阅一个或多个 Topic,并且指定了 tag。其实在消费发送和存储的时候我们都有看到 tag,主要用于定义消息的业务属性。消费者可以只订阅 Topic 下某些 tag 的消息,也就是根据 tag 过滤消息。

  • 消费者也需要做一些初始化,Broker 并不会将消息推给消费者,而是需要消费者按需自取。业务本身并没有理会怎么从 Broker 拉取消息,这些都是 DefaultMQPushConsumer 默默无闻的奉献。所以,我们需要启动消费者,消费者会从 NameServer 拉取路由信息,并不断从 Broker 拉取消息。

  • 消息拉取回来后,消息需要怎么处理呢?每个消费者都不一样(业务本身决定),由我们业务定义的 MessageListener 处理。最后,消费者也需要确认收货,就是告诉 Broker 消费成功与否。

从上面的例子可以看到,我们只是启动了消费者,并没有调用拉取消息相关的 API,消息是怎么顺着网线爬过来的呢?既然只是启动了消费者,那我们就来看看消费者启动是怎么肥四。

消费者启动

这张图就像金陵十二钗的判词一样,从启动的时候就早早的注定了 RocketMQ 消费者的宿命。消费者注定只是打工人,不是上帝。我们来看看打工人第一步要干些什么:

  • 检查消费者的配置,比如消费者组名、消费类型、Queue 分配策略等参数是否符合规范;将订阅关系数据发给 Rebalance 服务对象。校验消费者实例名,如果是默认的名字,则更改为当前的程序进程 id。

    消费类型就是在 RocketMQ 概述中提到的 集群消费或广播消费,而 Queue 分配策略和 Rebalance 我们会在本文后面分析。

  • 获取或创建 MQClientInstance,MQClientInstance 和 RocketMQ 生产者中 Producer 启动的 MQClientInstance 是一样的,用于管理本实例中全部生产者与消费者的生产和消费行为。同一个 clientId 是共用一个 MQClientInstance 的, clientId 是通过本机 IP 和 instanceName(默认值 default)拼起来的。

  • 设置 Rebalance 对象消费者组、消费类型、Queue 分配策略、MQClientInstance 等参数,后面用到了再聊。

  • 初始化 Broker API 的封装类 pullAPIWrapper,看名字就知道消息拉取流程中会派上用场,同时注册消息过滤器。

  • 初始化位点管理器,并加载位点信息,位点管理也就是消费进度管理啦。位点管理器分为本地管理和远程管理两种,集群消费时消费位点保存在 Broker 中,由远程管理器管理;广播消费时位点存储在本地,由本地管理器管理。

  • 本地注册消费者实例,如果注册成功,则表示消费者启动成功。

  • 启动 MQClientInstance 实例,启动过程和生产者启动一致。主要是启动了 NettyRemotingClient 和一些定时任务等。

  • 初始化消费服务并启动。之所以用户“感觉”消息是 Broker 主动推送给自己的,是因为 DefaultMQPushConsumer 通过 PullMessageService 将消息拉取到本地,再通过 Callback 的 形 式,将本地消息 Push 给用户的消费代码。DefaultMQPushConsumer 与 DefaultMQPullConsumer 获取消息的方式一样,本质上都是拉取。

  • 更新本地订阅关系和路由信息;通过 Broker 检查是否支持消费者的过滤类型;向集群中的所有 Broker 发送消费者组的心跳信息。

  • 立即执行一次 Rebalance,Rebalance 过程我们在后文中详细讲解。

大家是不是看的云里雾里的,反正我刚开始看的时候很懵逼,完全看不懂这是什么操作。各位看官不用惊慌,后面流程涉及的时候我们会详细分析。这里先混个眼熟,流程中遇到的时候我们就知道这个组件是启动的时候初始化过的。

消息消费流程

接下来我们就看看消费者是如何从上帝沦为打工人的。首先消费者需要自己从 Broker 拉取消息,然后传递给业务线程进行消费,消费者夹在中间,工具属性拉满。而消费者拉取消息的入口就在启动时初始化的 PullMessageService。

  • DefaultMQPushConsumer 启动初始化过程中会启动消息拉取服务 PullMessageService,该服务是一个循环线程服务, run() 方法不断执行从 PullRequestQueue 中获取一个消息拉取任务 PullRequest,然后根据该任务中的消费者组获取相应的 DefaultMQPushConsumer 实例,执行消息拉取任务。

  • PullRequest 是消息拉取任务,封装了哪个消费者组,待拉取的 MessageQueue,拉取之后消息存放在本地的 ProcessQueue,以及拉取偏移量等。PullRequestQueue 很明显就是存放 PullRequest 的队列,它是由 RebalanceService 维护,我们会本文后面详细分析 Rebalance 过程,这里先不展开了。

  • DefaultMQPushConsumer.pullMessage() 检查当前处理的队列是否被删除,服务器是否在运行中状态等。然后进行本地流控判断,如果本地缓存消息数量大于配置的最大拉取条数(默认为 1000,可以调整),或本地缓存消息字节数大于配置的最大缓存字节数,则延迟 50ms 再拉取。检查订阅关系是否为空,为空则延迟 50ms 再拉取。

  • 封装拉取回调函数 PullCallback,网络请求成功则回调 PullCallback.onSuccess,异常则会调用 PullCallback.onException。

  • 根据 brokerName 和 brokerId 查找 brokerAddr, 没找到则先从 NameServer 拉取路由信息,再重新获取 brokerAddr。构建拉取消息请求头 PullMessageRequestHeader(Topic、queueId、offset) 等,然后调用 pullMessageAsync() 将信息发送到服务器。

  • pullMessageAsync 将请求封装成 RemotintCommand,然后构建回调函数 InvokeCallback,远程请求返回会回调 InvokeCallback.operationComplete。

  • 然后就是我们已经很熟悉的基于 Netty 的网络请求过程,整个网络请求响应过程和生产者消息发送是一样的。Netty 的初始化同样是在 Consumer 启动流程中,在这里主要是获取或者创建一个 NettyChannel。先从 channelTables Map 本地缓存中,以 Broker Addr 为 key 获取 Channel,没有获取到则通过 Netty Bootstrap.connect( Broker Addr) 创建 Channel,并放入缓存。然后生成<opaque, ResponseFuture>的键值对放入 responseTable 缓存中。调用 channel.writeAndFlush() 将请求通过网络传输给指定 Broker。

  • 当客户端发送请求的时,NettyRemotingServer WorkerGroup 处理可读事件,调用 NettyServerHandler.channelRead0() 处理数据。接着调用链到 processRequestCommand 方法,这个方法主要是根据请求中的 RequestCode,从本地缓存 processorTable 中获取相应的 Processor 来执行后续逻辑。当前是拉取消息,故获取到的是 PullMessageProcessor。PullMessageProcessor 的具体处理过程我们稍后在分析,现在只需要知道它会调用 MessageStore.getMessage() 获取消息并返回给 Consumer。

  • 消息查询:结合 RocketMQ 消息存储 中介绍的存储结构,我们都知道了消息实际存储在 CommitLog 中,为了加速消息查询,维护了 ConsumeQueue 这个索引文件接下来我们就看看如何从这两个文件中找到想要的消息。Broker 会根据请求中的 Topic、queueId、offset 等信息找到待返回消息在 ConsumeQueue 中的记录,然后读取这些记录的物理偏移量,再根据物理偏移量从 CommitLog 总获取实际的消息,经过序列化等处理后返回给 Consumer。查询消息的过程可以分为以下几个步骤。

    拉取前校验,校验 DefaultMessageStore 服务是否已经关闭(正常关闭进程时会被关闭),校验 DefaultMessageStore 服务是否可读。

    findConsumeQueue 方法根据 Topic、queueId 查找 ConsumeQueue 索引映射文件。判断根据查找到的 ConsumeQueue 索引文件校验传入的待查询的位点值是否合理,只有待查询的消息 offset 大于当前 ConsumeQueue 文件 minOffset,且小于 maxOffset 才合理,否则重新计算下一次可以拉取的位点值。

    循环读取满足 maxMsgNums=32 条数的消息。循环从 ConsumeQueue 中依次读取消息物理位点、消息大小和 taghashCode。先做 Hash 过滤,再使用过滤后的消息物理偏移量消息大小 到 CommitLog 中查找消息体,并放入结果列表中。

    监控指标统计,返回拉取的消息结果。

到这里,消费者取得了阶段性的胜利,消费者已经将消息从 Broker 那儿拉回来了。真的是可喜可贺,举国欢庆,锣鼓喧天,鞭炮齐鸣……高兴的还是太早了,上有 Broker,下有业务线程,消费者要开启崎岖坎坷的回调之路了。(图还是上面那张,多了怕大伙记不住,咋们接着看)

  • 请求回调:回调流程也和生产者异步消息回调一样。NettyRemotingClient 处理可读事件,NettyClientHandler 处理返回结果,调用 ResponseFuture.executeInokeCallback,进而调用 InvokeCallback.operationComplete。返回成功则 pullCallback.onSuccess() ,异常则调用 pullCallback.onException(),我们假设返回成功。

    网络请求返回的是二进制数据,需要解码成消息列表填充 msgFoundList,并对消息进行消息过滤(TAG)模式。前面提过,消费者可以只订阅 Topic 下某些 Tag,所以在这里进行过滤。

    如果拉取到的消息列表为空,则将 pullRequest 重新放回 pullRequestQueue 队列中,那么 pullMessageService 就可以从队列中继续获取该 pullRequest 执行下一次拉取任务。

    将拉取到的消息存入 ProcessQueue。ProcessQueue 可以理解为 MessageQueue 在消费者端的本地缓存,拉取的消息会先缓存到 ProcessQueue。

    拉取到的消息提交到 ConsumeMessageService 中供业务方消费,并由 ConsumeMessageService 提交给业务线程进行业务消费逻辑处理。

    pullCallback.onSuccess 提交给 ConsumeMessageService,并把 pullRequest 重新放回 pullRequestQueue,就可以反返回了,pullMessageService 会从队列中继续获取该 pullRequest 执行下一次拉取任务。

  • 接力棒交给了 ConsumeMessageService,ConsumeMessageService 又是如何将消息提交给业务方消费的呢?ConsumeMessageService.submitConsumeRequest 先将拉取回来的消息封装到一个可执行对象 ConsumeRequest 中,然后将这个可执行对象提交到线程池。

    ConsumeMessageService 有两种:并行消费服务 ConsumeMessageConcurrentlyService 和 顺序消费服务 ConsumeMessageOrderlyService。ConsumeMessageConcurrentlyService 或者 ConsumeMessageOrderlyService 区别在于 ConsumeMessageOrderlyService 会将本地缓存的消息按照 MessageId 排序后返回给消费者线程。

  • 然后 ConsumeRequest 被线程池调度并执行其中的 run 方法,最关键的是在 run 方法中调用了业务注册的 MessageListener.consumeMessage 方法,这就是业务自己实现的消费逻辑。

  • 业务消费完之后需要进行消费结果处理。包含消费指标统计、消费重试处理和消费位点处理。我们先大致了解下,后面会详细分析具体的结果处理流程。

    统计消费成功和失败的 TPS

    消费重试处理:消费失败的发回 Broker。发回失败的将消费重试次数加 1,并重新提交给消费者;

    消费位点处理:根据消费结果更新消费位点记录。

上面我们已经了解了整个消息消费的主要流程,其实包含三个关键步骤:

  • pullMessageService 这个循环线程不断从 pullRequestQueue 获取消息拉取任务,构建拉取请求,通过 Netty 将消息拉取请求发送给 Broker。

  • 然后是远程请求返回,回调消费者 PullCallback,将消息缓存到本地 ProcessQueue,并提交给消费者服务。

  • 消费者服务将消息封装成可执行对象提交到线程池,线程池调度执行 ConsumeRequest.run 方法,并调用业务实现的 MessageListener.cunsumeMessage 方法,处理业务消费逻辑。最后,进行消费结果处理。

虽然我们知道了消息拉取及消费的主体流程,但是还有很多脏活累活要做,不然怎么是打工人呢,打工不易呀。维护 pullRequestQueue 的 RebalanceService 是怎么工作的?结果处理的时候消息重试是怎么重试的?消费进度怎么管理的?并发和顺序消息区别是什么?你这个年龄,你这个阶段,这些不搞清楚怎么睡得着觉?咱们不慌,其实主要内容都在上面了,再大的坑也只是坑。

负载均衡

在消息消费流程中,PullMessageService 需要从 pullRequestQueue 队列中获取消息拉取任务 pullRequest,而这个 pullRequest 是由消费者负载均衡服务 RebalanceService 创建的。消费者还要管负载均衡?消费者大呼:臣妾做不到呀!可打工人的命运就是如此呀,三此君不得不跟大家一起看看 RebalanceService 是如何工作的。

  • RebalanceService 也是一个循环线程,每 20s 执行一次。查找当前 clientId 对应的全部的消费者组,全部执行一次 Rebalance。每个消费者执行 Rebalance 先获取订阅的所有 Topic,在 Topic 维度进行 Rebalance,即调用 rebalanceByTopic。

  • rebalanceByTopic 是消费者重平衡实现的核心方法,如上图:

    首先从 rebalanceImpl 实例的本地缓存变量 topicSubscribeInfoTable 中获取该 Topic 主题下的消息消费队列集合 mqSet;

    根据 topic 和 consumerGroup 为参数调用 mQClientFactory.findConsumerIdList 方法获取该消费组下所有消费者 Id 列表 cidAll。

    先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。

  • 然后,调用 updateProcessQueueTableInRebalance() 方法,具体的做法是,

  • 将分配到的消息队列集合 mqSet 与 当前消费者正在处理的消息队列 processQueueTable 比对。例如,消费者 cid1 正在处理的是队列是 [q1,q5,q6],重新分配后 cid1 处理[q1,q2,q3],则 cid 需要新处理两个队列。

  • 如果有 MessageQueue 不再分配给当前的消费者消费,则设置 ProcessQueue.setDropped(true),表示放弃当前 MessageQueue 的 Pull 消息,

  • 如果有新增的 mq 分配给该消费者则创建对应的 ProcessQueue,创建对应的 pullRequest 加入到 pullRequestQueue 中。

消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。

延迟重试

现在,消息也从 Broker 拉取回来给到业务线程啦,而且还做了负载均衡。应该放打工人回去补个觉了吧。可现实又是一记响亮的耳光。消费者还得“值班”!业务线程消费完之后,还要处理消费结果,如果消费失败需要进行重试。消费者将消费失败的消息发送回 Broker,延迟一段时间进行重新消费。

具体来说,RocketMQ 会创建重试主题:%RETRY%+消费组名称,并将原有的 Topic 修改为重试 Topic。这样还不行,因为我们需要延迟一段时间重试,刚刚失败就立即重试没有意义。所以在发回重试消息的时候会设置一个延迟级别 delayLevel,Broker 就会把该重试消息放到延迟队列中。如果重试次数超过 15 次,消息进入死信队列,需要人工干预。

所以,消息重试机制其实涉及 4 种队列,消息原 Topic 的队列、消息重试队列以、延迟队列及 死信队列 DLQ。他们之间的关系是怎么样的呢?工作机制又是怎样的呢?

我们还是回到具体的流程中:

  • 消费者重试逻辑入口是在 ConsumeMessageConcurrentlyService#processConsumeResult 消费结果处理中,如果消费结果是 RECONSUME_LATER,会从上下文中获取并设置延迟级别 delayLevel,然后将消息发送回 Broker。

  • 通信层就是调用 RemoteClientNetty.invokeSync 方法 (RequestCode=CONSUMER_SEND_MSG_BACK) 将请求发送给 Broker,Broker 根据 RequestCode 找到 SendMessageProcessor,并调用 consumerSendMsgBack 方法进行后续处理。

  • 消息重试的关键点之一就在 consumerSendMsgBack 中:

    创建重试主题:%RETRY%+原消费组名称,消费者默认会订阅原主题及对应的重试主题,故消费者会消费对应的重试消息。

    根据物理偏移量从 commitlog 文件中获取消息,并将消息的原主题存入属性中,消费者就可以从消息属性中恢复原主题。

    重试次数超过 maxReconsumeTimes (default=15),再次改变主题为 DLQ(死信队列)。死信队列没什么可说的,就是一个特殊的队列,这个队列的消息不会被消费,需要人工干预。

    将重试消息存入到 CommitLog 及索引文件

  • 直到这里还只能实现重试,但是无法实现延迟重试,延迟需要借助延迟队列。延迟队列并不仅在消费重试的时候使用,我们也可以通过 Producer API 发送延迟消息。我们这里就通过消费重试的场景来了解延迟队列。接着刚刚的流程,重试消息构建好之后需要将消息存入 CommitLog。还记得我们在发回重试消息的时候设置了一个 delayLevel,这里就派上用场了。

  • 在存入 CommitLog 的时候会检查 delayLevel,如果 delayLevel>0 会再次改变消息主题为延迟主题 SCHEDULE_TOPIC_XXXX。

  • 只是改一个主题就能延迟了?当然还需要 Broker 提供一些机制为延迟队列保驾护航。

    默认创建 15 个定时任务,分别处理 15 个延迟队列,对应 15 个延迟级别,延迟队列的 queueId = delayLevel -1。默认延迟 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"。所以,知道了 delayLevel,我们就知道了这条消息在延迟主题中所属的队列,以及延迟的时间。

    根据 queueId 与延迟主题 SCHEDULE_TOPIC 查找 ConsumeQueue,获取每条消息的偏移量,大小及 TaghashCode

    根据消息物理偏移量与消息大小从 commitlog 文件中查找消息。清除消息的 delayLevel,并恢复消息原先的 Topic 及 queueId

    将消息再次存入到 Commitlog 及索引文件

  • 回到消息延迟重试逻辑中,消息主题从 Topic1-> %RETRY%+${group} -> SCHEDULE_TOPIC_XXXX,现在延迟队列恢复原主题,也就是从 SCHEDULE_TOPIC_XXXX 恢复到 %RETRY%+${group}。前面我们也说了,消费者会默认订阅对应的重试主题。

  • 那么接下来,消费者就会消费对应的重试主题。回到消息正常的消费拉取流程,在预处理重试消息队列步骤中:如果拉取的消息来自重试队列,则将 Topic 名重置为原来的 Topic 名。

位移管理

值班也值班完了,这回该结束了吧?No, No, No!不得发个值班报告给领导确认下?是的,消费者还得把他的消费进度告诉 Broker,也就是消费位移的管理。

消费结果处理中还需要重点关注 RocketMQ 消费进度的管理。我们每次之拉取一批消息,如果不知道消费进度,那我们怎么知道下一次从什么地方开始消费?如果消费者宕机了,重新启动应该从哪里开始消费?这些问题都依赖 RocketMQ 消费进度管理。

RocketMQ 的消费进度管理分为本地位移管理 LocalOffsetStore 和 远程位移管理 RemoteOffsetStore 两种方式。LocalOffsetStore 用于广播消费,RemoteOffsetStore 用于集群消费位移管理。这里我们主要分析 RemoteOffsetStore。

  • 先看上图中左下角 RemoteBrokerOffsetStore.updateOffset,是消费流程结果处理的时候调用 OffsetStore 来更新本地消费进度缓存,从图中可以看出,本地消费进度缓存 offsetTable 中存储的是<MessageQueue, minOffset>。为什么是最小的 offset?你想想 TCP 确认的时候是不是也是确认最小的 offset?

  • 消费结果处理更新完本地消费进度缓存后就返回了,接下来由消费者启动时创建的定时任务,每 10s 执行一次,将所有队列的消费进度同步给 Broker。Broker 收到请求之后,通过 RequestCode.UPDATE_CONSUMER_OFFSET 找到 ConsumerOffsetManager。执行 ConsumerOffsetManager.commitOffset 更新 Broker 消费位移。Broker 存储的消费进度表 offsetTable 是<topic@group, <queueId, minOffset>>,也就是每个消费者组每个队列的最小位移。

  • 以上就是远程位移管理的两要点,还需要补充的是除了定时任务同步所有消费队列位移,在消费者 shutdown 及 Broker 返回拉取位移非法时都会进行位移同步。

顺序消息

最后,三此君还是不得不跟大家加个餐呀,我们一直都在说并发消息,但是面试官要是问题顺序消息的原理,三此君要是没跟大家讲的话,没发跟大家交代呀。看在三此君那么负责的份上,是不是应该关注,是不是应该点赞,是不是应该转发,是不是应该收藏?

是不是听到这里心都凉了一半,怎么还有……大家不慌,顺序消息和并发消息大同小异,我们主要了解下怎么保证消息的顺序消费的。顺序消息,其实只能够保证一个消费队列的消息顺序,如果要保证全局有序,那么需要保证只有一个消费队列。

总结

本以为仅仅是 Broker 把消息推给消费者,消费者消费了就完事了。可是从消费者启动、消息拉取、消息消费、负载均衡、到结果处理中的消息重试和消费位移管理。消费者大冤种,纯纯打工人是石锤了。我们最后来总结一下:

  • 消费方式分为 Push 和 Pull,Pull 需要业务方自己调用 API 进行拉取,Push 是基于 Pull,由 RocketMQ 线程将消息拉回本地,再调用业务方实现的 MessageListener,将消息传递非业务方进行消费。

  • 消费者启动完成各种参数校验,和生产者一样的是都会实例化 MQClientInstance,并且启动消息拉取服务,消息重平衡服务。

  • 消息重平衡服务每 20s 执行一次,获取对应 Topic 所有的 MessageQueue 和 clientId 进行排序,然后按照分配策略(默认平均分配)进行重新分配,如果有新的分配的队列就生成 PullRequest 放入 pullRequestQueue 中。

  • 消息消费的入口就在消费者初始化是启动了消息拉取服务 PullMessageService,该服务会从 pullRequestQueue 获取消息拉取任务 pullRequest,从 Broker 拉取消息,并提交给 ConsumeMessageService。

  • ConsumeMessageService 将回调业务方实现的 MessageListener 进行消费,消费完成后执行消费结果处理。

  • 消费结果处理的重点在消息重试和位点管理。

  • 消息重试是在消费失败时,消息发送会 Broker,并设置延迟级别(随着重试次数层架,延迟级别也会增加),并将消息放入重试队列。因为设置了延迟级别,存入 CommitLog 的时候会替换原 Topic 替换为延迟主题 SCHEDULE_TOPIC。Broker 的有定时任务处理延迟队列,将到时间的延迟消息恢复主题,并放入原队列中。消费者订阅了重试队列,消费到时间被重投的队列。

  • 广播消费使用的本地位移管理,集群消费使用远程位移管理。ConsumeMessageService 消费完成后会用当前最小的偏移量更新本地消费位移缓存,offsetTable。消费者初始化的时候启动了定时任务,每 10s 将本地消费位移缓存同步给 Broker。

最后的最后,如果觉得本文对你有用的话,记得点赞、关注、转发、收藏,这将是对我最大的帮助,我们下期再见。

参考文献

  • RocketMQ 官方文档

  • RocketMQ 源码

  • 丁威, 周继锋. RocketMQ 技术内幕:RocketMQ 架构设计与实现原理. 机械工业出版社, 2019-01.

  • 李伟. RocketMQ 分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.

  • 杨开元. RocketMQ 实战与原理解析. 机械工业出版社, 2018-06.

发布于: 2022 年 08 月 14 日阅读数: 74
用户头像

三此君

关注

还未添加个人签名 2018.11.15 加入

程序员的自我救赎:编程知识、职场经验、程序人生、个人管理等。

评论

发布
暂无评论
一张图进阶 RocketMQ - 消费者这个大冤种_RocketMQ_三此君_InfoQ写作社区