八股 MQ005——聊聊 Broker
写在前面
近期看到有篇文章说,找到一件可以产生效益的事情,然后持之以恒,迭代的只有完成它的方法,而不需要变更执行它的决心。
我觉得说的很对。
很喜欢士兵突击。当时看时以为我是成才,傻子才是许三多。现在回头想想,才发现能当许三多是很难的事情。
人太聪明了,聪明到会考虑很多,结果忘了最原初的事情。
道理人人都懂,事情人人都能做。但许三多才会去做,才成了兵王。
所以,找到坚持有益的事情,然后就是坚持与提高它的效率。然后就是陪着自己和时间慢慢走。
基础知识
Broker 是什么
Broker 可以看做是一台 Kafka 服务器,但其实是运行在一台网络服务器上的进程。它有如下作用:
接收 Producer 写入消息的请求
接收 Consumer 拉取消息的请求
负责消息的持久化存储(内存->硬盘)
负责 Partition 的不同 Replicas 之间的数据同步与故障迁移
Broker 的生命周期
启动阶段:当 Broker 启动时,它会加载配置文件,初始化日志管理器,创建网络层和请求处理层,连接到 ZooKeeper,并注册自己到集群。如果集群没有 Controller,它也会尝试竞选成为 Controller。
运行阶段:当 Broker 运行时,它会处理来自生产者和消费者的请求,包括写入和读取数据,以及同步和复制数据。如果 Broker 是 Controller,它还会处理来自其他 Broker 的请求,包括管理集群的元数据,分配和转移分区,选举和切换 Leader 等。同时,Broker 也会与 ZooKeeper 保持心跳通信,检测集群的状态和变化。
重新选举阶段:当 Broker 失去与 ZooKeeper 的连接或者宕机时,它会退出 Controller 角色,并释放/controller 目录的锁。此时,ZooKeeper 会通知其他 broker 开始新一轮的选举。原来的 Controller 会尝试重新竞选,如果成功,它会恢复到运行阶段;如果失败,它会成为 Follower,并等待下一次选举。
关闭阶段:当 Broker 关闭时,它会先停止接收新的请求,并等待已有的请求完成。然后,它会关闭网络层和请求处理层,断开与 ZooKeeper 的连接,并从集群上注销自己。最后,它会关闭日志管理器,并释放资源。
Broker 有哪些配置项
通过配置项可以直接从参数的角度来了解 Broker。参数有很多,这里只覆盖了一部分。但能从这些参数来看到 Broker 有自己服务器维度、网络连接、与 ZooKeeper 交互、Log 存储、消费者与消息多维度的参数设置。
Broker Server 基础配置
broker.id
:整个 Kafka 集群内标识唯一 Broker 的 ID。整数类型。这个值其实也是 ZooKeeper 在/brokers/ids
路径下创建的节点 ID。若不指定,或者指定为-1,那么 ZooKeeper 就会从reserved.broker.max.id
这个参数(默认 1000)加 1,用作 Broker 的 IDdelete.topic.enable
:是否允许删除 Topic。auto.create.topics.enabl
e:是否允许在 Producer 在未指定 Topic 发送 Message 时自动创建 Topic。
Broker Socket 配置
listeners
:Broker 之间,Client 与 Broker 之间通信建立连接时使用的信息。即 Broker 的监听者,可以以逗号分割配置多个。它的格式为[安全协议]://Hostname/IP:Port
。Kafka 支持两种传输层协议(PLAINTEXT 或 SSL)(加密消息)与两种认证层安全协议(SSL 或 SASL)(认证客户端与服务器身份)。advertised.listeners
:将 Broker 建立通信的地址发布到 Zookeeper 中,便于 Client(Producer 和 Consumer)连接。它的格式和listener
一致。listener.security.protocol.map
:以 Key/Value 的形式定义监听者的安全协议,在大多数情况下会将 Key 认为是监听者的别名。inter.broker.listener.name
:设置内部通信时使用哪个监听者。可以直接设置listener.security.protocol.map
中设置的 Key。num.network.threads
:Broker Server 接收请求及发送响应时启用的线程数量。num.io.threads
:Broker Server 处理请求、对 Message 进行 I/O 操作时启用的线程数。
Broker Log 配置
Broker Log 存储也是很重要的内容,这里先按下不表。
log.dirs
:日志、Message 保存的路径。num.partitions
:创建 Topic 时,如果没有指定 Partition 数量,则使用该配置项设置的 Partition 数量。num.recovery.threads.per.data.dir
:每个数据目录启用几个线程来处理,这里的线程数和数据目录数是乘积关系,并且只在 Broker 启动或关闭时使用。具体如下:当服务器正常启动时,用于打开每个分区的日志片段。
当服务器发生崩溃并重启时,用于检查和截断每个分区的日志片段。
当服务器正常关闭时,用于关闭日志片段。
min.insync.replicas
:当acks=all
时,至少有多少个 Replicas 需要确认已持久化数据,包括 Leader。log.retention.hours
:Kafka 保留 Message 的时间,默认是 168 小时,即 7 天。log.retention.check.interval.ms
:检测 Message 是否可以被删除的时间间隔。log.retention.bytes
:已保留的消息的字节总数来判断旧消息是否过期,这个值对应的是每一个分区,如果一个 topic 有 3 个分区,这个值设为 1GB, 那么该主题最多保留 3GB 数据。log.segment.bytes
:每个 Segment 文件的大小,默认是 1G。当日志大小超过该值时,下一次日志写入就会创建一个新的 segment。
Broker ZooKeeper 配置
2.7 版本还在使用,之后 3.x 版本已经不使用了。
zookeeper.connect
:设置 Zookeeper 地址。可用逗号分割配置多个地址,既 Zookeeper 集群的地址。zookeeper.connection.timeout.ms
:等待连接 Zookeeper 的超时时间。
Consumer Group 配置
group.initial.rebalance.delay.ms
:当 Consumer Group 新增或减少 Consumer 时,重新分配 Topic Partition 的延迟时间。
Message 配置
message.max.bytes
:Broker 接收每条 Message 的最大值,默认是 1M。fetch.message.max.bytes
:Consumer 每次获取 Message 的大小。
Broker 集群原理是什么?
Kafka 可以设置多个 Broker 来实现分布式架构,提供良好的水平扩展性,可以支持大吞吐量的消息处理能力。而多个 Broker 通过构成一个集群,那么多个 Broker 之间是如何协调的?相互如何通信?
如何管理 Broker?
Kafka 通过 Controller 与 ZooKeeper(2.7 版本之前)来实现对 Broker 的管理。
Broker 的启动与下线
当一个 Broker 启动时,它会做以下这些事情:
初始化:加载配置文件、初始化日志管理器等初始化动作;
注册:连接到 ZooKeeper,并在 ZooKeeper 的
/brokers/ids
路径下创建一个临时节点来注册自己(填入配置的broker.id
)监听:注册自己之后,还会监听
/brokers/ids
这个路径节点,当这个节点下增加或删除子节点时,ZooKeeper 会通知监听了的 Broker。竞选:如果当前集群没有 Controller,那么 Broker 还会尝试竞选 Controller。
当一个 Broker 下线时,它会做以下这些事情:
停止:停止接受新的请求,并等待处理中的请求完成;
关闭:关闭与网络和请求处理层的连接,断开与 ZooKeeper 的连接,删除之前注册的节点。关闭日志管理器,释放连接资源。
Controller
Controller 是 Broker 中的 Leader,非 Controller 的 Broker 就是 Follower。他有如下用途:
管理和协调 Kafka 集群的元数据,分区的分配和状态转移,以及处理 Broker 的故障
管理主题的创建、删除、增加分区等操作
管理分区的重分配、领导者选举、负载均衡等操作
监控集群成员的变化,包括 broker 的增加或减少、宕机或恢复等
向其他 broker 提供数据服务,包括同步集群元数据、分区信息、副本信息等
Controller 的生命周期
初始化阶段:当 Broker 启动时,它会尝试在 ZooKeeper 的
/controller
目录下创建一个临时节点,并写入自己的broker.id
,从而竞选成为 Controller。如果成功,它会初始化 Controller 的上下文,包括集群的元数据、分区状态机、副本状态机、事件管理器等。如果失败,它会成为 Follower,并监听/controller
目录的变化。运行阶段:当 Broker 成为 Controller 后,它会在运行阶段处理各种事件,例如 Broker 的增加或减少、Topic 的创建或删除、Partition 的分配或转移、Leader 的选举或切换等。这些事件会由 Controller 事件管理器来调度和执行,根据不同的事件类型,触发不同的控制器动作。同时,Controller 也会与其他 Broker 保持心跳通信,检测 Broker 的可用性和健康状况。
重新选举阶段:当 Broker 失去与 ZooKeeper 的连接或者宕机时,它会退出 Controller 角色,并释放/controller 目录的锁。此时,ZooKeeper 会通知其他 Broker 开始新一轮的选举。原来的 Controller 会尝试重新竞选,如果成功,它会恢复到运行阶段;如果失败,它会成为 Follower,并等待下一次选举。
销毁阶段:当 Broker 关闭时,它会销毁 Controller 相关的资源,包括上下文、状态机、事件管理器等,并从 ZooKeeper 上注销自己。
Controller 的选举流程
答案就隐藏在上面的生命周期中。具体来说有如下情况:
当集群启动时,每个 Broker 都会尝试在 ZooKeeper 的
/controller
目录下创建一个临时节点,并写入自己的broker.id
;ZooKeeper 保证只有一个 Broker 能够成功创建
/controller
节点,该 Broker 就成为 Controller,负责管理集群的元数据和状态。其他 Broker 会监听
/controller
目录的变化,如果发现/controller
节点被删除,就会重新开始竞选。当 Controller 宕机或失去与 ZooKeeper 的连接时,会删除
/controller
下的临时节点,并释放/controller
节点的锁。此时,ZooKeeper 会通知其他 Broker 开始新一轮的竞选。原来的 Controller 会尝试重新竞选,如果成功,它会恢复到运行状态;如果失败,它会成为 Follower,并等待下一次竞选。
Controller 要做哪些事
监听 ZooKeeper 上不同的目录。
/admin
:这个目录用于存储集群中的管理操作,包括删除 Topic、增加分区、重新分配分区等。当有管理操作被触发时,这个目录会更新。监听这个目录来实现 Controller 处理 Topic 创建、删除、增加分区等操作;/brokers/ids
:这个目录用于存储集群中所有活跃的 broker 信息,包括 broker.id、host、port、rack 等。当 broker 加入或离开集群时,这个目录会更新。监听这个目录来实现 Controller 监控集群成员与处理 Partition 的分配与转义等操作;/brokers/topics
:这个目录用于存储集群中所有的 topic 信息,包括 topic 名称、分区数量、副本分配等。当 topic 被创建或删除时,这个目录会更新;监听这个目录来实现 Controller 响应 Topic 创建删除等操作;/controller
:这个目录用于存储当前的 controller 信息,包括 broker.id 和 controller epoch。当 controller 发生变化时,这个目录会更新。监听这个目录用于触发 Controller 的重选举流程;/isr_change_notification
:这个目录用于存储集群中的 ISR(同步副本集合)变化通知,包括分区名称和新的 ISR 列表。当 ISR 发生变化时,这个目录会更新;监听这个目录可以用于提供可靠的消息生产 ACK 机制。维护元数据信息
调用 ZooKeeper 的 API 来创建、更新、删除、获取、监听,包括 Broker、Topic、Partition、Replica 在内的集群元数据信息;
Controller 也会在自己内存中维护这些信息的缓存,用于处理其他 Broker 获取这些数据的请求
Controller 也会使用 Kafka 内部特殊的 Topic 来生产、消费与同步这些元数据信息
维护 Partition 分配与状态
Partition 分配:由 kafka 内部的分配策略来决定的,例如 DefaultReplicaPlacementStrategy、RackAwareReplicaPlacementStrategy 等。用户也可以自定义自己的分配策略,通过实现 BrokerRackAwareness 接口来定义;
维护分区状态机
维护每个 Partition 的状态:NonExistentPartition、NewPartition、OnlinePartition、OfflinePartition。
Controller 来协调和控制各个状态的转移。
维护副本状态机。
维护每个 Partition Replica 的状态,包括 NewReplica、OnlineReplica、OfflineReplica 等。
Controller 来协调和控制各个状态的转移。
(这里有很多内容是关于 Kafka 如何管理 Partition 的,包括内部的多个 Replica,这里先按下不表,在独立介绍几个模块角色之后,希望能通过具体的操作流程来串联整体)
综合
Broker 在一次消息写入时起到了哪些作用?
接收消息:Broker 接收生产者发送的消息,并将其存储在本地磁盘的分区分段文件中。Broker 还会为每个分段文件创建索引文件,以便快速查找消息
同步消息:如果 Broker 是某个分区的 Leader 副本,它还会将收到的消息同步给其他的 Follower 副本,以实现数据的备份和高可用性。Broker 可以配置同步方式(同步或异步)和同步策略(ISR 或 Quorum)
响应确认:Broker 在完成消息的接收和同步后,会根据生产者的 acks 设置返回相应的确认信息,以通知生产者消息是否写入成功
Broker 在消息消费时充当了哪些角色?
提供消息:Broker 根据 Consumer 的请求,从本地磁盘的分区分段文件中读取相应的消息,并返回给 Consumer]。
维护消费位移:Broker 会为每个 Consumer Group 维护一个消费位移(offset),用来记录该 Group 已经消费到哪个位置。Consumer 可以根据自己的消费情况,定期提交消费位移给 Broker,也可以查询 Broker 的消费位移来确定消费的起始位置]。
处理 Rebalance:Broker 会监测 Consumer Group 内部的变化,比如有新的 Consumer 加入或者有 Consumer 退出,然后触发重平衡(rebalance)操作,重新分配每个分区的消费者。Broker 会通知 Group Leader 进行分区分配,并将结果同步给其他的 Consumer]。
Broker 是如何完成消息的存储的?
这个内容就太多了,放在下一篇吧。
参考资料
https://juejin.cn/post/6844904000622428173
https://www.modb.pro/db/615537
版权声明: 本文为 InfoQ 作者【Codyida】的原创文章。
原文链接:【http://xie.infoq.cn/article/fc0352c96fd7024863807a3a0】。文章转载请联系作者。
评论