写点什么

RocketMQ 解析

用户头像
石刻掌纹
关注
发布于: 2021 年 01 月 16 日



基本架构

  1. nameserver

  2. 负责管理集群里所有的 broker,让使用 mq 的系统通过 nameserver 感知到都有哪些 broker,每个 namerserver 上都有所有 broker 的信息。

  3. nameserver 每隔 10s 检查各个 broker 的最近心跳时间,超过 120s 还没有续约心跳就认为 broker 挂掉

  4. nameserver 之间采用的是 peer-to-peer 模式

  5. broker

  6. 存储消息,多主多从部署。broker 与 nameserver 通过 TCP 长连接通信,每隔 30 秒会发送心跳给 nameserver,并且心跳中包含 broker 当前的数据情况,比如有哪些 topic 的哪些数据在自己这里,这些信息都属于路由信息的一部分

  7. slave 节点会不停发送请求到 master 节点拉取消息

  8. 在 RocketMQ 4.5 版本之后支持 Dledger 机制:基于 raft 算法主节点宕机后自动完成主备切换。要求是至少一个 master 配两个 slave,三个 broker 组成一个 group,分组运行。

  9. 每个 topic 的数据分布式存储在多个 broker 上

  10. broker 持久化消息过程

  11. 当 broker 接收到消息后首先会将消息顺序写入 Commitlog 的日志文件

  12. commitlog 是很多个磁盘文件,每个文件最多限定 1G,broker 接收到消息后直接追加写入该文件末尾。commitlog 文件写满达到 1G 后,会创建新的 commitlog 文件

  13. 每个 MessageQueue 都有一个与之对应的 ConsumeQueue 文件,该文件中记录着这条消息在 commitlog 中的 offset 偏移量

  14. broker 中的磁盘存储有这么两个文件:$HOME/store/consumequeue/{topic}/MessageQueue0/ConsumeQueue0 磁盘文件;$HOME/store/consumequeue/{topic}/MessageQueue1/ConsumeQueue1 磁盘文件

  15. 实际上 ConsumeQueue 文件中存储的不只是 offset,还包含了消息长度,以及 tag 的 hashcode,一条数据是 20 个字节,每个 ConsumeQueue 文件保存 30 万条数据,大概每个文件是 5.8M

  16. 所以 topic 中的每个 MessageQueue,都对应着 broker 机器上的 ConsumeQueue 文件,其中保存着这个 MessageQueue 的所有消息在 commitlog 中的物理位置,也就是 offset 偏移量

  17. Broker 是基于 OS 操作系统的 PageCache 顺序写两个机制,来提升写入 CommitLog 文件的性能的。

  18. 首先 Broker 是以顺序的方式将消息写入 CommitLog 磁盘文件的,也就是每次写入就是在文件末尾追加一条数据就可以了,对文件进行顺序写的性能要比对文件随机写的性能提升很多

  19. 另外,数据写入 CommitLog 文件的时候,其实不是直接写入底层的物理磁盘文件的,而是先进入 OS 的 PageCache 内存缓存中,然后后续由 OS 的后台线程定时、异步化的将 OS PageCache 内存缓冲中的数据刷入底层的磁盘文件。

  20. 在这样的优化之下,采用磁盘文件顺序写+OS PageCache 写入+OS 异步刷盘的策略,基本上可以让消息写入 CommitLog 的性能跟你直接写入内存里是差不多的,所以正是如此,才可以让 Broker 高吞吐的处理每秒大量的消息写入。

  21. 在异步刷盘模式下,生产者将消息发送给 broker,broker 将消息写入 os cache 中就返回生产者 ack 了。所以异步刷盘模式下,吞吐量会很高,但如果 broker 宕机有可能会丢失数据

  22. 另外一种模式叫做同步刷盘,如果你使用同步刷盘模式的话,那么生产者发送一条消息出去,broker 收到了消息,必须直接强制把这个消息刷入底层的物理磁盘文件中,然后才会返回 ack 给 producer,此时你才知道消息写入成功了。但是如果强制每次消息写入都要直接进入磁盘中,必然导致每条消息写入性能急剧下降,导致消息写入吞吐量急剧下降,但是可以保证数据不会丢失。

  23. 生产者

  24. 跟 nameserver 建立 TCP 长连接,定时从 nameserver 拉取到最新的路由信息,包括集群有哪些 broker,哪些 topic,每个 topic 存储在哪些 broker 上

  25. 生产者找到要发送的 topic 对应的 broker 节点,采取轮询或 hash 的方式发送给 broker

  26. 生产者与 broker 通信的方式也是 TCP 长连接

  27. 如果有个一组的 master broker 节点挂掉,会导致主从切换时间内发送到这个 master 节点的消息都失败。producer 有一个开关:sendLatencyFaultEnable。一旦打开了这个开关,那么他会有一个自动容错机制,比如如果某次访问一个 Broker 发现网络延迟有 500ms,然后还无法访问,那么就会自动回避访问这个 Broker 一段时间,比如接下来 3000ms 内,就不会访问这个 Broker 了。这样的话,就可以避免一个 Broker 故障之后,短时间内生产者频繁的发送消息到这个故障的 Broker 上去,出现较多次数的异常。而是在一个 Broker 故障之后,自动回避一段时间不要访问这个 Broker,过段时间再去访问他。那么这样过一段时间之后,可能这个 Master Broker 就已经恢复好了,比如他的 Slave Broker 切换为了 Master 可以让别人访问了。

  28. 消费者

  29. 跟生产者类似,也是与 nameserver 和 broker 建立长连接,拉取路由信息,找到需要消费的 topic 在哪些 broker 节点上,从中拉取消息

  30. 消费者从 broker 获取消息时有可能从 slave 节点拉取,也有可能从 master 节点拉取。消费者首先会从 master 节点拉取消息,master 返回消息时,会根据自身负载情况以及和 slave 同步情况,向消费者建议下一次是从 master 还是 slave 节点拉取消息。比如 master 节点目前每秒 10w 写请求,本身负载已经很重,再从他这里拉取消息就不合适了;另一种情况,master 节点本身写入了 100w 条数据,但是 slave 节点同步缓慢只同步了 96w,落后了 4w 条数据,那么下次还是只能去 master 节点拉取消息

