写点什么

八股 MQ005——聊聊 Broker

作者:Codyida
  • 2023-05-04
    广东
  • 本文字数:5160 字

    阅读完需:约 17 分钟

写在前面

近期看到有篇文章说,找到一件可以产生效益的事情,然后持之以恒,迭代的只有完成它的方法,而不需要变更执行它的决心。


我觉得说的很对。


很喜欢士兵突击。当时看时以为我是成才,傻子才是许三多。现在回头想想,才发现能当许三多是很难的事情。


人太聪明了,聪明到会考虑很多,结果忘了最原初的事情。


道理人人都懂,事情人人都能做。但许三多才会去做,才成了兵王。


所以,找到坚持有益的事情,然后就是坚持与提高它的效率。然后就是陪着自己和时间慢慢走。

基础知识

Broker 是什么

Broker 可以看做是一台 Kafka 服务器,但其实是运行在一台网络服务器上的进程。它有如下作用:


  1. 接收 Producer 写入消息的请求

  2. 接收 Consumer 拉取消息的请求

  3. 负责消息的持久化存储(内存->硬盘)

  4. 负责 Partition 的不同 Replicas 之间的数据同步与故障迁移

Broker 的生命周期

  1. 启动阶段:当 Broker 启动时,它会加载配置文件,初始化日志管理器,创建网络层和请求处理层,连接到 ZooKeeper,并注册自己到集群。如果集群没有 Controller,它也会尝试竞选成为 Controller。

  2. 运行阶段:当 Broker 运行时,它会处理来自生产者和消费者的请求,包括写入和读取数据,以及同步和复制数据。如果 Broker 是 Controller,它还会处理来自其他 Broker 的请求,包括管理集群的元数据,分配和转移分区,选举和切换 Leader 等。同时,Broker 也会与 ZooKeeper 保持心跳通信,检测集群的状态和变化。

  3. 重新选举阶段:当 Broker 失去与 ZooKeeper 的连接或者宕机时,它会退出 Controller 角色,并释放/controller 目录的锁。此时,ZooKeeper 会通知其他 broker 开始新一轮的选举。原来的 Controller 会尝试重新竞选,如果成功,它会恢复到运行阶段;如果失败,它会成为 Follower,并等待下一次选举。

  4. 关闭阶段:当 Broker 关闭时,它会先停止接收新的请求,并等待已有的请求完成。然后,它会关闭网络层和请求处理层,断开与 ZooKeeper 的连接,并从集群上注销自己。最后,它会关闭日志管理器,并释放资源。

Broker 有哪些配置项

通过配置项可以直接从参数的角度来了解 Broker。参数有很多,这里只覆盖了一部分。但能从这些参数来看到 Broker 有自己服务器维度、网络连接、与 ZooKeeper 交互、Log 存储、消费者与消息多维度的参数设置。

Broker Server 基础配置

  • broker.id:整个 Kafka 集群内标识唯一 Broker 的 ID。整数类型。这个值其实也是 ZooKeeper 在 /brokers/ids 路径下创建的节点 ID。若不指定,或者指定为-1,那么 ZooKeeper 就会从 reserved.broker.max.id 这个参数(默认 1000)加 1,用作 Broker 的 ID

  • delete.topic.enable:是否允许删除 Topic。

  • auto.create.topics.enable:是否允许在 Producer 在未指定 Topic 发送 Message 时自动创建 Topic。

Broker Socket 配置

  • listeners:Broker 之间,Client 与 Broker 之间通信建立连接时使用的信息。即 Broker 的监听者,可以以逗号分割配置多个。它的格式为[安全协议]://Hostname/IP:Port。Kafka 支持两种传输层协议(PLAINTEXT 或 SSL)(加密消息)与两种认证层安全协议(SSL 或 SASL)(认证客户端与服务器身份)。

  • advertised.listeners:将 Broker 建立通信的地址发布到 Zookeeper 中,便于 Client(Producer 和 Consumer)连接。它的格式和listener一致。

  • listener.security.protocol.map:以 Key/Value 的形式定义监听者的安全协议,在大多数情况下会将 Key 认为是监听者的别名。

  • inter.broker.listener.name:设置内部通信时使用哪个监听者。可以直接设置listener.security.protocol.map中设置的 Key。

  • num.network.threads:Broker Server 接收请求及发送响应时启用的线程数量。

  • num.io.threads:Broker Server 处理请求、对 Message 进行 I/O 操作时启用的线程数。

Broker Log 配置

