写点什么

RabbitMQ 详解——RabbitMQ 服务端执行逻辑(三)

发布于: 20 小时前
RabbitMQ详解——RabbitMQ服务端执行逻辑(三)

消息发送和消费时序

RabbitMQ 启动后会针对每一个 vhost 启动两个进程:msg_store_persistent:负责将持久化消息写入文件与从文件中读取消息 msg_store_transient 负责非持久化消息写入文件与从文件中读取消息(当内存不足,启用磁盘存储)msg_store_gc 负责对文件做文件的合并及删除,整理。减少磁盘的占用




涉及表

  • flying_ets:用于消息 write、remove 的引用计数


  • cur_file_cache_ets:用于当前正在写的文件的消息缓存


  • msg_store_ets_index:消息在文件中的索引信息


  • file_summary_ets:文件的描述信息


消息写入



消息删除


消息读取


队列

首先需要明确一点,不管是持久化消息还是非持久化消息都可以被写入到磁盘。持久化消息在到达队列时就被写入到磁盘,并且如果内存允许,持久化到消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存不够用时会从内存中删除。非持久化的消息一般只保存在内存时,在内存不够用时会被换入到磁盘,以节省内存消息存储是在持久层(逻辑概念,并不真实存在)上完成的,包含两个部分:队列索引和消息存储


  • 队列索引(rabbit_queue_index):负责维护队列中落盘消息的的信息,包括消息的存储地点、是否已被交付给消费者、是否已被消费者 ack 等,每个队列对应一个 rabbit_queue_index。

  • 消息存储(rabbit_msg_store):以键值对的形式存储消息,一个节点只有一个,被所有队列共享,具体还分为 msg_store_persistent 和 msg_store_transient,前者负责持久化消息的持久化,重启后消息不会丢失,后者负责非持久化消息的持久化,重启后会丢失。

  • 消息内容(包括消息体、属性、header,下面我们提到的消息内容默认都是包含这三部分)存储时可以直接存储在 rabbit_queue_index 中,也可以被保存在 rabbit_msg_store 中,RabbitMQ 的策略是,较小的消息直接存储在 rabbit_queue_index,较大的消息存储在 rabbit_msg_store 中。大小通过 queue_index_embed_msgs_below 配置项决定,默认为 4096,单位为 B

队列结构

RabbitMQ 中的队列消息可以是以下四种状态:

  • alpha:消息内容和消息索引都存储在内存中

  • beta:消息内容保存在磁盘中,消息索引保存在内存中

  • gamma:消息内容保存在磁盘中,消息索引在磁盘和内存中都有

  • delta:消息内容和索引都在磁盘中其中 alpha 状态最耗内存,但很少消耗 CPU,delta 状态基本不消耗内存,但需要消耗更多的 CPU 和磁盘 IO 操作。


delta 状态读取消息需要两次 IO 操作,第一次是读取消息索引 rabbit_queue_index,第二次是读取消息内容 rabbit_msg_store;beta 和 gamma 状态都只需要一次 IO 操作去读取消息内容即可。


RabbitMQ 运行时会根据统计的消息传送速度定期计算一个当前内存中能够保存的最大消息数量 target_ram_count,如果 alpha 状态的消息数大于这个数量,就会引起消息状态的转换,多余的消息可能会转到其它 3 个状态中的一个。



上面就是每个队列的结构,Q1 和 Q4 只包含 alpha 状态的消息,Q2 和 Q3 包含 beta 和 gamma 状态的消息,delta 状态只包含了 delta 状态的消息。一般情况下,消息按照 Q1->Q2->Delta->Q3->Q4 这样的顺序进行流动,但并不是每一条消息都会经历所有状态,这取决于当前系统的负载情况(比如非持久化的消息在内存负载不高时,就不会经历 delta)。基于此结构,在内存负载很高的情况下,能够通过将一部分消息由磁盘保存来节省内存空间,而在内存负载降低时,又可以将这部分消息渐渐流回内存中,使得整个队列具有很好的弹性。


  • 消息获取



用户头像

还未添加个人签名 2018.05.03 加入

十年开发老兵,多年搜索及推荐领域技术和业务积累目前主要研究低代码平台设计及开发模式,关注三高架构设计方案及系统实现,微服务治理实践及DDD领域驱动设计方案

评论

发布
暂无评论
RabbitMQ详解——RabbitMQ服务端执行逻辑(三)