写点什么

Kafka-Broker 的基本模块

作者:编程江湖
  • 2021 年 12 月 16 日
  • 本文字数:3161 字

    阅读完需:约 10 分钟

1.SocketServer

SocketServer 作为 Broker 对外提供 Socket 服务的模块,主要用于接收 socket 连接的请求,然后产生相应为之服务的 SocketChannel 对象。

内部主要包括三个模块:

Acceptor 主要用于监听 Socket 连接;

Processor 主要用于转发 Socket 的请求和响应。

RequestChannel 主要用于缓存 Socket 的请求和响应。

1.1Acceptor 对象主要功能

(1)开启 socket 服务

(2)注册 Accept 事件

(3)监听此 ServerChannel 上的 ACCEPT 事件,当其发生时,将其以轮询的方式把对应的 SocketChannel 转交给 Processor 处理线程。

1.2Processor 对象主要功能

(1)当有新的 SocketChannel 对象进来的时候,注册其上的 OP_READ 事件以便接收客户端的请求。

(2)从 RequestChannel 中的响应队列获取对应客户端的请求,然后产生 OP_WRITE 事件。

(3)监听 selector 上的事件。如果是读事件,说明有新的 request 到来,需要转移给 RequestChannel 的请求队列;如果是写事件,说明之前的 request 已经处理完毕,需要从 RequestChannel 的响应队列获取响应并发送回客户端;如果是关闭事件,说明客户端已经关闭了 该 Socket 连接,此时服务端也应该释放相关资源。

1.3RequestChannel

本质上就是为了解耦 SocketServer 和 KafkaApis 两个模块,内部包含 Request 的阻塞队列和 Response 的阻塞队列。

注:SocketServer 为了防止空闲连接大量存在,采用了 LRU 算法,即最近最少使用算法,会将长时间没有交互的 SocketChannel 对象关闭,及时释放资源。因此 Processor 仅仅是起到了接收 Request,发送 Response 的作用,其处理 Request 的具体业务逻辑是由 KafkaApis 层负责的,并且两者之间是通过 RequestChannel 相互联系起来的。


总结可得,SocketServer 负责下面三个方面:

(1)建立 Socket,保持和客户端的通信;

(2)转发客户端的 Request;

(3)返回 Response 给客户端。最后通过 RequestChannel 与其他模块解耦。

2.KafkaRequestHandlerPool

KafkaRequestHandlerPool 本质上就是一个线程池,里面包含了 num.io.threads 个 IO 处理线程,默认 为 8 个。KafkaRequestHandlerPool 在内部启动了若干个 KafkaRequestHandler 处理线程,并将 RequestChannel 对象和 KafkaApis 对象传递给了 KafkaRequestHandler 处理线程,因为 KafkaRequestHandler 需要从前者的 requestQueue 中取出 Request,并且利用后者来完成具体的业务逻辑。

3.KafkaApis

KafkaApis 负责具体的业务逻辑,它主要和 Producer、Consumer、Broker Server 交互。 KafkaApis 主要依赖以下四个组件来完成具体的业务逻辑:

LogManager 提供针对 Kafka 的 topic 日志的读取和写入功能。

ReplicaManager 提供针对 topic 分区副本数据的同步功能。

OffsetManager 提供针对提交至 Kafka 偏移量的管理功能。

KafkaSchedule 为其他模块提供定时的调度和管理功能。

3.1LogManager

LogManager 负责提供 Broker Server 上 topic 的分区数据读取和写入功能,负责读取和写入位于 Broker Server 上的所有分区副本数据;如果 Partition 有多个 Replica,则每个 Broker Server 不会存在相同 Partition 的 Replica;如果存在的话,一旦遇到 Broker Server 下线,则会立刻丢失 Partition 的多份副本,失去 了一定的可靠性。

Topic、Partition 和 Replica 三者之间的关联关系:


3.2ReplicaManager

ReplicaManager 负责提供针对 topic 的分区副本数据的同步功能,需要针对不同的变化做出及时响应,例如 Partition 的 Replicas 发送 Leader 切换时,Partition 的 Replicas 所在的 Broker Server 离线的时候,Partition 的 Replicas 发生 Follower 同步 Leader 数据异常的时候,等等。

分区两个名词:AR 和 ISR

AR 是 Assign Replicas 的缩写,代表已经分配给 Partition 的副本。

ISR 是 In-Sync Replicas 的缩写,代表处于同步状态的副本。

并不是所有的 AR 都是 ISR,尤其是当 Broker Server 离线的时候会导致对应 TopicAndPartition 的 Replica 没有及时同步 Leader 状态的 Replica,从而该 Replica 不是 ISR。

