RocketMQ msgId 与 offsetMsgId 释疑 (实战篇),腾讯技术官发布的“神仙文档”火爆网络
System.out.println("-------------------分割线-----------------");
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
执行效果如图所示:
![在这里插入图片描述](https://img-blog.csdnimg.cn/20200308202103968.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shado Java 开源项目【ali1024.coding.net/public/P7/Java/git】 w_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70)
不知道大家是否有注意到,调用 msgs.get(0).getMsgId()返回的 msgId 与直接输出 msgs 中的 msgId 不一样,那这又是为什么呢?答案在本文的第二部分有详细分析。
[](()2、消息 ID 释疑
从消息发送的结果可以得知,RocketMQ 发送的返回结果会返回 msgId 与 offsetMsgId,那这两个 msgId 分别是代表什么呢?
msgId:该 ID 是消息发送者在消息发送时会首先在客户端生成,全局唯一,在 RocketMQ 中该 ID 还有另外的一个叫法:uniqId,无不体现其全局唯一性。
offsetMsgId:消息偏移 ID,该 ID 记录了消息所在集群的物理地址,主要包含所存储 Broker 服务器的地址( IP 与端口号)以及所在 commitlog 文件的物理偏移量。
[](()2.1 msgId 即全局唯一 ID 构建规则
从这张图可以看出,msgId 确实是客户端生成的,接下来我们详细分析一下其生成算法。
MessageClientIDSetter#createUniqID
public static String createUniqID() {
StringBuilder sb = new StringBuilder(LEN * 2);
sb.append(FIX_STRING); // @1
sb.append(UtilAll.bytes2string(createUniqIDBuffer())); // @2
return sb.toString();
}
一个 uniqID 的构建主要分成两个部分:FIX_STRING 与唯一 ID 生成算法,顾名思义,FIX_STRING 就是一个客户端固定一个前缀,那接下来先看一下固定字符串的生成规则。
[](()2.1.1 FIX_STRING
MessageClientIDSetter 静态代码块
static {
byte[] ip;
try {
ip = UtilAll.getIP();
} catch (Exception e) {
ip = createFakeIP();
}
LEN = ip.length + 2 + 4 + 4 + 2;
ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
tempBuffer.position(0);
tempBuffer.put(ip);
tempBuffer.position(ip.length);
tempBuffer.putInt(UtilAll.getPid());
tempBuffer.position(ip.length + 2);
tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
FIX_STRIN 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》开源 G = UtilAll.bytes2string(tempBuffer.array());
setStartTime(System.currentTimeMillis());
COUNTER = new AtomicInteger(0);
}
从这里可以看出 FIX_STRING 的主要由:客户端的 IP、进程 ID、加载 MessageClientIDSetter 的类加载器的 hashcode。
[](()2.1.2 唯一性算法
msgId 的唯一性算法由 MessageClientIDSetter 的 createUniqIDBuffer 方法实现。
private static byte[] createUniqIDBuffer() {
ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
long current = System.currentTimeMillis();
if (current >= nextStartTime) {
setStartTime(current);
}
buffer.position(0);
buffer.putInt((int) (System.currentTimeMillis() - startTime));
buffer.putShort((short) COUNTER.getAndIncrement());
return buffer.array();
}
可以得出 msgId 的后半段主要由:当前时间与系统启动时间的差值,以及自增序号。
[](()2.2 offsetMsgId 构建规则
在消息 Broker 服务端将消息追加到内存后会返回其物理偏移量,即在 commitlog 文件中的文件,然后会再次生成一个 id,代码中虽然也叫 msgId,其实这里就是我们常说的 offsetMsgId,即记录了消息的物理偏移量,故我们重点来看一下其具体生成规则:
MessageDecoder#createMessageId
public static String createMessageId(final ByteBuffer input ,
final ByteBuffer addr, final long offset) {
input.flip();
int msgIDLength = addr.limit() == 8 ? 16 : 28;
input.limit(msgIDLength);
input.put(addr);
input.putLong(offset);
return UtilAll.bytes2string(input.array());
}
首先结合该方法的调用上下文,先解释一下该方法三个入参的含义:
ByteBuffer input
用来存放 offsetMsgId 的字节缓存区( NIO 相关的基础知识)
ByteBuffer addr
当前 Broker 服务器的 IP 地址与端口号,即通过解析 offsetMsgId 从而得到消息服务器的地址信息。
long offset
消息的物理偏移量。
即构成 offsetMsgId 的组成部分:Broker 服务器的 IP 与端口号、消息的物理偏移量。
温馨提示:即在 RocketMQ 中,只需要提供 offsetMsgId,可用不必知道该消息所属的 topic 信息即可查询该条消息的内容。
最后
经过日积月累, 以下是小编归纳整理的深入了解 Java 虚拟机文档,希望可以帮助大家过关斩将顺利通过面试。由于整个文档比较全面,内容比较多,篇幅不允许,下面以截图方式展示 。
由于篇幅限制,文档的详解资料太全面,细节内容太多,所以只把部分知识点截图出来粗略的介绍,每个小节点里面都有更细化的内容!
评论