消息疯狂堆积!RocketMQ 出 Bug 了?,rabbitmq 分布式事务原理
肥壕心里“万只马崩腾~” 第一反应是:“怎么肥事?刚下班就来搞事情???”
于是乎赶回公司赶紧打开电脑,登上 RocketMQ 后台查看(公司自己搭建的开源版 RocketMQ)
[](
)握草 (?`?Д?′)!!! 竟然堆积了 3 亿多条消息了???
=================================================================================================
要知道出现消息堆积无在乎这个问题:
[](
)生产者的生产速度 >> 消费者的处理速度
=======================================================================================
生产者的生产速度骤增,比如生产者的流量突然骤增
消费速度变慢,比如消费者实例 IO 阻塞严重或者宕机
擦了一下头上的冷汗??…赶紧登上消费者服务器瞧瞧。
应用运行正常!服务器磁盘 IO 正常!网络正常!
再去上去生产者的服务器,咦…流量也很正常!
什么???佛了?? …生产者和消费者的应用都很正常,但是为什么消息会堆积怎么多呢?看着这堆积的数量越堆越多(要是这是我头发的数量那该多好啊),越发着急。
虽然说 RocketMQ 版能支持 10 亿级别的消息堆积,不会因为消息堆积导致性能明显下降,??但是这堆积量很明显就是一个异常情况。
[](
)RocketMQ 有 BUG,没错这肯定是 RocketMQ 的锅!
=====================================================================================================
本篇完…
哈哈言归正传,虽然拼爹不行,但至少不能坑爹??
进入消费者的工程查看一下日志,emmm…没有发现报错,没有错误日志…看起来好像一切都很正常。
咦…不过这个消费的速度是不是有点慢???这不科学啊,消费者可是配置了 3 个结点的消费集群啊,按业务的需求量来说消费能力可是杠杠的呀。我再点开这个 TOPIC 的消费者信息
[](
)咦,这三个消费者的 ClientId 怎么会是一样呀?
==============================================================================================
以多年采坑经验的直接告诉我 “难道是因为 ClientId 的相同的问题,导致 broker 在分发消息的时候出现混乱,从而导致消息不能正常推送给消费者?” 因为生产者和消费者都表现正常,所以我猜测问题可能在于 Broker 这一块上。
基于这个推测,那么我们就需要解决这几个问题:
部署在不同的服务器上的两个消费者,为什么 ClientId 是相同的呢?
ClientId 相同,会导致 broker 消息分发错误吗?
[](
)问题分析
=======================================================================
为什么 ClientId 相同呢?我推测是因为 Docker 容器的问题。因为公司最近开始容器化阶段,而刚好消费者的项目也在第一批容器化阶段的列表上。
有了解过 Docker 的小伙伴都知道,当 Docker 进程启动时,会在主机上创建一个名为 docker0 的虚拟网桥。宿主机上的 Docker 容器会连接到这个虚拟网桥上。虚拟网桥的工作方式和物理交换机类似,这样主机上的所有容器就通过交换机连在了一个二层网络中。而 Docker 的网络模式一般有四种:
Host 模式
Container 模式
None 模式
Bridge 模式
对这几个模式不清楚的同学自行找度娘
[](
)我们容器都是采用 Host 模式,所以容器的网络跟宿主机是完全一致的。
======================================================================================================
可以看到,这里第一个就是 docker0 网卡,?默认的 ip 都是 172.17.0.1?。所以显而易见,ClientId 应该读取的都是 docker0 网卡的 IP,这就是能解释为什么多个消费端的 ClientId 都一致的问题了。
那么接下来就是 clientId 的究竟是在哪里设置呢?机智的我在 Github 的 Issues 搜索关键词 “Docker”,啪啦啪啦一搜,果然!还是有不少踩过此坑的志同道合之士,筛选了一番,找到一个比较靠谱的?open issue
可以看到,这个兄弟跟我的遇到的情况是一毛一样的,而他的结论跟我上面的推测也是大致相同(此时内心洋洋得意一番),他这里还提到 clientId 是在 ClientConfig 类中 buildMQClientId 方法中定义的。
[](
)源码探索
=======================================================================
进入 ClientConfig 类,定位到 buildMQClientId 方法
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
复制代码
通过这个相信大家都可以看出 clientId 的生成规则吧,就是 消费者客户端的 IP + “@”+ 实例名称,很明显问题就出在获取客户端 IP 上。
我们再继续看一下它究竟是如何获取客户端 IP 的
public class ClientConfig {
...
private String clientIP = RemotingUtil.getLocalAddress();
...
}
public static String getLocalAddress() {
try {
// Traversal Network interface to get the first non-loopback and non-private address
Enumeration<NetworkInterface>
enumeration = NetworkInterface.getNetworkInterfaces();
ArrayList<String> ipv4Result = new ArrayList<String>();
ArrayList<String> ipv6Result = new ArrayList<String>();
while (enumeration.hasMoreElements()) {
final NetworkInterface networkInterface = enumeration.nextElement();
final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
while (en.hasMoreElements()) {
final InetAddress address = en.nextElement();
if (!address.isLoopbackAddress()) {
if (address instanceof Inet6Address) {
ipv6Result.add(normalizeHostAddress(address));
} else {
ipv4Result.add(normalizeHostAddress(address));
}
}
}
}
// prefer ipv4
if (!ipv4Result.isEmpty()) {
for (String ip : ipv4Result) {
if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
continue;
}
return ip;
}
return ipv4Result.get(ipv4Result.size() - 1);
} else if (!ipv6Result.isEmpty()) {
return ipv6Result.get(0);
}
//If failed to find,fall back to localhost
final InetAddress localHost = InetAddress.getLocalHost();
return normalizeHostAddress(localHost);
} catch (Exception e) {
log.error("Failed to obtain local address", e);
}
return null;
}
复制代码
如果有操作过获取当前机器的 IP 的小伙伴,应该对 RemotingUtil.getLocalAddress() 这个工具方法并不陌生~
简单说就是获取当前机器网卡 IP,但是由于容器的网络模式采用的是 host 模式,也就意味着各个容器和宿主机都是处于同一个网络下,所以容器中我们也可以看到 Docker - Server 所创建的 docker 0 网卡,所以它读取的也就是 docker 0 网卡所默认的 IP 地址 172.17.0.1
(跟运维同学沟通了一下,目前由于是容器化的第一阶段,所以先采用简单模式部署,后面会慢慢替换成 k8s,每个 pod 都有自己的独立 IP ,到时网络会与宿主机和其他 pod 的相互隔离。emmm…k8s !听起来牛逼哄哄,恰好最近也在看这方面的书)
**这时候聪明的你可能会问 “不是还有一个实例名称的参数呢,这个又怎么会相同呢?” ** 别着急,我们继续往下看??
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
复制代码
getInstanceName() 方法其实直接获取 instanceName 这个参数值,但是这个参数值是什么时候赋值进去的呢?没错就是通过 changeInstanceNameToPID() 这个方法赋值的,在 consumer 在 start 的时候会调用此方法。
这个参数的逻辑很简单,在初始化的时候首先会获取环境变量 rocketmq.client.name 是否有值,如果没有就是用默认值 DEFAULT 。
然后 consumer 启动的时候会判断这参数值是否为 DEFAULT ,如果是的话就调用 UtilAll.getPid() 。
public static int getPid() {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName(); // format: "pid@hostname"
try {
return Integer.parseInt(name.substring(0, name.indexOf('@')));
} catch (Exception e) {
return -1;
}
}
复制代码
通过方法名字我们就可以很清楚知道,这个方法其实获取进程号的。那…为什么获取的进程号都是一致的呢?
聪明的你可能已经知道答案了对吧 !这里就不得不提?Docker 的 三大特性
cgroup
namespace
unionFS
没错,这里用的就是 namespace 技术啦。
Linux Namespace 是 Linux 内核提供的一个功能,可以实现系统资源的隔离,如:PID、User ID、Network 等。
由于都是使用相同的基础镜像,在最外层都是运行同样的 JAVA 工程,所以我们可以进去容器里面看,他们的进程号都是为 9
经过肥壕的一系列巧妙的推理和论证, 在 Docker 容器 HOST 网络模式下, 会生成相同的 clientId !
到这里为止,我们算是解决了上文推测的第一个问题!
紧跟柯南的步伐,我们继续推理第二个问题:?clientId 相同导致 Broker 分发消息错误?
Consumer 在负载均衡的时候应该是根据 clientId 作为客户端消费者的唯一标识,在消息下发的时候由于 clientId 的一致,导致负载分发错误。
那么我们下面就要去探究一下 Consumer 的负载均衡究竟是如何实现的。一开始我以为消费端的负载均衡都是在 Broker 处理的,由 Broker 根据注册地 Consumer 把不同的 Queue 分配给不同的 Consumer。但是去看了一下源码上的 doc 描述文档和对源码进行一番的研究后,结果发现自己见识还是太少了(哈哈哈,应该有小伙伴跟我开始的想法是一样的吧)
先来补充一下?RocketMQ 的整体架构
由于篇幅问题,这里我只讲解一下 Broker 和 consumer 之间的关系,其他的角色如果有不懂的可以看一下我之前写的 RocketMQ 介绍篇的文章
Consumer 与 NameServer 集群中的其中一个节点(随机选择)?建立长连接,定期从 NameServer 获取 Topic 路由信息。
根据获取 Topic 路由信息 与 Broker 建立长连接,且?定时向 Broker 发送心跳。
Broker 接收心跳消息的时候,会把 Consumer 的信息保存到本地缓存变量?consumerTable?。上图大致讲解了一下 consumerTable 的存储结构和内容,最主要的是它缓存了每个 consumer 的 clientId。
关于 Consumer 的消费模式,我直接引用源码的解释
在 RocketMQ 中,Consumer 端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在 Push 模式只是对 Pull 模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。
在两种基于拉模式的消费方式(Push/Pull)中,均需要 Consumer 端在知道从 Broker 端的哪一个消息队列—队列中去获取消息。因此,有必要在 Consumer 端来做负载均衡,即 Broker 其中多个 MessageQueue 分配给同一个 ConsumerGroup 中的哪些 Consumer 消费。
[](
)所以简单来说,不管是 Push 还是 Pull 模式,消息消费的控制权在 Consumer 上,所以 Consumer 的负载均衡实现是在 Consumer 的 Client 端上。
==============================================================================================================================================================
通过查看源码可以发现, RebalanceService 会完成负载均衡服务线程(每隔 20s 执行一次),RebalanceService 线程的 run() 方法最终调用的是 RebalanceImpl 类的 rebalanceByTopic() 方法,该方法是实现 Consumer 端负载均衡的核心。这里, rebalanceByTopic() 方法会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。这里主要来看下集群模式下的主要处理流程:
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
..... // 省略
评论