写点什么

kafka 生产者分区策略演进

作者:布兰特
  • 2022 年 7 月 10 日
  • 本文字数:5717 字

    阅读完需:约 19 分钟

kafka 生产者分区策略演进

分区策略迭代

kafka 2.4.0 之前的版本,producer 客户端的消息分区策略分别为

  • 当 record 的 key 存在时并且指定了分区策略,则直接写入到相应的分区

  • 当 record 的 key 存在时但并未指定分区,则根据 key 的 hash 对分区数取模,获得相应的分区

  • 当 record 的 key 为 null 并未指定分区,则使用 round-robin 轮询策略


kafka 2.4.0 ~ kafka 3.2.0 版本,producer 客户端的消息分区策略分别为

  • 当 record 的 key 存在时并且指定了分区策略,则直接写入到相应的分区

  • 当 record 的 key 存在时但并未指定分区,则根据 key 的 hash 对分区数取模,获得相应的分区

  • 当 record 的 key 为 null 并未指定分区,则使用 Sticky Partitioner 黏着分区策略


kafka 3.2.0 版本,producer 客户端的消息分区策略分别为

  • 当 record 的 key 存在时并且指定了分区策略,则直接写入到相应的分区

  • 当 record 的 key 存在时但并未指定分区,则根据 key 的 hash 对分区数取模,获得相应的分区

  • 当 record 的 key 为 null 并未指定分区,生产者根据 RecordAccumulator 中记录的信息来选择分区


我司目前 kafka 版本是 1.x,前阵子在解答同事问题的时候,提到了 kafka 生产者在向 broker 写消息时,是按照什么策略写的?因为我们目前 kafka 中大部分的数据,都是由运维利用日志采集工具直接转发到 kafka,并没有指定消息的 key。或者说, 有些研发同学对 kafka 的熟悉程度不高,在使用 kafka 的过程中,并不知道在有的一些场景下,合理利用好消息的 key,是对下游业务处理是有帮助的。所以在面对上面同事的问题,在 key 为 null 并未指定分区,则使用 round-robin 轮询策略。

目前所能看到的 kafka 博客,除了 0.8 版本很少见到了,剩下基于什么版本的都有。去 GitHub 上看了下,最新版本已经是 3.3.0 了。在 kafka 最新社区代码里,producer 客户端的消息分区策略还是发生了很大的变化的。最主要的就是默认的消息分区策略已经不是 round-robin 轮询了,而是在 kafka 2.4.0 版本中正是发布的 Sticky Partitioner,相关 kafka 优化提案KIP-480

Sticky Partitioner 背景

生产者发送给 broker 的数据 batch 的大小会影响到延迟。这里的延迟指的是生产者端的延迟,生产者端延迟通常定义为生产者客户端生成的消息被 Kafka 确认所需的时间。俗话说,时间就是金钱,最好尽可能减少延迟,以使系统运行得更快。当生产者能够更快地发送消息时,整个系统都会受益。

消息以 batch 的方式发送

每个 Kafka 主题包含一个或多个分区。当 Kafka 生产者向主题发送记录时,它需要决定将其发送到哪个分区。如果我们几乎同时将几条记录发送到同一个分区,它们可以作为一个 batch 发送。处理每个 batch 需要一些开销,batch 中的每条记录都会导致该成本。小批量的记录每条记录的有效成本更高。通常,较小的 batch 会导致更多的请求 requests 和排队 queuing,从而导致更高的延迟。

一个 batch 在达到一定大小 (batch.size) 或经过一段时间 (linger.ms) 后,触发发送。 batch.size 和 linger.ms 都在生产者中配置。 batch.size 的默认值为 16,384 字节,linger.ms 的默认值为 0 毫秒。一旦达到 batch.size 或至少 linger.ms 时间已经过去,系统将尽快发送批处理。


乍一看,似乎将 linger.ms 设置为 0 只会导致产生的 batch 只包含一条记录。但是,通常情况并非如此。即使 linger.ms 为 0,生产者也会在大约同一时间将记录生成到同一分区时将它们分组。这是因为系统需要一点时间来处理每个请求,当系统无法立即处理它们时,就会形成批处理。低吞吐量通常会给系统注入延迟,因为如果没有足够的记录来填充一个 batch,那么直到 linger.ms 才会发送该 batch。所以应该找到一种方法来增加 batch 大小以在 linger.ms 之前触发发送将进一步减少延迟。


目前,在没有指定 partition 和 key 的情况下,默认分区器以 round-robin 方式对数据进行分区。这意味着一系列连续记录中的每条记录都将被发送到不同的分区,直到我们用完分区并重新开始。虽然这会在分区之间均匀分布记录,但也会导致更多的 batch 更小。似乎最好让所有记录都转到指定的分区(或几个分区)并以更大的 batch 一起发送。


