Kafka:可靠!可靠!还是 xx 的可靠!
前言
Kafka 作为目前主流的消息中间件来说,内部有许多保证消息可靠的机制。消息丢失在线上生产环境来说,造成的影响可大可小,本篇将介绍 Kafka 保障可靠性的机制。
读完本篇你将会收获:
复制功能
复制功能是 Kafka 架构的核心,复制功能使得 Kafka 即便是在个别节点失效之后,仍然能保证对外提供正常的服务。
复制保证了 Kafka 的可用性和持久性。
复习一下:每个主题有多个分区,每个分区有多个副本
。副本是被保存到 broker 上面。
分区副本种类
首先要明确 2 个概念,首领副本和跟随着副本,其实就是跟 Zookeeper 里面讲的 Leader 和 Follower 一样的概念。
首领Leader副本
:每个分区都有一个首领副本。所有的生产者请求和消费者请求都会经过这个副本。主要是为了保证一致性。跟 zookeeper 中的 leader 是一个道理。跟随者Follower副本
:跟随者副本不会处理请求,他会从首领副本里面获取数据同步到自己这边。他的作用是当首领崩溃的时候,其中的一个跟随者副本会提升为新首领。
Kafka 根据副本同步的情况,分成了 3 个集合:
AR(Assigned Replicas)
:包括 ISR 和 OSRISR(In-sync Replicas)
:ISR 是指和首领副本 Leader 保持同步的副本集合
,可以被认为是可靠的数据。OSR(Out-Sync Replicas)
:就是失去了和 Leader 副本同步的副本集合。
可以把这些集合理解为一个池子,只有与首领同步的副本,才可以放进池子里面。
同步流程
首领会去检查哪个跟随者的状态是和自己是一致的。与其说是首领去询问跟随者,其实更像是让跟随者向首领汇报自己的状态同步信息。( poll
),跟随者副本会向首领副本发送同步请求来进行同步。
跟随者会在有新消息到达时尝试从首领那里复制消息。
请求信息里面包括了跟随者想要获取消息的偏移量,而且这些偏移量总是有序的。跟随者和首领通过偏移量来检查数据的一致性。
请求类型
上面讲了同步流程是通过 Fllower 向 Leader 发送同步请求来进行的,下面讲一下 Kakfa 的其他请求类型:
元数据请求
生产请求
同步请求
元数据请求
解决客户端去找分区首领所在哪个 broker 的问题
元数据请求里面包含了:
客户端感兴趣的主题列表。
服务器端的响应消息里指明了这些主题所包含的分区,
每个分区都有哪些副本,
以及哪个副本是首领。
有了这些元数据,客户端就知道该向哪个 broker 发送生产请求或者获取请求。
一般客户端都会把这些信息缓存起来。时不时会通过发送元数据请求来刷新这些信息。
当发生分区变化,比如新的 broker 加入,那么部分分区就会被分配到新的 broker 里,可能会有首领。
此时如果客户端还拿着旧的元数据信息,那么就会发生请求失败,返回一个非首领错误。
客户端在重试之前,会先发送元数据请求,去同步一下自己的元数据信息。然后再进行重试。
总结:其实就是 Redis Clusters 里面的全局 key 映射关系
生产请求
生产者发送的请求,可以理解为 Kafka 写入消息请求
如果之前有配置那个 acks,那么首先会进行一个写入消息检查。
包含首领副本的 broker 收到生产请求后,会对请求做一些验证:
对方是否具有主题的写入权限。
请求里面包含的 acks 值是否有效。
如果是 acks=all,是否有足够多的同步副本保证消息已经被写入。
之后,消息会被写入本地磁盘。
在消息被写入分区的首领之后,broker 开始检查 acks 配置参数,如果是 0,或者是 1,就立客返回响应。如果是 all。那么会先把请求保存到一个叫做炼狱的缓冲区里,直到首领发现所有跟随者副本都复制了消息,响应才会返回给客户端。(ZAB 里面是过半就行,这里要全部)
获取请求
获取请求解决的是跟随者跟首领数据一致性的问题
跟随者向首领发出请求,请求同步数据一致性。
跟随者会指定某主题下的某分区的偏移量一个区间的消息,让首领发送给他。
首领在收到请求后,会先检查请求是否有效,比如偏移量是否在分区上存在等等。
客户端可以指定 broker 最多可以从一个分区里面返回多少数据,因为服务器返回数据给我客户端,我客户端需要预分配一些内存空间来存储这些消息的。
如果是合法的偏移量,那么 broker 就会按照获取请求上面的要求,从分区里读取消息,然后返回给客户端。
Kafka 使用零复制技术向客户端发送消息。 其实就是直接把消息从文件里发送到网络通道,不需要经过任何中间缓冲区。
一般来说,其他的数据库会在把数据发送给客户端之前,会把他们保存在本地缓存里。
同步副本的条件
分区首领是肯定是同步副本,因为只有同步副本才能被选举为首领。
对于跟随者副本来说,需要满足一下几个条件,才能被认为是同步的副本
副本与 Zookeeper 之间有一个活跃的会话 Session,也就是说他的心跳机制是正常的。
过去 10。s(可配置)内从首领那里获取过消息
过去 10s(可配置)内从首领那里获取过最新的消息。确保消息的一致性。
失败重试
跟随者副本先请求同步消息 1,接着消息 2、消息 3……
如果没有收到这 3 个请求消息的响应,那么他不会发送第四个请求消息。(类似TCP中的ACK机制
)
首领是通过查看每个跟随者请求的最新偏移量来确定每个跟随者的复制进度。
如果跟随者在 10s 内没有请求任何信息,或者是 10s 内没有请求最新的数据,那么就会被认为不同步。
当首领宕机时,这些不同步的副本并不会被选举为新首领。
这个 10s 是一个参数来设置的,replica.lag.time.max.ms
首选首领
在 Kafka 中,首领,我们分为当前首领和首选首领。字面意思,当前首领就是整个集群中当前状态下的首领,而首选首领他的目的是为了在选举的时候把首选首领选举为当前首领。
在创建主题的时候选定的首领就是分区的首选首领。
auto.leader.rebalance.enable
被设置为true
。表示他会开启自动的首领选举。
unclean.leader.election.enable
设置为true
表示它会检查首选首领是不是当前首领,如果不是,并且该副本是同步的,那么就会触发首领选举,让首选首领称为当前首领。
在后面 broker 配置中会讲解开启不完全的首领选举是如何保障 Kafka 的高可用性。
broker 配置
这里讲的都是关于 Kafka 消息存储的可靠性,主要是由 3 个配置参数
复制系数
默认复制系数是3
。
假设复制系数是,那么即使是在个 broker 失效的情况下,任然能对外提供正常服务。
复制系数意味着对分区进行备份多少份数据,假设分区副本为 6 个,复制系数是 3,那么就会有 18 份副本。通过备份来获得高可用性。
不完全的首领选举
完全和不完全,指的是选举这个过程,而不是首领。
数据安全性和高可用性的一种取舍方案。
当一个分区首领不可用的时候,其中的一个同步副本就会被选举为新的分区首领。如果在选举的过程中,写入的数据能够被同步到所有的同步副本中,那么就称这个选举是完全的。
那为什么会出现不完全的选举呢?
分区有 3 个副本,其中的两个跟随者副本不可用了(比如两个 broker 发生崩溃),此时生产者还在对首领副本写入消息,并且写多少,他就确认多少,因为他是当前唯一一个可用的同步副本。此时首领副本崩溃了,并且那两个跟随副本其中一个启动了,它就会称为分区里的唯一不同步副本。
分区有 3 个副本,其中 2 个跟随者副本因为网络的原因,尽管已经很努力地同步了,还是未能跟首领副本的数据保持同步,此时首领副本裂开了。那么此时 2 个分区副本都是不同步的副本。
核心就是在于,分区首领在崩溃了之后,其他的副本是不同步的。
这种不同步的副本可不可以被选举为首领副本呢?
如果不可以!
那么可能就会有一段时间,导致 Kafka 无法对外提供服务,因为他要等不同步副本一直转化为同步副本为止。
他只有一个办法,就是等之前的那个首领副本恢复。所以他这种不可用状态可能会持续很久,他的高可用性是非常低的。
如果可以!
意味着我们会面临丢失消息的风险,因为我们选举的是不同步的副本,有一些消息存在于首领副本的,我们就无法恢复了。
主要的还是看场景吧,如果是用户点击网页这些不重要的消息,丢失一点倒是没关系。
unclean.leader.election.enable
设置为True
,就是允许不同步的副本称为首领,也就是不完全的选举。
对于银行这种系统来说,宁愿一个小时,甚至一天都不对外提供服务,也不可能冒着丢失消息的风险。
最少同步副本
最少同步副本,跟他很像的有可用的同步副本
一般我们来说,只有消息被写入到所有同步副本之后,才认为是已提交的。这样的消息才能够被消费者读取到。
假设我们的复制系数是 3,正常情况下,我们一个主题会有 3 个副本。但是如果其他 2 个 broker 都宕机了,那么此时可用的同步副本就只有 1 个。如果这个副本也宕机了,那么消息就丢失了。
我们想要确保数据最少写入到多少个副本,就需要设置min.insync.replicas
这个参数。如果当前集群中可用的同步副本小于我们这个最少同步副本,那么就会直接抛异常,拒绝生产者的请求。但是处理读请求。
可靠的生产者
配置的话主要是的是配置 acks 参数
即便我们禁止不完全选举、调高复制系数。也有可能会面临消息丢失的情况。
假设有 3 个副本,akcs=1,代表只需要在首领分区提交,就认为是消息被提交。此时有一个新的消息提交到首领分区中,并且处理完了。但是其他 2 个跟随者副本还没有来得及同步,首领分区宕机了。但是这 2 个跟随者会人为自己是同步副本,因为同步副本到非同步副本状态的转换是需要时间的。当新的分区首领选上之后,从首领角度来说是没有丢失消息,但是从生产者的角度来说是丢失了消息的。
还有一种情况就是,学聪明了,把 acks=all,让所有的同步副本都同步才确认。生产者发送消息,首领副本裂开了。生产者会得到一个“首领不可用”的响应。如果生产者没有正确地处理这个错误,也没有重试发送消息直到发送成功,那么消息也有可能丢失。这问题的关键在于,生产者有没有对写入失败的消息,进行正确的处理。这个是在代码层面可以解决的问题。
生产者发送确认机制
其实就是 acks 的设置
这里就不过多重复了,可选配置那里有讲。详细的可以看Kafka:生产者全解 - 掘金 (juejin.cn) 往期文章,里面有非常详细的介绍。
生产者重试
有的错误,生产者是可以自动处理,有的错误需要开发者在代码层进行处理。
错误也分为重试可解决的错误和重试不可解决的错误。
如果我们追求的是不丢失任何消息这个级别,那么我们尽可能让生产者遇到可重试错误情况下,尽量保持重试。像首领选举和网络分区这类问题,可能很快就会得到解决,让生产者保持重试,就不需要额外地区解决这些问题。
重试次数这个就要我们自己定义,而且我们捕获到了异常之后,是选择直接重试,还是说把异常记录下来,一系列操作,都是可以在代码层解决地。
但是要注意的是,重试发送同一个失败过的消息是具有一定风险的,比如我正在处理你前面那个消息,我可能因为一些原因导致处理有点慢了,生产者那边认为超时了,再重试发送一个过来。此时相当于对 Kafka 写入了 2 个相同的消息,会出现消息重复消费的问题。
怎么解决呢?一般我们可以在消费接口那里设计为具有幂等性的,既然我们无法从问题的根源解决,那么就在下游进行解决问题。
具体怎么做就是在消息体里面携带消息的唯一标识符,具有单调递增性,通过 Mysql 主键锁去实现接口的幂等性。
额外的错误处理
有的错误,生产者是可以自动处理,有的错误需要开发者在代码层进行处理。
错误也分为重试可解决的错误和重试不可解决的错误。
如果我们追求的是不丢失任何消息这个级别,那么我们尽可能让生产者遇到可重试错误情况下,尽量保持重试。像首领选举和网络分区这类问题,可能很快就会得到解决,让生产者保持重试,就不需要额外地区解决这些问题。
重试次数这个就要我们自己定义,而且我们捕获到了异常之后,是选择直接重试,还是说把异常记录下来,一系列操作,都是可以在代码层解决地。
但是要注意的是,重试发送同一个失败过的消息是具有一定风险的,比如我正在处理你前面那个消息,我可能因为一些原因导致处理有点慢了,生产者那边认为超时了,再重试发送一个过来。此时相当于对 Kafka 写入了 2 个相同的消息,会出现消息重复消费的问题。
怎么解决呢?一般我们可以在消费接口那里设计为具有幂等性的,既然我们无法从问题的根源解决,那么就在下游进行解决问题。
具体怎么做就是在消息体里面携带消息的唯一标识符,具有单调递增性,可以通过 Mysql 主键锁去实现接口的幂等性。
可靠的消费者
消费者能够读取到的消息,都是被写入到全部可用同步副本的。消息已经是具有一定的可靠性的。而消费者就是要解决如何消费消息是可靠的问题。
消费者可靠性配置
group.id
,这个是跟消费者群组有关的,同一个 group.id 的消费者,相当于在同一个消费者群组。注意的是,同一个群组下的消费者,每个消费者只能读到同一个主题下的部分数据,如果想要每个都读取到全部消息,则需要设置不同的 group.id。auto.offset.reset
,当发生没有偏移量可提交的时候,比如消费者第一次启动时,或者请求的偏移量在 broker 上不存在。就会采取latest
或者是earliest
。earliest 代表从头开始读,latest 代表从分区末尾的开始读。earliest 可能会出现消息重复的问题,但是这个还是比较好解决的,通过接口幂等性等,最差也就是耽误一段时间。但是采取 latest 就会面临丢失消息的风险,显然后者引发的问题严重性更大。enable.auto.commit
,手动还是自动提交偏移量。自动的话,就全部交给 kafka 中轮询来操作。手动的话就更加细,消息更可靠。
消费者重试
除了生产者对消息有重试机制,消费者要有重试机制,来确保消息的可靠性。
对于消费者来说,重试的是对偏移量的提交。
一次轮询,代表处理一个批次的消息,在这个批次里面,并不是所有的消息都会被完全处理。比如中途有一个写入 DB 的消息,此时 DB 处于不可用的状态。现在我想要稍后再去重试这个消息。但是我不能去提交比他后的消息偏移量,如果我提交了,那么就代表该偏移量之前的消息,我都人为处理成功了。这个跟 TCP 中 ACK 确认机制非常像。
怎么解决呢?这里我们假设都是遇到可重试的错误,不可重试的错误就没办法通过重试机制来解决。
第一种方式,我们还是提交最后一个处理成功的偏移量。但是我们会把刚刚处理不成功的消息,保存到一个缓冲区里面。然后调用消费者的 wude
pause
方法来确保其他的轮询不会返回数据。第二种方式,把处理不成功的消息,丢到一个特殊的主题,然后用一个消费者去单独处理这些主题的消息。其实说白了就是一个死信队列。
评论