数据模型

  1. topic:数据集合

  2. 通过 MessageQueue 将一个 topic 的数据拆分成多个数据分片,每个 broker 上存储多个 MessageQueue,一次实现 topic 数据的分布式存储

参数调整

  1. OS 内核参数

  2. vm.overcommit_memory:

  3. 这个参数有三个值可以选择,0、1、2。

  4. 如果值是 0 的话,在你的中间件系统申请内存的时候,os 内核会检查可用内存是否足够,如果足够的话就分配内存给你,如果感觉剩余内存不是太够了,干脆就拒绝你的申请,导致你申请内存失败,进而导致中间件系统异常出错。

  5. 因此一般需要将这个参数的值调整为 1,意思是把所有可用的物理内存都允许分配给你,只要有内存就给你来用,这样可以避免申请内存失败的问题。

  6. 比如我们曾经线上环境部署的 Redis 就因为这个参数是 0,导致在 save 数据快照到磁盘文件的时候,需要申请大内存的时候被拒绝了,进而导致了异常报错。

  7. 可以用如下命令修改:echo 'vm.overcommit_memory=1' >> /etc/sysctl.conf

  8. vm.swappiness

  9. 这个参数是用来控制进程的 swap 行为的,这个简单来说就是 os 会把一部分磁盘空间作为 swap 区域,然后如果有的进程现在可能不是太活跃,就会被操作系统把进程调整为睡眠状态,把进程中的数据放入磁盘上的 swap 区域,然后让这个进程把原来占用的内存空间腾出来,交给其他活跃运行的进程来使用。

  10. 如果这个参数的值设置为 0,意思就是尽量别把任何一个进程放到磁盘 swap 区域去,尽量大家都用物理内存。

  11. 如果这个参数的值是 100,那么意思就是尽量把一些进程给放到磁盘 swap 区域去,内存腾出来给活跃的进程使用。

  12. 默认这个参数的值是 60,有点偏高了,可能会导致我们的中间件运行不活跃的时候被迫腾出内存空间然后放磁盘 swap 区域去。

  13. 因此通常在生产环境建议把这个参数调整小一些,比如设置为 10,尽量用物理内存,别放磁盘 swap 区域去。因为当进程爆发增长导致内存爆掉之后,如果大量往 swap 区域放数据,会因为 swap 导致 IO 跑死,整个系统都卡住,无法处理。这时候我们就希望不要 swap,即使出现 oom-killer 也造成不了太大影响,但是不能允许服务器因为 IO 卡死像多米诺骨牌一样全部死机。对于 CPU 密集型的程序尤其如此

  14. 可以用如下命令修改:echo 'vm.swappiness=10' >> /etc/sysctl.conf。

  15. ulimit

  16. 这个是用来控制 linux 上的最大文件链接数的,默认值可能是 1024,一般肯定是不够的,因为你在大量频繁的读写磁盘文件的时候,或者是进行网络通信的时候,都会跟这个参数有关系

  17. 对于一个中间件系统而言肯定是不能使用默认值的,如果你采用默认值,很可能在线上会出现如下错误:error: too many open files。

  18. 因此通常建议用如下命令修改这个值:echo 'ulimit -n 1000000' >> /etc/profile。

  19. OS 参数总结:其实综合思考这几个参数,会发现到最后要调整的东西,无非都是跟磁盘文件 IO、网络通信、内存管理、线程数量有关系的,因为我们的中间件系统在运行的时候无非就是跟这些打交道。

  20. 中间件系统肯定要开启大量的线程(跟 vm.max_map_count 有关)

  21. 而且要进行大量的网络通信和磁盘 IO(跟 ulimit 有关)

  22. 然后大量的使用内存(跟 vm.swappiness 和 vm.overcommit_memory 有关)

  23. JVM 参数

  24. -Xms8g -Xmx8g -Xmn4g:这个就是很关键的一块参数了,也是重点需要调整的,就是默认的堆大小是 8g 内存,新生代是 4g 内存,但是部署中间件服务器应该是高配物理机如 48g 内存的,所以这里完全可以给他们翻几倍,比如给堆内存 20g,其中新生代给 10g,甚至可以更多一些,当然要留一些内存给操作系统来用

  25. -XX:+UseG1GC -XX:G1HeapRegionSize=16m:这几个参数也是至关重要的,这是选用了 G1 垃圾回收器来做分代回收,对新生代和老年代都是用 G1 来回收。这里把 G1 的 region 大小设置为了 16m,这个因为机器内存比较多,所以 region 大小可以调大一些给到 16m,不然用 2m 的 region,会导致 region 数量过多的。其次因为内存太大如果使用 CMS 会导致新生代回收的时候 STW 过于严重,G1 可以有效解决这一点,其实 G1 本身就是为了大内存而生

  26. -XX:G1ReservePercent=25:这个参数是说,在 G1 管理的老年代里预留 25%的空闲内存,保证新生代对象晋升到老年代的时候有足够空间,避免老年代内存都满了,新生代有对象要进入老年代没有充足内存了,默认值是 10%,略微偏少,这里 RocketMQ 给调大了一些

  27. -XX:InitiatingHeapOccupancyPercent=30:这个参数是说,当堆内存的使用率达到 30%之后就会自动启动 G1 的并发垃圾回收,开始尝试回收一些垃圾对象,默认值是 45%,这里调低了一些,也就是提高了 GC 的频率,但是避免了垃圾对象过多,一次垃圾回收耗时过长的问题

  28. -XX:SoftRefLRUPolicyMSPerMB=1000:建议这个参数不要设置为 0,避免频繁回收一些软引用的 Class 对象,这里可以调整为比如 1000

  29. -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m:这一堆参数都是控制 GC 日志打印输出的,确定了 gc 日志文件的地址,要打印哪些详细信息,然后控制每个 gc 日志文件的大小是 30m,最多保留 5 个 gc 日志文件。

  30. -XX:-OmitStackTraceInFastThrow:这个参数是说,有时候 JVM 会抛弃一些异常堆栈信息,因此这个参数设置之后,就是禁用这个特性,要把完整的异常堆栈信息打印出来

  31. -XX:+AlwaysPreTouch:这个参数的意思是我们刚开始指定 JVM 用多少内存,不会真正分配给他,会在实际需要使用的时候再分配给他,所以使用这个参数之后,就是强制让 JVM 启动的时候直接分配我们指定的内存,不要等到使用内存的时候再分配

  32. -XX:MaxDirectMemorySize=15g:这是说 RocketMQ 里大量用了 NIO 中的 direct buffer,这里限定了 direct buffer 最多申请多少,如果你机器内存比较大,可以适当调大这个值

  33. -XX:-UseLargePages -XX:-UseBiasedLocking:这两个参数的意思是禁用大内存页和偏向锁,这两个参数对应的概念每个要说清楚都得一篇文章,所以这里大家直接知道人家禁用了两个特性即可。

  34. JVM 参数总结:RocketMQ 默认的 JVM 参数是采用了 G1 垃圾回收器,默认堆内存大小是 8G。这个其实完全可以根据我们的机器内存来调整,可以增大一些也是没有问题的,然后就是一些 G1 的垃圾回收的行为参数做了调整,这个一般我们不用去动,然后就是对 GC 日志打印做了设置,这个一般也不用动。其余的就是禁用一些特性,开启一些特性,这些都直接维持 RocketMQ 的默认值即可。

  35. 自身参数调整

  36. sendMessageThreadPoolNums=16 这个参数的意思就是 RocketMQ 内部用来发送消息的线程池的线程数量,默认是 16。其实这个参数可以根据你的机器的 CPU 核数进行适当增加,比如机器 CPU 是 24 核的,可以增加这个线程数量到 24 或者 30,都是可以的。

  37. 参数调优整体总结

  38. 中间件系统在压测或者上生产之前,需要对三大块参数进行调整:OS 内核参数、JVM 参数以及中间件核心参数

  39. OS 内核参数主要调整的地方都是跟磁盘 IO、网络通信、内存管理以及线程管理有关的,需要适当调节大小

  40. JVM 参数需要我们去中间件系统的启动脚本中寻找他的默认 JVM 参数,然后根据机器的情况,对 JVM 的堆内存大小,新生代大小,Direct Buffer 大小,等等,做出一些调整,发挥机器的资源

  41. 中间件核心参数主要也是关注其中跟网络通信、磁盘 IO、线程数量、内存 管理相关的,根据机器资源,适当可以增加网络通信线程,控制同步刷磁盘或者异步刷磁盘,线程数量有多少,内存中一些队列的大小

