【年后跳槽必看篇 - 非广告】Kafka 核心知识点 - 第二章
书接上文:
继续聊一聊 Kafka 相关的核心概念
Kafka 如何保证消息的幂等性
所谓的消息幂等性就是如何保证消息只消费一次不重复消费。这需要从 Kafka 的多个角度去回答该问题一是要包含 Kafka 自身的机制,还需要考虑客户端自己的重复处理。
Consumer 导致消息重复消费
Kafka 的 Consumer(消费者)offset 还没来得及提交导致重复消费。所以,消费者可以手动提交 offet 来控制消息的消费情况。通过手动提交 offset,消费者可以跟踪自己已经消费的消息,确保不会重复消费。
另外,消费者如何保证不重复消费消息的关键在于消费者做控制,因为 MQ 有可能无法保证不重复发送消息,所以在消费者端也应该控制:即使 MQ 重复发送了消息,消费者拿到消息之后,也要判断是否已经消费过该条消息。所以根据实际业务场景,有以下几种实现方式:
如果 MQ(Kafka)拿到的数据是要存储到 DB 中,那么可以根据数据库创建唯一约束,这样的话,同样的数据从 Kafak(MQ)发送过来之后,当插入到 DB 的时候,会违反唯一约束而不会插入成功。(再者也可以先查询一次,判断是否在 DB 中已经存在,从而决定是否让消息丢弃)
让 Consumer(生产者)发送消息时,每条消息加一个全局唯一的 ID,然后消费时,将该 ID 保存到 Redis 中。消费时先去 Redis 里面查一下有没有,没有再去消费。(原理和上面那条差不多)
如果拿到的数据直接放到 Redis 的 set 中的话,那就不用考虑了,因为其 Set 本身就是去重的
同时,在 Kafka 中每个消费者都必须加入至少一个消费者组(Consumer Group),同一个消费者组内的消费者可以共享消费者的负载。因此,如果一个消息被消费者组内的其中一个消费者消费了,那么其它消费者就不用在接收到这个消息了。
另外,客户端还可以自己做一些**幂等机制**,防止消息的重复消费。
Producer(生产者重复发送消息导致消息重复消费)
在 Kafka 中内部可以为每条消息生成一个全局唯一、与业务无关的消息 ID,当 MQ 接收到消息时,会先根据 ID 判断消息是否重复发送,Kafka 再决定是否接收该消息。
Kafka 的 Exactly-once 来避免消息重复消费
Kafka 内部提供了 Exactly-once 消费语义。简单理解其实就是引入事务,消费者使用事务来保证消息的消费和 offset 提交是原子的,而生产者可以使用事务来保证消息的生产和 offset 提交是原子的。Exactly-once 消费语义则解决了重复问题。但是需要更复杂的设置和配置
Kafka 的三种消息传递语义
在 Kafka 中,有三种比较常见的消息传递语义:
at-least-once:至少一次
at-most-once:至多一次
exactly-once:仅一次
at-least-once
如何配置:
设置 enable.auto.commit 为 false,禁用自动提交 offset
消息处理完之后手动调用
consumer.commitSync()
提交 offset
这种语义有可能会对数据重复处理,因为该消费语义要保证消费者至少消费一次。在 at-least-once 语义中,在消费数据之后,手动调用函数 consumer.commitSync()异步提交 offset,有可能处理多次的场景是消费者的消息处理完并输出到结果库,但是 offset 还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息,所以至少会处理一次
这种语义比较适用于实时数据处理或消费者不能容忍数据丢失的场景,比如金融交易或者电信指令。
at-most-noce(Kafka 的默认实现)
如何配置:
设置
enable.auto.commit
为 true
auto.commit.interval.ms
设置一个较低的时间范围
由于上面的配置,此时的 Kafka 会有一个独立线程负责按照只当间隔提交 offset。
消费者的 offset 已经提交,但是消息还在处理冲(没有处理完),这个时候程序挂了,导致数据没有被成功处理,再重启的时候会从上次提交的 offset 出处理,导致上次没有被成功处理的消息丢失了。
exactly-once
如何配置:
将
enable.auto.commit
设置为 false,禁用自动提交使用
consumer.seek(topicParttion, offset)
来指定 offset在处理消息的时候,要同时保存住每个消息的 offset
这种语义可以保证数据只被消费处理一次。同时保证消息的顺序性。是以原子事务的方式保存 offset 和处理的消息结果。数据真正处理成功的时候才会保存 offset 信息。
在 Kafka 0.11 版本之前,实现 exactly-once 语义需要一些特殊的配置和设置。但是在 Kafka 0.11 版本之后,Kafka 提供了原生的 exactly-once 支持,使得实现 exactly-once 语义变得更加简单和可靠
Kafka 如何保证消息的顺序性
我们都知道 Kafka 的消息是存储在指定的 Topic 中的某个 Partition 中的,并且一个 Topic 是可以有多个 Partition 的,同一个 Partition 的消息是有序的,但是如果是不同的 Partition 或者不同的 Topic 的消息那就是无需的了。
为什么需要保证 Kafka 消息的顺序性呢?
假设需要做一个 MySQL binlog 的同步系统,在 MySQL 中有一个针对某条数据增删改的操作,对应出来的增删改三条 binlog,接着这三条 binlog 需要发送到 Kafka(MQ)中,消费者消费出来一次执行,此时就需要保证消息的一致性,否则数据就会出现问题。
为什么同一个 Partition 消息是有序的呢?
因为当生产者向某个 Partition 发送消息时,消息会被追加到该 Partition 中的日志文件中(log),并且会被分配一个唯一的 offset,文件的读写是有顺序的。而消费者在该 Partition 消费消息时,会从该 Partition 的最早 offset 开始逐个读取消息 ,从而保证了消息的顺序性。
如何保证数据写入一个 partition 中去:
那么想要实现消息的顺序性消费,可以从一下角度参考:
因为 Kafka 中的 Partition 是可以保证消息的顺序性,如果消息只写入到一个 Partition 中,那么消息一定是有顺序性的。为了只写入一个 Partition 可以只在 Topic 中只创建一个 Partition
如果确实需要存在多个 Partition。发送消息的时候可以指定一个 Partition。这样即使一个 Topic 中存在多个 Partition,我们可以把需要保证顺序性的消息都发送到一个 Partition 中,这样就可以保证顺序消费了
比如:生产者可以在生产写入数据的时候可以指定一个 Key,比如指定某个订单 id 作为 Key,这个订单相关的操作就会被分发到一个 Partition 中去。
什么情况下 kafka 会出现消息顺序不一致?
消费者内部利用了多个线程并发处理,则可能会出现顺序不一致的问题。
如图所示:
那么应该如何解决消费者端多线程并发处理消息导致消息顺序不一致的情况呢?
大致的思路可以按照hash
算法进行 hash 分发。因为相同的订单 Key 的数据会分发到一个内存queue
里面去。
如图:
Kafka 将消息分发到同一个 Partition 中的具体实现方式
在 Kafka 中,当我们向其发送消息的时候,如果 Key 为 null,那么 Kafka 会采用默认的Round-robin
策论,也就是轮转。具体实现类是:DefaultPartitioner
。所以如果想要指定发送消息到某个 Partition 中,可以参考下面的方式:
指定 Partition
发送消息的时候执行 Partition,具体 可以在 ProducerRecord 中指定 Partition
指定 Key
在没有指定 Partition(null)时,如果有 Key,Kafka 会根据 Key 做 Hash 计算出一个 Partition 编号来,如果 Key 相同,那么也是可以分到一个 Partition 中的。
自定义 Partition
除了指定 Partition 和 Key 以外,还可以自定义实现自己的 Partitioner(分区器)来指定消息发送到指定的 Partition(分区)
创建一个自定义类并实现 Partitioner 接口,重写partition()
方法
如上述代码所示,在partition()
方法中,利用了一简单的实现逻辑,根据键的 Hash 值将消息发送到相应的分区。为了在 Kafka 生产者中国使用自定义的 Partitioner(分区器),需要在生产者的配置中指定 Partitioner 类(分区器类)
Kafka 如何解决消息积压问题
消息积压问题大多数由于消费者故障导致,或者消费能力不足导致
解决思路如下:
如果 Consumer 有问题,先修复 Consumer 的问题,确保其能恢复正常消费速度。然后将现有 Consumer 都停掉
临时建立好原先 10 倍或者 20 倍 queue 的数量。(Kafka 也新建一个 Topic,Partition 是原来的 10 倍)
写一个临时分发数据的 Consumer 程序,这个程序部署上去消费积压好的数据,消费之后不做耗时处理,直接均匀轮询写入已经建立好 10 倍/20 倍数量的 queue 上
接着临时征用原来 10 倍的机器来部署 Consumer,每一批 Consumer 来消费一个临时 queue 的数据
等快速消费完积压完的数据之后,得恢复原来的部署架构,重新用原来的 Consumer 机器来消费
Kafka 可以保证消息 100%不丢失吗?(为什么 Kafka 不能 100%保证消息不丢失)
https://www.yuque.com/paidaxing-wskgg/axlvdy/zcf9sg3s3wzevd1v#XcWoQ
上面有提到过 Kafka 提供的 Producer 和 Consumer 之间的消息传递保证语义有三种。
at-least-once:至少一次
at-most-noce:至多一次
exactly-once:每条消息保证精确传递一次。不多也不会少
目前,Kafka 默认提供的交付可靠性保障时第二种,即 at-least-once。但是其实 Kafka 如果仅靠自身是没办法保证消息时 100%可靠的。
原因可以从一下角度考虑:
Producer(生产者)
Kafka 时允许生产者以异步方式发送消息,这意味着 Producer 在发送消息后不会等待确认。淡然我们可以注册一个回调等待消息的成功回调。
但是,如果 Producer 在发送消息之后,Kafka 的集群发生故障或崩溃,而消息尚未被完全写入 Kafka 的日志中,那么这些消息可能会丢失。虽然后续可能会有重试,但是如果重试也失败了呢?如果这个过程中刚好生产者也崩溃呢?那就可能会导致没人知道这条消息失败了。就会导致消息不再重试了。
Consumer(消费者)
消费者来说比较简单,只要保证再消息成功被消费时,再去提交 offset,这样就不会导致消息丢失了。
Broker(集群)
Kafka 使用日志来做消息的持久化,日志文件事存储在磁盘上的,但是如果 Broker 在消息尚未完全写入日志之前就崩溃,那么消息就有可能丢失了。
并且,操作系统在写磁盘之前,会先把数据写入到
Page Cache
中,然后再由操作系统自己决定什么时候同步到磁盘当中,而在这个过程中,如果还没来得及同步到磁盘,就直接宕机了。那么这个消息也是丢失了。不过,也可以通过配置
log.flush.interval.message=1
,来实现类似于同步刷盘的功能,但是这样又回到了之前的情况,就是还没来得及持久化就宕机了。即使 Kafka 中引入了副本机制来提高消息的可靠性,但是如果发生同步延迟,还没来得及同步,主副本就挂掉了,那么消息还是可能发生丢失。
这几种情况是从 Broker 角度来分析,Broker 自身是没办法保证消息不丢失的,但是如果配合 Producer,在配置request.required.acks = -1
这种 ACK 策略,可以确保消息持久化成功之后,才会 ACK 给 Producer,那么,如果我们的 Producer 再一定时间内,没有收到 ACK 是可以重新发送消息。
但是这种重新发送,就又回到了我们前面介绍 Producer 的时候的问题。生产者也有可能会挂掉,重新发送也有可能没有发送依据,导致消息最终丢失
归根到底,如果只靠 Kafka 自己,其实是没有办法保证极端情况下的消息 100%不丢失的。
但是我们可以引入一些机制来解决保证这个问题,比如:引入分布式事务,或者引入本地消息表,保证 Kafka Broker 没有保存消息成功返回时,可以重新投递消息,这样才可以。
后续有机会讲讲分布式事务相关概念
如有问题,欢迎加微信交流:w714771310,备注- 技术交流 。或关注微信公众号【码上遇见你】。
好了,本章节到此告一段落。希望对你有所帮助,祝学习顺利。
版权声明: 本文为 InfoQ 作者【派大星】的原创文章。
原文链接:【http://xie.infoq.cn/article/4a3e563c3d77de1f1542fe49e】。
本文遵守【CC BY-NC-SA】协议,转载请保留原文出处及本版权声明。
评论