Broker Log 存储也是很重要的内容,这里先按下不表。


  • log.dirs:日志、Message 保存的路径。

  • num.partitions:创建 Topic 时,如果没有指定 Partition 数量,则使用该配置项设置的 Partition 数量。

  • num.recovery.threads.per.data.dir:每个数据目录启用几个线程来处理,这里的线程数和数据目录数是乘积关系,并且只在 Broker 启动或关闭时使用。具体如下:

  • 当服务器正常启动时,用于打开每个分区的日志片段。

  • 当服务器发生崩溃并重启时,用于检查和截断每个分区的日志片段。

  • 当服务器正常关闭时,用于关闭日志片段。

  • min.insync.replicas:当acks=all时,至少有多少个 Replicas 需要确认已持久化数据,包括 Leader。

  • log.retention.hours:Kafka 保留 Message 的时间,默认是 168 小时,即 7 天。

  • log.retention.check.interval.ms:检测 Message 是否可以被删除的时间间隔。

  • log.retention.bytes:已保留的消息的字节总数来判断旧消息是否过期,这个值对应的是每一个分区,如果一个 topic 有 3 个分区,这个值设为 1GB, 那么该主题最多保留 3GB 数据。

  • log.segment.bytes:每个 Segment 文件的大小,默认是 1G。当日志大小超过该值时,下一次日志写入就会创建一个新的 segment。

Broker ZooKeeper 配置

2.7 版本还在使用,之后 3.x 版本已经不使用了。


  • zookeeper.connect:设置 Zookeeper 地址。可用逗号分割配置多个地址,既 Zookeeper 集群的地址。

  • zookeeper.connection.timeout.ms:等待连接 Zookeeper 的超时时间。

Consumer Group 配置

  • group.initial.rebalance.delay.ms:当 Consumer Group 新增或减少 Consumer 时,重新分配 Topic Partition 的延迟时间。

Message 配置

  • message.max.bytes:Broker 接收每条 Message 的最大值,默认是 1M。

  • fetch.message.max.bytes:Consumer 每次获取 Message 的大小。

Broker 集群原理是什么?

Kafka 可以设置多个 Broker 来实现分布式架构,提供良好的水平扩展性,可以支持大吞吐量的消息处理能力。而多个 Broker 通过构成一个集群,那么多个 Broker 之间是如何协调的?相互如何通信?

如何管理 Broker?

Kafka 通过 Controller 与 ZooKeeper(2.7 版本之前)来实现对 Broker 的管理。

Broker 的启动与下线

