写点什么

RocketMQ 顺序消息之消息消费

作者:周杰伦本人
  • 2022 年 8 月 12 日
    贵州
  • 本文字数:5032 字

    阅读完需:约 17 分钟

RocketMQ 顺序消息之消息消费

RocketMQ 按照消费模式来说可以分为集群消费和广播消费,广播消费指的是将每一条消息会被 ConsumerGroup 的每个 Consumer 消费,类似于计算机中的广播地址,广播给这个网段中的计算机,而集群消费是这一条消息只会被 ConsumerGroup 中的一个消费者进行消费


顺序消费的原理是同一消息队列只允许 Consumer 中的一个消费线程拉取消费。Consumer 中有个消费线程池,多个线程同时消费消息。在顺序消费的场景下消费线程请求到 Broker 时会先申请独占锁,获得锁请求允许消费


ConsumeMessageOrderlyService 就是对应的顺序消息的处理类,它使用定时线程池来消费消息,线程池中真正工作的线程是 ConsumeRequest:

消费工作线程 ConsumeRequest

class ConsumeRequest implements Runnable {    private final ProcessQueue processQueue;    private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) { this.processQueue = processQueue; this.messageQueue = messageQueue; }
public ProcessQueue getProcessQueue() { return processQueue; }
public MessageQueue getMessageQueue() { return messageQueue; }
@Override public void run() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; }
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; }
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; }
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && this.processQueue.isLockExpired()) { log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; }
long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; }
final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); // init the consume context type consumeMessageContext.setProps(new HashMap<String, String>()); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); }
long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; }
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); }
if (null == status || ConsumeOrderlyStatus.ROLLBACK == status || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); }
long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeOrderlyStatus.SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; }
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); }
if (null == status) { status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; }
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); }
ConsumeMessageOrderlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; }
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }
}
复制代码


我们看一下它的 run()方法做了什么:


  1. 判断消费模式是广播模式或者加锁了并且锁没有过期

  2. 这时候判断处理的队列的状态是否为 dropped,如果是就打印消息队列不能被消费日志,直接 return

  3. 当消息模式是集群模式时再次判断是否加锁,并且锁是否过期

  4. 如果可以消费了就讲消息放入线程池中

  5. 获取批量消息,遍历,判断是否有钩子函数,如果有执行钩子函数

  6. 然后加锁,调用 messageListener.consumeMessage()方法消费消息,最后解锁

  7. 根据返回的状态设置返回类型,执行处理结果 processConsumeResult()的方法

总结

这就是 RocketMQ 顺序消息之消息消费的内容,主要讲了消息队列中的工作线程 ConsumeRequest 的 run()的分析,大体逻辑就是进行批量处理消息,有钩子函数先执行钩子函数,然后消费前进行加锁,接着消费消息返回消息处理状态,最后解锁,根据返回的消息状态返回消费结果信息,顺序消息如果抛出异常的话会导致消息的阻塞,而相对应的并发消费由多个消费线程从消息队列中进行消费消息,消费速度比顺序消息要快,当前消费线程产生阻塞也不影响其他消费线程的消费。

发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2020.02.29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
RocketMQ顺序消息之消息消费_8月月更_周杰伦本人_InfoQ写作社区