问题
BUG 分析
client 端和 broker 端逻辑不一致
RocketMQ 对消息大小有限制,支持用户配置,并且在消息体超过一定大小(默认 4KB)时进行压缩发送
client 端对消息大小做检查
client 判断消息体
大小进行检查,见 org.apache.rocketmq.client.Validators#checkMessage
client 发送消息体压缩(消息体大小校验之后)org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
客户端大小校验默认阈值 4MB
broker 端对消息大小做检查
broker 对消息大小进行检查,包括 topic、properties、ip、消息体等信息,见 org.apache.rocketmq.store.dledger.DLedgerCommitLog.MessageSerializer#serialize
其中消息体为客户端发送的数据(有可能是压缩之后的)
事务消息处理时,boker 额外添加了 properties。见 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
现有配置
client 端未添加阈值
broker 端运维添加了 64KB 的限制
导致可以发送超过 64KB 的消息到 broker
org.apache.rocketmq.client.Validators#checkMessage 客户端消息体校验
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
复制代码
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl 客户端消息压缩
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
复制代码
org.apache.rocketmq.store.CommitLog#calMsgLength broker 消息体长度计算
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
final int msgLen = 4 //TOTALSIZE
+ 4 //MAGICCODE
+ 4 //BODYCRC
+ 4 //QUEUEID
+ 4 //FLAG
+ 8 //QUEUEOFFSET
+ 8 //PHYSICALOFFSET
+ 4 //SYSFLAG
+ 8 //BORNTIMESTAMP
+ bornhostLength //BORNHOST
+ 8 //STORETIMESTAMP
+ storehostAddressLength //STOREHOSTADDRESS
+ 4 //RECONSUMETIMES
+ 8 //Prepared Transaction Offset
+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
+ 1 + topicLength //TOPIC
+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
+ 0;
return msgLen;
}
复制代码
org.apache.rocketmq.store.dledger.DLedgerCommitLog.MessageSerializer#serialize broker 消息大小检查
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
DLedgerCommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new EncodeResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, null, key);
}
复制代码
broker 对事务消息的处理
org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage 方法接收 client 消息发送请求
消息落盘时对消息大小进行校验,大小通过了限制,落盘成功
事务消息回查时调用 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check
调用 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#putBackToHalfQueueReturnResult
其中调用 org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#renewHalfMessageInner 方法对消息做了处理,额外添加了 msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags()));
调用公共落盘方法,校验消息大小时未通过
消息回查方法通过 putBackToHalfQueueReturnResult 结果进行判断一直为 false 造成了死循环
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
复制代码
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#putBackToHalfQueueReturnResult 方法异常未处理,导致 putMessageResult 为 null
private PutMessageResult putBackToHalfQueueReturnResult(MessageExt messageExt) {
PutMessageResult putMessageResult = null;
try {
MessageExtBrokerInner msgInner = transactionalMessageBridge.renewHalfMessageInner(messageExt);
putMessageResult = transactionalMessageBridge.putMessageReturnResult(msgInner);
} catch (Exception e) {
log.warn("PutBackToHalfQueueReturnResult error", e);
}
return putMessageResult;
}
复制代码
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#renewHalfMessageInner
public MessageExtBrokerInner renewHalfMessageInner(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(msgExt.getTopic());
msgInner.setBody(msgExt.getBody());
msgInner.setQueueId(msgExt.getQueueId());
msgInner.setMsgId(msgExt.getMsgId());
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setTags(msgExt.getTags());
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags()));
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setWaitStoreMsgOK(false);
return msgInner;
}
复制代码
追踪日志
"PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, " + "msgId: {}",
方案
方案一
发送消息时,检查消息体大小,当大于 60KB(待确认) 时不允许发送,从而避免消息服务端触发 bug。
优点
大概率避免 server 端触发 bug
调整较小,引入新问题概率小
缺点
方案二
发送消息时,检查消息体大小,当大于 60KB(待确认) 时
优点
缺点
评论