问题
BUG 分析
client 端和 broker 端逻辑不一致
RocketMQ 对消息大小有限制,支持用户配置,并且在消息体超过一定大小(默认 4KB)时进行压缩发送
client 端对消息大小做检查
client 判断消息体大小进行检查,见 org.apache.rocketmq.client.Validators#checkMessage
client 发送消息体压缩(消息体大小校验之后)org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
客户端大小校验默认阈值 4MB
broker 端对消息大小做检查
broker 对消息大小进行检查,包括 topic、properties、ip、消息体等信息,见 org.apache.rocketmq.store.dledger.DLedgerCommitLog.MessageSerializer#serialize
其中消息体为客户端发送的数据(有可能是压缩之后的)
事务消息处理时,boker 额外添加了 properties。见 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
现有配置
client 端未添加阈值
broker 端运维添加了 64KB 的限制
导致可以发送超过 64KB 的消息到 broker
org.apache.rocketmq.client.Validators#checkMessage 客户端消息体校验
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); }
复制代码
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl 客户端消息压缩
if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; }
复制代码
org.apache.rocketmq.store.CommitLog#calMsgLength broker 消息体长度计算
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) { int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20; int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20; final int msgLen = 4 //TOTALSIZE + 4 //MAGICCODE + 4 //BODYCRC + 4 //QUEUEID + 4 //FLAG + 8 //QUEUEOFFSET + 8 //PHYSICALOFFSET + 4 //SYSFLAG + 8 //BORNTIMESTAMP + bornhostLength //BORNHOST + 8 //STORETIMESTAMP + storehostAddressLength //STOREHOSTADDRESS + 4 //RECONSUMETIMES + 8 //Prepared Transaction Offset + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY + 1 + topicLength //TOPIC + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength + 0; return msgLen; }
复制代码
org.apache.rocketmq.store.dledger.DLedgerCommitLog.MessageSerializer#serialize broker 消息大小检查
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message if (msgLen > this.maxMessageSize) { DLedgerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new EncodeResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, null, key); }
复制代码
broker 对事务消息的处理
org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage 方法接收 client 消息发送请求
消息落盘时对消息大小进行校验,大小通过了限制,落盘成功
事务消息回查时调用 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check
调用 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#putBackToHalfQueueReturnResult
其中调用 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#renewHalfMessageInner 方法对消息做了处理,额外添加了 msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags()));
调用公共落盘方法,校验消息大小时未通过
消息回查方法通过 putBackToHalfQueueReturnResult 结果进行判断一直为 false 造成了死循环
if (isNeedCheck) { if (!putBackHalfMsgQueue(msgExt, i)) { continue; } listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; }
复制代码
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#putBackToHalfQueueReturnResult 方法异常未处理,导致 putMessageResult 为 null
private PutMessageResult putBackToHalfQueueReturnResult(MessageExt messageExt) { PutMessageResult putMessageResult = null; try { MessageExtBrokerInner msgInner = transactionalMessageBridge.renewHalfMessageInner(messageExt); putMessageResult = transactionalMessageBridge.putMessageReturnResult(msgInner); } catch (Exception e) { log.warn("PutBackToHalfQueueReturnResult error", e); } return putMessageResult; }
复制代码
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#renewHalfMessageInner
public MessageExtBrokerInner renewHalfMessageInner(MessageExt msgExt) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(msgExt.getTopic()); msgInner.setBody(msgExt.getBody()); msgInner.setQueueId(msgExt.getQueueId()); msgInner.setMsgId(msgExt.getMsgId()); msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setTags(msgExt.getTags()); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags())); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(msgExt.getStoreHost()); msgInner.setWaitStoreMsgOK(false); return msgInner; }
复制代码
追踪日志
"PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, " + "msgId: {}",
方案
方案一
发送消息时,检查消息体大小,当大于 60KB(待确认) 时不允许发送,从而避免消息服务端触发 bug。
优点
大概率避免 server 端触发 bug
调整较小,引入新问题概率小
缺点
方案二
发送消息时,检查消息体大小,当大于 60KB(待确认) 时
优点
缺点
评论