8 张图带你全面了解 kafka 的核心机制
kafka 是目前企业中很常用的消息队列产品,可以用于削峰、解耦、异步通信。特别是在大数据领域中应用尤为广泛,主要得益于它的高吞吐量、低延迟,在我们公司的解决方案中也有用到。既然 kafka 在企业中如此重要,那么本文就通过几张图带大家全面认识一下 kafka,现在我们不妨带入 kafka 设计者的角度去思考该如何设计,它的架构是怎么样的、都有哪些组件组成、如何进行扩展等等。
kafka 基础架构
现在假如有 100T 大小的消息要发送到 kafka 中,数据量非常大,一台机器存储不下,面对这种情况,你该如何设计呢?
很简单,分而治之,一台不够,那就多台,这就形成了一个 kafka 集群。如下图所示,一个 broker 就是一个 kafka 节点,100T 数据就有 3 个节点分担,每个节点约 33T,这样就能解决问题了,还能提高吞吐量。
Topic: 可以理解为一个队列,一个 kafka 集群中可以定义很多的 topic,比如上图中的
topicA
。Partition: 为了实现扩展性,提高吞吐量,一个非常大的
topic
可以分布到多个broker
(即服务器)上,一个topic
可以分为多个partition
,每个partition
是一个有序的队列。比如上图中的 topicA 被分成了 3 个partition
。Replica: 副本,如果数据只放在一个
broker
中,万一这个broker
宕机了怎么办?为了实现高可用,一个topic
的每个分区都有若干个副本,一个Leader
和若干个Follower
。比如上图中的虚线连接的就是它的副本。Leader: 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是
Leader
。Follower: 每个分区多个副本中的“从”,实时从
Leader
中同步数据,保持和Leader
数据的同步。Leader
发生故障时,某个Follower
会成为新的Leader
。Producer: 消息生产者,就是向
Kafka broker
发消息的客户端,后面详细讲解。Consumer: 消息消费者,向
Kafka broker
取消息的客户端,多个Consumer
会组成一个消费者组,后面详细讲解。Zookeeper:用来记录 kafka 中的一些元数据,比如 kafka 集群中的 broker,leader 是谁等等,但
Kafka
2.8.0 版本以后也支持非 zk 的方式,大大减少了和 zk 的交互。
kafka 生产者流程
前面通过一张图片讲解了 kafka 整体的架构,那现在我们来看看 kafka 生产者发送的整个过程,这里面也是大有文章。
在消息发送的过程中,涉及到了两个线程——main
线程和 Sender
线程。在 main
线程中创建了一个双端队列 RecordAccumulator
。main
线程将消息发送给 RecordAccumulator
,Sender
线程不断从 RecordAccumulator
中拉取消息发送到 Kafka Broker
。
在主线程中由
kafkaProducer
创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator
, 也称为消息收集器)中。
拦截器: 可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
序列化器: 用于在网络传输中将数据序列化为字节流进行传输,保证数据不会丢失。
分区器: 用于按照一定的规则将数据分发到不同的 kafka broker 节点中
Sender
线程负责从RecordAccumulator
获取消息并将其发送到Kafka
中。
RecordAccumulator
主要用来缓存消息以便Sender
线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator
缓存的大小可以通过生产者客户端参数buffer.memory
配置,默认值为33554432B
,即32M
。主线程中发送过来的消息都会被迫加到
RecordAccumulator
的某个双端队列(Deque
)中,RecordAccumulator
内部为每个分区都维护了一个双端队列,即Deque<ProducerBatch>
, 消息写入缓存时,追加到双端队列的尾部。Sender
读取消息时,从双端队列的头部读取。ProducerBatch
是指一个消息批次;与此同时,会将较小的ProducerBatch
凑成一个较大ProducerBatch
,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch
大小可以通过batch.size
控制,默认16kb
。Sender
线程会在有数据积累到batch.size
,默认 16kb,或者如果数据迟迟未达到batch.size
,Sender
线程等待linger.ms
设置的时间到了之后就会获取数据。linger.ms
单位ms
,默认值是0ms
,表示没有延迟。
Sender
从RecordAccumulator
获取缓存的消息之后,会将数据封装成网络请求<Node,Request>
的形式,这样就可以将Request
请求发往各个Node
了。请求在从
sender
线程发往Kafka
之前还会保存到InFlightRequests
中,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求。InFlightRequests
默认每个分区下最多缓存 5 个请求,可以通过配置参数为max.in.flight.request.per. connection
修改。请求
Request
通过通道Selector
发送到kafka
节点。发送后,需要等待 kafka 的应答机制,取决于配置项
acks
.
0:生产者发送过来的数据,不需要等待数据落盘就应答。
1:生产者发送过来的数据,
Leader
收到数据后应答。-1(all):生产者发送过来的数据,Leader 和副本节点收齐数据后应答。默认值是-1,-1 和 all 是等价的。
Request
请求接受到 kafka 的响应结果,如果成功的话,从InFlightRequests
清除请求,否则的话需要进行重发操作,可以通过配置项retries
决定,当消息发送出现错误的时候,系统会重发消息。retries
表示重试次数。默认是 int 最大值,2147483647
。清理消息累加器
RecordAccumulator
中的数据。
kafka 消费者流程
原来 kafka 生产者发送经过了这么多流程,我们现在来看看 kafka 消费者又是如何进行的呢?
Kafka 中的消费是基于拉取模式的。消息的消费一般有两种模式:推送模式和拉取模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。
kafka 是以消费者组进行消费的,一个消费者组,由多个 consumer 组成。形成一个消费者组的条件,是所有消费者的 groupid 相同。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。
消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
那么问题来了,kafka 是如何指定消费者组的每个消费者消费哪个分区?每次消费的数量是多少呢?
一、如何制定消费方案
消费者 consumerA,consumerB, consumerC 向 kafka 集群中的协调器
coordinator
发送JoinGroup
的请求。coordinator
主要是用来辅助实现消费者组的初始化和分区的分配。
coordinator
老大节点选择 =groupid
的hashcode
值 % 50(__consumer_offsets
内置主题位移的分区数量)例如:groupid
的 hashcode 值 为 1,1% 50 = 1
,那么__consumer_offsets
主题的 1 号分区,在哪个broker
上,就选择这个节点的coordinator
作为这个消费者组的老大。消费者组下的所有的消费者提交offset
的时候就往这个分区去提交offset
。
选出一个
consumer
作为消费中的leader
,比如上图中的ConsumerB
。消费者
leader
制定出消费方案,比如谁来消费哪个分区等把消费方案发给
coordinator
最后
coordinator
就把消费方 案下发给各个consumer
, 图中只画了一条线,实际上是有下发各个consumer
。
注意,每个消费者都会和coordinator
保持心跳(默认 3s),一旦超时(session.timeout.ms=45s
),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms
=5 分钟),也会触发再平衡,也就是重新进行上面的流程。
二、消费者消费细节
现在已经初始化消费者组信息,知道哪个消费者消费哪个分区,接着我们来看看消费者细节。
消费者创建一个网络连接客户端
ConsumerNetworkClient
, 发送消费请求,可以进行如下配置:
fetch.min.bytes
: 每批次最小抓取大小,默认 1 字节fetch.max.bytes
: 每批次最大抓取大小,默认 50Mfetch.max.wait.ms
:最大超时时间,默认 500ms
发送请求到 kafka 集群
成功的回调,会将数据保存到
completedFetches
队列中消费者从队列中抓取数据,根据配置
max.poll.records
一次拉取数据返回消息的最大条数,默认 500 条。获取到数据后,需要经过反序列化器、拦截器等。
kafka 的存储机制
我们都知道消息发送到 kafka,最终是存储到磁盘中的,我们看下 kafka 是如何存储的。
一个topic
分为多个partition
,每个 partition 对应于一个log
文件,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,每个partition
分为多个segment
。每个segment
包括:“.index
”文件、“.log
”文件和.timeindex
等文件,Producer
生产的数据会被不断追加到该 log 文件末端。
上图中 t1 即为一个topic
的名称,而“t1-0/t1-1”则表明这个目录是 t1 这个topic
的哪个partition
。
kafka 中的索引文件以稀疏索引(sparseindex
)的方式构造消息的索引,如下图所示:
根据目标
offset
定位segment
文件找到小于等于目标
offset
的最大offset
对应的索引项定位到
log
文件向下遍历找到目标
Record
注意:index 为稀疏索引,大约每往log
文件写入4kb
数据,会往index
文件写入一条索引。通过参数log.index.interval.bytes
控制,默认4kb
。
那 kafka 中磁盘文件保存多久呢?
kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
log.retention.hours
,最低优先级小时,默认 7 天。log.retention.minutes
,分钟。log.retention.ms
,最高优先级毫秒。log.retention.check.interval.ms
,负责设置检查周期,默认 5 分钟。
总结
其实 kafka 中的细节十分多,本文也只是对 kafka 的一些核心机制从理论层面做了一个总结,更多的细节还是需要自行去实践,去学习。
作者:JAVA 旭阳
链接:https://juejin.cn/post/7233809309150740541
来源:稀土掘金
评论