写点什么

如何让 Kafka 在保证高性能、高吞吐的同时通过各种机制来保证高可用性?

  • 2021 年 11 月 11 日
  • 本文字数:4583 字

    阅读完需:约 15 分钟

二、事务

1.场景

幂等性并不能跨多个分区运作,而事务可以弥补这个缺憾,**事务可以保证对多个分区写入操作的原子性。**操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功部分失败的可能。


为了实现事务,应用程序必须提供唯一的transactionalId,这个参数通过客户端程序来进行设定。


见代码库:com.heima.kafka.chapter7.ProducerTransactionSend


properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);

2.前期准备

事务要求生产者开启幂等性特性,因此通过将 transactional.id 参数设置为非空从而开启事务特性的同时需要将ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为 true(默认值为 true),如果显示设置为 false,则会抛出异常。


KafkaProducer提供了 5 个与事务相关的方法,详细如下:


//初始化事务,前提是配置了 transactionalIdpublic void initTransactions()//开启事务 public void beginTransaction()//为消费者提供事务内的位移提交操作 public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)//提交事务 public void commitTransaction()//终止事务,类似于回滚 public void abortTransaction()

3.案例解析

见代码库:com.heima.kafka.chapter7.ProducerTransactionSend


消息发送端


/**


  • Kafka Producer 事务的使用*/public class ProducerTransactionSend {public static final String topic = "topic-transaction";public static final String brokerList = "localhost:9092";public static final String transactionId = "transactionId";


public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);properties.put(ProducerConfig.TRANSACTIONAL_ID_C


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


ONFIG, transactionId);


KafkaProducer<String, String> producer = new KafkaProducer<> (properties);


producer.initTransactions();producer.beginTransaction();