Sticky Partitioner 尝试在分区器中实现此行为。通过“黏着”一个分区直到批处理已满(或在 linger.ms 启动时发送),与默认分区程序相比,我们可以创建更大的批处理并减少系统中的延迟。即使在 linger.ms 为 0 并且我们立即发送的情况下,我们也看到改进的批处理和减少的延迟。发送 batch 后,粘性分区发生变化。随着时间的推移,记录应该在所有分区中均匀分布。


Netflix 也有类似的想法,并创建了一个粘性分区器,它选择一个分区并将所有记录发送给它一段给定的时间,然后再切换到一个新的分区。https://medium.com/netflix-techblog/kafka-inside-keystone-pipeline-dd5aeabaf6bb


另一种方法是在创建新 batch 时更改粘性分区。这个想法是最大程度地减少可能发生在不合时宜的分区切换上的空 batch。此方法用于此处介绍的粘性分区器。


优化方案

在没有指定分区策略、key = null 的情况下更改默认分区,为给定 Topic 选择 Sticky Partitioner 分区。当记录累加器 accumulator 为给定分区上的主题分配 new batch 时,“黏着”分区会更改。


同时添加一个名为 UniformStickyPartitioner 的新分区器,以允许对所有记录进行粘性分区,即使是那些 key 不为空的记录。这就像 RoundRobinPartitioner 如何对所有记录使用循环分区策略,包括带有 key 的记录。

性能测试

一般来说,与当前代码相比,这个分区器经常看到延迟减少了一半。在最坏的情况下,Sticky Partitioner 的执行与 round-robin 相同。一种趋势是随着分区的增加看到更多的好处。尽管如此,对于 16 个分区,还是有明显的好处。在 1000 msg/sec 的吞吐量下,延迟仍然是默认值的一半左右。



观察到的另一个趋势,尤其是在刷新情况下,随着发送的消息数量从低吞吐量增加到中等吞吐量,延迟会减少更多。好处部分取决于每秒消息与分区的比率。


最后,在 linger.ms 不为零且吞吐量足够低以至于默认代码需要在 linger.ms 上等待的情况下,有一个明显的好处。例如,使用 1 个生产者、16 个分区和 1000 msg/sec 以及 linger.ms = 1000 运行时,粘性分区程序的 p99 延迟为 204,而默认为 1017。这大约是延迟的 1/5,这是由于批次不必等待 linger.ms。


除了延迟之外,与默认代码相比,Sticky Partitioner 还发现 CPU 利用率有所降低。在观察到的情况下,与默认值相比,粘性分区节点的 CPU 利用率通常降低 5-15%(例如从 9-17% 到 5-12.5% 或从 30-40% 到 15-25%)代码的节点。




Sticky Partitioner 总结

Sticky Partitioner 的主要目标是增加每个 batch 中的记录数,以减少 batch 总数并消除过度排队。当 batch 较少且每个 batch 中的记录较多时,每条记录的成本较低,并且使用粘性分区策略可以更快地发送相同数量的记录。数据表明,这种策略在使用空键的情况下确实减少了延迟,并且当分区数量增加时效果变得更加明显。此外,在使用粘性分区策略时,CPU 使用率通常会降低。通过坚持分区并发送更少但更大的批次,生产者看到了巨大的性能改进。


Kafka 3.3.0

随后社区内有开发人员提出,UniformStickyPartitioner 导致 Kafka 分区不均匀。


110 个生产者,550 个分区,550 个消费者,5 个节点 Kafka 集群,生产者使用 null key+stick partitioner,约 100w tps ,观察到的 partition 延迟异常,消息分布不均匀,导致消息较多的分区的生产和消费延迟最大异常。我找不到在这种 tps 下坚持会使消息分布不均匀的原因。我不能切换到 round-robin 分区器,这会增加延迟和 cpu 成本。是不是 stick partationer 的设计导致消息分布不均,还是这个不正常。如何解决?


引入了 UniformStickyPartitioner 并使其成为默认分区器。存在一个问题:它实际上将更多记录分配给速度较慢的 broker,并且可能导致“失控”问题,当 broker 的临时缓慢倾斜分布使得 broker 获取更多的记录并因此变得更慢,这反过来又使分布更加倾斜,并且问题持续存在。


问题的发生是因为“粘性”时间是由新的 batch 创建驱动的,这与 broker 延迟成反比——较慢的 broker 消耗批次的速度较慢,因此它们比速度更快的代理获得更多的“粘性”时间,从而扭曲了分布。场景的细节在这里描述得很好https://issues.apache.org/jira/browse/KAFKA-10888?focusedCommentId=17285383&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17285383。(将要提交 batch 提交延迟,导致不能生成 new batch,无法切换下一个写入分区,后续堆积的消息仍然会提交到当前分区)


