原理剖析:一文搞懂 Kafka Producer(下)
编辑导读:本篇是 Kafka Producer 原理剖析的下篇,对 Kafka Producer 的幂等性、实现细节和常见问题进行了详细的解读。AutoMQ 是与 Apache Kafka 100% 完全兼容的新一代 Kafka,可以帮助用户降低 90%以上的 Kafka 成本并且进行极速地自动弹性。作为 Kafka 生态的忠实拥护者,我们也会持续致力于传播 Kafka 技术,欢迎关注我们。
引言
在之前的文章原理剖析:一文搞懂 Kafka Producer(上)中,我们介绍了介绍 Kafka Producer 的使用方法与实现原理。这篇将继续介绍 Kafka Producer 的实现细节与常见问题。
幂等性
在一个分布式的消息系统中,各个角色均有可能发生故障。以 Apache Kafka 为例,Broker 和 Client 都有可能会崩溃,Broker 与 Client 之间的网络请求与响应都有可能丢失。根据 Producer 处理这类故障时采取的策略,可以分为以下几种语义:
至少一次(At Least Once):当发生请求超时或者服务端错误时,Producer 重复尝试发送消息直至成功。这样做可以保证每条消息都被写入 Topic,但是可能会发生重复。
至多一次(At Most Once):在超时或报错时 Producer 不进行重试,每条消息仅发送一次。这样做可以避免消息重复,但也可能会导致消息丢失。
精确一次(Exactly Once):Producer 进行适当的重试,以确保每条消息会且仅会被写入 Topic 一次,既不重复,也不遗漏。Exactly Once 的语义是最理想的实现,它可以满足绝大多数业务场景的需求;但同时也是最难以实现的,它需要 Client 与 Broker 之间的密切配合。
Apache Kafka Producer 提供了两个级别的 Exactly Once 的语义实现:
幂等性(Idempotence):确保 Producer 在向某个 Partition 发送消息时,该消息会且仅会被持久化一次。
事务性(Transaction):当 Producer 同时向多个 Partition 发送消息时,确保这些消息要么都被持久化,要么都不被持久化。
这里我们主要介绍 Kafka Producer 幂等性的使用与实现,关于事务消息的实现原理可以参阅我们之前的文章《原理剖析| Kafka Exactly Once 语义实现原理:幂等性与事务消息》。
开启幂等性
Kafka Producer 开启幂等性是非常简单的,它只需要设置几个配置项,而无需修改任何其他代码(Producer 的接口并没有变化)。
相关配置项有:
acks
当指定数量的副本收到消息后,Producer 才会认为消息写入完成。默认为 "all"
acks=0
Producer 不会等待任何 broker 的响应,消息写入网络层后即认为写入成功
acks=1
Producer 会等待 leader broker 的响应
acks=all
Producer 会等待所有同步的(in sync)副本的响应
enable.idempotence
开启幂等性,即,保证每条消息写入且仅被写入一次,同时保证消息按照发送顺序写入。默认为 "true"
开启此配置时,需要保证 max.in.flight.requests.per.connection 不大于 5,retries 大于 0,acks 设置为 "all"
在使用时需要注意,幂等 Producer 仅能避免由 Producer 内部的重试策略(Producer、Broker 或网络出错)导致的消息重复,它无法处理以下几种情况:
幂等 Producer 仅保证 Session 级别的不重不漏,当 Producer 发生重启时,不能保证重启后与重启前发送的消息不重复。
幂等 Producer 仅保证 Partition 级别的不重不漏,不能保证向多个 Partition 发送的消息不重复。
当 Producer 出于各种原因发送超时时,即,发送耗时超过了 delivery.timeout.ms,Producer 会抛出
TimeoutException
;此时无法保证对应的消息是否已经被 Broker 持久化,需要由上层根据情况进行处理。
实现原理
为了实现幂等性,Kafka 引入了以下两个概念:
Producer ID(以下简称 PID):Producer 的唯一标识。PID 由 Idempotent Producer 在首次发送消息前,请求 Broker 分配获得,是全局唯一的。PID 仅在 Producer 和 Broker 内部使用,不会暴露给 Client 使用者。
Sequence Number(以下简称 SEQ):消息的序列号。该序列号在 (PID, Partition) 维度上严格递增。事实上,SEQ 会存储在 Record Batch 的头中,作为 Batch 中第一条消息的 SEQ,Batch 中其它消息的 SEQ 依次递增。
值得一提的是,PID 与 SEQ 均会跟随消息持久化到 Log 中。
事实上,除了前述两个属性外,还有 Producer Epoch,它与 PID 结合才会唯一标识一个 Producer。它的在不同的场景下有不同的用途:
对于开启事务能力的 Producer(配置了 "transactional.id"),Producer Epoch 同样由 Broker 分配。这样做可以保证,多个具有相同 Transactional ID 的 Producer 中仅会有一个生效,即 "Fence Producer"。
而对于没有开启事务能力的 Producer,Producer Epoch 则由 Producer 自己维护,它会在需要重置序列号(Reset SEQ,后文会详细介绍)时增长,并将 SEQ 重置到 0。
![](https://static001.geekbang.org/infoq/86/868d453ed54134013c9b210d58c3ca20.png)
下面分别介绍为了实现幂等性,服务端(Broker)与客户端(Producer)分别做了哪些操作。
服务端
Broker 会在内存中记录每个 Producer 的状态信息,包括 Producer Epoch 与每个 Partition 最新写入的 5 个 Record Batch 的元数据(包括 SEQ、offset、timestamp 等),用于判断 Producer 发送的请求是否存在重复或者遗漏。
此外,这些状态信息也会定期进行快照,Broker 在重启时会基于快照与 Log 中的信息恢复出这些状态信息。
值得一提的是,这里硬编码的 5 也是 Producer 配置 max.in.flight.requests.per.connection 的上限。具体原因在后文介绍。
当 Broker 收到一个 Record Batch 后,在进行完必要前置操作后、真正持久化到 Log 前,会检查该 Batch 上的 PID、Producer Epoch 与 SEQ。具体地说:
检查该 Record Batch 是否与本地记录的 5 个 Record Batch 一致,若是,则认为 Producer 出于某些原因重复发送了该 Record Batch,不进行任何操作,直接返回本地记录的元数据(主要为 offset)
检查之前是否记录了该 PID 对应的状态信息,若没有,检查 SEQ 是否为 0
若是,则认为这是一个全新的 Producer,记录该 Producer 相关信息,并写入 Record Batch
若否,则报错
UnknownProducerIdException
检查 Producer Epoch 是否与本地记录一致,若不一致,检查 SEQ 是否为 0
若是,则认为该 Producer 出于某些原因重置了 SEQ,更新记录,并写入 Record Batch
若否,则报错
OutOfOrderSequenceException
检查 SEQ 是否与最近一次写入的 Record Batch 的 SEQ 连续
若是,则缓存该 Record Batch 的元数据,并写入
若否,则报错
OutOfOrderSequenceException
经过上述处理,可以确保在客户端侧,由同一个 Producer 向同一个 Partition 写入的 Record Batch 都是连续的(基于 SEQ),不会存在遗漏或重复。
客户端
Producer 对于幂等性的处理则相对更加复杂,主要有以下两个难点:
Producer 在发送时可能会发生超时,在超时时,可能存在两种可能——“Broker 没有收到请求”或“Broker 处理了请求,但 Producer 没有收到响应”。这就导致 Producer 难以确认当某个 Produce 请求超时时,Broker 是否已经进行了持久化。
Producer 可能会向同一个 Broker 同时发送多个 Produce 请求,当其中一个或多个报错时,需要根据不同情况,对它们以及后续的请求采取不同的处理方式。
基本概念
在介绍 Producer 发送流程前,先介绍几个基本概念:
在途 Batch(Inflight Batch)
Producer 会按 Partition 维度,记录已经发送请求但尚未收到响应的 Batch;特别地,对于幂等 Producer,还会额外记录每个 Inflight Batch 的 SEQ,并按照 SEQ 排序。
未解决的 Batch(Unresolved Batch)
之前提到,Producer 在发送消息时会进行数次重试,直至总耗时超出 delivery.timeout.ms。如果某个 Batch 发生了 Delivery Timeout,则认为其为 Unresolved。
当某个 Batch 被标记为 Unresolved 时,Producer 无法判断 Broker 是否已经持久化这个 Batch,则只能通过检查这个 Batch 的后续 Batch 是否被 Broker 持久化(亦或报错
OutOfOrderSequenceException
):若后续 Batch 写入成功,则认为它之前的 Unresolved Batch 也已经写入完成;否则,则认为前面的 Unresolved Batch 没有写入完成,需要重置 SEQ。提升 Epoch(Bump Epoch)与重置 SEQ(Reset Sequence Number)
当 Producer 遇到无法通过重试解决的问题时(例如,Inflight Batch 均响应完成,但仍存在 Unresolved Batch 时;Broker 报错
UnknownProducerIdException
时),会执行 Bump Epoch & Reset SEQ 的操作。具体地说,会将 Producer Epoch 加一,并将出错 Partition 的所有 Inflight Batch 从零开始重新编号重新发送,并清空 Unresolved Batch。
发送流程
幂等 Producer 发送一个 Batch 的流程如下:
在发送 Batch 的过程中,Producer 还会驱动处理一些其他事件(例如处理超时 Batch),这些步骤会用括号标出。
(判断 Unresolved Batches 的状态)
如果确认 Unresolved Batch 实际已写入完成,则将其从 Unresolved Batches 中移除
如果确认 Unresolved Batch 实际并没有写入(判断条件:Inflight Batches 为空),则 Bump Epoch & Reset SEQ
检查目前该 Partition 能否发送新的 Batch,不能发送的场景:
存在 Unresolved Batch
之前发生了 Bump Epoch 且仍存在老的 Epoch 的 Inflight Batch
之前某个 Batch 正在重试(也就是说,幂等 Producer 在重试时,Inflight 始终为 1)
(如果之前发生了 Bump Epoch,且已经不存在老 Epoch 的 Inflight Batch,则 Reset SEQ)
获取对应 Partition 的下一个 SEQ,并设置到 Batch 中
将 Batch 加入到 Inflight Batches 中
(检查是否存在 Delivery Timeout 的 Batch,若存在,则将其加入到 Unresolved Batches 中)
向 Broker 发送 Produce 请求,等待响应
收到响应后,检查 Error Code
若为不可重试错误(例如
AuthorizationException
),则 Bump Epoch & Reset SEQ,并向上层报错若为可重试错误(例如
TimeoutException
),则加入重试队列,等待下次发送。此外,如果报错为UnknownProducerIdException
且之前没有 Reset SEQ,则 Bump Epoch & Reset SEQ 并重试;否则直接重试OutOfOrderSequenceException
且“Unresolved Batch 为空”或“该 Batch 恰好为 SEQ 最大的 Unresolved Batch 的下一个”,则 Bump Epoch & Reset SEQ 并重试;否则直接重试从 Inflight Batches 中移除,并向上层返回成功
Inflight Request 上限
前文中提到,Producer 的配置 max.in.flight.requests.per.connection 存在上限 5,这同时也是 Broker 缓存每个 PID 在每个 Partition 发送过的最新的 Batch 的数量。这样做的原因是,当 Inflight Request 数量(例如 2)超过 Broker 缓存的 Batch 数量(例如 1)时,存在以下反例:
Producer 向 Broker 先后发送了两个 Produce Request,且这两个请求中,均包含一个发送给 Partition p1 的 Batch,记为 b1 与 b2,其中 b1 SEQ < b2 SEQ
Broker 将 b1 与 b2 依次持久化完成(此时,Broker 缓存中会记录 b2 的元数据),但由于网络问题,Producer 没有收到响应
Producer 发现超时后重试,重新发送包含 b1 的 Produce Request
Broker 收到 Request 后发现 b1 SEQ 小于缓存中的 b2 的 SEQ,可以推测出该消息为重复的,不应写入,而是直接返回 offset 等信息;但由于缓存中并没有 b1 相关元数据,Broker 也就无法返回 offset 信息
以上就是 Inflight Request 数量不能超过 5 的原因。
其他细节
Producer Epoch 溢出的处理
当 Producer Epoch 溢出时(类型为
short
,最大值为 32767),Producer 会将 PID 与 Epoch 重置,并向 Broker 请求分配一个新的 PID 与 Epoch,并 Reset SEQ。SEQ 溢出的处理
当 SEQ 溢出时(类型为
int
,最大值为 2147483647),下一条消息的 SEQ 会轮转回 0。考虑到 Inflight Batch 的数量与 Batch 中消息的数量的限制,不会发生问题。UnknownProducerIdException
的处理UnknownProducerIdException
报错常发生于以下场景:由于 Log Retention 限制,Broker 将 Log 中某个 Producer 发送的消息均删除了,此时 Broker 重启后缓存中不再有该 Producer 的状态信息。如果此时 Producer 尝试接着之前的 SEQ 发送消息,由于 Broker 无法识别 PID,则会报错。为了处理这种情况,Producer 只需 Bump Epoch 并 Reset SEQ,重新发送消息即可。
示例
下面通过两个示例来帮助理解 Kafka Producer 幂等性的实现。
Broker 没有收到 Produce 请求
![](https://static001.geekbang.org/infoq/1a/1a624397994623acaf61a71ad1f14d12.png)
Producer 没有收到 Produce 响应
![](https://static001.geekbang.org/infoq/47/47a13f9ed5ef1f013e5af25a1a970ad1.png)
实现细节
下面介绍一些前文未涉及的 Kafka Producer 的实现细节。
消息压缩
Kafka Producer 支持在客户端对消息进行压缩,以减少消息的网络传输成本与存储成本。可以通过 Producer 配置 compression.type
来指定压缩时使用的算法,支持的选项有 none、gzip、snappy、lz4、zstd,默认为 none,即不进行压缩。
开启压缩后,可以节约网络带宽与 Broker 存储空间,但是会增加 Producer 与 Broker 的 CPU 消耗。此外,由于压缩是以 Batch 维度进行的,更好的攒批(更大的 Batch)会带来更好的压缩效果。
在实现消息压缩时,会存在这样一个矛盾:只要在真正将消息压缩到 Batch 中之后,才能判断它实际(压缩后)占用了多大的大小;但为了不超过 batch.size 的限制,需要在消息写入 Batch 之前就判断其压缩后的大小。
为了解决这个问题,Kafka 提出了一个自适应的压缩率估计算法。其逻辑如下:
维护一个 Map,其中记录了每个 Topic 上各个压缩算法的“估计压缩率”,初始值为 1.0
当某个 Batch 写满并压缩完成后,计算其“实际压缩率”(压缩后大小 / 压缩后大小)
基于这个实际压缩率调整估计压缩率
如果实际压缩率 < 估计压缩率,将估计压缩率向实际压缩率靠近,最大减少 0.005
如果实际压缩率 > 估计压缩率,将估计压缩率向实际压缩率靠近,最大增加 0.05
在尝试向新的 Batch 写入消息时,将使用新的估计压缩率 * 1.05 作为估算值
除此之外,为了应对极端情况(消息可压缩性波动导致估计值大幅偏离实际值),Kafka 还支持了 Batch 分裂的逻辑。
Batch 分裂
Batch 分裂(Split Batch)是 Kafka Producer 为了应对如下场景实现的功能:当上文中提到的压缩率估计值大幅低于实际值时,可能会导致在一个 Batch 中写入了过多的消息以至于超出了 Broker 或 Topic 的限制(message.max.bytes
或 max.message.bytes
),Broker 会拒绝写入并报错 MESSAGE_TOO_LARGE
。
当发生这样的问题时,就需要 Producer 将过大的 Batch 拆分开并重新发送,具体流程如下:
Producer 收到
MESSAGE_TOO_LARGE
报错重置前文中提到的“估计压缩率”至 max(1.0, 该过大 Batch 的实际压缩率)
将该 Batch 解压,并将解压出的消息基于 batch.size 重新攒批(由于重置了估计压缩率,这会产生多个 Batch),并重新加入发送队列
(如果开启了幂等性或事务性)为新的多个 Batch 设置 SEQ
释放老的 Batch 所使用的内存
监控指标
Kafka Producer 暴露了一些监控指标,可以通过 Producer 配置 metrics.recording.level
来指定 metrics 级别,支持的选项有 INFO、DEBUG、TRACE,默认为 INFO。目前 Kafka Producer 中各监控指标级别均为 INFO,即无论配置如何均会采集。
下面是 Producer 暴露的 metrics 及其含义。
batch-size-avg, batch-size-max:每个 Batch 的大小,如果开启了消息压缩,则为压缩后大小
batch-split-rate, batch-split-total:Batch 分裂的频率与次数
bufferpool-wait-time-ns-total:从 Buffer Pool 中等待分配内存的耗时
buffer-exhausted-rate, buffer-exhausted-total:从 Buffer Pool 中分配内存超时的频率与次数
compression-rate-avg:Batch 的平均压缩率
node-{node}.latency:指定 Node 响应 Produce 请求的延时(从发送请求到收到响应),包括成功与失败的所有请求
record-error-rate, record-error-total:发送消息(而非 Batch)失败的频率与数量,包括同步调用阶段失败与异步调用阶段失败
record-queue-time-avg, record-queue-time-max:Batch 从创建到发送等待的耗时
record-retry-rate, record-retry-total:重试发送消息的频率和数量,不包含 Split Batch 导致的重试
record-send-rate, record-send-total:发送消息的频率和数量
record-size-avg, record-size-max:每个 Batch 中最大的消息(压缩前)的平均大小与最大大小。注意,record-size-avg 并不是消息的平均大小
records-per-request-avg:每个 Produce 请求中消息的数量
request-latency-avg, request-latency-max:Broker 响应 Produce 请求的延时(从发送请求到收到响应),包括成功与失败的所有请求
topic.{topic}:Topic 粒度的各 metrics,包括
.records-per-batch:每个 Batch 中的消息数量
.bytes:同 batch-size-avg, batch-size-max
.compression-rate:同 compression-rate-avg
.record-retries:同 record-retry-rate, record-retry-total
.record-errors:同 record-error-rate, record-error-total
{operation}-time-ns-total:Client 中各接口的总执行耗时,包括
flush:
KafkaProducer#flush
耗时metadata-wait:向 Broker 请求刷新 Topic Metadata 的耗时
txn-init:
KafkaProducer#initTransactions
耗时txn-begin:
KafkaProducer#beginTransaction
耗时txn-send-offsets:
KafkaProducer#sendOffsetsToTransaction
耗时txn-commit:
KafkaProducer#commitTransaction
耗时txn-abort:
KafkaProducer#abortTransaction
耗时
常见问题
下面是一些在使用 Kafka Producer 时常遇到的问题与原因。
发送超时
Producer 发送超时的可能原因有很多,例如网络问题、Broker 负载过高,下面介绍两种由 Producer 导致的发送超时的情况。
Callback 耗时过长:Producer 支持在发送消息时注册回调,但该回调会在 Producer 的 sender 线程中执行,如果用户编写的回调方法执行了一些“重操作”,阻塞了 sender 线程的话,会导致该 Producer 的其它消息无法被及时发送,进而超时。
Callback 死锁:在 Callback 中同步调用
send
方法会导致死锁。举例如下,在 Callback 方法中检查是否发生错误,如果发生错误则调用prdocuer.send().get()
;正如前文所述,Callback 会在 sender 线程中执行,这样做会导致“阻塞 sender 线程的同时,等待 sender 线程执行”,发生死锁。
发送线程被阻塞
尽管 Kafka Producer 在发送消息时是异步的,但仍有一小部分操作是同步执行的。当这些同步操作出于某些原因被阻塞时,会导致调用 KafkaProducer#send
方法的线程也被阻塞。常见的发生阻塞的原因有:
刷新 Metadata 超时:在某些情况下,Producer 在发送消息前需要请求 Broker 刷新 Topic 元数据,该操作会在
send
的同步阶段执行。如果 Broker 出于某些原因无法提供服务或响应超时,会导致 Producer 被阻塞直至超时。Producer Buffer 满:当 Producer 发送消息的速率过快以至于超过 Broker 的处理能力或被 Broker 限流时,未被发送的消息会积攒在内存(Buffer Pool)中。当 Producer Buffer 被耗尽时,
send
方法将被阻塞,直至出现可用 Buffer 或超时。
CPU / 内存占用高
会有很多原因导致 Kafka Producer 的 CPU 与内存占用升高,下面介绍一些由 Kafka Producer 内部导致的 CPU 或内存占用升高的可能情况,实际排查时还应通过采集火焰图等手段准确定位问题。
CPU 占用高
Producer 攒批的大小越小,发送 Batch 的频率越高,CPU 占用越高。
开启消息压缩会导致 Producer CPU 占用升高。
Producer 会缓存历史一段时间使用过的 Partition 信息及其 Leader Node,并在发送消息遍历所有 Node 并检查是否存在待发送的 batch。所以 Producer 涉及的 Partition 分散的 Node 越多,其 CPU 占用越高。
内存占用高
Producer 发送消息的速率超出了 Broker 的承载能力,导致消息堆积在 Buffer Pool 中,这会导致内存占用升高。
Producer 攒批的大小越大,(由于每次创建 Batch 会分配一整块内存)内存“浪费”越多,内存占用越高。
开启消息压缩时,执行压缩操作需要额外的缓冲区,这会导致内存占用升高。
Fatal Error 后无法发送
当 Kafka Producer 开启了事务时(配置了 transactional.id
),如果在执行事务操作时发生了 Fatal Error,例如 ProducerFencedException
,会导致该 Producer 的后续所有消息均发送失败(无论是否使用了事务),只能重启 Producer 解决问题。
结语
至此,我们已经对 Apache Kafka 的生产者进行了完整和深度的解析。如果这些内容对你有所收获,也欢迎访问AutoMQ官方网站,关注我们的官方社交媒体。
评论