🏆【Alibaba 中间件技术系列】「RocketMQ 技术专题」帮你梳理 RocketMQ 或 Kafka 的选择理由以及二者 PK
前提背景
大家都知道,市面上有许多开源的 MQ,例如,RocketMQ、Kafka、RabbitMQ 等等,现在 Pulsar 也开始发光,今天我们谈谈笔者最常用的 RocketMQ 和 Kafka,想必大家早就知道二者之间的特点以及区别,但是在实际场景中,二者的选取有可能会范迷惑,那么今天笔者就带领大家分析一下二者之间的区别,以及选取标准吧!
架构对比
RocketMQ 的架构
RocketMQ 由 NameServer、Broker、Consumer、Producer 组成,NameServer 之间互不通信,Broker 会向所有的 nameServer 注册,通过心跳判断 broker 是否存活,producer 和 consumer 通过 nameserver 就知道 broker 上有哪些 topic。
Kafka 的架构
Kafka 的元数据信息都是保存在 Zookeeper,新版本部分已经存放到了 Kafka 内部了,由 Broker、Zookeeper、Producer、Consumer 组成。
Broker 对比
主从架构模型差异:
维度不同
Kafka 的 master/slave 是基于 partition(分区)维度的,而 RocketMQ 是基于 Broker 维度的;
Kafka 的 master/slave 是可以切换的(主要依靠于 Zookeeper 的主备切换机制)
RocketMQ 无法实现自动切换,当 RocketMQ 的 Master 宕机时,读能被路由到 slave 上,但写会被路由到此 topic 的其他 Broker 上。
刷盘机制
RocketMQ 支持同步刷盘,也就是每次消息都等刷入磁盘后再返回,保证消息不丢失,但对吞吐量稍有影响。一般在主从结构下,选择异步双写策略是比较可靠的选择。
消息查询
RocketMQ 支持消息查询,除了 queue 的 offset 外,还支持自定义 key。RocketMQ 对 offset 和 key 都做了索引,均是独立的索引文件。
消费失败重试与延迟消费
RocketMQ 针对每个 topic 都定义了延迟队列,当消息消费失败时,会发回给 Broker 存入延迟队列中,每个消费者在启动时默认订阅延迟队列,这样消费失败的消息在一段时候后又能够重新消费。
延迟时间与延迟级别一一对应,延迟时间是随失败次数逐渐增加的,最后一次间隔 2 小时。
当然发送消息是也可以指定延迟级别,这样就能主动设置延迟消费,在一些特定场景下还是有作用的。
数据读写速度
Kafka 每个 partition 独占一个目录,每个 partition 均有各自的数据文件.log,相当于一个 topic 有多个 log 文件。
RocketMQ 是每个 topic 共享一个数据文件 commitlog,
Kafka 的 topic 一般有多个 partition,所以 Kafka 的数据写入速度比 RocketMQ 高出一个量级。
但 Kafka 的分区数超过一定数量的文件同时写入,会导致原先的顺序写转为随机写,性能急剧下降,所以 kafka 的分区数量是有限制的。
随机和顺序读写的对比
连续 / 随机 I/O(在底层硬盘维度)
连续 I/O :指的是本次 I/O 给出的初始扇区地址和上一次 I/O 的结束扇区地址是完全连续或者相隔不多的。反之,如果相差很大,则算作一次随机 I/O。
发生随机 I/O 可能是因为磁盘碎片导致磁盘空间不连续,或者当前 block 空间小于文件大小导致的。
连续 I/O 比随机 I/O 效率高的原因是
连续 I/O,磁头几乎不用换道,或者换道的时间很短;
随机 I/O,如果这个 I/O 很多的话,会导致磁头不停地换道,造成效率的极大降低。
随机和顺序速度比较
IOPS 和吞吐量:为何随机是关注 IOPS,顺序关注吞吐量?
随机在每次 IO 操作的寻址时间和旋转延时都不能忽略不计,而这两个时间的存在也就限制了 IOPS 的大小;
顺序读写可以忽略不计寻址时间和旋转延时,主要花费在数据传输的时间上。
IOPS 来衡量一个 IO 系统性能的时候,要说明读写的方式以及单次 IO 的大小,因为读写方式会受到旋转时间和寻道时间影响,而单次 IO 会受到数据传输时间影响。
服务治理
Kafka 用 Zookeeper 来做服务发现和治理,broker 和 consumer 都会向其注册自身信息,同时订阅相应的 znode,这样当有 broker 或者 consumer 宕机时能立刻感知,做相应的调整;
RocketMQ 用自定义的 nameServer 做服务发现和治理,其实时性差点,比如如果 broker 宕机,producer 和 consumer 不会实时感知到,需要等到下次更新 broker 集群时(最长 30S)才能做相应调整,服务有个不可用的窗口期,但数据不会丢失,且能保证一致性。
但是某个 consumer 宕机,broker 会实时反馈给其他 consumer,立即触发负载均衡,这样能一定程度上保证消息消费的实时性。
Producer 差异
发送方式
kafka 默认使用异步发送的形式,有一个 memory buffer 暂存消息,同时会将多个消息整合成一个数据包发送,这样能提高吞吐量,但对消息的实效有些影响;
RocketMQ 可选择使用同步或者异步发送。
发送响应
Kafka 的发送 ack 支持三种设置:
消息存进 memory buffer 就返回(0);
等到 leader 收到消息返回(1)
等到 leader 和 isr 的 follower 都收到消息返回(-1)
上面也介绍了,Kafka 都是异步刷盘
RocketMQ 都需要等 broker 的响应确认,有同步刷盘,异步刷盘,同步双写,异步双写等策略,相比于 Kafka 多了一个同步刷盘。
Consumer 差异
消息过滤
RocketMQ 的 queue 和 kafka 的 partition 对应,但 RocketMQ 的 topic 还能更加细分,可对消息加 tag,同时订阅时也可指定特定的 tag 来对消息做更进一步的过滤。
有序消息
RocketMQ 支持全局有序和局部有序
Kafka 也支持有序消息,但是如果某个 broker 宕机了,就不能在保证有序了。
消费确认
RocketMQ 仅支持手动确认,也就是消费完一条消息 ack+1,会定期向 broker 同步消费进度,或者在下一次 pull 时附带上 offset。
Kafka 支持定时确认,拉取到消息自动确认和手动确认,offset 存在 zookeeper 上。
消费并行度
Kafka 的消费者默认是单线程的,一个 Consumer 可以订阅一个或者多个 Partition,一个 Partition 同一时间只能被一个消费者消费,也就是有多少个 Partition 就最多有多少个线程同时消费。
如分区数为 10,那么最多 10 台机器来并行消费(每台机器只能开启一个线程),或者一台机器消费(10 个线程并行消费)。即消费并行度和分区数一致。
RocketMQ 消费并行度分两种情况:有序消费模式和并发消费模式,
有序模式下,一个消费者也只存在一个线程消费,并行度同 Kafka 完全一致。
并发模式下,每次拉取的消息按 consumeMessageBatchMaxSize(默认 1)拆分后分配给消费者线程池,消费者线程池 min=20,max=64。也就是每个 queue 的并发度在 20-64 之间,一个 topic 有多个 queue 就相乘。所以 rocketmq 的并发度比 Kafka 高出一个量级。
并发消费方式并行度取决于 Consumer 的线程数,如 Topic 配置 10 个队列,10 台机器消费,每台机器 100 个线程,那么并行度为 1000。
事务消息
RocketMQ 指定一定程度上的事务消息,当前开源版本删除了事务消息回查功能,事务机制稍微变得没有这么可靠了,不过阿里云的 rocketmq 支持可靠的事务消息;kafka 不支持分布式事务消息。
Topic 和 Tag 的区别?
业务是否相关联
无直接关联的消息:淘宝交易消息,京东物流消息使用不同的 Topic 进行区分。
交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。
Tag 和 Topic 的选用
针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。
不同的 Topic 之间的消息没有必然的联系。
Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。
通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。
Tag 怎么实现消息过滤
RocketMQ 分布式消息队列的消息过滤方式有别于其它 MQ 中间件,是在 Consumer 端订阅消息时再做消息过滤的。
RocketMQ 这么做是在于其 Producer 端写入消息和 Consumer 端订阅消息采用分离存储的机制来实现的,Consumer 端订阅消息是需要通过 ConsumeQueue 这个消息消费的逻辑队列拿到一个索引,然后再从 CommitLog 里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。
ConsumeQueue 的存储结构:可以看到其中有 8 个字节存储的 Message Tag 的哈希值,基于 Tag 的消息过滤是基于这个字段值的。
Tag 过滤方式
Consumer 端在订阅消息时除了指定 Topic 还可以指定 Tag,如果一个消息有多个 Tag,可以用||分隔。
Consumer 端会将这个订阅请求构建成一个 SubscriptionData,发送一个 Pull 消息的请求给 Broker 端。
Broker 端从 RocketMQ 的文件存储层—Store 读取数据之前,会用这些数据先构建一个 MessageFilter,然后传给 Store。
Store 从 ConsumeQueue 读取到一条记录后,会用它记录的消息 tag hash 值去做过滤,由于在服务端只是根据 hashcode 进行判断。
无法精确对 tag 原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始 tag 字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
Message Body 过滤方式
向服务器上传一段 Java 代码,可以对消息做任意形式的过滤,甚至可以做 Message Body 的过滤拆分
数据消息的堆积能力
理论上 Kafka 要比 RocketMQ 的堆积能力更强,不过 RocketMQ 单机也可以支持亿级的消息堆积能力,我们认为这个堆积能力已经完全可以满足业务需求。
消息数据回溯
Kafka 理论上可以按照 Offset 来回溯消息
RocketMQ 支持按照时间来回溯消息,精度毫秒,例如从一天之前的某时某分某秒开始重新消费消息,典型业务场景如 consumer 做订单分析,但是由于程序逻辑或者依赖的系统发生故障等原因,导致今天消费的消息全部无效,需要重新从昨天零点开始消费,那么以时间为起点的消息重放功能对于业务非常有帮助。
性能对比
Kafka 单机写入 TPS 约在百万条/秒,消息大小 10 个字节
RocketMQ 单机写入 TPS 单实例约 7 万条/秒,单机部署 3 个 Broker,可以跑到最高 12 万条/秒,消息大小 10 个字节。
数据一致性和实时性
消息投递实时性
Kafka 使用短轮询方式,实时性取决于轮询间隔时间
RocketMQ 使用长轮询,同 Push 方式实时性一致,消息的投递延时通常在几个毫秒。
消费失败重试
Kafka 消费失败不支持重试
RocketMQ 消费失败支持定时重试,每次重试间隔时间顺延
消息顺序
Kafka 支持消息顺序,但是一台 Broker 宕机后,就会产生消息乱序
RocketMQ 支持严格的消息顺序,在顺序消息场景下,一台 Broker 宕机后,发送消息会失败,但是不会乱序
Mysql Binlog 分发需要严格的消息顺序
(题外话)Kafka 没有的,RocketMQ 独有的 tag 机制
普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
总结
RocketMQ 定位于非日志的可靠消息传输(日志场景也 OK),目前 RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。
RocketMQ 的同步刷盘在单机可靠性上比 Kafka 更高,不会因为操作系统 Crash,导致数据丢失。
同时同步 Replication 也比 Kafka 异步 Replication 更可靠,数据完全无单点。
另外 Kafka 的 Replication 以 topic 为单位,支持主机宕机,备机自动切换,但是这里有个问题,由于是异步 Replication,那么切换后会有数据丢失,同时 Leader 如果重启后,会与已经存在的 Leader 产生数据冲突。
例如充值类应用,当前时刻调用运营商网关,充值失败,可能是对方压力过多,稍后在调用就会成功,如支付宝到银行扣款也是类似需求。这里的重试需要可靠的重试,即失败重试的消息不因为 Consumer 宕机导致丢失。
版权声明: 本文为 InfoQ 作者【浩宇天尚】的原创文章。
原文链接:【http://xie.infoq.cn/article/c1b54c5904914bb14abd8fb90】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论