写点什么

【消息队列最佳实践】消息恰好被消费一次

  • 2021 年 11 月 11 日
  • 本文字数:2721 字

    阅读完需:约 9 分钟

消息在 Kafka 是存在本地磁盘的,而为了减少消息存储时对磁盘的随机 I/O,一般会将消息先写到 os 的 Page Cache,然后再找合适时机刷盘。


比如 Kafka 可以配置异步刷盘时机:


  • 当达到某一时间间隔

  • 或累积一定消息数量


假如你经营一个图书馆,读者每还一本书你都要去把图书归位,不仅工作量大而且效率低下,但是如果你可以选择每隔 3 小时或者图书达到一定数量的时候再把图书归位,这样可以把同一类型的书一起归位,节省了查找图书位置的时间,可以提高效率。


不过如果发生掉电或异常重启,Page Cache 中还没有来得及刷盘的消息就会丢失了。那么怎么解决呢?


你可能会:


  • 把刷盘的间隔设置很短

  • 或设置累积一条消息


就刷盘,但频繁刷盘会对很影响性能,而且宕机或掉电几率也不高,不推荐。


如果你的系统对消息丢失容忍度很低,可考虑集群部署 Kafka,通过部署多个副本备份数据,保证消息尽量不丢失。


Kafka 集群中有一个 Leader 负责消息的写入和消费,可以有多个 Follower 负责数据的备份。Follower 中有一个特殊的集合叫做 ISR(in-sync replicas),当 Leader 故障时,新选举出来的 Leader 会从 ISR 中选择,默认 Leader 的数据会异步地复制给 Follower,这样在 Leader 发生掉电或者宕机时,Kafka 会从 Follower 中消费消息,减少消息丢失的可能。


由于默认消息是异步地从 Leader 复制到 Follower 的,所以一旦 Leader 宕机,那些还没有来得及复制到 Follower 的消息还是会丢失。


为解决这个问题,Kafka 为生产者提供“acks”,当这个选项被设置为“all”时,生产者发送的每一条消息除了发给 Leader 外还会发给所有的 ISR,并且必须得到 Leader 和所有 ISR 的确认后才被认为发送成功。这样,只有 Leader 和所有的 ISR 都挂了消息才会丢失。


当设置“acks=all”时,需要同步执行 1、3、4 三个步骤,对于消息生产的性能来说也是有比较大的影响


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


的,所以你在实际应用中需要仔细地权衡考量。我给你的建议是:


1.如果你需要确保消息一条都不能丢失,那么建议不要开启消息队列的同步刷盘,而是用集群的方式来解决,可以配置当所有 ISR Follower 都接收到消息才返回成功。


2.如果对消息的丢失有一定的容忍度,那么建议不部署集群,即使以集群方式部署,也建议配置只发送给一个 Follower 就可以返回成功了。


3.我们的业务系统一般对于消息的丢失有一定的容忍度,比如说以上面的红包系统为例,如果红包消息丢失了,我们只要后续给没有发送红包的用户补发红包就好了。


在消费的过程中存在消息丢失的可能




一个消费者消费消息的进度是记录在消息队列集群中的,而消费的过程分为三步:接收消息、处理消息、更新消费进度。


这里面接收消息和处理消息的过程都可能会发生异常或者失败,比如消息接收时网络发生抖动,导致消息并没有被正确的接收到;处理消息时可能发生一些业务的异常导致处理流程未执行完成,这时如果更新消费进度,这条失败的消息就永远不会被处理了,也可以认为是丢失了。


所以,在这里你需要注意的是,一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题,比方说某一条消息在处理之后消费者恰好宕机了,那么因为没有更新消费进度,所以当这个消费者重启之后还会重复地消费这条消息。


如何保证消息只被消费一次


从上面的分析中你能发现,为了避免消息丢失我们需要付出两方面的代价:一方面是性能的损耗,一方面可能造成消息重复消费。


性能的损耗我们还可以接受,因为一般业务系统只有在写请求时才会有发送消息队列的操作,而一般系统的写请求的量级并不高,但是消息一旦被重复消费就会造成业务逻辑处理的错误。那么我们要如何避免消息的重复呢?


想要完全的避免消息重复的发生是很难做到的,因为网络的抖动、机器的宕机和处理的异常都是比较难以避免的,在工业上并没有成熟的方法,因此我们会把要求放宽,只要保证即使消费到了重复的消息,从消费的最终结果来看和只消费一次是等同的就好了,也就是保证在消息的生产和消费的过程是“幂等”的。


幂等


=================================================================


多次执行同一个操作和执行一次操作,最终得到的结果是相同的。


如果消费一条消息,要将库存数减 1,那么如消费两条相同消息,库存数减 2,这就非幂等。


而如果消费一条消息后处理逻辑是将库存的数置 0,


或如果当前库存数是 10,则减 1,这样在消费多条消息时所得到的结果就是相同的,这就是幂等。


一件事儿无论做多少次都和做一次产生的结果是一样的,那么这件事儿就具有幂等性。


生产、消费过程增加消息幂等




消息在生产和消费的过程中都可能重复,所以要在生产、消费过程增加消息幂等性保证,这样就可认为从“最终结果上来看”消息实际上是只被消费一次


消息生产过程中,在 Kafka0.11 和 Pulsar 都支持“producer idempotency”,即生产过程的幂等性,这种特性保证消息虽然可能在生产端产生重复,但最终在 MQ 存储时只会存一份。


这是怎么做到的呢?


给每个生产者一个唯一 ID,并为生产的每条消息赋予一个唯一 ID,MQ 服务端会存储<生产者ID,最后一条消息ID>映射。


当某生产者产生新消息,MQ 服务端比对消息 ID 是否与存储的最后一条 ID 一致,若一致,就认为是重复消息,服务端自动丢弃。


在消费端,幂等可从如下两方面考虑:


  • 通用层


可在消息被生产时,使用发号器给它生成一个全局唯一消息 ID,消息被处理后,把这个 ID 存储在 DB,在处理下一条消息前,先从 DB 查询该全局 ID 是否被消费过,若被消费过就放弃消费。


无论是生产端的幂等保证还是消费端通用的幂等性保证,它们的共同特点都是为每个消息生成唯一 ID,然后在使用这个消息时,先比对 ID 是否已存在,存在则认为消息已被使用。


所以这种方式是一种标准的实现幂等的方式,实战中可直接使用,伪代码如 下:


// 判断 ID 是否存在


boolean isIDExisted = selectByID(ID);


if(isIDExisted) {


// 存在则直接返回


return;


} else {


// 不存在,则处理消息


process(message);


// 存储 ID


saveID(ID);


}


不过这样会有个问题:如果消息在处理之后,还没有来得及写入 DB,消费者宕机了,重启后发现 DB 并无这条消息,还是会重复执行两次消费逻辑,这时就需要引入事务,保证消息处理和写入 DB 必须同时成功或失败,但这样消息处理成本更高,所以如果对消息重复没有特别严格要求,可直接使用这种通用方案,而不考虑引入事务。


  • 业务层


有很多种处理方式,有一种是增加乐观锁。比如你的消息处理程序需要给一个人的账号加钱。


具体操作:


给每个人的账号数据加个版本号,在生产消息时先查询该账户的版本号,并将版本号连同消息一起发给 MQ。消费端拿到消息和版本号后,在执行更新账户金额 SQL 的时候带上版本号:


update user


set amount = amount + 20,


version=version+1


where userId=1


and version=1;

评论

发布
暂无评论
【消息队列最佳实践】消息恰好被消费一次