写点什么

大数据 -57 Kafka 高级特性 Producer 消息发送流程与核心配置详解

作者:武子康
  • 2025-08-01
    山东
  • 本文字数:3930 字

    阅读完需:约 13 分钟

大数据-57 Kafka 高级特性 Producer 消息发送流程与核心配置详解

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-30-新发布【1T 万亿】参数量大模型!Kimi‑K2 开源大模型解读与实践,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 07 月 28 日更新到:Java-83 深入浅出 MySQL 连接、线程、查询缓存与优化器详解 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • Java 添加 POM 依赖

  • Java 操作 Kafka 的 API、SpringBoot

  • 实现对 Kafka 消息发送和消息消费


基本流程

Kafka Producer 消息发送流程详解

Producer 初始化阶段

  • 在创建 KafkaProducer 实例时,会同步初始化以下核心组件:

  • 创建并启动 Sender 线程,该线程负责实际的消息发送,被设置为守护线程(daemon thread)

  • 初始化 RecordAccumulator 消息缓冲区,默认大小 32MB

  • 加载配置的拦截器链(Interceptor)、序列化器(Serializer)和分区器(Partitioner)

消息发送流程

  • 当调用 send()方法时,实际是异步处理流程:a. 拦截处理:消息首先通过配置的 ProducerInterceptor 链进行处理(可用于添加消息头、监控等)示例:可添加 traceId 等分布式追踪信息

  • b. 序列化阶段:分别对消息 key 和 value 进行序列化

  • 默认使用 ByteArraySerializer,也可配置为 StringSerializer 等

  • c. 分区路由:通过 Partitioner 确定目标分区

  • 默认使用 RoundRobin 或 Key 哈希策略

  • 自定义分区器需实现 Partitioner 接口

消息缓冲与批次发送

  • 消息会被暂存到 RecordAccumulator 缓冲区

  • 触发批次发送的条件(满足任一即触发):

  • 批次大小达到 batch.size(默认 16KB)

  • 等待时间达到 linger.ms(默认 0ms,表示立即发送)应用场景:高吞吐场景可适当调大 linger.ms(如 50ms)以提升批量效果

  • 每个分区对应独立的批次队列

网络传输与 Broker 处理

  • Sender 线程将批次消息通过网络发送到目标分区 Leader 副本

  • Broker 端处理流程:

  • 写入 Leader 副本的 PageCache

  • 同步到 ISR 副本集

  • 根据 acks 配置等待副本同步确认

异常处理机制

  • 重试条件:

  • retries 参数>0(默认 Integer.MAX_VALUE)

  • 错误类型可重试(如网络异常、NOT_LEADER 等)

  • 重试策略:

  • 具备 backoff 退避机制(通过 retry.backoff.ms 配置)

  • 避免无效重试(如消息过大等不可恢复错误)

响应处理

  • 同步模式:通过 Future.get()阻塞等待响应

  • 异步模式:通过 Callback 接口处理响应

  • 响应元数据包含:

  • 主题、分区信息

  • 消息 offset

  • 时间戳等信息

关键配置参数

Broker 配置

这里是 Broker 的常见配置:


bootstrap.servers

生产者客户端与 broker 集群建立初始链接需要 Broker 的地址列表,由该初始连接发现 Kafka 集群中其他的所有 Broker,该地址列表不需要写全部的 Kafka 集群地址,但也不要只写一个防止宕机不可用。

key.serializer

实现了 org.apache.kafka.common.serialization.Serializer 的 key 序列化类

value.serializer

实现了 org.apache.kafka.common.serialization.Serializer 的 value 序列化类

acks

该项控制着已发消息的持久性。


  • acks=0,生产者不等待 Broker 的任何消息确认。

  • acks=1,Leader 将记录写到它本地的地址,就相应客户端的消息,而不等待 Follower 的副本的确认。

  • acks=all,Leader 等待所有有同步副本消息的确认,保证了只要有一个同步副本存在,消息就不会丢失。

  • acks=-1,等价于 acks=all默认值为1

compression.type

生产者生成数据的压缩格式,默认是 none(无压缩)。可选:


  • none

  • gzip

  • snappy

  • lz4


默认是none

Broker 配置补充

额外的配置还有下图的这些内容:




retry.backoff.ms

在向一个指定的主题分区重发消息的时候,控制重试之间的等待时间间隔。这个参数非常重要,它可以避免在遇到临时性故障时过于频繁地重试,从而减轻系统负担。


  • 参数作用:当生产者发送消息失败时,会在指定的时间间隔后自动重试

  • 工作方式:采用指数退避策略,每次重试的间隔会逐渐增加

  • 使用场景:在网络不稳定或 broker 负载较高时特别有用

  • 实际示例:假设设置为 100ms,第一次重试等待 100ms,第二次可能等待 200ms,第三次可能等待 400ms


long型 默认 100

retries

retries 参数配置生产者发送消息时的重试次数。


  • 核心功能:当消息发送出现可重试错误时(如网络问题、leader 选举等),系统会自动重新发送消息

  • 注意事项

  • 如果同时需要保证消息的有序性,必须设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1

  • 重试可能导致消息重复,消费者端需要做好幂等处理

  • 推荐值:生产环境通常设置为 3-5 次

  • 错误类型:只对可恢复的异常进行重试(如 NetworkException、NotLeaderForPartitionException 等)

request.timeout.ms

配置客户端等待请求响应的最长时间。


  • 关键作用:控制生产者等待 broker 响应的时间阈值

  • 配置建议

  • 必须大于 replica.lag.time.max.ms(默认 10000ms)

  • 在网络延迟较高的环境中需要适当增大

  • 超时处理

  • 超时后会触发重试机制(如果配置了 retries>0)

  • 连续超时达到重试次数上限后会抛出 TimeoutException

  • 影响因素:受网络状况、broker 负载、消息大小等因素影响


