kafka 生产者你不得不知的那些事儿
前言
kafka 生产者作为消息发送中很重要的一环,这里面可是大有文章,你知道生产者消息发送的流程吗?知道消息是如何发往哪个分区的吗?如何保证生产者消息的可靠性吗?如何保证消息发送的顺序吗?如果对于这些问题还比较模糊的话,那么很有必要看看这篇文章了,本文主要是基于 kafka3.x 版本讲解。
生产者流程
kafka 生产者最重要的就是消息发送的整个流程,我们来看下究竟是怎么一回事把。
在消息发送的过程中,涉及到了两个线程——main
线程和 Sender
线程。在 main
线程中创建了一个双端队列 RecordAccumulator
。main
线程将消息发送给 RecordAccumulator
,Sender
线程不断从 RecordAccumulator
中拉取消息发送到 Kafka Broker
。
在主线程中由
kafkaProducer
创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator
, 也称为消息收集器)中。
拦截器: 可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
序列化器:用于在网络传输中将数据序列化为字节流进行传输,保证数据不会丢失。
分区器:用于按照一定的规则将数据分发到不同的 kafka broker 节点中
Sender
线程负责从RecordAccumulator
获取消息并将其发送到Kafka
中。
RecordAccumulator
主要用来缓存消息以便Sender
线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator
缓存的大小可以通过生产者客户端参数buffer.memory
配置,默认值为33554432B
,即32M
。主线程中发送过来的消息都会被迫加到
RecordAccumulator
的某个双端队列(Deque
)中,RecordAccumulator
内部为每个分区都维护了一个双端队列,即Deque<ProducerBatch>
, 消息写入缓存时,追加到双端队列的尾部。Sender
读取消息时,从双端队列的头部读取。ProducerBatch
是指一个消息批次;与此同时,会将较小的ProducerBatch
凑成一个较大ProducerBatch
,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch
大小可以通过batch.size
控制,默认16kb
。Sender
线程会在有数据积累到batch.size
,默认 16kb,或者如果数据迟迟未达到batch.size
,Sender
线程等待linger.ms
设置的时间到了之后就会获取数据。linger.ms
单位ms
,默认值是0ms
,表示没有延迟。
Sender
从RecordAccumulator
获取缓存的消息之后,会将数据封装成网络请求<Node,Request>
的形式,这样就可以将Request
请求发往各个Node
了。请求在从
sender
线程发往Kafka
之前还会保存到InFlightRequests
中,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求。InFlightRequests
默认每个分区下最多缓存 5 个请求,可以通过配置参数为max.in.flight.request.per. connection
修改。请求
Request
通过通道Selector
发送到kafka
节点。发送后,需要等待 kafka 的应答机制,取决于配置项
acks
.
0:生产者发送过来的数据,不需要等待数据落盘就应答。
1:生产者发送过来的数据,
Leader
收到数据后应答。-1(all):生产者发送过来的数据,Leader 和副本节点收齐数据后应答。默认值是-1,-1 和 all 是等价的。
Request
请求接受到 kafka 的响应结果,如果成功的话,从InFlightRequests
清除请求,否则的话需要进行重发操作,可以通过配置项retries
决定,当消息发送出现错误的时候,系统会重发消息。retries
表示重试次数。默认是 int 最大值,2147483647
。清理消息累加器
RecordAccumulator
中的数据。
生产者重要参数
现在我们来看看 kafka 生产者中常用且关键的配置参数。
bootstrap.servers
生产者连接集群所需的 broker 地 址 清 单 。 例 如hadoop102:9092,hadoop103:9092,hadoop104:9092
,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker
里查找到其他 broker
信息。
key.serializer
和value.serializer
指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memory
RecordAccumulator
缓冲区总大小,默认 32m。
batch.size
缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms
如果数据迟迟未达到 batch.size
,kafka 等待这个时间之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms
之间。
max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B ,即 lMB 一般情况下,这个默认值就可以满足大多数的应用场景了。
compression.type
这个参数用来指定消息的压缩方式,默认值为“none
",即默认情况下,消息不会被压缩。该参数还可以配置为 "gzip
","snappy
" 和 "lz4
"。对消息进行压缩可以极大地减少网络传输、降低网络 I/O,从而提高整体的性能 。消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩;
acks
acks
的值为 0,1 和-1 或者 all。
0 表示
Producer
往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。1 表示
Producer
往集群发送数据只要Leader
成功写入消息就可以发送下一条,只确保Leader
接收成功。-1 或 all 表示
Producer
往集群发送数据需要所有的ISR Follower
都完成从Leader
的同步才会发送下一条,确保 Leader发送
成功和所有的副本都成功接收。安全性最高,但是效率最低。
max.in.flight.requests.per.connection
允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries
和retry.backoff.ms
当消息发送出现错误的时候,系统会重发消息。retries
表示重试次数。在 kafka3.4.0 默认是 int 最大值,2147483647
。如果设置了重试,还想保证消息的有序性,需要设置max.in.flight.requests.per.connection
=1 否则在重试此失败消息的时候,其他的消息可能发送成功了。另外retry.backoff.ms
控制两次重试之间的时间间隔,默认是 100ms。
更多 kafka 生产者的配置可以查阅官网https://kafka.apache.org/documentation/#producerconfigs
。
生产者发送消息 API
生产者发送 demo
通常情况下,生产者发送消息分为以下 4 个步骤:
(1)配置生产者客户端参数及创建相应的生产者实例
(2)构建待发送的消息
(3)发送消息
(4)关闭生产者实例
我们直接上代码。
引入 maven 依赖
核心发送逻辑
消息对象
ProducerRecord
kafka 发送时主要构造出ProducerRecord
对象,包含发送的主题,partition,key,value 等。
三种发送模式
kafka 提供了 3 种发送消息的模式,发后即忘,同步发送和异步发送,我们直接上代码。
发后即忘(
fire-and-forget
)
发后即忘,它只管往 Kafka 发送,并不关心消息是否正确到达。 在大多数情况下,这种发送方式没有问题。 不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。 这种发送方式的性能最高,可靠性最差。
同步发送(
sync
)
只需在上面种发送方式的基础上,再调用一下 get()方法即可,该方法时阻塞的。
带回调异步发送(
async
)
回调函数会在 producer
收到 ack
时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata
和Exception
,如果 Exception
为 null
,说明消息发送成功,如果 Exception
不为 null
,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
生产者发送核心机制
生产者分区机制
kafka 设计上存在分区的,它有下面两个好处:
便于合理使用存储资源,每个
Partition
在一个Broker
上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker
上。合理控制分区的任务,可以实现负载均衡的效果。提高并行度和吞吐量,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
那究竟生产者是按照什么样的策略发往到不同的分区呢?
根据生产者的发送流程,其中会经过分区器,默认情况下是使用DefaultPartitioner
,具体逻辑如下:
按指定分区发送
kafka
发送消息的时候构造消息对象ProducerRecord
,可以传入指定的partition
, 那么消息就会发送这个指定的分区。例如 partition=0,所有数据写入分区 0。
没有指明
partition
值但有key
的情况下,将key
的hash
值与topic
的partition
数进行取余得到partition
值;
例如:key1
的hash
值=5, key2
的hash
值=6 ,topic
的partition
数=2,那么key1
对应的value1
写入 1 号分区,key2
对应的value2
写入 0 号分区。
既没有
partition
值又没有key
值的情况下,Kafka 采用Sticky Partition
(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch
已满或者已完成,Kafka
再随机一个分区进行使用(和上一次的分区不同)。
例如:第一次随机选择 0 号分区,等 0 号分区当前批次满了(默认 16k)或者linger.ms
设置的时间到, Kafka
再随机一个分区进行使用(如果还是 0 会继续随机)。
自定义分区器
如果默认的分区规则不满足需求,我们也可以自定义一个分区器。比如我们实现一个分区器实现,发送过来的数据中如果包含 alvin
,就发往 0 号分区,不包含 alvin
,就发往 1 号分区。
实现分区器接口
Partitioner
配置分区器
如何提高生产者吞吐量?
对比着前面 kafka 生产者的发送流程,kafka 生产者提供的一些配置参数可以有助于提高生产者的吞吐量。
如何保证生产者消息的可靠性?
为了保证消息发送的可靠性,kafka
在 producer
里面提供了消息确认机制。我们可以通过配置来决 定消息发送到对应分区的几个副本才算消息发送成功。可以在定义 producer
时通过 acks
参数指定。
acks=0
生产者发送过来的数据,不需要等数据落盘应答。
acks=1(默认值)
生产者发送过来的数据,Leader
收到数据后应答。
acks=-1 或者 all
生产者发送过来的数据,Leader
和ISR
队列里面的所有节点收齐数据后应答。
ISR
概念:(同步副本)。每个分区的 leader
会维护一个 ISR
列表,ISR
列表里面就是 follower
副本 的 Borker
编 号 , 只 有 跟 得 上 Leader
的 follower
副 本 才 能 加 入 到 ISR
里 面 , 这 个 是 通 过 replica.lag.time.max.ms
=30000(默认值)参数配置的,只有 ISR
里的成员才有被选为 leader
的可能。
如果Leader
收到数据,所有Follower
都开始同步数据,但有一个Follower
,因为某种故障,迟迟不能与Leader
进行同步,那这个问题怎么解决呢?
Leader
维护了一个动态的in-sync replica set
(ISR
),意为和Leader
保持同步的Follower+Leader
集合(leader:0,isr:0,1,2)
。如果Follower
长时间未向Leader
发送通信请求或同步数据,则该Follower
将被踢出ISR
。该时间阈值由replica.lag.time.max.ms
参数设定,默认30s
。
小结:数据完全可靠条件 = ACK
级别设置为-1 + 分区副本大于等于 2 + ISR 里应答的最小副本数量大于等于 2。
acks=0
,生产者发送过来数据就不管了,可靠性差,效率高;acks=1
,生产者发送过来数据Leader
应答,可靠性中等,效率中等;acks=-1或者all
,生产者发送过来数据Leader
和ISR
队列里面所有Follwer
应答,可靠性高,效率低;
在生产环境中,acks=0
很少使用;acks=1
,一般用于传输普通日志,允许丢个别数据;acks=-1
,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
如何保证消息只发送一次?
kafka 作为分布式消息系统,难免会出现重复消息或者丢消息的情况,会存在 3 种数据传递语义。
最多一次(At Most Once)
ack 级别设置为 0, 可以保证数据不重复,但是不能保证数据不丢失, 所以叫做最多一次。
至少一次(At Least Once)
ack 级别设置为-1 + 分区副本大于等于 2 + ISR
里应答的最小副本数量大于等于 2 可能会出现至少一次的消息。比如下图中在发送过程 Leader 节点宕机,消息就会重试,就有可能出现消息的重复。
At Least Once
可以保证数据不丢失,但是不能保证数据不重复。
精确一次(Exactly Once)
对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。这在 kafka 中可以通过幂等性和事务的特性实现。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR 最小副本数量>=2) 。
幂等性,简单来说,就是一个操作重复做,每次的结果都一样。开启幂等性功能,参数enable.idempotence
设置为 true 即可,在 3.x 版本中默认情况下也是 true。具体实现原理如下:
每一个
producer
在初始化时会生成一个producer_id
,并为每个目标partition
维护一个“序列号”。producer
每发送一条消息,会将<producer_id
,分区>对应的“序列号”加 1。broker
服务端端会为每一对<producer_id,分区>
维护一个序列号,对于每收到的一条消息,会判断服务端 的SN_old
和接收到的消息中的SN_new
进行对比:
如果
SN_OLD+1
=SN_NEW
,正常情况如果
SN_old+1
>SN_new
,说明是重复写入的数据,直接丢弃如果
SN_old+1
<SN_new
,说明中间有数据尚未写入,或者是发生了乱序,或者是数据丢失,将抛出严重异常:OutOfOrderSequenceException
。
如何保证生产者消息的顺序?
根据前面的生产者发送流程可以知道,要想保证消息投递的顺序性:
首先要保证单分区,因为单分区内是有序的,多分区,分区与分区间无序。
kafka 在 1.x 版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1
kafka 在 1.x 及以后版本保证数据单分区有序,条件如下:
未开启幂等性,
max.in.flight.requests.per.connection
需要设置为 1。开启幂等性,
max.in.flight.requests.per.connection
需要设置小于等于 5。
因为在 kafka1.x 以后,启用幂等后,kafka 服务端会缓存producer
发来的最近 5 个request
的元数据,故无论如何,都可以保证最近 5 个request
的数据都是有序的。
总结
本文总结了 kafka 生产者整个消息发送的流程,只有明白了这个流程以后,那么我们对于一些生产者消息发送的一些问题才有更加深刻的理解。
版权声明: 本文为 InfoQ 作者【JAVA旭阳】的原创文章。
原文链接:【http://xie.infoq.cn/article/8212a5a9a20564c4c4bc4cd7f】。文章转载请联系作者。
评论