压测指标

  1. 压测目的:实际上我们平时做压测,主要关注的还是要压测出来一个最合适的最高负载。什么叫最合适的最高负载呢?意思就是在 RocketMQ 的 TPS 和机器的资源使用率和负载之间取得一个平衡。比如 RocketMQ 集群在机器资源使用率极高的极端情况下可以扛到 10 万 TPS,但是当他仅仅抗下 8 万 TPS 的时候,你会发现 cpu 负载、内存使用率、IO 负载和网卡流量,都负载较高,但是可以接受,机器比较安全,不至于宕机。那么这个 8 万 TPS 实际上就是最合适的一个最高负载,也就是说,哪怕生产环境中极端情况下,RocketMQ 的 TPS 飙升到 8 万 TPS,你知道机器资源也是大致可以抗下来的,不至于出现机器宕机的情况。所以我们做压测,其实最主要的是综合 TPS 以及机器负载,尽量找到一个最高的 TPS 同时机器的各项负载在可承受范围之内,这才是压测的目的。

  2. 具体指标

  3. cpu 负载情况

  4. 检查 Broker 机器上的 CPU 负载,可以通过 top、uptime 等命令来查看。比如执行 top 命令就可以看到 cpu load 和 cpu 使用率,这就代表了 cpu 的负载情况。

  5. 在执行了 top 命令之后,往往可以看到如下一行信息:load average:12.03,12.05,12.08。类似上面那行信息代表的是 cpu 在 1 分钟、5 分钟和 15 分钟内的 cpu 负载情况。比如我们一台机器是 24 核的,那么上面的 12 意思就是有 12 个核在使用中。换言之就是还有 12 个核其实还没使用,cpu 还是有很大余力的。这个 cpu 负载其实是比较好的,因为并没有让 cpu 负载达到极限。

  6. 内存使用率:使用 free 命令就可以查看到内存的使用率

  7. JVM GC:使用 jstat 命令就可以查看 RocketMQ 的 JVM 的 GC 频率,基本上新生代每隔几十秒会垃圾回收一次,每次回收过后存活的对象很少,几乎不进入老年代

  8. 磁盘 IO 负载

  9. 首先可以用 top 命令查看一下 IO 等待占用 CPU 时间的百分比,你执行 top 命令之后,会看到一行类似下面的东西:Cpu(s):  0.3% us,  0.3% sy,  0.0% ni, 76.7% id, 13.2% wa,  0.0% hi,  0.0% si。在这里的 13.2% wa,说的就是磁盘 IO 等待在 CPU 执行时间中的百分比。

  10. 如果这个比例太高,说明 CPU 执行的时候大部分时间都在等待执行 IO,也就说明 IO 负载很高,导致大量的 IO 等待。

  11. 这个当时我们压测的时候,是在 40%左右,说明 IO 等待时间占用 CPU 执行时间的比例在 40%左右,这是相对高一些,但还是可以接受的,只不过如果继续让这个比例提高上去,就很不靠谱了,因为说明磁盘 IO 负载可能过高了。

  12. 网卡流量

  13. 使用如下命令可以查看服务器的网卡流量:sar -n DEV 1 2,通过这个命令就可以看到每秒钟网卡读写数据量了。当时我们的服务器使用的是千兆网卡,千兆网卡的理论上限是每秒传输 128M 数据,但是一般实际最大值是每秒传输 100M 数据。

  14. 因此当时我们发现的一个问题就是,在 RocketMQ 处理到每秒 7 万消息的时候,每条消息 500 字节左右的大小的情况下,每秒网卡传输数据量已经达到 100M 了,就是已经达到了网卡的一个极限值了。因为一个 Master Broker 服务器,每秒不光是通过网络接收你写入的数据,还要把数据同步给两个 Slave Broker,还有别的一些网络通信开销。

  15. 因此实际压测发现,每条消息 500 字节,每秒 7 万消息的时候,服务器的网卡就几乎打满了,无法承载更多的消息了。

  16. 压测总结

  17. 最后针对本次压测做一点小的总结,实际上经过压测,最终发现我们的服务器的性能瓶颈在网卡上,因为网卡每秒能传输的数据是有限的

  18. 因此当我们使用平均大小为 500 字节的消息时,最多就是做到 RocketMQ 单台服务器每秒 7 万的 TPS,而且这个时候 cpu 负载、内存负载、jvm gc 负载、磁盘 io 负载,基本都还在正常范围内。

  19. 只不过这个时候网卡流量基本已经打满了,无法再提升 TPS 了。因此在这样的一个机器配置下,RocketMQ 一个比较靠谱的 TPS 就是 7 万左右。

  20. 到底应该如何压测:应该在 TPS 和机器的 cpu 负载、内存使用率、jvm gc 频率、磁盘 io 负载、网络流量负载之间取得一个平衡,尽量让 TPS 尽可能的提高,同时让机器的各项资源负载不要太高。

  21. 实际压测过程:采用几台机器开启大量线程并发读写消息,然后观察 TPS、cpu load(使用 top 命令)、内存使用率(使用 free 命令)、jvm gc 频率(使用 jstat 命令)、磁盘 io 负载(使用 top 命令)、网卡流量负载(使用 sar 命令),不断增加机器和线程,让 TPS 不断提升上去,同时观察各项资源负载是否过高。

  22. 生产集群规划:根据公司的后台整体 QPS 来定,稍微多冗余部署一些机器即可,实际部署生产环境的集群时,使用高配置物理机,同时合理调整 os 内核参数、jvm 参数、中间件核心参数

