写点什么

图解 Kafka Producer 中的消息缓存模型

  • 2022 年 9 月 17 日
    江西
  • 本文字数:3260 字

    阅读完需:约 11 分钟

图解Kafka Producer中的消息缓存模型

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源!

大家好,我是彦祖啊~0.0

在阅读本文之前, 希望你可以思考一下下面几个问题, 带着问题去阅读文章会获得更好的效果。

  1. 发送消息的时候, 当 Broker 挂掉了,消息体还能写入到消息缓存中吗?

  2. 当消息还存储在缓存中的时候, 假如 Producer 客户端挂掉了,消息是不是就丢失了?

  3. 当最新的 ProducerBatch 还有空余的内存,但是接下来的一条消息很大,不足以加上上一个 Batch 中,会怎么办呢?

  4. 那么创建 ProducerBatch 的时候,应该分配多少的内存呢?

1 什么是消息累加器 RecordAccumulator

kafka 为了提高 Producer 客户端的发送吞吐量和提高性能,选择了将消息暂时缓存起来,等到满足一定的条件, 再进行批量发送, 这样可以减少网络请求,提高吞吐量。

而缓存这个消息的就是 RecordAccumulator 类.

整体模型图

上图就是整个消息存放的缓存模型,我们接下来一个个来讲解。

2 消息缓存模型

在这里插入图片描述

上图表示的就是 消息缓存的模型, 生产的消息就是暂时存放在这个里面。

  1. 每条消息,我们按照 TopicPartition 维度,把他们放在不同的Deque<ProducerBatch> 队列里面。TopicPartition 相同,会在相同Deque<ProducerBatch> 的里面。

  2. ProducerBatch : 表示同一个批次的消息, 消息真正发送到 Broker 端的时候都是按照批次来发送的,这个批次可能包含一条或者多条消息。

  3. 如果没有找到消息对应的 ProducerBatch 队列, 则创建一个队列。

  4. 找到 ProducerBatch 队列队尾的 Batch,发现 Batch 还可以塞下这条消息,则将消息直接塞到这个 Batch 中

  5. 找到 ProducerBatch 队列队尾的 Batch,发现 Batch 中剩余内存,不够塞下这条消息,则会创建新的 Batch

  6. 当消息发送成功之后, Batch 会被释放掉。

ProducerBatch 的内存大小

那么创建 ProducerBatch 的时候,应该分配多少的内存呢?

先说结论: 当消息预估内存大于batch.size的时候,则按照消息预估内存创建, 否则按照batch.size的大小创建(默认 16k).

我们来看一段代码,这段代码就是在创建 ProducerBatch 的时候预估内存的大小

RecordAccumulator#append

    /**     * 公众号: 石臻臻的杂货铺     * 微信:szzdzhp001     **/       // 找到 batch.size 和 这条消息在batch中的总内存大小的 最大值       int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));       // 申请内存       buffer = free.allocate(size, maxTimeToBlock);


复制代码
  1. 假设当前生产了一条消息为 M, 刚好消息 M 找不到可以存放消息的 ProducerBatch(不存在或者满了),那么这个时候就需要创建一个新的 ProducerBatch 了

  2. 预估消息的大小 跟batch.size 默认大小 16384(16kb). 对比,取最大值用于申请的内存大小的值。

那么, 这个消息的预估是如何预估的?纯粹的是消息体的大小吗?

DefaultRecordBatch#estimateBatchSizeUpperBound

预估需要的 Batch 大小,是一个预估值,因为没有考虑压缩算法从额外开销

    /**    * 使用给定的键和值获取只有一条记录的批次大小的上限。    * 这只是一个估计,因为它没有考虑使用的压缩算法的额外开销。    **/    static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {        return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);    }

复制代码
  1. 预估这个消息 M 的大小 + 一个 RECORD_BATCH_OVERHEAD 的大小

  2. RECORD_BATCH_OVERHEAD 是一个 Batch 里面的一些基本元信息,总共占用了 61B

  3. 消息 M 的大小也并不是单单的只有消息体的大小,总大小=(key,value,headers)的大小+MAX_RECORD_OVERHEAD

  4. MAX_RECORD_OVERHEAD:一条消息头最大占用空间, 最大值为 21B

也就是说创建一个 ProducerBatch,最少就要 83B .

比如我发送一条消息 " 1 " , 预估得到的大小是 86B, 跟batch.size(默认16384) 相比取最大值。 那么申请内存的时候取最大值 16384 。

