写点什么

消息疯狂堆积!RocketMQ 出 Bug 了?,rabbitmq 分布式事务原理

用户头像
极客good
关注
发布于: 刚刚

肥壕心里“万只马崩腾~” 第一反应是:“怎么肥事?刚下班就来搞事情???”



于是乎赶回公司赶紧打开电脑,登上 RocketMQ 后台查看(公司自己搭建的开源版 RocketMQ)



[](


)握草 (?`?Д?′)!!! 竟然堆积了 3 亿多条消息了???


=================================================================================================


要知道出现消息堆积无在乎这个问题:


[](


)生产者的生产速度 >> 消费者的处理速度


=======================================================================================


  1. 生产者的生产速度骤增,比如生产者的流量突然骤增

  2. 消费速度变慢,比如消费者实例 IO 阻塞严重或者宕机


擦了一下头上的冷汗??…赶紧登上消费者服务器瞧瞧。


应用运行正常!服务器磁盘 IO 正常!网络正常!


再去上去生产者的服务器,咦…流量也很正常!


什么???佛了?? …生产者和消费者的应用都很正常,但是为什么消息会堆积怎么多呢?看着这堆积的数量越堆越多(要是这是我头发的数量那该多好啊),越发着急。


虽然说 RocketMQ 版能支持 10 亿级别的消息堆积,不会因为消息堆积导致性能明显下降,??但是这堆积量很明显就是一个异常情况。


[](


)RocketMQ 有 BUG,没错这肯定是 RocketMQ 的锅!


=====================================================================================================


本篇完…



哈哈言归正传,虽然拼爹不行,但至少不能坑爹??


进入消费者的工程查看一下日志,emmm…没有发现报错,没有错误日志…看起来好像一切都很正常。


咦…不过这个消费的速度是不是有点慢???这不科学啊,消费者可是配置了 3 个结点的消费集群啊,按业务的需求量来说消费能力可是杠杠的呀。我再点开这个 TOPIC 的消费者信息



[](


)咦,这三个消费者的 ClientId 怎么会是一样呀?


==============================================================================================


以多年采坑经验的直接告诉我 “难道是因为 ClientId 的相同的问题,导致 broker 在分发消息的时候出现混乱,从而导致消息不能正常推送给消费者?” 因为生产者和消费者都表现正常,所以我猜测问题可能在于 Broker 这一块上。


基于这个推测,那么我们就需要解决这几个问题:


  1. 部署在不同的服务器上的两个消费者,为什么 ClientId 是相同的呢?

  2. ClientId 相同,会导致 broker 消息分发错误吗?


[](


)问题分析


=======================================================================


为什么 ClientId 相同呢?我推测是因为 Docker 容器的问题。因为公司最近开始容器化阶段,而刚好消费者的项目也在第一批容器化阶段的列表上。


有了解过 Docker 的小伙伴都知道,当 Docker 进程启动时,会在主机上创建一个名为 docker0 的虚拟网桥。宿主机上的 Docker 容器会连接到这个虚拟网桥上。虚拟网桥的工作方式和物理交换机类似,这样主机上的所有容器就通过交换机连在了一个二层网络中。而 Docker 的网络模式一般有四种:


  • Host 模式

  • Container 模式

  • None 模式

  • Bridge 模式


对这几个模式不清楚的同学自行找度娘


[](


)我们容器都是采用 Host 模式,所以容器的网络跟宿主机是完全一致的。


======================================================================================================



可以看到,这里第一个就是 docker0 网卡,?默认的 ip 都是 172.17.0.1?。所以显而易见,ClientId 应该读取的都是 docker0 网卡的 IP,这就是能解释为什么多个消费端的 ClientId 都一致的问题了。


那么接下来就是 clientId 的究竟是在哪里设置呢?机智的我在 Github 的 Issues 搜索关键词 “Docker”,啪啦啪啦一搜,果然!还是有不少踩过此坑的志同道合之士,筛选了一番,找到一个比较靠谱的?open issue



可以看到,这个兄弟跟我的遇到的情况是一毛一样的,而他的结论跟我上面的推测也是大致相同(此时内心洋洋得意一番),他这里还提到 clientId 是在 ClientConfig 类中 buildMQClientId 方法中定义的。


[](


)源码探索


=======================================================================


进入 ClientConfig 类,定位到 buildMQClientId 方法


public String buildMQClientId() {


StringBuilder sb = new StringBuilder();


sb.append(this.getClientIP());


sb.append("@");


sb.append(this.getInstanceName());


if (!UtilAll.isBlank(this.unitName)) {


sb.append("@");


sb.append(this.unitName);


}


return sb.toString();


}


复制代码


通过这个相信大家都可以看出 clientId 的生成规则吧,就是 消费者客户端的 IP + “@”+ 实例名称,很明显问题就出在获取客户端 IP 上。


我们再继续看一下它究竟是如何获取客户端 IP 的


public class ClientConfig {


...


private String clientIP = RemotingUtil.getLocalAddress();


...


}


public static String getLocalAddress() {


try {


// Traversal Network interface to get the first non-loopback and non-private address


Enumeration<NetworkInterface>


【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


enumeration = NetworkInterface.getNetworkInterfaces();


ArrayList<String> ipv4Result = new ArrayList<String>();


ArrayList<String> ipv6Result = new ArrayList<String>();


while (enumeration.hasMoreElements()) {


final NetworkInterface networkInterface = enumeration.nextElement();


final Enumeration<InetAddress> en = networkInterface.getInetAddresses();


while (en.hasMoreElements()) {


final InetAddress address = en.nextElement();


if (!address.isLoopbackAddress()) {


if (address instanceof Inet6Address) {


ipv6Result.add(normalizeHostAddress(address));


} else {


ipv4Result.add(normalizeHostAddress(address));


}


}


}


}


// prefer ipv4


if (!ipv4Result.isEmpty()) {


for (String ip : ipv4Result) {


if (ip.startsWith("127.0") || ip.startsWith("192.168")) {


continue;


}


return ip;


}


return ipv4Result.get(ipv4Result.size() - 1);


} else if (!ipv6Result.isEmpty()) {


return ipv6Result.get(0);


}


//If failed to find,fall back to localhost


final InetAddress localHost = InetAddress.getLocalHost();


return normalizeHostAddress(localHost);


} catch (Exception e) {


log.error("Failed to obtain local address", e);


}


return null;


}


复制代码


如果有操作过获取当前机器的 IP 的小伙伴,应该对 RemotingUtil.getLocalAddress() 这个工具方法并不陌生~


简单说就是获取当前机器网卡 IP,但是由于容器的网络模式采用的是 host 模式,也就意味着各个容器和宿主机都是处于同一个网络下,所以容器中我们也可以看到 Docker - Server 所创建的 docker 0 网卡,所以它读取的也就是 docker 0 网卡所默认的 IP 地址 172.17.0.1


(跟运维同学沟通了一下,目前由于是容器化的第一阶段,所以先采用简单模式部署,后面会慢慢替换成 k8s,每个 pod 都有自己的独立 IP ,到时网络会与宿主机和其他 pod 的相互隔离。emmm…k8s !听起来牛逼哄哄,恰好最近也在看这方面的书)


**这时候聪明的你可能会问 “不是还有一个实例名称的参数呢,这个又怎么会相同呢?” ** 别着急,我们继续往下看??


private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");


public String getInstanceName() {


return instanceName;


}


public void setInstanceName(String instanceName) {


this.instanceName = instanceName;


}


public void changeInstanceNameToPID() {


if (this.instanceName.equals("DEFAULT")) {


this.instanceName = String.valueOf(UtilAll.getPid());


}


}


复制代码


getInstanceName() 方法其实直接获取 instanceName 这个参数值,但是这个参数值是什么时候赋值进去的呢?没错就是通过 changeInstanceNameToPID() 这个方法赋值的,在 consumer 在 start 的时候会调用此方法。


这个参数的逻辑很简单,在初始化的时候首先会获取环境变量 rocketmq.client.name 是否有值,如果没有就是用默认值 DEFAULT 。


然后 consumer 启动的时候会判断这参数值是否为 DEFAULT ,如果是的话就调用 UtilAll.getPid() 。


public static int getPid() {


RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();


String name = runtime.getName(); // format: "pid@hostname"


try {


return Integer.parseInt(name.substring(0, name.indexOf('@')));


} catch (Exception e) {


return -1;


}


}


复制代码


通过方法名字我们就可以很清楚知道,这个方法其实获取进程号的。那…为什么获取的进程号都是一致的呢?



聪明的你可能已经知道答案了对吧 !这里就不得不提?Docker 的 三大特性


  • cgroup

  • namespace

  • unionFS


没错,这里用的就是 namespace 技术啦。


Linux Namespace 是 Linux 内核提供的一个功能,可以实现系统资源的隔离,如:PID、User ID、Network 等。


由于都是使用相同的基础镜像,在最外层都是运行同样的 JAVA 工程,所以我们可以进去容器里面看,他们的进程号都是为 9


经过肥壕的一系列巧妙的推理和论证, 在 Docker 容器 HOST 网络模式下, 会生成相同的 clientId !


到这里为止,我们算是解决了上文推测的第一个问题!


紧跟柯南的步伐,我们继续推理第二个问题:?clientId 相同导致 Broker 分发消息错误?


Consumer 在负载均衡的时候应该是根据 clientId 作为客户端消费者的唯一标识,在消息下发的时候由于 clientId 的一致,导致负载分发错误。


那么我们下面就要去探究一下 Consumer 的负载均衡究竟是如何实现的。一开始我以为消费端的负载均衡都是在 Broker 处理的,由 Broker 根据注册地 Consumer 把不同的 Queue 分配给不同的 Consumer。但是去看了一下源码上的 doc 描述文档和对源码进行一番的研究后,结果发现自己见识还是太少了(哈哈哈,应该有小伙伴跟我开始的想法是一样的吧)


先来补充一下?RocketMQ 的整体架构



由于篇幅问题,这里我只讲解一下 Broker 和 consumer 之间的关系,其他的角色如果有不懂的可以看一下我之前写的 RocketMQ 介绍篇的文章


  1. Consumer 与 NameServer 集群中的其中一个节点(随机选择)?建立长连接,定期从 NameServer 获取 Topic 路由信息。

  2. 根据获取 Topic 路由信息 与 Broker 建立长连接,且?定时向 Broker 发送心跳



Broker 接收心跳消息的时候,会把 Consumer 的信息保存到本地缓存变量?consumerTable?。上图大致讲解了一下 consumerTable 的存储结构和内容,最主要的是它缓存了每个 consumer 的 clientId。


关于 Consumer 的消费模式,我直接引用源码的解释


在 RocketMQ 中,Consumer 端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在 Push 模式只是对 Pull 模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。


在两种基于拉模式的消费方式(Push/Pull)中,均需要 Consumer 端在知道从 Broker 端的哪一个消息队列—队列中去获取消息。因此,有必要在 Consumer 端来做负载均衡,即 Broker 其中多个 MessageQueue 分配给同一个 ConsumerGroup 中的哪些 Consumer 消费。


[](


)所以简单来说,不管是 Push 还是 Pull 模式,消息消费的控制权在 Consumer 上,所以 Consumer 的负载均衡实现是在 Consumer 的 Client 端上。


==============================================================================================================================================================


通过查看源码可以发现, RebalanceService 会完成负载均衡服务线程(每隔 20s 执行一次),RebalanceService 线程的 run() 方法最终调用的是 RebalanceImpl 类的 rebalanceByTopic() 方法,该方法是实现 Consumer 端负载均衡的核心。这里, rebalanceByTopic() 方法会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。这里主要来看下集群模式下的主要处理流程:


private void rebalanceByTopic(final String topic, final boolean isOrder) {


switch (messageModel) {


case BROADCASTING: {


..... // 省略

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
消息疯狂堆积!RocketMQ出Bug了?,rabbitmq分布式事务原理