写点什么

RocketMQ 事务消息导致罗盘次数激增 BUG 排查

作者:陈皮
  • 2023-07-20
    北京
  • 本文字数:3593 字

    阅读完需:约 12 分钟

问题

  • RocketMQ 4.7.1 版本,事务消息存在 bug。(新版本仍未解决 - 2023/07)

BUG 分析

client 端和 broker 端逻辑不一致

  • RocketMQ 对消息大小有限制,支持用户配置,并且在消息体超过一定大小(默认 4KB)时进行压缩发送

  • client 端对消息大小做检查

  • client 判断消息体大小进行检查,见 org.apache.rocketmq.client.Validators#checkMessage

  • client 发送消息体压缩(消息体大小校验之后)org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl

  • 客户端大小校验默认阈值 4MB

  • broker 端对消息大小做检查

  • broker 对消息大小进行检查,包括 topic、properties、ip、消息体等信息,见 org.apache.rocketmq.store.dledger.DLedgerCommitLog.MessageSerializer#serialize

  • 其中消息体为客户端发送的数据(有可能是压缩之后的)

  • 事务消息处理时,boker 额外添加了 properties。见 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner

  • 现有配置

  • client 端未添加阈值

  • broker 端运维添加了 64KB 的限制

  • 导致可以发送超过 64KB 的消息到 broker


org.apache.rocketmq.client.Validators#checkMessage 客户端消息体校验



if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); }
复制代码


org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl 客户端消息压缩



if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; }

复制代码


org.apache.rocketmq.store.CommitLog#calMsgLength broker 消息体长度计算


 protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {        int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;        int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;        final int msgLen = 4 //TOTALSIZE            + 4 //MAGICCODE            + 4 //BODYCRC            + 4 //QUEUEID            + 4 //FLAG            + 8 //QUEUEOFFSET            + 8 //PHYSICALOFFSET            + 4 //SYSFLAG            + 8 //BORNTIMESTAMP            + bornhostLength //BORNHOST            + 8 //STORETIMESTAMP            + storehostAddressLength //STOREHOSTADDRESS            + 4 //RECONSUMETIMES            + 8 //Prepared Transaction Offset            + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY            + 1 + topicLength //TOPIC            + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength            + 0;        return msgLen;    }
复制代码


org.apache.rocketmq.store.dledger.DLedgerCommitLog.MessageSerializer#serialize broker 消息大小检查



final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message if (msgLen > this.maxMessageSize) { DLedgerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new EncodeResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, null, key); }
复制代码

broker 对事务消息的处理

  • org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage 方法接收 client 消息发送请求

  • 消息落盘时对消息大小进行校验,大小通过了限制,落盘成功

  • 事务消息回查时调用 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check

  • 调用 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#putBackToHalfQueueReturnResult

  • 其中调用 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#renewHalfMessageInner 方法对消息做了处理,额外添加了 msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags()));

  • 调用公共落盘方法,校验消息大小时未通过

  • 消息回查方法通过 putBackToHalfQueueReturnResult 结果进行判断一直为 false 造成了死循环



if (isNeedCheck) { if (!putBackHalfMsgQueue(msgExt, i)) { continue; } listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; }
复制代码


org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#putBackToHalfQueueReturnResult 方法异常未处理,导致 putMessageResult 为 null


  private PutMessageResult putBackToHalfQueueReturnResult(MessageExt messageExt) {        PutMessageResult putMessageResult = null;        try {            MessageExtBrokerInner msgInner = transactionalMessageBridge.renewHalfMessageInner(messageExt);            putMessageResult = transactionalMessageBridge.putMessageReturnResult(msgInner);        } catch (Exception e) {            log.warn("PutBackToHalfQueueReturnResult error", e);        }        return putMessageResult;    }
复制代码


org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#renewHalfMessageInner



public MessageExtBrokerInner renewHalfMessageInner(MessageExt msgExt) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(msgExt.getTopic()); msgInner.setBody(msgExt.getBody()); msgInner.setQueueId(msgExt.getQueueId()); msgInner.setMsgId(msgExt.getMsgId()); msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setTags(msgExt.getTags()); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags())); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(msgExt.getStoreHost()); msgInner.setWaitStoreMsgOK(false); return msgInner; }
复制代码


追踪日志


"PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, " + "msgId: {}",

方案

  • 只有当消息体接近 64KB 时,有可能触发此 BUG。

  • 业务使用时尽量控制单消息大小

  • 精简自定义 MQ header 内容

  • 需要调用 WMS 公共类库封装方法

方案一

发送消息时,检查消息体大小,当大于 60KB(待确认) 时不允许发送,从而避免消息服务端触发 bug。

优点

  • 大概率避免 server 端触发 bug

  • 调整较小,引入新问题概率小

缺点

  • 触发时会导致当前业务无法正常执行

  • 60KB 大小并不能完全保证不会触发 BUG,具体大小需要调整确认

  • 当服务端调整了消息体大小限制时,需要同步调整 60KB 的大小

方案二

发送消息时,检查消息体大小,当大于 60KB(待确认) 时


  • 保存消息体到 MySQL 数据库

  • RocketMQ 消息只发送 header 相关内容

  • 消息消费时,对该类消息,从 MySQL 中读取消息体内容,消费完成后清除 MySQL 中对应的数据

优点

  • 大概率避免 server 端触发 bug,并能保证业务功能正常

  • 同时保证数据大于 64KB 时功能正常

缺点

  • 有一定的复杂度

  • 需要使用 MySQL TEXT、BLOG 等大字段

  • 需要控制 MySQL 全局事务

  • 当服务端调整了消息体大小限制时,需要同步调整 60KB 的大小

  • 保存其他 nosql 库更合适

用户头像

陈皮

关注

还未添加个人签名 2018-04-26 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ 事务消息导致罗盘次数激增 BUG 排查_RocketMQ_陈皮_InfoQ写作社区