核心机制

  1. DLedger 高可用

  2. 基于 DLedger 技术来实现 Broker 高可用架构,实际上就是每个 broker 上有一个 DLedger 组件,用 DLedger 先替换掉原来 Broker 自己管理的 CommitLog,由 DLedger 来管理 CommitLog,然后 Broker 基于 DLedger 管理的 CommitLog 去构建出来机器上的各个 ConsumeQueue 磁盘文件。

  3. DLedger 基于 raft 协议进行 leader broker 选举过程

  4. 三台 Broker 机器启动的时候,他们都会投票自己作为 Leader,然后把这个投票发送给其他 Broker。

  5. 我们举一个例子,Broker01 是投票给自己的,Broker02 是投票给自己的,Broker03 是投票给自己的,他们都把自己的投票发送给了别人。

  6. 此时在第一轮选举中,Broker01 会收到别人的投票,他发现自己是投票给自己,但是 Broker02 投票给 Broker02 自己,Broker03 投票给 Broker03 自己,似乎每个人都很自私,都在投票给自己,所以第一轮选举是失败的。因为大家都投票给自己,怎么选举出来一个 Leader 呢?

  7. 接着每个人会进入一个随机时间的休眠,比如说 Broker01 休眠 3 秒,Broker02 休眠 5 秒,Broker03 休眠 4 秒。

  8. 此时 Broker01 必然是先苏醒过来的,他苏醒过来之后,直接会继续尝试投票给自己,并且发送自己的选票给别人。接着 Broker03 休眠 4 秒后苏醒过来,他发现 Broker01 已经发送来了一个选票是投给 Broker01 自己的,此时他自己因为没投票,所以会尊重别人的选择,就直接把票投给 Broker01 了,同时把自己的投票发送给别人。接着 Broker02 苏醒了,他收到了 Broker01 投票给 Broker01 自己,收到了 Broker03 也投票给了 Broker01,那么他此时自己是没投票的,直接就会尊重别人的选择,直接就投票给 Broker01,并且把自己的投票发送给别人。

  9. 此时所有人都会收到三张投票,都是投给 Broker01 的,那么 Broker01 就会当选为 Leader。

  10. 其实只要有(3 台机器 / 2) + 1 个人投票给某个人,就会选举他当 Leader,这个(机器数量 / 2) + 1 就是大多数的意思。

  11. 总结:他确保有人可以成为 Leader 的核心机制就是一轮选举不出来 Leader 的话,就让大家随机休眠一下,先苏醒过来的人会投票给自己,其他人苏醒过后发现自己收到选票了,就会直接投票给那个人。

  12. DLedger 是如何基于 Raft 协议进行多副本同步的?

  13. DLedger 在进行同步的时候是采用 Raft 协议进行多副本同步的,我们接下来聊一下 Raft 协议中的多副本同步机制。

  14. 数据同步会分为两个阶段,一个是 uncommitted 阶段,一个是 commited 阶段

  15. 首先 Leader Broker 上的 DLedger 收到一条数据之后,会标记为 uncommitted 状态,然后他会通过自己的 DLedgerServer 组件把这个 uncommitted 数据发送给 Follower Broker 的 DLedgerServer。

  16. 接着 Follower Broker 的 DLedgerServer 收到 uncommitted 消息之后,必须返回一个 ack 给 Leader Broker 的 DLedgerServer,然后如果 Leader Broker 收到超过半数的 Follower Broker 返回 ack 之后,就会将消息标记为 committed 状态。

  17. 然后 Leader Broker 上的 DLedgerServer 就会发送 commited 消息给 Follower Broker 机器的 DLedgerServer,让他们也把消息标记为 comitted 状态。

  18. 消费模式

  19. 集群模式(默认)不同的系统应该设置不同的消费组,如果不同的消费组订阅了同一个 Topic,每个消费组都只有一台机器会收到这条消息

  20. 广播模式:如果不同的消费组订阅了同一个 Topic,每个消费组都每台机器会收到这条消息

  21. Push 模式 vs Pull 模式

  22. 实际上,这两个消费模式本质是一样的,都是消费者机器主动发送请求到 Broker 机器去拉取一批消息下来。

  23. Push 消费模式本质底层也是基于这种消费者主动拉取的模式来实现的,只不过他的名字叫做 Push 而已,意思是 Broker 会尽可能实时的把新消息交给消费者机器来进行处理,他的消息时效性会更好。

  24. 一般我们使用 RocketMQ 的时候,消费模式通常都是基于他的 Push 模式来做的,因为 Pull 模式的代码写起来更加的复杂和繁琐,而且 Push 模式底层本身就是基于消息拉取的方式来做的,只不过时效性更好而已。

  25. Push 模式的实现思路:当消费者发送请求到 Broker 去拉取消息的时候,如果有新的消息可以消费那么就会立马返回一批消息到消费机器去处理,处理完之后会接着立刻发送请求到 Broker 机器去拉取下一批消息。

  26. 所以消费机器在 Push 模式下会处理完一批消息,立马发起请求拉取下一批消息,消息处理的时效性非常好,看起来就跟 Broker 一直不停的推送消息到消费机器一样。

  27. 另外 Push 模式下有一个请求挂起和长轮询的机制:当请求发送到 Broker,结果他发现没有新的消息给你处理的时候,就会让请求线程挂起,默认是挂起 15 秒,然后这个期间他会有后台线程每隔一会儿就去检查一下是否有的新的消息给你,另外如果在这个挂起过程中,如果有新的消息到达了会主动唤醒挂起的线程,然后把消息返回给你。

  28. Broker 是如何将消息读取出来返回给消费机器的?

  29. 其实这里要涉及到两个概念,分别是 ConsumeQueue 和 CommitLog。

  30. 假设一个消费者机器发送了拉取请求到 Broker 了,他说我这次要拉取 MessageQueue0 中的消息,然后我之前都没拉取过消息,所以就从这个 MessageQueue0 中的第一条消息开始拉取好了。

  31. 于是,Broker 就会找到 MessageQueue0 对应的 ConsumeQueue0,从里面找到第一条消息的 offset,接着 Broker 就需要去 CommitLog 中根据这个 offset 地址去读取出来这条消息的数据,然后把这条消息的数据返回给消费者机器

  32. 所以其实消费消息的时候,本质就是根据你要消费的 MessageQueue 以及开始消费的位置,去找到对应的 ConsumeQueue 读取里面对应位置的消息在 CommitLog 中的物理 offset 偏移量,然后到 CommitLog 中根据 offset 读取消息数据,返回给消费者机器。

  33. 消费者机器如何处理消息、进行 ACK 以及提交消费进度?

  34. 当我们处理完这批消息之后,消费者机器就会回复 ack 以及我们目前的一个消费进度到 Broker 上去,然后 Broker 就会存储我们的消费进度。比如我们现在对 ConsumeQueue0 的消费进度假设就是在 offset=1 的位置,那么他会记录下来一个 ConsumeOffset 的东西去标记我们的消费进度

  35. 那么下次这个消费组只要再次拉取这个 ConsumeQueue 的消息,就可以从 Broker 记录的消费位置开始继续拉取,不用重头开始拉取了

  36. 如果消费组中出现机器宕机或者扩容加机器,会怎么处理?

  37. 这个时候其实会进入一个 rabalance 的环节,也就是说重新给各个消费机器分配他们要处理的 MessageQueue。

  38. 比如现在机器 01 负责 MessageQueue0 和 Message1,机器 02 负责 MessageQueue2 和 MessageQueue3,现在机器 02 宕机了,那么机器 01 就会接管机器 02 之前负责的 MessageQueue2 和 MessageQueue3。

  39. 或者如果此时消费组加入了一台机器 03,此时就可以把机器 02 之前负责的 MessageQueue3 转移给机器 03,然后机器 01 就仅仅负责一个 MessageQueue2 的消费了,这就是负载重平衡的概念。

  40. ConsumeQueue 文件也是基于 os cache 的

  41. ConsumeQueue 会被大量的消费者发送的请求给高并发的读取,所以 ConsumeQueue 文件的读操作是非常频繁的,而且同时会极大的影响到消费者进行消息拉取的性能和消费吞吐量。所以实际上 broker 对 ConsumeQueue 文件同样也是基于 os cache 来进行优化的。也就是说,对于 Broker 机器的磁盘上的大量的 ConsumeQueue 文件,在写入的时候也都是优先进入 os cache 中的

  42. os 自己有一个优化机制,就是读取一个磁盘文件的时候,他会自动把磁盘文件的一些数据缓存到 os cache 中。而且大家之前知道 ConsumeQueue 文件主要是存放消息的 offset,所以每个文件很小,30 万条消息的 offset 就只有 5.72MB 而已。所以实际上 ConsumeQueue 文件们是不占用多少磁盘空间的,他们整体数据量很小,几乎可以完全被 os 缓存在内存 cache 里。

  43. CommitLog 是基于 os cache+磁盘一起读取的

  44. 因为 CommitLog 是用来存放消息的完整数据的,所以内容量是很大的,毕竟他一个文件就要 1GB,所以整体完全有可能多达几个 TB。所以这么多的数据,可能都放在 os cache 里吗?

  45. 明显是不可能的,因为 os cache 用的也是机器的内存,一般多也就几十个 GB 而已,何况 Broker 自身的 JVM 也要用一些内存,留个 os cache 的内存只是一部分罢了,比如 10GB~20GB 的内存,所以 os cache 对于 CommitLog 而言,是无法把他全部数据都放在里面给你读取的!

  46. 也就是说,os cache 对于 CommitLog 而言,主要是提升文件写入性能,当你不停的写入的时候,很多最新写入的数据都会先停留在 os cache 里,比如这可能有 10GB~20GB 的数据。

  47. 之后 os 会自动把 cache 里的比较旧的一些数据刷入磁盘里,腾出来空间给更新写入的数据放在 os cache 里,所以大部分数据可能多达几个 TB 都是在磁盘上的

  48. 所以最终结论:当你拉取消息的时候,可以轻松从 os cache 里读取少量的 ConsumeQueue 文件里的 offset,这个性能是极高的,但是当你去 CommitLog 文件里读取完整消息数据的时候,会有两种可能。

  49. 第一种可能,如果你读取的是那种刚刚写入 CommitLog 的数据,那么大概率他们还停留在 os cache 中,此时你可以顺利的直接从 os cache 里读取 CommitLog 中的数据,这个就是内存读取,性能是很高的。

  50. 第二种可能,你也许读取的是比较早之前写入 CommitLog 的数据,那些数据早就被刷入磁盘了,已经不在 os cache 里了,那么此时你就只能从磁盘上的文件里读取了,这个性能是比较差一些的。

  51. 什么时候会从 os cache 读?什么时候会从磁盘读?

  52. 如果消费者机器一直快速的在拉取和消费处理,紧紧的跟上了生产者写入 broker 的消息速率,那么每次拉取几乎都是在拉取最近人家刚写入 CommitLog 的数据,那几乎都在 os cache 里。

  53. 但是如果 broker 的负载很高,导致你拉取消息的速度很慢,或者是你自己的消费者机器拉取到一批消息之后处理的时候性能很低,处理的速度很慢,这都会导致你跟不上生产者写入的速率。

  54. 比如人家都写入 10 万条数据了,结果消费者才拉取了 2 万条数据,此时有 5 万条最新的数据是在 os cache 里,有 3 万条你还没拉取的数据是在磁盘里,那么当后续你再拉取的时候,必然很大概率是从磁盘里读取早就刷入磁盘的 3 万条数据。

  55. Master Broker 什么时候会让你从 Slave Broker 拉取数据?

  56. 假设此时你的 broker 里已经写入了 10 万条数据,但是你仅仅拉取了 2 万条数据,下次你拉取的时候,是从第 2 万零 1 条数据开始继续往后拉取的,是不是?也就是说,此时你有 8 万条数据是没有拉取的!

  57. 然后 broker 自己是知道机器上当前的整体物理内存有多大的,而且他也知道自己可用的最大空间占里面的比例,他是知道自己的消息最多可以在内存里放多少的!因为他知道,他最多只能利用 10GB 的 os cache 去放消息,这么多内存最多也就放 5 万左右的消息。

  58. 然后这个时候你过来拉取消息,他发现你还有 8 万条消息没有拉取,这个 8 万条消息他发现是大于 10GB 内存最多存放的 5 万条消息的,那么此时就说明,肯定有 3 万条消息目前是在磁盘上的,不在 os cache 内存里!

  59. 所以他经过上述判断,会发现此时你很大概率会从磁盘里加载 3 万条消息出来!他会认为,出现这种情况,很可能是因为自己作为 master broker 负载太高了,导致没法及时的把消息给你,所以你落后的进度比较多。这个时候,他就会告诉你,我这次给你从磁盘里读取 3 万条消息,但是下次你还是从 slave broker 去拉取吧!

  60. 以上就是这个关键问题的解答,本质是对比你当前没有拉取消息的数量和大小,以及最多可以存放在 os cache 内存里的消息的大小,如果你没拉取的消息超过了最大能使用的内存的量,那么说明你后续会频繁从磁盘加载数据,此时就让你从 slave broker 去加载数据了!

  61. RocketMQ 是如何基于 mmap 技术+page cache 技术优化的?

  62. 普通 IO:必须先把数据写入到用户进程私有空间里去,然后将数据从用户空间拷贝到内核 IO 缓冲区,最后再写入磁盘文件,会有两次拷贝过程

  63. mmap:将磁盘文件地址和用户进程虚拟内存地址进行映射,之后的数据读写也就可以从虚拟内存直接到磁盘文件,无论读写都只有一次拷贝。

  64. RocketMQ 底层对 CommitLog、ConsumeQueue 之类的磁盘文件的读写操作,基本上都会采用 mmap 技术来实现。如果具体到代码层面,就是基于 JDK NIO 包下的 MappedByteBuffer 的 map()函数,来先将一个磁盘文件(比如一个 CommitLog 文件,或者是一个 ConsumeQueue 文件)映射到内存里来

  65. 内存映射:因为刚开始你建立映射的时候,并没有任何的数据拷贝操作,其实磁盘文件还是停留在那里,只不过他把物理上的磁盘文件的一些地址和用户进程私有空间的一些虚拟内存地址进行了一个映射

  66. 这个地址映射的过程,就是 JDK NIO 包下的 MappedByteBuffer.map()函数干的事情,底层就是基于 mmap 技术实现的

  67. 另外 mmap 技术在进行文件映射的时候,一般有大小限制,在 1.5GB~2GB 之间,所以 RocketMQ 才让 CommitLog 单个文件在 1GB,ConsumeQueue 文件在 5.72MB,不会太大。这样限制了 RocketMQ 底层文件的大小,就可以在进行文件读写的时候,很方便的进行内存映射了。

  68. 基于 mmap 技术+pagecache 技术实现高性能的文件读写

  69. 接下来就可以对这个已经映射到内存里的磁盘文件进行读写操作了,比如要写入消息到 CommitLog 文件,你先把一个 CommitLog 文件通过 MappedByteBuffer 的 map()函数映射其地址到你的虚拟内存地址。

  70. 接着就可以对这个 MappedByteBuffer 执行写入操作了,写入的时候他会直接进入 PageCache 中,然后过一段时间之后,由 os 的线程异步刷入磁盘中

  71. 预映射机制 + 文件预热机制

  72. Broker 针对上述的磁盘文件高性能读写机制做的一些优化:

  73. 内存预映射机制:Broker 会针对磁盘上的各种 CommitLog、ConsumeQueue 文件预先分配好 MappedFile,也就是提前对一些可能接下来要读写的磁盘文件,提前使用 MappedByteBuffer 执行 map()函数完成映射,这样后续读写文件的时候,就可以直接执行了。

  74. 文件预热:在提前对一些文件完成映射之后,因为映射不会直接将数据加载到内存里来,那么后续在读取尤其是 CommitLog、ConsumeQueue 的时候,其实有可能会频繁的从磁盘里加载数据到内存中去。所以其实在执行完 map()函数之后,会进行 madvise 系统调用,而且 PageCache 技术在加载数据的时候还会将你加载的数据块的临近的其他数据块也一起加载到 PageCache 里去,就是提前尽可能多的把磁盘文件加载到内存里去。

  75. 通过上述优化,才真正能实现一个效果,就是写磁盘文件的时候都是进入 PageCache 的,保证写入高性能;同时尽可能多的通过 map + madvise 的映射后预热机制,把磁盘文件里的数据尽可能多的加载到 PageCache 里来,后续对 CosumeQueue、CommitLog 进行读取的时候,才能尽可能从内存里读取数据。

