写点什么

分布式基础概念 - 消息中间件 [RocketMQ]

作者:派大星
  • 2023-12-19
    辽宁
  • 本文字数:2137 字

    阅读完需:约 7 分钟

RocketMQ 架构设计

如图所示:


RocketMQ 事务消息原理

依赖于 TransactionListener 接口


  • executeLocalTransaction 方法会在发送消息后调用,用于执行本地事务,如果本地事务执行成功,rocketmq 再提交消息

  • checkLocalTransaction 用于对本地事务做检查,rocketmq 依赖此方法做补偿


通过两个内部的 topic 来实现对消息的两阶段支持,


prepare:将消息(消息上带有事务标识)投递到一个名为 RMS_SYS_TRANS_HALF_TOPIC 的 topic 中,而不是投递到真正的 topic 中。


commit/rollback:producer 再通过 TransactionListener 的 executeLocalTransaction 方法执行本地事务,当 producer 的 localTransaction 处理成功或者失败后,producer 会向 broker 发送 commit 或 rollback 命令,如果是 commit,则 broker 会将投递到 RMQ_SYS_TRANS_HALF_TOPIC 中的消息投递到真实的 topic 中,然后再投递一表示删除的消息到 RMQ_SYS_TRANS_OP_HALF_TOPIC 中,表示当前事务已完成;如果是 rollback,则没有投递到真实 topic 的过程,只需要投递表示删除的消息到 RMQ_SYS_TRANS_OP_HALF_TOPIC。最后,消费者和消费普通的消息一样消费事务消息


  • 第一阶段(prepare)失败:给应用返回发送消息失败

  • 事务失败:发送回滚命令给 broker,由 broker 执行消息的回滚

  • Commit 或 rollback 失败:由 broker 定时向 producer 发起事务检查,如果本地事务成功,则提交消息事务,否则回滚消息事务


事务状态的检查有两种情况:


  • commit/rollback:broker 会执行相应的 commit/rollback 操作

  • 如果是 TRANSACTION_NOT_TYPE,则一段时间后会再次检查,当检查的次数超过上限(默认 15 次)则丢弃消息

RocketMQ 顺序消息原理

默认是不能保证的,需要程序保证发送和消费的是同一个 queue,多线程消费也无法保证


发送顺序:发送端自己业务逻辑保证先后,发往一个固定的 queue,生产者可以在消息体上设置消息的顺序


发送者实现 MessageQueueSelector 接口,选择一个 queue 进行发送,也可使用 rocketmq 提供的默认实


  • 现 SelectMessageQueueByHash:按参数的 hashcode 与可选队列进行求余选择

  • SelectMessageQueueByRandom:随机选择


mq:queue 本身就是顺序追加写,只需保证一个队列统一时间只有一个 consumer 消费,通过加锁实现,consumer 上的顺序消费有一个定时任务、每隔一定时间向 broker 发送请求延长锁定


消费端


  • pull 模式:消费者需要自己维护需要拉取的 queue,一次拉取的消息都是顺序的,需要消费端自己保证顺序消费

  • push 模式:消费实例实现自 MQPushConsumer 接口,提供注册监听的方法消费消息,registerMessageListener、重载方法

  • MessageListenerConcurrently:并行消费

  • MessageListenerOrderly:串行消费,consumer 会把消息放入本地队列并加锁,定时任务保证锁的同步

简述 RocketMQ 持久化机制

  • commitLog:日志数据文件,被所有的 queue 共享,大小为 1G,写满之后重新生成,顺序写

  • consumeQueue:逻辑 queue,消息先到达 commitLog、然后异步转发到 consumeQueue,包含 queue 在 CommitLog 中的物理位置偏移量 Offset,消息实体内容的大小和 Message Tag 的 hash 值。大小约为 600W 个字节,写满之后重新生成,顺序写

  • indexFile:通过 key 或者时间区间来查找 CommitLog 中的消息,文件名以创建的时间戳命名,固定的单个 IndexFile 大小为 400M,可以保存 2000W 个索引


所有队列共用一个日志数据文件,避免了 kafka 的分区数过多、日志文件过多导致磁盘 IO 读写压力较大造成性能瓶颈,rocketmq 的 queue 只存储少量数据、更加轻量化,对于磁盘的访问是串行化避免磁盘竞争,缺点在于:写入是顺序写,但读是随机的,先读 ConsumeQueue,再读 CommitLog,会降低消息读的效率


消息发送到 broker 后,会被写入 commitLog,写之前加锁,保证顺序写入。然后转发到 consumeQueue 息消费时先从 consumeQueue 读取消息在 CommitLog 中的起始物理偏移量 Offset,消息大小、和消息 Tag 的 HashCode 值。在从 CommitLog 读取消息内容


  • 同步刷盘,消息持久化到磁盘才会给生产者返回 ack,可以保证消息可靠、但是会影响性能

  • 异步刷盘:消息写入 pageCache 就返回 ack 给生产者,刷盘采用异步线程,降低读写延迟提高性能和吞吐

RocketMQ 如何保证不丢消息

生产者


  • 同步阻塞的方式发送消息,加上失败重试机制,可能 broker 存储失败,可以通过查询确认

  • 异步发送需要重写回调方法,检查发送结果

  • ack 机制,可能存储 CommitLog,存储 ConsumerQueue 失败,此时对消费者不可见


broker:同步刷盘、集群模式下采用同步复制、会等待 slave 复制完成才会返回确认


消费者


  • offset 手动提交,消息消费保证幂等

定时任务实现原理

  • 优先队列:基于小顶堆实现,每次新增任务需要进行堆化,取任务时取堆顶元素、调整堆架构,时间复杂度是 O(logN)

  • 时间轮算法:是一个环形队列,按照时间的单位区分,每个时间单位里面是一个链表、用来存储定时任务,像时钟一样轮询环形队列,取出链表中的任务执行,如果超出了环形队列的时间粒度、可以使用多级时间轮,即使用不同维度的时间单位,就跟时钟或者水表一样,这一层的走了一圈,下一层的才走了一格,时间复杂度为 O(1)


往期相关精彩内容推荐:



历史文章导航:



如有问题,欢迎加微信交流:w714771310,备注- 技术交流  。或关注微信公众号【码上遇见你】。




发布于: 刚刚阅读数: 5
用户头像

派大星

关注

微信搜索【码上遇见你】,获取更多精彩内容 2021-12-13 加入

微信搜索【码上遇见你】,获取更多精彩内容

评论

发布
暂无评论
分布式基础概念-消息中间件[RocketMQ]_RocketMQ_派大星_InfoQ写作社区