当一个 Broker 启动时,它会做以下这些事情:


  1. 初始化:加载配置文件、初始化日志管理器等初始化动作;

  2. 注册:连接到 ZooKeeper,并在 ZooKeeper 的/brokers/ids路径下创建一个临时节点来注册自己(填入配置的broker.id

  3. 监听:注册自己之后,还会监听 /brokers/ids这个路径节点,当这个节点下增加或删除子节点时,ZooKeeper 会通知监听了的 Broker。

  4. 竞选:如果当前集群没有 Controller,那么 Broker 还会尝试竞选 Controller。


当一个 Broker 下线时,它会做以下这些事情:


  1. 停止:停止接受新的请求,并等待处理中的请求完成;

  2. 关闭:关闭与网络和请求处理层的连接,断开与 ZooKeeper 的连接,删除之前注册的节点。关闭日志管理器,释放连接资源。

Controller

Controller 是 Broker 中的 Leader,非 Controller 的 Broker 就是 Follower。他有如下用途:


  1. 管理和协调 Kafka 集群的元数据,分区的分配和状态转移,以及处理 Broker 的故障

  2. 管理主题的创建、删除、增加分区等操作

  3. 管理分区的重分配、领导者选举、负载均衡等操作

  4. 监控集群成员的变化,包括 broker 的增加或减少、宕机或恢复等

  5. 向其他 broker 提供数据服务,包括同步集群元数据、分区信息、副本信息等

Controller 的生命周期

  1. 初始化阶段:当 Broker 启动时,它会尝试在 ZooKeeper 的/controller目录下创建一个临时节点,并写入自己的broker.id,从而竞选成为 Controller。如果成功,它会初始化 Controller 的上下文,包括集群的元数据、分区状态机、副本状态机、事件管理器等。如果失败,它会成为 Follower,并监听/controller目录的变化。

  2. 运行阶段:当 Broker 成为 Controller 后,它会在运行阶段处理各种事件,例如 Broker 的增加或减少、Topic 的创建或删除、Partition 的分配或转移、Leader 的选举或切换等。这些事件会由 Controller 事件管理器来调度和执行,根据不同的事件类型,触发不同的控制器动作。同时,Controller 也会与其他 Broker 保持心跳通信,检测 Broker 的可用性和健康状况。

  3. 重新选举阶段:当 Broker 失去与 ZooKeeper 的连接或者宕机时,它会退出 Controller 角色,并释放/controller 目录的锁。此时,ZooKeeper 会通知其他 Broker 开始新一轮的选举。原来的 Controller 会尝试重新竞选,如果成功,它会恢复到运行阶段;如果失败,它会成为 Follower,并等待下一次选举。

  4. 销毁阶段:当 Broker 关闭时,它会销毁 Controller 相关的资源,包括上下文、状态机、事件管理器等,并从 ZooKeeper 上注销自己。

Controller 的选举流程

答案就隐藏在上面的生命周期中。具体来说有如下情况:


  1. 当集群启动时,每个 Broker 都会尝试在 ZooKeeper 的/controller目录下创建一个临时节点,并写入自己的broker.id;

  2. ZooKeeper 保证只有一个 Broker 能够成功创建/controller节点,该 Broker 就成为 Controller,负责管理集群的元数据和状态。

  3. 其他 Broker 会监听/controller目录的变化,如果发现/controller节点被删除,就会重新开始竞选。

  4. 当 Controller 宕机或失去与 ZooKeeper 的连接时,会删除/controller下的临时节点,并释放/controller节点的锁。此时,ZooKeeper 会通知其他 Broker 开始新一轮的竞选。

  5. 原来的 Controller 会尝试重新竞选,如果成功,它会恢复到运行状态;如果失败,它会成为 Follower,并等待下一次竞选。

Controller 要做哪些事

  • 监听 ZooKeeper 上不同的目录。

  • /admin:这个目录用于存储集群中的管理操作,包括删除 Topic、增加分区、重新分配分区等。当有管理操作被触发时,这个目录会更新。监听这个目录来实现 Controller 处理 Topic 创建、删除、增加分区等操作;

  • /brokers/ids:这个目录用于存储集群中所有活跃的 broker 信息,包括 broker.id、host、port、rack 等。当 broker 加入或离开集群时,这个目录会更新。监听这个目录来实现 Controller 监控集群成员与处理 Partition 的分配与转义等操作;

  • /brokers/topics:这个目录用于存储集群中所有的 topic 信息,包括 topic 名称、分区数量、副本分配等。当 topic 被创建或删除时,这个目录会更新;监听这个目录来实现 Controller 响应 Topic 创建删除等操作;

  • /controller:这个目录用于存储当前的 controller 信息,包括 broker.id 和 controller epoch。当 controller 发生变化时,这个目录会更新。监听这个目录用于触发 Controller 的重选举流程;

  • /isr_change_notification:这个目录用于存储集群中的 ISR(同步副本集合)变化通知,包括分区名称和新的 ISR 列表。当 ISR 发生变化时,这个目录会更新;监听这个目录可以用于提供可靠的消息生产 ACK 机制。

  • 维护元数据信息

  • 调用 ZooKeeper 的 API 来创建、更新、删除、获取、监听,包括 Broker、Topic、Partition、Replica 在内的集群元数据信息;

  • Controller 也会在自己内存中维护这些信息的缓存,用于处理其他 Broker 获取这些数据的请求

  • Controller 也会使用 Kafka 内部特殊的 Topic 来生产、消费与同步这些元数据信息

  • 维护 Partition 分配与状态

  • Partition 分配:由 kafka 内部的分配策略来决定的,例如 DefaultReplicaPlacementStrategy、RackAwareReplicaPlacementStrategy 等。用户也可以自定义自己的分配策略,通过实现 BrokerRackAwareness 接口来定义;

  • 维护分区状态机

  • 维护每个 Partition 的状态:NonExistentPartition、NewPartition、OnlinePartition、OfflinePartition。

  • Controller 来协调和控制各个状态的转移。

  • 维护副本状态机。

  • 维护每个 Partition Replica 的状态,包括 NewReplica、OnlineReplica、OfflineReplica 等。

  • Controller 来协调和控制各个状态的转移。


(这里有很多内容是关于 Kafka 如何管理 Partition 的,包括内部的多个 Replica,这里先按下不表,在独立介绍几个模块角色之后,希望能通过具体的操作流程来串联整体)

综合

Broker 在一次消息写入时起到了哪些作用?

  1. 接收消息:Broker 接收生产者发送的消息,并将其存储在本地磁盘的分区分段文件中。Broker 还会为每个分段文件创建索引文件,以便快速查找消息

  2. 同步消息:如果 Broker 是某个分区的 Leader 副本,它还会将收到的消息同步给其他的 Follower 副本,以实现数据的备份和高可用性。Broker 可以配置同步方式(同步或异步)和同步策略(ISR 或 Quorum)

  3. 响应确认:Broker 在完成消息的接收和同步后,会根据生产者的 acks 设置返回相应的确认信息,以通知生产者消息是否写入成功

Broker 在消息消费时充当了哪些角色?

  1. 提供消息:Broker 根据 Consumer 的请求,从本地磁盘的分区分段文件中读取相应的消息,并返回给 Consumer]。

  2. 维护消费位移:Broker 会为每个 Consumer Group 维护一个消费位移(offset),用来记录该 Group 已经消费到哪个位置。Consumer 可以根据自己的消费情况,定期提交消费位移给 Broker,也可以查询 Broker 的消费位移来确定消费的起始位置]。

  3. 处理 Rebalance:Broker 会监测 Consumer Group 内部的变化,比如有新的 Consumer 加入或者有 Consumer 退出,然后触发重平衡(rebalance)操作,重新分配每个分区的消费者。Broker 会通知 Group Leader 进行分区分配,并将结果同步给其他的 Consumer]。

Broker 是如何完成消息的存储的?

这个内容就太多了,放在下一篇吧。

参考资料

  1. https://juejin.cn/post/6844904000622428173

  2. https://www.modb.pro/db/615537

发布于: 刚刚阅读数: 2
用户头像

Codyida

关注

还未添加个人签名 2017-12-21 加入

还未添加个人简介

评论

发布
暂无评论
八股MQ005——聊聊Broker_后端_Codyida_InfoQ写作社区