关于 Batch 的结构和消息的结构,我们回头单独用一篇文章来讲解

3 内存分配

我们都知道 RecordAccumulator 里面的缓存大小是一开始定义好的, 由buffer.memory控制, 默认 33554432 (32M)

当生产的速度大于发送速度的时候,就可能出现 Producer 写入阻塞。

而且频繁的创建和释放 ProducerBatch,会导致频繁 GC, 所有 kafka 中有个缓存池的概念,这个缓存池会被重复使用,但是只有固定( batch.size)的大小才能够使用缓存池。

PS:以下 16k 指得是 batch.size 的默认值.

Batch 的创建和释放

1. 内存 16K 缓存池中有可用内存

①. 创建 Batch 的时候, 会去缓存池中,获取队首的一块内存 ByteBuffer 使用。

②. 消息发送完成,释放 Batch, 则会把这个 ByteBuffer,放到缓存池的队尾中,并且调用ByteBuffer.clear 清空数据。以便下次重复使用

在这里插入图片描述

2. 内存 16K 缓存池中无可用内存

①. 创建 Batch 的时候, 去非缓存池中的内存获取一部分内存用于创建 Batch. 注意:这里说的获取内存给 Batch, 其实就是让 非缓存池 nonPooledAvailableMemory 减少 16K 的内存, 然后 Batch 正常创建就行了, 不要误以为好像真的发生了内存的转移。

②. 消息发送完成,释放 Batch, 则会把这个 ByteBuffer,放到缓存池的队尾中,并且调用ByteBuffer.clear 清空数据, 以便下次重复使用

在这里插入图片描述

3. 内存非 16K 非缓存池中内存够用

①. 创建 Batch 的时候, 去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建 Batch. 注意:这里说的获取内存给 Batch, 其实就是让 非缓存池(nonPooledAvailableMemory) 减少对应的内存, 然后 Batch 正常创建就行了, 不要误以为好像真的发生了内存的转移。

②. 消息发送完成,释放 Batch, 纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的 Batch 内存大小。 当然这个 Batch 会被 GC 掉

在这里插入图片描述

4. 内存非 16K 非缓存池内存不够用

①. 先尝试将 缓存池中的内存一个一个释放到 非缓存池中, 直到非缓存池中的内存够用与创建 Batch 了

②. 创建 Batch 的时候, 去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建 Batch. 注意:这里说的获取内存给 Batch, 其实就是让 非缓存池(nonPooledAvailableMemory) 减少对应的内存, 然后 Batch 正常创建就行了, 不要误以为好像真的发生了内存的转移。

③. 消息发送完成,释放 Batch, 纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的 Batch 内存大小。 当然这个 Batch 会被 GC 掉

例如: 下面我们需要创建 48k 的 batch, 因为超过了 16k,所以需要在非缓存池中分配内存, 但是非缓存池中当前可用内存为 0 , 分配不了, 这个时候就会尝试去 缓存池里面释放一部分内存到 非缓存池。

释放第一个 ByteBuffer(16k) 不够,则继续释放第二个,直到释放了 3 个之后总共 48k,发现内存这时候够了, 再去创建 Batch。

在这里插入图片描述

注意:这里我们涉及到的 非缓存池中的内存分配, 仅仅指的的内存数字的增加和减少。

4 问题和答案

  1. 发送消息的时候, 当 Broker 挂掉了,消息体还能写入到消息缓存中吗?

当 Broker 挂掉了,Producer 会提示下面的警告⚠️, 但是发送消息过程中

这个消息体还是可以写入到 消息缓存中的,也仅仅是写到到缓存中而已。


WARN [Producer clientId=console-producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be available

复制代码


在这里插入图片描述

  1. 当最新的 ProducerBatch 还有空余的内存,但是接下来的一条消息很大,不足以加上上一个 Batch 中,会怎么办呢?

那么会创建新的 ProducerBatch。

  1. 那么创建 ProducerBatch 的时候,应该分配多少的内存呢?

触发创建 ProducerBatch 的那条消息预估大小大于 batch.size ,则以预估内存创建。否则,以 batch.size 创建。

还有一个问题供大家思考:

当消息还存储在缓存中的时候, 假如 Producer 客户端挂掉了,消息是不是就丢失了?

发布于: 2022 年 09 月 17 日阅读数: 49
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019.09.06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
图解Kafka Producer中的消息缓存模型_Kakfa_石臻臻的杂货铺_InfoQ写作社区