try {//处理业务逻辑并创建 ProducerRecordProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");producer.send(record1);ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");producer.send(record2);ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");producer.send(record3);//处理一些其它逻辑 producer.commitTransaction();} catch (ProducerFencedException e) {producer.abortTransaction();}producer.close();}}


模拟事务回滚案例


try {//处理业务逻辑并创建 ProducerRecordProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");producer.send(record1);


//模拟事务回滚案例 System.out.println(1/0);


ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");producer.send(record2);ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");producer.send(record3);//处理一些其它逻辑 producer.commitTransaction();} catch (ProducerFencedException e) {producer.abortTransaction();}


从上面案例中,msg1 发送成功之后,出现了异常事务进行了回滚,则 msg1 消费端也收不到消息。

三、控制器

在 Kafka 集群中会有一个或者多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。


Kafka 中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的 broker 会在 Zookeeper 中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:

1.ZooInspector 管理

  • 使用zookeeper图形化的客户端工具(ZooInspector)提供的 jar 来进行管理,启动如下:


  1. 定位到 jar 所在目录

  2. 运行 jar 文件 java -jar zookeeper-dev-ZooInspector.jar

  3. 连接Zookeeper



{"version":1,"brokerid":0,"timestamp":"1529210278988"}



其中 version 在目前版本中固定为 1,brokerid表示称为控制器的 broker 的 id 编号timestamp表示竞选称为控制器时的时间戳


在任意时刻,集群中有且仅有一个控制器。每个 broker 启动的时候会去尝试去读取**/controller 节点**的 brokerid 的值,如果读取到 brokerid 的值不为-1,则表示已经有其它 broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller 这个节点,当前 broker 去创建节点的时候,也有可能其他 broker 同时去尝试创建这个节点,只有创建成功的那个 broker 才会成为控制器,而创建失败的 broker 则表示竞选失败。每个 broker 都会在内存中保存当前控制器的 brokerid 值,这个值可以标识为activeControllerId


Zookeeper 中还有一个与控制器有关的/controller_epoch节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的 controller_epoch 值。controller_epoch 用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为**“控制器的纪元”**。



controller_epoch的初始值为 1,即集群中第一个控制器的纪元为 1,当控制器发生变更时,没选出一个新的控制器就将该字段值加 1。每个和控制器交互的请求都会携带上controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么则说明已经有新的控制器当选了。由此可见,Kafka 通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。


具备控制器身份的 broker 需要比其他普通的 broker 多一份职责,具体细节如下:


  1. 监听 partition 相关的变化。

  2. 监听 topic 相关的变化。

  3. 监听 broker 相关的变化。

  4. 从 Zookeeper 中读取获取当前所有与 topic、partition 以及 broker 有关的信息并进行相应的管理。

四、可靠性保证

  1. 可靠性保证:确保系统在各种不同的环境下能够发生一致的行为

  2. Kafka 的保证


  • [ ] 保证分区消息的顺序

  • [ ] 如果使用同一个生产者同一个分区写入消息,而且消息 B 在消息 A 之后写入

  • [ ] 那么 Kafka 可以保证消息 B 的偏移量比消息 A 的偏移量大,而且消费者会先读取消息 A 再读取消息 B

  • [ ] 只有当消息被写入分区的所有同步副本时(文件系统缓存),它才被认为是已提交

  • [ ] 生产者可以选择接收不同类型的确认,控制参数 acks

  • [ ] 只要还有一个副本是活跃的,那么已提交的消息就不会丢失

  • [ ] 消费者只能读取已经提交的消息

1. 失效副本

怎么样判定一个分区是否有副本是处于同步失效状态的呢?从 Kafka 0.9.x 版本开始通过唯一的一个参数replica.lag.time.max.ms(默认大小为 10,000)来控制,当 ISR 中的一个 follower 副本滞后 leader 副本的时间超过参数replica.lag.time.max.ms指定的值时即判定为副本失效,需要将此 follower 副本剔出除 ISR 之外。具体实现原理很简单,当 follower 副本将 leader 副本的 LEO(Log End Offset,每个分区最后一条消息的位置)之前的日志全部同步时,则认为该 follower 副本已经追赶上 leader 副本,此时更新该副本的lastCaughtUpTimeMs标识。Kafka 的副本管理器(ReplicaManager)启动时会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的lastCaughtUpTimeMs差值是否大于参数replica.lag.time.max.ms指定的值。千万不要错误的认为 follower 副本只要拉取 leader 副本的数据就会更新lastCaughtUpTimeMs,试想当 leader 副本的消息流入速度大于 follower 副本的拉取速度时,follower 副本一直不断的拉取 leader 副本的消息也不能与 leader 副本同步,如果还将此 follower 副本置于 ISR 中,那么当 leader 副本失效,而选取此 follower 副本为新的 leader 副本,那么就会有严重的消息丢失。

2.副本复制

Kafka 中的每个主题分区都被复制了 n 次,其中的 n 是主题的复制因子(replication factor)。这允许 Kafka 在集群服务器发生故障时自动切换到这些副本,以便在出现故障时消息仍然可用。Kafka 的复制是以分区为粒度的,分区的预写日志被复制到 n 个服务器。 在 n 个副本中,一个副本作为 leader,其他副本成为 followers。顾名思义,producer 只能往 leader 分区上写数据(读也只能从 leader 分区上进行),followers 只按顺序从 leader 上复制日志。


一个副本可以不同步 Leader 有如下几个原因 慢副本:在一定周期时间内 follower 不能追赶上 leader。最常见的原因之一是I / O瓶颈导致 follower 追加复制消息速度慢于从 leader 拉取速度。 卡住副本:在一定周期时间内 follower 停止从 leader 拉取请求。follower replica卡住了是由于 GC 暂停follower 失效或死亡


新启动副本:当用户给主题增加副本因子时,新的 follower 不在同步副本列表中,直到他们完全赶上了 leader 日志。


如何确定副本是滞后的


replica.lag.max.messages=4



在服务端现在只有一个参数需要配置replica.lag.time.max.ms。这个参数解释replicas响应partition leader的最长等待时间。检测卡住或失败副本的探测——如果一个 replica 失败导致发送拉取请求时间间隔超过replica.lag.time.max.ms。Kafka 会认为此 replica 已经死亡会从同步副本列表从移除。检测慢副本机制发生了变化——如果一个 replica 开始落后 leader 超过replica.lag.time.max.ms。Kafka 会认为太缓慢并且会从同步副本列表中移除。除非 replica 请求 leader 时间间隔大于replica.lag.time.max.ms,因此即使 leader 使流量激增和大批量写消息。Kafka 也不会从同步副本列表从移除该副本。

1.Leader Epoch 引用

数据丢失场景



数据出现不一致场景


2.Kafka 0.11.0.0.版本解决方案

造成上述两个问题的根本原因在于 HW 值被用于衡量副本备份的成功与否以及在出现 failture 时作为日志截断的依据,但 HW 值的更新是异步延迟的,特别是需要额外的 FETCH 请求处理流程才能更新,故这中间发生的任何崩溃都可能导致 HW 值的过期。鉴于这些原因,Kafka 0.11 引入了leader epoch来取代 HW 值。Leader 端多开辟一段内存区域专门保存 leader 的 epoch 信息,这样即使出现上面的两个场景也能很好地规避这些问题。


所谓 leader epoch 实际上是一对值:(epochoffset)。epoch 表示 leader 的版本号,从 0 开始,当 leader 变更过 1 次时 epoch 就会+1,而 offset 则对应于该 epoch 版本的 leader 写入第一条消息的位移。因此假设有两对值:


  1. (0, 0)

  2. (1, 120)


则表示第一个 leader 从位移 0 开始写入消息;共写了 120 条[0, 119];而第二个 leader 版本号是 1,从位移 120 处开始写入消息。


leader broker中会保存这样的一个缓存,并定期地写入到一个 checkpoint 文件中。


避免数据丢失:



避免数据不一致



评论

发布
暂无评论
如何让Kafka在保证高性能、高吞吐的同时通过各种机制来保证高可用性?