假设我们有一个生产者写入 linger.ms=0 的 3 个分区,并且一个分区由于某种原因稍微变慢了一点。这可能是领导者更换或一些暂时的网络问题。生产者必须保留该分区的 batch,直到它变得可用。在保留这些 batch 的同时,其他 batch 将开始堆积。这些 batch 中的每一个都可能被填满,因为生产者还没有准备好发送到这个分区。


从 StickyPartitioner 的角度考虑这一点。每次选择慢速分区时,生产者都会完全填满批次。另一方面,由于 linger.ms=0 设置,剩余的“快速”分区可能无法填充它们的批次。只要有一条记录可用,它就可能会被发送。所以更多的数据最终会被写入已经开始积压的分区。即使最初缓慢的原因(例如领导者更换)得到解决,这种不平衡也可能需要一些时间才能恢复。我们相信,如果分区无法赶上额外负载的障碍,这甚至会产生失控效应。


我们分析了一个我们认为可能会发生这种情况的案例。下面我总结了 1 小时内对 3 个分区的写入。这里的分区 0 是“慢”分区。所有分区的批次数量大致相同,但慢速分区的批次大小要大得多。


修改建议自适应分区切换 UniformStickyPartitioner 切换的一个潜在缺点是,如果其中一个 broker 落后(无法维持其吞吐量份额),记录将继续堆积在 accumulator 累加器中,最终会耗尽缓冲池内存并降低生产速度以匹配最慢 broker 的容量。为了避免这个问题,分区切换决策可以适应 broker 负载。


等待发送 batch 的队列大小是 broker 负载的直接指示(负载越多的 broker 将有更长的队列)。选择下一个分区时考虑队列大小的分区切换。选择分区的概率与队列大小成反比(即队列较长的分区不太可能被选择)。


除了基于队列大小的逻辑之外,partitioner.availability.timeout.ms 可以设置为非 0 值,在这种情况下,准备发送批次的分区超过 partitioner.availability.timeout.ms 毫秒,将是标记为不可用于分区,并且在代理能够接受来自分区的下一个就绪批次之前不会被选择。可以通过设置 partitioner.adaptive.partitioning.enable = false 来关闭自适应分区切换。


从实现的角度来看,分区器对队列大小或 broker 准备情况一无所知(但这些信息可以在 RecordAccumulator 对象中收集),因此 KafkaProducer 类将在 RecordAccumulator 中执行分区逻辑,除非显式设置了自定义分区器.请注意,这些更改不会影响有 key 值消息的分区,只会影响 key=null 消息的分区。


DefaultPartitioner 和 UniformStickyPartitioner 将被弃用,它们的行为方式与现在相同。未指定自定义分区器的用户将自动获得新行为。


// Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION,// which means that the RecordAccumulator would pick a partition using built-in logic (which may// take into account broker load, the amount of data produced to each partition, etc.).int partition = partition(record, serializedKey, serializedValue, cluster);
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { if (record.partition() != null) return record.partition();
if (partitioner != null) { int customPartition = partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); if (customPartition < 0) { throw new IllegalArgumentException(String.format( "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition)); } return customPartition; }
if (serializedKey != null && !partitionerIgnoreKeys) { // hash the keyBytes to choose a partition return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size()); } else { return RecordMetadata.UNKNOWN_PARTITION; } }

复制代码


尝试计算分区,但请注意,在此调用之后,它可以是 RecordMetadata.UNKNOWN_PARTITION (默认 -1),这意味着 RecordAccumulator 将使用内置逻辑选择一个分区(这可能会考虑 broker 的负载、每个 partition 产生的数据量等)。


Partitioner.partition 方法可以返回 -1,以指示默认的分区决策应该由生产者自己做出。现在,Partitioner.partition 需要返回一个有效的分区号。这背离了分区逻辑(包括默认分区逻辑)完全封装在分区器对象中的范式,这种封装不再有效,因为它需要只有生产者(Sender,RecordAccumulator)才能知道的信息(例如队列大小、记录大小、broker 响应能力等)。


当生产者从分区器获得 -1 时,它会自行计算分区。这样自定义分区器逻辑可以继续工作,生产者将使用从分区器返回的分区,但是如果分区器只想使用默认分区逻辑,它可以返回 -1 并让生产者找出分区来利用。


相关链接

https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/https://github.com/apache/kafka/pull/12049

https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner

https://issues.apache.org/jira/browse/KAFKA-8601

https://issues.apache.org/jira/browse/KAFKA-10888?focusedCommentId=17285383&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17285383

https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner

发布于: 刚刚阅读数: 3
用户头像

布兰特

关注

大数据研发工程师 2019.05.10 加入

还未添加个人简介

评论

发布
暂无评论
kafka 生产者分区策略演进_kafka_布兰特_InfoQ写作社区