a.ReplicaManager 是如何实现 Replica 数据的同步?

主要利用 ReplicaFetcherThread(副本数据拉取线程)和 Height Watermark Mechanism(高水位线机制)来实现数据的同步管理。

b.什么是高水位?

本质上代表的是 ISR 中的所有 replicas 的 last commited message 的最小起始偏移量,即在这偏移之前的数据都被 ISR 所有的 replicas 所接收,但是在这偏移之后的数据被 ISR 中的部分 replicas 所接收。


其中 RecoverPoint 代表的是 recover-point-offset-checkpoint 文件中记录的偏移量,LogEndOffset 代表的是当前 TopicAndPartition 的 replica 所接收到消息的最大偏移量,HeightWatermark 代表的是已经同步给所有 ISR 的最小偏移量。Replica 的 HeightWatermark 发生更新在以下两种情况:

(1)Leader 状态的 Replica 接收到其他 Follower 状态的 Replica 的 FetchRequest 请求时,会选择性得更新 HeightWatermark。

(2)Follower 状态的 Replica 接收到来自 Leader 状态的 Replica 的 FetchResponse 时,会选择性更新 HeightWatermark,即 ReplicaFetcherThread 内部的 processPartitionData 流程。

4.OffsetManager

4.1Kafka 提供两种保存 Consumer 偏移量的方法:

(1)将偏移量保存到 Zookeeper 中。

(2)将偏移量保存至 Kafka 内部一个名为_consumer_offsets 的 Topic 里面。

将偏移量保存至 Zookeeper 中是 kafka 一直就支持的,但是考虑到 zookeeper 并不太适合大批量的频繁写入操作,因此 kafka 开始支持将 Consumer 的偏移量保存再 Kafka 内部的 topic 中,即_consumer_offsets Topic。当用户配置 offsets.storage=kafka 时,大数据培训高级消费者会将偏移量保存至 Topic 里面,同时通过 OffsetManager 提供对这些偏移量的管理。

4.2 OffsetManager 主要功能

缓存最新的偏移量。

提供对偏移量的查询。

Compact,保留最新的偏移量,以此来控制 Topic 日志的大小。

Kafka 如何将 Consumer Group 产生的偏移量信息保存在_consumer_offsets 的不同分区?

本质是通过计算不同 Consumer Group 的 hash 值和_consumer_offsets 的分区数的模数,其结果作为指定分区的索引。

5.KafkaScheduler

KafkaScheduler 为其他模块提供定时任务的调度和管理,例如 LogManager 内部的 cleanupLogs 定时任务,flushDirtyLogs 定时任务和 checkpointRecoverPointOffsets 定时任务;ReplicaManager 模块内部的 maybeShrinkIsr 定时任务;OffsetManager 内部的 offsets-cache-compactor 定时任务等等。KafkaScheduler 内部是基于 ScheduledThreadPoolExecutor 实现的,对外封装了任务调度的接口 schedule,线程个数由参数 background.threads 决定,默认值为 10。

6.KafkaHealthcheck

KafkaHealthcheck 主要提供 Broker Server 健康状态的上报。Broker Server 健康状态本质上就是指 Broker Server 是否在线,如果 Broker Server 在线,说明处于健康状态,如果 Broker Server 离线,说明处于死亡状态。

Broker Server 如何上报健康状态?

BrokerChangeListener 通过监听目录为/brokers/ids 的 zookeeper 路径,当发生有数据变化时,则获取当前目录下的数据,从而获取当前集群的在线 Broker Server 列表。而 KafkaHealthcheck 正是提供了在目录为/brokers/ids 的 Zookeeper 路径上注册节点的能力,该节点所在路径为 EphemeralPath(非永久路径),当 Broker Server 由于异常情况导致下线时,此 EphemeralPath 随着 Broker Server 和 zookeeper 链接的断开而消失。

7.TopicConfigManager

kafka 提供对 topic 配置参数的在线修改能力,修改完成之后无需重新启动 kafka 集群,在线生效。Topic 配置参数包括:数据文件的大小,索引文件的大小,索引项的大小,索引项的粒度,日志文件保留的策略等等;

Topic 的配置参数位于路径为/config/topics/[topic]的 zookeeper 上,Broker Server 内部为了避免针对每个 Topic 都在相关路径上建立监听器,对外提供了一个被通知的路径,其位于/brokers/config_changes,如果检测到该路径 上发生变化,则读取该路径上的数据,获取配置文件待更新的 Topic,然后再从/config/topics/[topic]上加载最新的配置文件。

用户头像

编程江湖

关注

IT技术分享 2021.11.23 加入

还未添加个人简介

评论

发布
暂无评论
Kafka-Broker的基本模块