业务场景

  1. 消息丢失原因

  2. 生产者因素:网络发生了抖动,导致这次网络通信失败,或者线程资源耗尽,队列已满等原因,消息没有投递到 MQ Server 端。

  3. MQ 因素:消息写入 MQ 之后,其实 MQ 可能仅仅是把这个消息给写入到 page cache 里,也就是操作系统自己管理的一个缓冲区,这本质也是内存,然后这个时候,假如要是出现了 Broker 机器的崩溃或者重启也会导致消息丢失

  4. 消费者因素:消费者拿到消息后,处理异常,并且采用的是自动 ack

  5. 采用事务消息防止消息丢失

  6. 事务消息流程:首先生产者向 mq server 端发送一个 half 消息,生产者正常执行业务流程完毕则 commit,异常则执行 rollback。消费者只有在生产者执行 commit 之后才能看到这条消息

  7. 先发送 half 消息,再执行业务逻辑的原因是,如果业务流程执行完毕,却发现 mq server 端不通或者宕机,此时无法发送 half 消息出去,那么就会产生数据不一致的问题。所以先发送消息,目的就是为了确认 mq server 端响应正常,如果 half 消息写入失败,说明之后的流程就无法执行下去,此时事务应该直接停止

  8. half 消息发送成功,但是生产者本地方法执行异常,此时需要 rollback,并且可以做些高可用的降级措施,如数据库写入异常时,直接写入本地文件,或者手动触发重试等等

  9. half 消息发送成功,但是生产者没有 commit,或者 commit 消息丢失,此时 mq server 端有补偿机制,会等待一定时间,回调生产者的接口,查询这次业务是否成功,比如生成订单是否成功,确认 half 消息最终状态是 commit 还是 rollback

  10. 这套流程的意义

  11. 如果 MQ 有问题或者网络有问题,half 消息根本都发不出去,此时 half 消息肯定是失败的,那么订单系统就不会执行后续流程了!

  12. 如果要是 half 消息发送出去了,但是 half 消息的响应都没收到,然后执行了退款流程,那 MQ 会有补偿机制来回调找你询问要 commit 还是 rollback,此时你选择 rollback 删除消息就可以了,不会执行后续流程!

  13. 如果要是订单系统收到 half 消息了,结果订单系统自己更新数据库失败了,那么他也会进行回滚,不会执行后续流程了!

  14. 如果要是订单系统收到 half 消息了,然后还更新自己数据库成功了,订单状态是“已完成”了,此时就必然会发送 commit 请求给 MQ,一旦消息 commit 了,那么必然保证红包系统可以收到这个消息!

  15. 而且即使你 commit 请求发送失败了,MQ 也会有补偿机制,回调你接口让你判断是否重新发送 commit 请求

  16. 总之,生产者系统只要成功了,那么必然要保证 MQ 里的消息是 commit 了可以让消费者系统看到他!

  17. half 消息是如何对消费者不可见的?

  18. 其实写入一个 Topic,最终是定位到这个 Topic 的某个 MessageQueue,然后定位到一台 Broker 机器上去,然后写入的是 Broker 上的 CommitLog 文件,同时将消费索引写入 MessageQueue 对应的 ConsumeQueue 文件

  19. 如果写入一条 half 消息到 OrderPaySuccessTopic 里去,会定位到这个 Topic 的一个 MessageQueue,然后定位到 RocketMQ 的一台机器上去,接着按理说,消息会写入 CommitLog。同时消息的 offset 会写入 MessageQueue 对应的 ConsumeQueue,这个 ConsumeQueue 是属于 OrderPaySuccuessTopic 的,然后消费者按理说会从这个 ConsumeQueue 里获取到写入的这个 half 消息。

  20. 但是实际上消费者却没法看到这条消息,其本质原因就是 RocketMQ 一旦发现你发送的是一个 half 消息,他不会把这个 half 消息的 offset 写入 OrderPaySuccessTopic 的 ConsumeQueue 里去。他会把这条 half 消息写入到自己内部的“RMQ_SYS_TRANS_HALF_TOPIC”这个 Topic 对应的一个 ConsumeQueue 里去

  21. 如果执行 rollback 操作的话,如何标记消息回滚?

  22. 假设我们的订单系统执行了 rollback 请求,那么此时就需要对消息进行回滚。因为 RocketMQ 都是顺序把消息写入磁盘文件的,所以在这里如果你执行 rollback,他的本质就是用一个 OP 操作来标记 half 消息的状态

  23. RocketMQ 内部有一个 OP_TOPIC,此时可以写一条 rollback OP 记录到这个 Topic 里,标记某个 half 消息是 rollback 了。另外,假设生产者一直没有执行 commit/rollback,RocketMQ 会有定时线程回调订单系统的接口去判断 half 消息的状态,但是他最多就是回调 15 次,如果 15 次之后你都没法告知他 half 消息的状态,就自动把消息标记为 rollback。

  24. 如果执行 commit 操作,如何让消息对红包系统可见?

  25. 生产者执行 commit 操作之后,RocketMQ 就会在 OP_TOPIC 里写入一条记录,标记 half 消息已经是 commit 状态了。

  26. 接着需要把放在 RMQ_SYS_TRANS_HALF_TOPIC 中的 half 消息给写入到 OrderPaySuccessTopic 的 ConsumeQueue 里去,然后我们的消费者系统可以就可以看到这条消息进行消费了。

  27. MQ 重复投递原因

  28. 生产者接口超时,上游接口进行重试导致重复投递

  29. 消费者处理完消息,还没来得及手动 ack 就被重启或者宕机,之后 mq server 端重新投递消息

发布于: 2021 年 01 月 16 日阅读数: 29
用户头像

石刻掌纹

关注

还未添加个人签名 2018.11.22 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ解析