🏆【Alibaba 中间件技术系列】「RocketMQ 技术专题」让我们一起探索一下 DefaultMQPushConsumer 的实现原理及源码分析
RocketMQ 的前提回顾
RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:
能够保证严格的消息顺序
提供丰富的消息拉取模式
高效的订阅者水平扩展能力
实时的消息订阅机制
亿级消息堆积能力
为什么使用 RocketMQ
强调集群无单点,可扩展,任意一点高可用、水平可扩展
海量消息堆积能力,消息堆积后写入低延迟
支持上万个队列
消息失败重试机制
消息可查询
开源社区活跃
成熟度已经经过淘宝双十一的考验
RocketMQ 的发展变化
RocketMQ 开源是使用文件作为持久化工具,阿里内部未开源的性能会更高,使用 oceanBase 作为持久化工具。在 RocketMQ1.x 和 2.x 使用 zookeeper 管理集群,3.x 开始使用 nameserver 代替 zk,更轻量级,此外 RocketMQ 的客户端拥有两种的操作方式:DefaultMQPushConsumer 和 DefaultMQPullConsumer。
DefaultMQPushConsumer 的 Maven 配置
DefaultMQPushConsumer 使用示例
CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在 broker 端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始
CLUSTERING:默认模式,同一个 ConsumerGroup(groupName 相同)每个 consumer 只消费所订阅消息的一部分内容,同一个 ConsumerGroup 里所有的 Consumer 消息加起来才是所
订阅 topic 整体,从而达到负载均衡的目的
BROADCASTING:同一个 ConsumerGroup 每个 consumer 都消费到所订阅 topic 所有消息,也就是一个消费会被多次分发,被多个 consumer 消费。
ConsumeConcurrentlyStatus.RECONSUME_LATER boker 会根据设置的 messageDelayLevel 发起重试,默认 16 次。
DefaultMQPushConsumerImpl 中各个对象的主要功能如下:
RebalancePushImpl:主要负责决定,当前的 consumer 应该从哪些 Queue 中消费消息;
1)PullAPIWrapper:长连接,负责从 broker 处拉取消息,然后利用 ConsumeMessageService 回调用户的 Listener 执行消息消费逻辑;
2)ConsumeMessageService:实现所谓的"Push-被动"消费机制;从 Broker 拉取的消息后,封装成 ConsumeRequest 提交给 ConsumeMessageSerivce,此 service 负责回调用户的 Listener 消费消息;
3)OffsetStore:维护当前 consumer 的消费记录(offset);有两种实现,Local 和 Rmote,Local 存储在本地磁盘上,适用于 BROADCASTING 广播消费模式;而 Remote 则将消费进度存储在 Broker 上,适用于 CLUSTERING 集群消费模式;
4)MQClientFactory:负责管理 client(consumer、producer),并提供多中功能接口供各个 Service(Rebalance、PullMessage 等)调用;大部分逻辑均在这个类中完成;
consumer.registerMessageListener 执行过程:
通过源码可以看出主要实现过程在 DefaultMQPushConsumerImpl 类中 consumer.start 后调用 DefaultMQPushConsumerImpl 的同步 start 方法
通过 mQClientFactory.start();发我们发现他调用
在这个方法中有多个 start,我们主要看 pullMessageService.start();通过这里我们发现 RocketMQ 的 Push 模式底层其实也是通过 pull 实现的,下面我们来看下 pullMessageService 处理了哪些逻辑:
我们发现其实他还是通过 DefaultMQPushConsumerImpl 类的 pullMessage 方法来进行消息的逻辑处理.
pullRequest 拉取方式
PullRequest 这里说明一下,上面我们已经提了一下 rocketmq 的 push 模式其实是通过 pull 模式封装实现的,pullrequest 这里是通过长轮询的方式达到 push 效果。
长轮询方式既有 pull 的优点又有 push 模式的实时性有点。
push 方式是 server 端接收到消息后,主动把消息推送给 client 端,实时性高。弊端是 server 端工作量大,影响性能,其次是 client 端处理能力不同且 client 端的状态不受 server 端的控制,如果 client 端不能及时处理消息容易导致消息堆积已经影响正常业务等。
pull 方式是 client 循环从 server 端拉取消息,主动权在 client 端,自己处理完一个消息再去拉取下一个,缺点是循环的时间不好设定,时间太短容易忙等,浪费 CPU 资源,时间间隔太长 client 的处理能力会下降,有时候有些消息会处理不及时。
长轮询的方式可以结合两者优点
检查 PullRequest 对象中的 ProcessQueue 对象的 dropped 是否为 true(在 RebalanceService 线程中为 topic 下的 MessageQueue 创建拉取消息请求时要维护对应的 ProcessQueue 对象,若 Consumer 不再订阅该 topic 则会将该对象的 dropped 置为 true);若是则认为该请求是已经取消的,则直接跳出该方法;
更新 PullRequest 对象中的 ProcessQueue 对象的时间戳(ProcessQueue.lastPullTimestamp)为当前时间戳;
检查该 Consumer 是否运行中,即 DefaultMQPushConsumerImpl.serviceState 是否为 RUNNING;若不是运行状态或者是暂停状态(DefaultMQPushConsumerImpl.pause=true),则调用 PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)方法延迟再拉取消息,其中 timeDelay=3000;该方法的目的是在 3 秒之后再次将该 PullRequest 对象放入 PullMessageService. pullRequestQueue 队列中;并跳出该方法;
进行流控。若 ProcessQueue 对象的 msgCount 大于了消费端的流控阈值(DefaultMQPushConsumer.pullThresholdForQueue,默认值为 1000),则调用 PullMessageService.executePullRequestLater 方法,在 50 毫秒之后重新该 PullRequest 请求放入 PullMessageService.pullRequestQueue 队列中;并跳出该方法;
若不是顺序消费(即 DefaultMQPushConsumerImpl.consumeOrderly 等于 false),则检查 ProcessQueue 对象的 msgTreeMap:TreeMap<Long,MessageExt>变量的第一个 key 值与最后一个 key 值之间的差额,该 key 值表示查询的队列偏移量 queueoffset;若差额大于阈值(由 DefaultMQPushConsumer. consumeConcurrentlyMaxSpan 指定,默认是 2000),则调用 PullMessageService.executePullRequestLater 方法,在 50 毫秒之后重新将该 PullRequest 请求放入 PullMessageService.pullRequestQueue 队列中;并跳出该方法;
以 PullRequest.messageQueue 对象的 topic 值为参数从 RebalanceImpl.subscriptionInner: ConcurrentHashMap, SubscriptionData>中获取对应的 SubscriptionData 对象,若该对象为 null,考虑到并发的关系,调用 executePullRequestLater 方法,稍后重试;并跳出该方法;
若消息模型为集群模式(RebalanceImpl.messageModel 等于 CLUSTERING),则以 PullRequest 对象的 MessageQueue 变量值、type =READ_FROM_MEMORY(从内存中获取消费进度 offset 值)为参数调用 DefaultMQPushConsumerImpl. offsetStore 对象(初始化为 RemoteBrokerOffsetStore 对象)的 readOffset(MessageQueue mq, ReadOffsetType type)方法从本地内存中获取消费进度 offset 值。若该 offset 值大于 0 则置临时变量 commitOffsetEnable 等于 true 否则为 false;该 offset 值作为 pullKernelImpl 方法中的 commitOffset 参数,在 Broker 端拉取消息之后根据 commitOffsetEnable 参数值决定是否用该 offset 更新消息进度。该 readOffset 方法的逻辑是:以入参 MessageQueue 对象从 RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap <MessageQueue,AtomicLong>变量中获取消费进度偏移量;若该偏移量不为 null 则返回该值,否则返回-1;
当每次拉取消息之后需要更新订阅关系(由 DefaultMQPushConsumer. postSubscriptionWhenPull 参数表示,默认为 false)并且以 topic 值参数从 RebalanceImpl.subscriptionInner 获取的 SubscriptionData 对象的 classFilterMode 等于 false(默认为 false),则将 sysFlag 标记的第 3 个字节置为 1,否则该字节置为 0;
该 sysFlag 标记的第 1 个字节置为 commitOffsetEnable 的值;第 2 个字节(suspend 标记)置为 1;第 4 个字节置为 classFilterMode 的值;
初始化匿名内部类 PullCallback,实现了 onSucess/onException 方法; 该方法只有在异步请求的情况下才会回调;
调用底层的拉取消息 API 接口:
PullAPIWrapper.pullKernelImpl
PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法进行消息拉取操作。
将回调类 PullCallback 传入该方法中,当采用异步方式拉取消息时,在收到响应之后会回调该回调类的方法。
发送远程请求拉取消息
在 MQClientAPIImpl.pullMessage 方法中,根据入参 communicationMode 的值分为异步拉取和同步拉取方式两种。
无论是异步方式拉取还是同步方式拉取,在发送拉取请求之前都会构造一个 ResponseFuture 对象,以请求消息的序列号为 key 值,存入 NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>变量中,对该变量有几种情况会处理:
发送失败后直接删掉 responseTable 变量中的相应记录;
收到响应消息之后,会以响应消息中的序列号(由服务端根据请求消息的序列号原样返回)从 responseTable 中查找 ResponseFuture 对象,并设置该对象的 responseCommand 变量。若是同步发送会唤醒等待响应的 ResponseFuture.waitResponse 方法;若是异步发送会调用 ResponseFuture.executeInvokeCallback()方法完成回调逻辑处理;
在 NettyRemotingClient.start()启动时,也会初始化定时任务,该定时任务每隔 1 秒定期扫描 responseTable 列表,遍历该列表中的 ResponseFuture 对象,检查等待响应是否超时,若超时,则调用 ResponseFuture. executeInvokeCallback()方法,并将该对象从 responseTable 列表中删除;
同步拉取
对于同步发送方式,调用 MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法,大致步骤如下:
调用 RemotingClient.invokeSync(String addr, RemotingCommand request, long timeoutMillis)方法:
获取 Broker 地址的 Channel 信息。根据 broker 地址从 RemotingClient.channelTables:ConcurrentHashMap, ChannelWrapper>变量中获取 ChannelWrapper 对象并返回该对象的 Channel 变量;若没有 ChannelWrapper 对象则与 broker 地址建立新的连接并将连接信息存入 channelTables 变量中,便于下次使用;
若 NettyRemotingClient.rpcHook:RPCHook 变量不为空(该变量在应用层初始化 DefaultMQPushConsumer 或者 DefaultMQPullConsumer 对象传入该值),则调用 RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;
调用 NettyRemotingAbstract.invokeSyncImpl(Channel channel, RemotingCommand request, long timeoutMillis)方法,该方法的逻辑如下:
A)使用请求的序列号(opaue)、超时时间初始化 ResponseFuture 对象;并将该 ResponseFuture 对象存入 NettyRemotingAbstract.responseTable: ConcurrentHashMap 变量中;
B)调用 Channel.writeAndFlush(Object msg)方法将请求对象 RemotingCommand 发送给 Broker;然后调用 addListener(GenericFutureListener<? extends Future<? super Void>> listener)方法添加内部匿名类:该内部匿名类实现了 ChannelFutureListener 接口的 operationComplete 方法,在发送完成之后回调该监听类的 operationComplete 方法,在该方法中,首先调用 ChannelFuture. isSuccess()方法检查是否发送成功,若成功则置 ResponseFuture 对象的 sendRequestOK 等于 true 并退出此回调方法等待响应结果;若不成功则置 ResponseFuture 对象的 sendRequestOK 等于 false,然后从 NettyRemotingAbstract.responseTable 中删除此请求序列号(opaue)的记录,置 ResponseFuture 对象的 responseCommand 等于 null,并唤醒 ResponseFuture.waitResponse(long timeoutMillis)方法的等待;
C)调用 ResponseFuture.waitResponse(long timeoutMillis)方法等待响应结果;在发送失败或者收到响应消息(详见 5.10.3 小节)或者超时的情况下会唤醒该方法返回 ResponseFuture.responseCommand 变量值;
D)若上一步返回的 responseCommand 值为 null,则抛出异常:若 ResponseFuture.sendRequestOK 为 true,则抛出 RemotingTimeoutException 异常,否则抛出 RemotingSendRequestException 异常;
E)若上一步返回的 responseCommand 值不为 null,则返回 responseCommand 变量值;
若 NettyRemotingClient.rpcHook: RPCHook 变量不为空,则调用 RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法;
以上一步的返回值 RemotingCommand 对象为参数调用 MQClientAPIImpl. processPullResponse (RemotingCommand response)方法将返回对象解析并封装成 PullResultExt 对象然后返回给调用者,响应消息的结果状态转换如下:
若 RemotingCommand 对象的 Code 等于 SUCCESS,则 PullResultExt.pullStatus=FOUND;
若 RemotingCommand 对象的 Code 等于 PULL_NOT_FOUND,则 PullResultExt.pullStatus= NO_NEW_MSG;
若 RemotingCommand 对象的 Code 等于 PULL_RETRY_IMMEDIATELY,则 PullResultExt.pullStatus= NO_MATCHED_MSG;
若 RemotingCommand 对象的 Code 等于 PULL_OFFSET_MOVED,则 PullResultExt.pullStatus= OFFSET_ILLEGAL;
getMQClientAPIImpl().pullMessage 最终通过 channel 写入并刷新队列中。然后在消息服务端大体的处理逻辑是服务端收到新消息请求后,如果队列中没有消息不急于返回,通过一个循环状态,每次 waitForRunning 一段时间默认 5 秒,然后再 check,如果 broker 一直没有新新消息,第三次 check 的时间等到时间超过 SuspendMaxTimeMills 就返回空,如果在等待过程中收到了新消息直接调用 notifyMessageArriving 函数返回请求结果。“长轮询”的核心是,Broker 端 HOLD 住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给 Consumer 。长轮询的主动权掌握在 consumer 中,即使 broker 有大量的消息堆积也不会主动推送给 consumer。
版权声明: 本文为 InfoQ 作者【浩宇天尚】的原创文章。
原文链接:【http://xie.infoq.cn/article/12e44096267fb3ada433db45b】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论