精华推荐 | 【深入浅出 RocketMQ 原理及实战】「底层源码挖掘系列」透彻剖析贯穿 RocketMQ 的消费者端的运行核心的流程
RocketMQ 的消息模型
RocketMQ 的基础消息模型是发布-订阅(Pub/Sub)是一种消息范式,消息的发送者(称为发布者、生产者、Producer)会将消息直接发送给特定的接收者(称为订阅者、消费者、Comsumer),如下图所示。
消息通过生产者发送到某一个 Topic,如果需要订阅该 Topic 并消费里面的消息的话,就要创建对应的消费者进行消费,而本文主要会进行介绍对应的消息队列的消费者。
本文主旨
本文主要会针对于 RocketMQ 的消费者 Consumer 的功能原理进行分析和介绍,消费者主要会通过以推(push),拉(pull)两种模式对消息进行消费。同时也支持集群方式和广播方式的消费。提供实时消息订阅机制,可以满足大多数用户的需求。
RocketMQ 提供 Push 模式也提供了 Pull 模式
MQ 的消费模式可以大致分为两种,一种是推 Push,一种是拉 Pull。
Push 模式处理消费消费
Push 是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
DefaultMQPushConsumer 的使用和初始化
Push 模式主要通过初始化 DefaultMQPushConsumer 对象进行消费数据信息,案例代码如下所示。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer
consumer.start();
复制代码
消费的位点配置
消费端的消费的位点计算值,可以在启动前进行配置,主要方法可以通过下面代码进行配置。
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
复制代码
CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费。
注意:第一次启动是指从来没有消费过消息的消费者,如果该消费者消费过,那么会在 broker 端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始。
消费的模式的配置
消费模式主要分为:集群消费(Clustering)和广播消费(Broadcasting)这两种。
consumer.setMessageModel(MessageModel.BROADCASTING);
复制代码
CLUSTERING:默认模式,同一个 ConsumerGroup,每个 consumer 只消费所订阅消息的一部分内容,同一个 ConsumerGroup 里所有的 Consumer 消息加起来才是所订阅 topic 整体,从而达到负载均衡的目的。
BROADCASTING:同一个 ConsumerGroup 每个 consumer 都消费到所订阅 topic 所有消息,也就是一个消费会被多次分发,被多个 consumer 消费。
DefaultMQPushConsumer 的运行原理和流程
DefaultMQPushConsumerImpl 中各个对象的主要功能如下:
平衡和分配队列组件实现类-RebalancePushImpl
RebalancePushImpl:主要负责进行分配对应当前服务实例的消费者会从当前消费的 topic 中的那个 Queue 中进行消费消息;此外当消费者宕机或者下线的时候,还会执行 rebalance 再次平衡和分配给其他消费者对应的队列控制。
长连接进行拉去消息组件实现类-PullAPIWrapper
PullAPIWrapper:主要与 broker 服务端建立长连接,一直进行定时从 broker 服务端处拉取消息数据,默认为:32 条消息,之后还会调用 ConsumeMessageService 实现类,进行用户注册的 Listener 执行消息消费逻辑。
看一下 consumer.registerMessageListener 的源码,如下所示。
/**
* Register a callback to execute on message arrival for concurrent consuming.
* @param messageListener message handling callback.
*/
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
复制代码
回调用户的注册的 MessageListener 组件实现类-ConsumeMessageService
ConsumeMessageService:实现所谓的"Push-被动"消费机制;从 Broker 拉取的消息后,封装成 ConsumeRequest 提交给 ConsumeMessageSerivce,此 service 负责回调用户的 Listener 消费消息。
存储 Offset 的消费记录的位移组件实现类--OffsetStore
OffsetStore:维护当前 consumer 的消费记录(offset);有两种实现,Local 和 Rmote,Local 存储在本地磁盘上,适用于 BROADCASTING 广播消费模式;而 Remote 则将消费进度存储在 Broker 上,适用于 CLUSTERING 集群消费模式;
综合门面功能接口供各个 Service 组件实现类--MQClientFactory
MQClientFactory:负责管理 client(consumer、producer),并提供多中功能接口供各个 Service(Rebalance、PullMessage 等)调用;大部分逻辑均在这个类中完成,总体流程架构如下图所示。
DefaultMQPushConsumerImpl 的 start 方法的源码
我们先来看一下对应的 DefaultMQPushConsumerImpl 类的 start 方法源码,源码可以看出主要实现过程在 consumer.start 后调用 DefaultMQPushConsumerImpl 的同步 start 方法,如下所示。
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
复制代码
DefaultMQPushConsumerImpl 的 start 方法的源码
主要我们能关注重点的代码和组件的 start,通过 mQClientFactory.start();发我们发现他调用了很多组件的 start 方法:
this.mQClientAPIImpl.start():主要用于开启请求-响应的网络通道对象。
this.startScheduledTask():主要开启多个定时任务的功能
this.pullMessageService.start():主要开启拉取数据的业务组件。
this.rebalanceService.start():主要开启 rebalance 业务服务组件。
this.defaultMQProducer.getDefaultMQProducerImpl().start(false):开启 push 服务的对象组件作为门面。
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
复制代码
重点我们先来主要看 pullMessageService.start(),通过这里我们发现 RocketMQ 的 Push 模式底层其实也是通过 pull 实现的,接下来我们先来分析一下 pullMessageService 中的 pullMessage 方法的源码。
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
复制代码
DefaultMQPushConsumerImpl 的 pullMessage 方法的源码
源码中需要进行根据消费组进行筛选对应的消费组,以方便选对应的消费组件 DefaultMQPushConsumerImpl,如下图所示。
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
复制代码
最后还是通过 DefaultMQPushConsumerImpl 类的 pullMessage 方法来进行消息的逻辑处理,
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
复制代码
DefaultMQPushConsumerImpl 的逻辑限流控制流程
总结一下针对于限流的总体流程控制:
首先拉去消息数据的时候会先去判断对应的 ProcessQueue 的对象元素是否还存在订阅关系或者被删除了,从而进行跳过那些不应该被消费的数据。
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
复制代码
上面的逻辑是先会判断和校验 PullRequest 对象中的 ProcessQueue 对象的 dropped 是否为 true(在 RebalanceService 线程中为 topic 下的 MessageQueue 创建拉取消息请求时要维护对应的 ProcessQueue 对象,若 Consumer 不再订阅该 topic 则会将该对象的 dropped 置为 true);若是则认为该请求是已经取消的,则直接跳出该方法。
更新 PullRequest 对象中的 ProcessQueue 对象的时间戳(ProcessQueue.lastPullTimestamp)为当前时间戳。此外会判断当前的 Consumer 消费者组件是否运行中,主要是通过 DefaultMQPushConsumerImpl.serviceState 是否为 RUNNING。
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
复制代码
如果运行状态或者是暂停状态 this.isPause()=false(DefaultMQPushConsumerImpl.pause=true),则会进行执行PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)
方法延迟再拉取消息,其中 timeDelay=3000;
该方法的目的是在 3 秒之后再次将该 PullRequest 对象放入 PullMessageService. pullRequestQueue 队列中;并跳出该方法
主要进行消费者端进行速度和控制消费速度的流控。若 ProcessQueue 对象的 msgCount 大于了消费端的流控阈值,默认值为 1000,主要通过DefaultMQPushConsumer.pullThresholdForQueue
的执行进行判断。当调用的processQueue.getMsgCount().get()
的数值大于DefaultMQPushConsumer.pullThresholdForQueue
的值时候会进行 PullMessageService.executePullRequestLater 方法。
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
复制代码
主要进行消费者端进行速度和控制消费速度的流控。主要会通过 this.defaultMQPushConsumer.getPullThresholdSizeForQueue()
与进行计算消息的内存空间的总大小进行对比,单位是 M,当大于系统定义的 this.defaultMQPushConsumer.getPullThresholdSizeForQueue()
的阈值大小的时候,则会进行限流处理。
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
复制代码
以上 3 和 4 步骤中的(DELAY_MILLS_WHEN_FLOW_CONTROL)50 毫秒之后,才会将该 PullRequest 请求放入 PullMessageService.pullRequestQueue 队列中。从而实现看限流的能力。
当上面的直接限流之后,还会有跨度限流的控制,首先系统还会判断当前的消费方式是否顺序消费(即 DefaultMQPushConsumerImpl.consumeOrderly 等于 false)。
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
}
复制代码
则检查 ProcessQueue 对象的msgTreeMap:TreeMap<Long,MessageExt>
变量的第一个 key 值与最后一个 key 值之间的的差值,该 key 值表示查询的队列偏移量 queueoffset;
若 queueoffset 差值大于阈值(DefaultMQPushConsumer. consumeConcurrentlyMaxSpan 指定,默认是 2000),则调用 PullMessageService.executePullRequestLater 方法,在 50 毫秒之后再该 PullRequest 请求放入 PullMessageService.pullRequestQueue 队列中。
PullRequest.messageQueue 对象的 topic 值为参数 RebalanceImpl.subscriptionInner,ConcurrentHashMap, SubscriptionData>中获取对应的 SubscriptionData 对象,若该对象为 null,考虑到并发的关系,调用 executePullRequestLater 方法。
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
复制代码
DefaultMQPushConsumerImpl 的消费模式分类
如果当前的消费者属于集群模式(RebalanceImpl.messageModel 等于 CLUSTERING)。
PullRequest 对象的 MessageQueue 变量值,type =READ_FROM_MEMORY(从内存中获取消费进度 offset 值)为参数调用 DefaultMQPushConsumerImpl 的 offsetStore 对象。
实际代表着 RemoteBrokerOffsetStore 对象的 readOffset(MessageQueue mq, ReadOffsetType type)方法从本地内存中获取消费进度 offset 值。
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
复制代码
若该 offset 值大于 0,则置临时变量 commitOffsetEnable 等于 true 否则为 false;该 offset 值作为 pullKernelImpl 方法中的 commitOffset 参数,在 Broker 端拉取消息之后根据 commitOffsetEnable 参数值决定是否用该 offset 更新消息进度。
该 readOffset 方法的逻辑是:以入参 MessageQueue 对象从 RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap <MessageQueue,AtomicLong>变量中获取消费进度偏移量;若该偏移量不为 null 则返回该值,否则返回-1;
DefaultMQPushConsumerImpl 的订阅模型
当每次拉取消息之后需要更新订阅关系(由 DefaultMQPushConsumer. postSubscriptionWhenPull 参数表示,默认为 false)并且以 topic 值参数从 RebalanceImpl.subscriptionInner 获取的 SubscriptionData 对象的 classFilterMode 等于 false(默认为 false),则将 sysFlag 标记的第 3 个字节置为 1,否则该字节置为 0;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
复制代码
该 sysFlag 标记的第 1 个字节置为 commitOffsetEnable 的值;第 2 个字节(suspend 标记)置为 1;第 4 个字节置为 classFilterMode 的值。
DefaultMQPushConsumerImpl 的底层客户端如何拉取消息的通信方法
调用底层的拉取消息 API 接口,方法进行消息拉取操作。
PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)
复制代码
内部会存在将回调类 PullCallback 传入该方法中,当采用异步方式拉取消息时,在收到响应之后会回调该回调类的方法。
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
// 消息的通信方式为异步
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
复制代码
初始化匿名内部类 PullCallback,实现了 onSuccess/onException 方法; 该方法只有在异步请求的情况下才会回调;
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
复制代码
主要针对于拉去底层消息的状态进行分状态处理,针对于 this.pullAPIWrapper.pullKernelImpl 的方法,我会在【下篇】进行介绍,此处不做讲述分析。
此外针对于 PullStatus 状态的分析-FOUND 状态的处理,主要更新本地的 offset 值,以及流程控制等。如下所示。
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
复制代码
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
复制代码
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
复制代码
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
复制代码
评论