八股 MQ004——聊聊 Producer
写在前面
月亮湾大道的车顺着画好的笔直的路线昂首前冲。他们的路是建好了的。
但有的路不是建好的。是要走的。
往前走,走的七拐八歪,走的忐忑起伏,然后留下走的痕迹。
这不叫路,这叫什么呢?不重要,重要的是,只要往前,它的终点就不是起点。
Producer 基础知识
一般的生产环节有:配置参数与实例化->构造消息->发送消息->关闭实例
配置参数
Producer 的配置参数有这些:
bootstrap.servers:与 Consumer 初始化时类似,该参数用于发现 Kafak 集群信息。一般设置两个以上的地址,但不要求全部。
key.serializer 与 value.serializer:发送到 Broker 的消息底层都是以字节流传输的,因此需要指定消息的 key 与 value 的序列化方法,以便将消息转化为对应的字节流。这一点和 Consumer 是相反的,Consumer 正好需要将字节流反序列化为消息的 key 与 value。
client.id:Producer 对应的客户端 ID,默认为空,Kafka 会自动生成一个字符串,以"producer-N",N 为 Producer 的序列编号。
发送消息
Producer 发送消息有三种模式:
发后即忘:只触发消息投递到 Kafka 中,但不关心投递的结果。性能最高,可靠性最差。
同步:阻塞到获取消息投递的结果,成功或失败,失败可捕获异常并处理。
异步:投递消息时指定了一个 Callback,kafka 有对投递的响应时就会触发 Callback。
发送消息的原理大致如下:
Producer 会创建一个双端队列 RecordAccumulator,用来缓存要发送的消息;
Producer 调用 send 方法之后,将消息序列化,会确定一个 Partition,然后将字节序列放在 RecordAccumulator 对应的 batch 中;
当 batch 到达一定大小或者时间限制之后,或者 flush 方法,就会通知 sender 线程;
sender 会从 RecordAccumulator 中拉取对应的 batch,找到对应的 broker,然后发送;
发送后会根据 Producer 配置的同步或者异步发送消息模式,来决定:如果是同步,就等到 sender 线程收到 broker 的 ack 之后,才会返回结果;如果是异步,就不等待 ack,直接返回给 Producer。
RecordAccumulator
RecordAccumulator 是一个线程安全的双端队列(它也是一个小型的生产者-消费者模型),用于缓存需要发送到 Broker 的消息。
RecordAccumulator 会由多个 Producer 线程并发写入,而由一个 Sender 线程读取;
RecordAccumulator 内维护了一个 ConcurrentMap,其中 key 为 Topic-Partition 维度的标识,唯一定位某个 Topic 的某个 Partition,Value 为 ArrayDeque,队列里存储 batch 的消息,缓存的消息就被批量缓存在这里。
可以通过设置 batch.size 与 linger.ms 两个参数来控制 RecordAccumulator 需要等待 batck 内消息个数到达多少 或者 需要等待多长时间,就触发 Sender 线程发送消息。
RecordAccumulator 会根据 Producer 的 compression.type 参数来设置压缩算法,有:none(不设置)、gzip、snappy、lz4 或 zstd。
RecordAccumulator 可以根据 Producer 配置的 retries 参数来决定是否重试:0,不重试;大于 0,为重试的次数;小于 0,为一直重试。重试时可以设置 retry.backoff.ms 来设置重试的间隔;max.in.flight.requestes.per.connection 来设置每个链接允许的未确认请求数量;ack 参数来设置确认机制:0:只管发消息,不论 Leader 或 Follower 落盘成功均 ack;1: 只需要 Leader 落盘成功即可 ack;-1:需要 Leader 与 Follower 都落盘成功才可以 ack(注意这里可能:Follower 同步完成,到 Broker 发送 Ack 之前,Leader 故障,重新选举 Leader,而 Ack 未发送,导致重试,就会有同样的消息发送到新的 Leader 中,有重复生产的情况。)。这里更多的 ack 机制后面再说。
Interceptor、Serializer、Partitioner
Kafka 的 Producer 生产的消息会经过拦截器、序列化器、分区器之后才会真正到达 Kafka Broker 的 Partition 中。
Interceptor
拦截器有两种:1. 生产者拦截器;2. 消费者拦截器。拦截器支持链式调用,生产者拦截器可以在消息发送到 Broker 之前与 Broker 返回发送结果(成功或失败)之后定制调用各种特殊业务逻辑。(注意,拦截器的链式调用是在消息生产的主链路中,过重的逻辑会影响消息生产的 TPS)。
Serializer
序列化器是 Kafka 用于序列化生产消息的 Key 与 Value。序列化器可以通过指定 key.serializer 与 value.serializer 来实现。也可以自定义序列化器。在此不赘述。
Partitioner
分区器是 Kafka 用于将消息映射到不同分区的工具。它使用有以下几种情况:
如果消息指定了 key,那么会使用 key 的哈希值对分区数值取模来制定分区;
如果消息没有指定 key,那么会使用轮询的方式来分区;
如果消息指定了 partition,那么会直接以该 partition 来作为分区;
如果自定义了 partition.class 参数,那么会按照自定义的规则来执行分区逻辑
Sender 线程
Kafka 的 producer 有两个线程:主线程与 Sender 线程。
主线程所做的事情有:
构造要生产的消息、依次执行拦截器、序列化、分区器等操作、将消息添加到 RecordAccumulator 中。
RecordAccumulator 维护了一个 BufferPool 来管理缓存池,每一个缓存的大小由 batch.size 来指定。当追加一条生产消息时,会从寻找/新建 RecordAccumulator 的双端队列,从其尾部获取一个 ProducerBatch,判断当前消息的大小是否可以写入该批次(由 batch.size 指定)中。若可以写入则写入;若不可以写入,则新建一个 ProducerBatch。
Sender 线程核心有一个 run 方法完成。它主要做的事情如下:
Sender 线程在 running 状态下,会循环调用 runOnce()方法,即主要的将 RecordAccumulator 里的缓存消息向 Broker 中发送的业务逻辑;
如果主动关闭 Sender 线程,且非强制关闭,并且 RecordAccumulator 中还有消息待发送,则会额外调用一次 runOnce()方法,将剩余的消息发送完成之后再退出 Sender 线程;
但如果是强制关闭,则直接拒绝提交剩下未完成的消息;
关闭 Kafka 网络通信对象。
上面核心的发送消息的业务逻辑封装在 runOnce()中。它主要顺序依次调用 sendProducerData()与 poll()两个方法
sendProducerData 主要做了以下几件事情:
从 metadata 中获取集群和分区的信息。包括确认消息与目标 Partition 的路由关系;Broker 实际可用的网络状态良好的 Partition;
根据已准备好的 Partition 从 RecordAccumulator 中抽取待发送的消息批次(ProducerBatch),按照 BrokerNodeID:List-ProducerBatch 的格式组织;
将 2 中整理好的消息格式进一步写入 Map-TopicPartition:List-ProducerBatch 的 inFlightBatches 数据结构中。(因为这个结构是以 TopicPartition 为维度,Value 为这个分区待发送的消息详情,所以可以通过这个结构来感知 Sender 线程中不同分区消息积压的情况。进而可以通过 max.in.flight.requests.per.connection 参数来控制某个队列发送限流量级)
从 inFlightBatches 中找到需要发送的 ProducerBatch(通过计算 Batch 是否过期来判断)。
按照 BrokerNodeID 维度将需要多个 ProducerBatch 组装成一个 Request 准备进行发送。接下就来到 poll()方法
poll()方法的关键点:
更新 metadata;
触发真正的网络通信(通过 NIO 的 Selector#select()方法),对通道的读写就绪事件进行处理,当写事件就绪后,就会将通道中的消息发送到远端的 broker。
针对消息发送、消息接收、断开链接、超时等异步处理结果进行收集;
根据接收的异步处理结果进行响应,会将响应结果设置到 KafkaProducer#send 方法返回的响应中,唤醒 Producer,至此完成一次完整的消息发送流程。
ACK 机制
Producer 生产的消息投递到 Broker 的 Topic Partition 之后,需要 Partition 返回代表投递结果的 ACK,而后 Producer 才可以根据投递的 ACK 与否来决定是继续下一轮的生产还是因为失败而重新触发生产消息。
前面有说过,Kafka 可以配置 Producer 的 acks 参数来控制消息投递时 ACK 的逻辑,具体如下:
acks=0,Producer 在成功写入消息之后,不会等待任何来自 Broker 的响应;
acks=1,集群的 leader 分区副本收到消息之后,就会向 Producer 发送一个成功响应;
acks=-1,只有所有 ISR 的分区副本都收到消息之后,才会向 Producer 发送一个成功响应;
这里有两点需要补充。
分区副本机制
Topic 是逻辑概念,实际一个 Topic 里所有的消息是存储在多个 Partition 中,而 Kafka 为了保持高可用,允许每一个 Partition 创建多个 Replica,Replica 有两种类型:Leader 与 Follower。
分区在创建时就会创建多个副本,同时多个副本会选举出一个 Leader 与若干 Follower(默认为 2)。
Leader 承接所有的读写请求,Follower 只负责从 Leader 异步拉取数据,以实现与 Leader 副本的数据同步
ISR(In-Sync Replicas)
继续上面提到,与 Leader 同步的副本就是 ISR(一个集合),其他与 Leader 不同步的副本就不在 ISR 中。与 ISR 对应的就是 OSR(Out-Sync Replicas)。
Broker 可通过 replica.lag.time.max.ms 参数设置 Follower 与 Leader 落后的最长时间间隔,默认为 10s。当 Follower 落后 Leader 数据时间间隔在这个参数设定的范围内,这个 Follower 就可以认为在 ISR 中。
ISR 是动态变化的,当某个 Follower 追上 Leader 的数据同步进度,那么这个 Follower 可以加入到 ISR 中。
当 acks=-1 时,需要完成同步的 Replicas 就是 ISR 集合中所有的副本。
综合
Producer 有哪些重要的参数?
acks:略
max.request.size:Producer 能生产的最大消息大小,默认为 1M,这个参数设置也需要考虑 Broker 端的 message.max.bytes。
compression.type:消息压缩的类型,none,表示不压缩,其他可选 gzip、snappy 等。
retries 和 retry.backoff.ms:分别对应生产失败的重试次数与重试间隔。
batch.size:每个 Batch 要存放 batch.size 大小的数据后,才可以发送出去,比如说 batch.size 默认值是 16KB,那么里面凑够 16KB 的数据才会发送。
linger.ms:这个参数用来指定生产者发送 ProducerBatch 之前等待等待更多消息(ProducerRecord)的加入 ProducerBatch 的时间
partition.class:自定义分区器。
Producer 如何保证消息有序?
如果要保证全局消息有序,那么需要一个 Topic 只有一个 Partition,这样所有的消息会都发往同一个 Partition,但这样会降低消息生产与消费的吞吐量,限制 Consumer 只有一个;
如果要保证局部有序,如保证满足某种业务场景下(可以通过某些业务字段来区分),那么可以在生产消息的时候指定 PartitionKey,Kafka 会根据 PartitionKey 来做 Hash 计算,保证相同的 PartitionKey 放在同一个 Partition 中,这样就能保证局部有序;
以上说的是正常情况,当消息生产出现异常时,因为会有重试场景发生,所以如果先生产的消息失败,后生产的消息成功,而重试的消息生产成功之后,这样就会产生异常时间点前后的乱序。这样可以通过设置 max.in.flight.requests.per.connection 为 1,来确保发送消息时针对每个 Broker Partition 的链接,这样能保证先生产的消息可以确定等到最终重试成功之后,才进行下一个消息的生产。当然也可以考虑关闭重试机制,同时做好消费端的因为乱序消费的记录,而后通过定时脚本轮询失败任务,并同时做好报警监控。
Kafka 的事务生产?
Kafka 的事务消息是指在一次事务中需要发送多个消息的情况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都发送失败 1。Kafka 的事务消息主要有以下几个特点:
Kafka 的事务消息需要生产者开启幂等性和指定唯一的 transactional.id,
Kafka 的事务消息需要生产者调用 initTransactions(), beginTransaction(), commitTransaction()或 abortTransaction()等方法来管理事务的开始、提交或回滚。
Kafka 的事务消息可以支持消费-转换-生产模式,即生产者可以在事务中消费一个主题的消息,转换后发送到另一个主题,并且提交消费位移到事务中。
Kafka 的事务消息依赖于 Broker 端的事务协调器(TransactionCoordinator)来处理生产者的事务请求,并将事务状态和元数据存储在一个特殊的主题__transaction_state 中。
Kafka 的事务消息在提交或回滚时,会向涉及到的分区发送一个事务标记(Transaction Marker),用来标识该分区中哪些消息属于该事务,以及该事务是成功还是失败。
Kafka 的事务消息在消费时,需要消费者设置 isolation.level 为 read_committed,以过滤掉未提交或回滚的事务消息。
参考资料
你绝对能看懂的Kafka源代码分析-RecordAccumulator类代码分析_futurerecordmetadata_爱码叔的博客-CSDN博客
4、深潜KafkaProducer —— RecordAccumulator - 腾讯云开发者社区-腾讯云 (tencent.com)
https://www.jianshu.com/p/57d82f4db8b8
Kafka-之Producer生产者(含拦截器、分区器、序列化器及异步消息发送模式)_kafkaproducer<k, v>_稳哥的哥的博客-CSDN博客
KafkaProducer Sender 线程详解(含详细的执行流程图) - 中间件兴趣圈 - 博客园 (cnblogs.com)
版权声明: 本文为 InfoQ 作者【Codyida】的原创文章。
原文链接:【http://xie.infoq.cn/article/bac6e511d0c7e18ef8f51fe48】。文章转载请联系作者。
评论