int型 默认 30000

interceptor.classes

配置生产者拦截器链,用于在消息发送前进行预处理。


  • 执行时机:在消息序列化之前,即将发送到 Kafka 集群时执行

  • 主要用途

  • 消息内容审计和修改

  • 发送统计和监控

  • 附加元数据信息

  • 实现要求

  • 必须实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口

  • 需要定义 onSend()和 onAcknowledgement()方法

  • 配置方式:通过 Map[String, Object] configs 中的 List 集合配置多个拦截器

  • 执行顺序:按照配置顺序依次执行

  • 典型应用

  • 日志打点

  • 消息加密

  • 请求追踪


默认没有拦截器

acks

同上,不介绍了。

batch.size

当多个消息发送到同一个分区时,生产者会尝试将多个记录合并为一个批处理(batch)进行发送。这种批处理机制能显著提高客户端和服务端的处理效率,减少网络请求次数。该配置项以字节为单位控制默认批处理的大小,其具体行为和影响如下:


  1. 批处理大小限制:

  2. 所有批处理的大小都小于或等于该配置值

  3. 例如设置为 16KB(16384)时,每个批处理最大不超过 16KB

  4. 请求发送机制:

  5. 发送给 Broker 的请求将包含多个批次

  6. 每个分区对应一个批次

  7. 请求中只包含当前可发送的数据批次

  8. 配置影响:

  9. 过小值的影响:

  10. 会限制系统的吞吐量

  11. 极端情况下(设置为 0)会完全关闭批处理功能

  12. 示例:设置为 1KB 可能导致频繁的小批量请求

  13. 过大值的影响:

  14. 会占用较多内存资源

  15. 可能导致消息延迟发送

  16. 示例:设置为 100MB 时可能造成内存浪费

  17. 典型场景:

  18. 高吞吐场景建议设置较大的值(如 64KB-1MB)

  19. 低延迟场景可适当减小该值(如 16KB-32KB)

  20. 需根据消息平均大小和分区数量进行权衡

client.id

client.id 是生产者在发送请求时传递给 Broker 的身份标识字符串,其主要作用和特性包括:


  1. 核心功能:

  2. 用于在 Broker 的请求日志中追踪消息来源

  3. 帮助识别是哪个应用程序发送了特定消息

  4. 在监控和故障排查时提供重要依据

  5. 命名规范:

  6. 通常采用与业务相关的描述性字符串

  7. 示例:订单服务可能使用"order-service-producer"

  8. 建议包含应用名称和环境标识(如 dev/prod)

  9. 实际应用:

  10. 在分布式系统中识别不同生产者实例

  11. 结合监控系统实现细粒度的性能监控

  12. 用于配额管理和访问控制

  13. 最佳实践:

  14. 保持唯一性和可读性

  15. 避免使用敏感信息

  16. 在微服务架构中建议采用标准命名规范

compression.type

同上,不介绍了。

send.buffer.bytes

TCP 发送数据的时候用的缓冲区的大小,若设置为 0,则用操作系统默认的。

buffer.memory

生产者可以用来缓存等待发送到服务器的记录的总内存字节,如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞 max.block.ms 的时间,此后将引发异常。此设置应大致对应于生产者将使用的总内存,但并非生产者使用的所有内存都用于缓冲。

connections.max.idle.ms

当连接空闲时间达到这个值,就关闭连接。long型 默认 540000

linger.ms

生产者发送请求传输间隔会对需要发送的消息进行累积,然后作为一个批次发送,一般情况是消息的发送速度比消息积累的速度要慢。有时候客户端需要减少请求次数,即使在负载不大的情况下。该配置设置了一个延迟,生产者消息不会立即将消息送到 Broker,而是等待这么一段时间以积累消息,然后将这段消息之类的消息作为一个批次发送,该设置是批处理的另一个上限,一旦此消息达到了 batch.size 指定的值,消息批会立即发送,如果积累的消息字节数达不到 batch.size 的值,可以设置该毫秒值,等待这么长时间之后,也会发送消息批。默认值是0

max.block.ms

控制 KafkaProducer.send()和 KafkaProducer.partitionFor()阻塞时长,当缓存满了或元数据不可用的时候,这些方法阻塞。在用户提供的序列化器和分区器的阻塞时间不计入。long 型值,默认 60000

max.request.size

单个请求的最大字节数,该设置会限制单个请求总消息批的个数,以免单个请求发送太多的数据,服务器有自己的限制批大小的设置,与该配置可能不一样int 型 默认 1048576

partitioner.class

实现了接口 org.apache.kafka.clients.producer.Partitioner 的分区器实现类。默认值:org.apache.kafka.clients.producer.internals.DefaultPartitioner

receive.buffer.bytes

TCP 接收缓存(SO_RECVBUF),设置为 01,则使用操作系统默认的值。int型 默认32768

security.protocol

跟 Broker 通信的协议:PLAINTEXT、SSL、SASL_PLAINTEXT、ASAL_SSLString型 默认 PLAINTEXT

max.in.flight.requests.per.connection

单个连接上未确认请求的最大数量,达到这个数量,客户端阻塞。如果该值大于 1,则存在失败的请求,在重试的时候消息顺序不能保证。int型 默认5

reconnect.backoff.max.ms

对于每个连续的连接失败,每台主机退避将成倍增加,直到达到此最大值。

reconnect.backoff.ms

尝试重连指定主机的基础等待时间,避免该主机的密集重连。

发布于: 刚刚阅读数: 4
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-57 Kafka 高级特性 Producer 消息发送流程与核心配置详解_Java_武子康_InfoQ写作社区