《我想进大厂》之 MQ 夺命连环 11 问
继之前的 mysql 夺命连环之后,我发现我这个标题被好多套用的,什么夺命 zookeeper,夺命多线程一大堆,这一次,开始面试题系列 MQ 专题,消息队列作为日常常见的使用中间件,面试也是必问的点之一,一起来看看 MQ 的面试题。
你们为什么使用 mq?具体的使用场景是什么?
mq 的作用很简单,削峰填谷。以电商交易下单的场景来说,正向交易的过程可能涉及到创建订单、扣减库存、扣减活动预算、扣减积分等等。每个接口的耗时如果是 100ms,那么理论上整个下单的链路就需要耗费 400ms,这个时间显然是太长了。
如果这些操作全部同步处理的话,首先调用链路太长影响接口性能,其次分布式事务的问题很难处理,这时候像扣减预算和积分这种对实时一致性要求没有那么高的请求,完全就可以通过 mq 异步的方式去处理了。同时,考虑到异步带来的不一致的问题,我们可以通过 job 去重试保证接口调用成功,而且一般公司都会有核对的平台,比如下单成功但是未扣减积分的这种问题可以通过核对作为兜底的处理方案。
使用 mq 之后我们的链路变简单了,同时异步发送消息我们的整个系统的抗压能力也上升了。
那你们使用什么 mq?基于什么做的选型?
我们主要调研了几个主流的 mq,kafka、rabbitmq、rocketmq、activemq,选型我们主要基于以下几个点去考虑:
由于我们系统的 qps 压力比较大,所以性能是首要考虑的要素。
开发语言,由于我们的开发语言是 java,主要是为了方便二次开发。
对于高并发的业务场景是必须的,所以需要支持分布式架构的设计。
功能全面,由于不同的业务场景,可能会用到顺序消息、事务消息等。
基于以上几个考虑,我们最终选择了 RocketMQ。
你上面提到异步发送,那消息可靠性怎么保证?
消息丢失可能发生在生产者发送消息、MQ 本身丢失消息、消费者丢失消息 3 个方面。
生产者丢失
生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断 MQ 没收到,消息就丢失了。
由于同步发送的一般不会出现这样使用方式,所以我们就不考虑同步发送的问题,我们基于异步发送的场景来说。
异步发送分为两个方式:异步有回调和异步无回调,无回调的方式,生产者发送完后不管结果可能就会造成消息丢失,而通过异步发送+回调通知+本地消息表的形式我们就可以做出一个解决方案。以下单的场景举例。
下单后先保存本地数据和 MQ 消息表,这时候消息的状态是发送中,如果本地事务失败,那么下单失败,事务回滚。
下单成功,直接返回客户端成功,异步发送 MQ 消息
MQ 回调通知消息发送结果,对应更新数据库 MQ 发送状态
JOB 轮询超过一定时间(时间根据业务配置)还未发送成功的消息去重试
在监控平台配置或者 JOB 程序处理超过一定次数一直发送不成功的消息,告警,人工介入。
一般而言,对于大部分场景来说异步回调的形式就可以了,只有那种需要完全保证不能丢失消息的场景我们做一套完整的解决方案。
MQ 丢失
如果生产者保证消息发送到 MQ,而 MQ 收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失。
比如 RocketMQ:
RocketMQ 分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使 MQ 挂了,恢复的时候也可以从磁盘中去恢复消息。
比如 Kafka 也可以通过配置做到:
虽然我们可以通过配置的方式来达到 MQ 本身高可用的目的,但是都对性能有损耗,怎样配置需要根据业务做出权衡。
消费者丢失
消费者丢失消息的场景:消费者刚收到消息,此时服务器宕机,MQ 认为消费者已经消费,不会重复发送消息,消息丢失。
RocketMQ 默认是需要消费者回复 ack 确认,而 kafka 需要手动开启配置关闭自动 offset。
消费方不返回 ack 确认,重发的机制根据 MQ 类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理了。(Kafka 没有这些)
你说到消费者消费失败的问题,那么如果一直消费失败导致消息积压怎么处理?
因为考虑到时消费者消费一直出错的问题,那么我们可以从以下几个角度来考虑:
消费者出错,肯定是程序或者其他问题导致的,如果容易修复,先把问题修复,让 consumer 恢复正常消费
如果时间来不及处理很麻烦,做转发处理,写一个临时的 consumer 消费方案,先把消息消费,然后再转发到一个新的 topic 和 MQ 资源,这个新的 topic 的机器资源单独申请,要能承载住当前积压的消息
处理完积压数据后,修复 consumer,去消费新的 MQ 和现有的 MQ 数据,新 MQ 消费完成后恢复原状
那如果消息积压达到磁盘上限,消息被删除了怎么办?
这。。。他妈都删除了我有啥办法啊。。。冷静,再想想。。有了。
最初,我们发送的消息记录是落库保存了的,而转发发送的数据也保存了,那么我们就可以通过这部分数据来找到丢失的那部分数据,再单独跑个脚本重发就可以了。如果转发的程序没有落库,那就和消费方的记录去做对比,只是过程会更艰难一点。
说了这么多,那你说说 RocketMQ 实现原理吧?
RocketMQ 由 NameServer 注册中心集群、Producer 生产者集群、Consumer 消费者集群和若干 Broker(RocketMQ 进程)组成,它的架构原理是这样的:
Broker 在启动的时候去向所有的 NameServer 注册,并保持长连接,每 30s 发送一次心跳
Producer 在发送消息的时候从 NameServer 获取 Broker 服务器地址,根据负载均衡算法选择一台服务器来发送消息
Conusmer 消费消息的时候同样从 NameServer 获取 Broker 地址,然后主动拉取消息来消费
为什么 RocketMQ 不使用 Zookeeper 作为注册中心呢?
我认为有以下几个点是不使用 zookeeper 的原因:
根据 CAP 理论,同时最多只能满足两个点,而 zookeeper 满足的是 CP,也就是说 zookeeper 并不能保证服务的可用性,zookeeper 在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
基于性能的考虑,NameServer 本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而 zookeeper 的写是不可扩展的,而 zookeeper 要解决这个问题只能通过划分领域,划分多个 zookeeper 集群来解决,首先操作起来太复杂,其次这样还是又违反了 CAP 中的 A 的设计,导致服务之间是不连通的。
持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
消息发送应该弱依赖注册中心,而 RocketMQ 的设计理念也正是基于此,生产者在第一次发送消息的时候从 NameServer 获取到 Broker 地址后缓存到本地,如果 NameServer 整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。
那 Broker 是怎么保存数据的呢?
RocketMQ 主要的存储文件包括 commitlog 文件、consumequeue 文件、indexfile 文件。
Broker 在收到消息之后,会把消息保存到 commitlog 的文件当中,而同时在分布式的存储当中,每个 broker 都会保存一部分 topic 的数据,同时,每个 topic 对应的 messagequeue 下都会生成 consumequeue 文件用于保存 commitlog 的物理位置偏移量 offset,indexfile 中会保存 key 和 offset 的对应关系。
CommitLog 文件保存于 ${Rocket_Home}/store/commitlog 目录中,从图中我们可以明显看出来文件名的偏移量,每个文件默认 1G,写满后自动生成一个新的文件。
由于同一个 topic 的消息并不是连续的存储在 commitlog 中,消费者如果直接从 commitlog 获取消息效率非常低,所以通过 consumequeue 保存 commitlog 中消息的偏移量的物理地址,这样消费者在消费的时候先从 consumequeue 中根据偏移量定位到具体的 commitlog 物理文件,然后根据一定的规则(offset 和文件大小取模)在 commitlog 中快速定位。
Master 和 Slave 之间是怎么同步数据的呢?
而消息在 master 和 slave 之间的同步是根据 raft 协议来进行的:
在 broker 收到消息后,会被标记为 uncommitted 状态
然后会把消息发送给所有的 slave
slave 在收到消息之后返回 ack 响应给 master
master 在收到超过半数的 ack 之后,把消息标记为 committed
发送 committed 消息给所有 slave,slave 也修改状态为 committed
你知道 RocketMQ 为什么速度快吗?
是因为使用了顺序存储、Page Cache 和异步刷盘。
我们在写入 commitlog 的时候是顺序写入的,这样比随机写入的性能就会提高很多
写入 commitlog 的时候并不是直接写入磁盘,而是先写入操作系统的 PageCache
最后由操作系统异步将缓存中的数据刷到磁盘
什么是事务、半事务消息?怎么实现的?
事务消息就是 MQ 提供的类似 XA 的分布式事务能力,通过事务消息可以达到分布式事务的最终一致性。
半事务消息就是 MQ 收到了生产者的消息,但是没有收到二次确认,不能投递的消息。
实现原理如下:
生产者先发送一条半事务消息到 MQ
MQ 收到消息后返回 ack 确认
生产者开始执行本地事务
如果事务执行成功发送 commit 到 MQ,失败发送 rollback
如果 MQ 长时间未收到生产者的二次确认 commit 或者 rollback,MQ 对生产者发起消息回查
生产者查询事务执行最终状态
根据查询事务状态再次提交二次确认
最终,如果 MQ 收到二次确认 commit,就可以把消息投递给消费者,反之如果是 rollback,消息会保存下来并且在 3 天后被删除。
版权声明: 本文为 InfoQ 作者【艾小仙】的原创文章。
原文链接:【http://xie.infoq.cn/article/e083734ba23a287728cd958a6】。文章转载请联系作者。
评论