RabbitMQ、RocketMQ、Kafka 性能为何差距如此之大?
MQ 的作用解耦、异步、削峰填谷。
未使用 MQ 的情况
mysql 并发写大部分情况下维持在 600-800 之间,并发读 1200-1500 之间,所以消费端在消费消息的时候需控制在并发小于 1000,从而达到限流的效果。
使用 MQ 的情况
mq 做个缓冲,消息放到磁盘,几个 G 或上 t 都可以存储,消息丢失的可能性比较小。
使用 MQ 需要面临的问题
可用性降低
多了 MQ,对外部依赖增加,但通过 try-catch 兜底,mq 消息发送失败,则插入数据库。
提高复杂度
需要搭建高可用的 Kafka 集群或 RocketMQ 集群。
消息重复
通过消费端的幂等性实现。
中间生产消息的时候,有可能会发生网络的波动,业务程序认为消息没有发送成功,其实消息已经写入了一条,应用端超时 timeout,此时会进行消息的重发即 2 个 id 为 1 的都会写入到 mq 中,后端应用消费的时候,就会消费到 2 条消息。
消息顺序
比如下单、支付、发送物流通知,这 3 个业务消息并发的产生且后端多线程消费的情况下,需要考虑消息消费顺序的问题。
解决的方式是单个消费者、单个生产者、单个队列可以保证消息有序的消费。
一个主题,多个队列的情况下需要通过负载均衡的方式路由到不同的队列中来。
有多个消费者不能确保消息消费的顺序。
一致性问题
A、B、C 三个系统,A 和 B 两个写入数据库成功了,C 系统写库失败,这种情况可以用分布式事务解决,可以使用 RocketMQ 提供的分布式事务或阿里开源的 Seta。
对比下常用的 MQ
RabbitMQ
支持并发 1.2W。
RabbitMQ 集群很弱,主要确保高可用,不能拓展性能。
想性能更高,得搭建多主多从,比如 3 主 3 从、4 主 4 从,第一个可以确保高可用,第二个可以提高整个的性能,但 RabbitMQ 集群不可以这样拓展性能。
Kafka
支持并发 100W。
RocketMQ
支持并发 10W。
Kafka、RocketMQ 天生支持分布式,支持动态扩展、动态扩缩容。
RocketMQ 相对来说功能也比较丰富,支持死信消息、延迟(基于死信消息可以实现延迟消息)消息、消息的回溯、消息的过滤。
Kafka 不支持死信消息。
消息端消费成功,发起 ACK 确认,作为 RabbitMQ 来说,可以直接把这个消息删掉。
Kafka 或 RocketMQ 会记录消息者的偏移量,保证下一次消费的时候不会消费同一条消息。
死信消息
如果消费很多次还没有成功,比如 10 次、20 次都不能消费成功,mq 中的这个消息就不能被确认 ,这个时候就引入了死信消息,进入一个单独的死信队列进行保存,后续进行手工处理或额外处理,比如用消息补偿机制,实在消费不了的则异步通知生产者。
RabbitMQ 在 ack 确认很多次都没有成功返回的时候,则会设置一个标识,就会认为这个消息是死信消息,就会把这个消息写入 DCL 队列中。
RocketMQ 也有这样的死信消息,如果消息重试的次数超过 16 次,作为 RocketMQ 也会把这个消息写入专门的死信队列中去。
补偿机制要根据业务来,比如微信冲电话费,在微信应用里面,通过异步的方式来通知成功或失败,如果说失败了,失败的补偿机制就是退费;如果这条消息反正也消费不了,不知道出于什么原因,也有可能加入了失信名单 或超过了消费的额度,这个时候就消费不了,多次尝试之后,在微信的后端就认为是死信消息,而退费就是一种补偿机制。
延迟消息
一般情况下,消息只要发到 mq,消费者就会里立马消费掉,但是有的业务场景需要在这个消息上加一个延迟的时间,比如延迟 10 分钟再被消费。
应用场景比如买电影票-线上电影票的购票流程:
1、选座位,对这个座位进行锁定,防止再被其他人锁定
2、必须在 10 分钟之内支付
异常情况:选了座位,不支付。
对于后端系统来说,只要锁定过期且没有支付,就需要把座位释放掉。
这种情况可以采用定时任务来处理,不断的去轮循数据库,但会出现新的问题,1 要查询数据库,2 每个人选定的时间不一样,若定时 10 分钟跑一次,就会出现释放座位不及时的情况,若定时 1 秒跑一次,系统性能开销比较大。
最优的方案是采用延时消息,每一次选座位的时候,就写一个延时 10 分钟的消息,在消费的时候,必须等 10 分钟之后,消费者再处理,不需要轮询数据库。
不同MQ为什么性能差别这么大?
主要依赖于 Rabbitmq、Kafka 持久化的底层机制:将消息写入磁盘的零拷贝技术。
Netty、Nginx 都有用到该技术。
零拷贝包括 MMAP 的零拷贝、Sendfile 的零拷贝。
RabbitMQ 传统方式的拷贝
作为消费者要拉取消息进行消费,站在 IO 的角度去看,为了确保消息的高可用,往往把消息放到磁盘,一旦数据没有写入磁盘就会有丢失数据的可能性,所以消息会先写入磁盘。
把数据从磁盘读出,再通过网络发送给消费者。
应用发送数据要先发送给操作系统的网卡,最终通过网卡发送数据给消费者。
站在磁盘的角度来看,数据首先要经过第一个拷贝,这里叫 DMA 拷贝到文件读取缓冲区,伪代码为 buffer=file.read ,写完之后,发给消费者,创建一个 socket 即建立一个 TCP 网络通信,通过 socket 调用 send 方法,把读到的 buffer 进行发送。
站在 io 的角度来看,经过了几次拷贝?
第一次:数据从磁盘拷贝到内核的文件读取缓冲区,这个过程称为 DMA 拷贝,
然后数据经过第二次拷贝:CPU 拷贝,拷贝的数据放入应用缓冲区即就是刚才定义的 buffer 字节流。
应用程序并不能直接操作网卡,底层调用 socket,通过 socket 调用操作系统的网卡,但是操作系统网卡会有一个问题 :不能直接读到应用的内存,所以又需要经过一次 CPU 拷贝到套接字发送缓冲区,最后再经过一次 DMA 拷贝(直接内存读取 Direct Memory Access)。
内核或操作系统的驱动允许不同速度的硬件进行沟通的时候才会有 DMA 拷贝。
如果没有 DMA,就需要通过 CPU 的大量中断来进行负载。
什么叫中断?
在计算机里面,启动一个线程,让 CPU 来跑,CPU 在跑的时候,你给我发了一个消息,我的电脑怎么知道我的网卡里面进来一条消息呢?这个就需要网卡在硬件级别叫下 CPU: cpu 等一等,现在我要打断你一下。
如果通过 CPU 负载的话,效率很低,因为 CPU 干很多事情,CPU 做大量中断负载的话,比如 200M 的数据,如果通过 CPU 拷贝,大概需要 200ms,而通过 DMA 拷贝,速度只需要 2 毫秒。
计算机里面,越底层的东西就越快,通过 CPU 拷贝到话,效率往往很低,因为这个时候还需要向 CPU 请求负载, 这里会涉及到很多的中断负载的切换。
在不考虑 MQ 应用程序运转多少时间的情况下,传统的拷贝大概需要 404 毫秒。
RocketMQ MMAP零拷贝技术
在 RocketMQ 中采用一种 MMAP 的零拷贝技术,本身是做内存映射,当内存的应用缓冲区调用操作系统的 mmap 函数,可以做一个内存映射。
拿到能够操作文件的通道到一个高级类 FileChannel,这个高级类实际上是对文件进行操作。
底层会调用操作系统的 mmap 函数来完成映射,映射的意思是内存即磁盘,磁盘即内存,如果完成映射之后,这个文件和内存的这个 buffer(ByteBuffer)就一致了。
mmap 是内存文件通过 FileChannel 调用 map 方法间接调用的,设置读写模式,文件映射到底可以读还是可以写,内存映射的位置即从哪里开始,0 表示从头开始,内存映射大小为 1024 即这个文件可以映射 1kb 左右,拿到这个 buffer 之后,就可以进行写入,这个 ByteBuffer 和 Hashmap 是一样的方式,直接 put 把字符串转换成 byte 数组进行写入,写入完之后,再去调用 flip 方法进行刷盘,这个数据就可以同步到磁盘了,当然刷完盘之后,还可以拿出来,通过 mmap.get 把里面的前 5 个数据读取出来,读取之后还可以打印,
文件中这么多 NULL,刚好长度是 1024。
通过 mmap 创建的,因为它进行内存映射,所以这个文件必须要有空格,通过 NULL 值进行表示,读的时候,通过偏移量+长度,指定了 5 个长度,就可以读取到 lijin 这个字符串数据。
传统的方式
Server 端(服务端)启动,模拟一个消费者即专门启动一个 Server Socket 监听,接受到数据,把数据读出来就可以了。
这个是传统的客户端读一个文件发送到网络的过程,这段代码跟
这个的业务场景是一样的。
创建一个 socket,因为要发给对应的消费者,先建立一个网络连接。
inpuStream.read()会进行 2 次拷贝,一个是 DMA 拷贝,一次是 CPU 拷贝。
而这种方式只是一次拷贝,因为是内存映射。
map 方法在系统启动的时候就被调用了。
传统的方式,每次都要 new 一个 FileInputStream,这里涉及到了 2 次拷贝(每一次读取出来,读到 buffer 中,涉及到 2 次拷贝:一次 DMA 拷贝、一次 CPU 拷贝),耗时 202 毫秒,因为要发送网络,通过连接本机的 8081 端口,发送给它,还要创建一个对应的输出流拿取结果。
传统的方式本质上和文件读取是一样的,这是通过流的方式读取,while true 不断的读并且累加,读完之后,拿到了 buffer,再写网络,网络就通过 socket 创建的 getOutputStream(文件的输出流、socket 的输出流)转到 DataOutputStream。
创建的 socket 就是一个连接,应用要跟消费者建立一个 TCP 的连接,这个 TCP 的连接在底层表示都是 socket,不单单只是数据连接,还包含了数据通道,这里 new 一个 socket 就相当于跟另外一个消费者 8081 这样的 socket 通道建立了链接,通过 socket 通道里面的 dataOutputStream.write 方法输出数据,这里又会涉及到一次 DMA 拷贝,一次 CPU 拷贝。
首先做一次 CPU 拷贝,相当于把 buffer 的数据首先要发到套接字缓冲区(socket 里面的缓冲区),这个 socket 要通过网卡发给消费者最终要把应用内存发送给网卡里面的内存,网卡是一个外设,网卡通过一个 USB 都可以去接,所以就需要做一个 DMA 拷贝。
这种方式共有 4 次拷贝,耗时为 422 毫秒,这是 RabbitMQ 的情况,而 RocketMQ 的 mmap 发送只有 204 毫秒,DMA 拷贝速度一般是 CPU 的百倍。
Kafka的sendfile零拷贝技术
Kafka 不会涉及到 cpu 拷贝,只是进行文件描述符的传递,这点消耗的时间可以忽略。
文件描述符类似一个指针,在 linux 上面所有东西都是文件描述符。
把数据放到文件数据读取缓冲区,这里就会有一个文件描述符,类似于网盘的地址,比如百度云网盘的分享链接,而真实的数据放百度网盘,这种开销可以忽略,既然数据已经放到了文件缓冲区,只要拿到文件缓冲区的指针,指针在应用程序里面内存的大小就可以忽略不计了。
在现代新的操作系统里面,既然都属于内核操作系统的进程,文件读取缓冲区的内存和套接字的内存可以共享。
文件描述符(offset=1024,size=9721823),比如要读取的文件,偏移量是 1024,读取 9721823 这个大小的数据。
把文件描述符传给应用,这个速度和时间可以忽略不计,调用 socket,相当于告诉 socket 你要去文件读取缓冲区内存找我要发送的数据,因为我已经告诉你偏移量和大小了。
通过 sendfile 的方式,只剩下 2 次 DMA 拷贝了,数据的传输基本上在内核就完成了。
第一步 new 出一个 SocketChannel,使用 8081 的服务器地址,SocketChannel 是套接字发送缓冲区的一个通道,FileChannel 是针对磁盘文件的通道,2 个通道通过 transferTo 进行共享,
共享的位置是从 0 开始,长度是文件大小,这里没有使用多文件,只读了一个文件。
fileName 可以通过 FileChannel 传过去,传过去就已经网络传输了,只要调用 transferTo 方法就会完成网络发送,这种方式的耗时只需要 16 毫秒。
传统的传输要转换成 InputStream、FileInputStream、DataOutputStream,所以开销会大些,同样的一张图片,转换出来的字节流会多一点。
通过 NIO 转换 transferTo 直接就这么转了,如果把中间的 CPU 拷贝的时间忽略,相当于 2+2+12,传输文件描述符的话,还是会占据一点点时间。
不同的序列化方式即转换的流不一样,传输的字节数大小也不一样。
为什么Kafka不用mmap
既然 sendfile 零拷贝技术效率更高,RocketMQ 早期版本也是基于 Kakfa java 版本重写改进的,那 RocketMQ 为什么不用 sendfile 技术?
因为它们的设计理念不一样。
作为文件描述符等同于网盘地址。
RocketMQ 有很多功能的延伸点是不一样的,比如延迟消息、死信消息需要数据流转到 MQ 应用。
RocketMQ 要支持延迟消息,数据最好要进入应用,不能单纯拿一个文件描述符做延迟消息,这也是为什么 Kafka 没有延迟消息的原因。
数据是通过这样的方式发送的,数据不会直接经过 Kafka。
Kafka 的设计比较简单,没有延迟消息、死信消息等。
比如 1 万条消息中有一个消息发送不成功,这种情况一定要放到 mq 的应用内存才能处理,
而通过 sendfile 方式,很多的消息数据都是文件读取缓冲区的文件描述符。
类似网盘资料中的数据很多,是一个代码压缩包,单独把其中的一段代码拿出来是非常麻烦的。
Kafka 做死信消息,要写一个定时器,不断的轮询,如果消息失败了,把这个消息写入到 Kafka 的一个文件或一个队列中,可以这样变相的实现,但自身原生是不支持死信消息的。
评论