图解 Kafka Producer 中的消息缓存模型
作者:石臻臻,CSDN 博客之星 Top5、Kafka Contributor、nacos Contributor、华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家、 KnowStreaming。
KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源! 。
大家好,我是彦祖啊~0.0
在阅读本文之前, 希望你可以思考一下下面几个问题, 带着问题去阅读文章会获得更好的效果。
发送消息的时候, 当 Broker 挂掉了,消息体还能写入到消息缓存中吗?
当消息还存储在缓存中的时候, 假如 Producer 客户端挂掉了,消息是不是就丢失了?
当最新的 ProducerBatch 还有空余的内存,但是接下来的一条消息很大,不足以加上上一个 Batch 中,会怎么办呢?
那么创建 ProducerBatch 的时候,应该分配多少的内存呢?
1 什么是消息累加器 RecordAccumulator
kafka 为了提高 Producer 客户端的发送吞吐量和提高性能,选择了将消息暂时缓存起来,等到满足一定的条件, 再进行批量发送, 这样可以减少网络请求,提高吞吐量。
而缓存这个消息的就是 RecordAccumulator 类.
整体模型图
上图就是整个消息存放的缓存模型,我们接下来一个个来讲解。
2 消息缓存模型
在这里插入图片描述
上图表示的就是 消息缓存的模型, 生产的消息就是暂时存放在这个里面。
每条消息,我们按照 TopicPartition 维度,把他们放在不同的
Deque<ProducerBatch>
队列里面。TopicPartition 相同,会在相同Deque<ProducerBatch>
的里面。ProducerBatch
: 表示同一个批次的消息, 消息真正发送到 Broker 端的时候都是按照批次来发送的,这个批次可能包含一条或者多条消息。如果没有找到消息对应的 ProducerBatch 队列, 则创建一个队列。
找到 ProducerBatch 队列队尾的 Batch,发现 Batch 还可以塞下这条消息,则将消息直接塞到这个 Batch 中
找到 ProducerBatch 队列队尾的 Batch,发现 Batch 中剩余内存,不够塞下这条消息,则会创建新的 Batch
当消息发送成功之后, Batch 会被释放掉。
ProducerBatch 的内存大小
那么创建 ProducerBatch 的时候,应该分配多少的内存呢?
先说结论: 当消息预估内存大于batch.size
的时候,则按照消息预估内存创建, 否则按照batch.size
的大小创建(默认 16k).
我们来看一段代码,这段代码就是在创建 ProducerBatch 的时候预估内存的大小
RecordAccumulator#append
假设当前生产了一条消息为 M, 刚好消息 M 找不到可以存放消息的 ProducerBatch(不存在或者满了),那么这个时候就需要创建一个新的 ProducerBatch 了
预估消息的大小 跟
batch.size
默认大小 16384(16kb). 对比,取最大值用于申请的内存大小的值。
那么, 这个消息的预估是如何预估的?纯粹的是消息体的大小吗?
DefaultRecordBatch#estimateBatchSizeUpperBound
预估需要的 Batch 大小,是一个预估值,因为没有考虑压缩算法从额外开销
预估这个消息 M 的大小 + 一个 RECORD_BATCH_OVERHEAD 的大小
RECORD_BATCH_OVERHEAD 是一个 Batch 里面的一些基本元信息,总共占用了 61B
消息 M 的大小也并不是单单的只有消息体的大小,总大小=(key,value,headers)的大小+MAX_RECORD_OVERHEAD
MAX_RECORD_OVERHEAD:一条消息头最大占用空间, 最大值为 21B
也就是说创建一个 ProducerBatch,最少就要 83B .
比如我发送一条消息 " 1 " , 预估得到的大小是 86B, 跟batch.size(默认16384)
相比取最大值。 那么申请内存的时候取最大值 16384 。
关于 Batch 的结构和消息的结构,我们回头单独用一篇文章来讲解。
3 内存分配
我们都知道 RecordAccumulator 里面的缓存大小是一开始定义好的, 由buffer.memory
控制, 默认 33554432 (32M)
当生产的速度大于发送速度的时候,就可能出现 Producer 写入阻塞。
而且频繁的创建和释放 ProducerBatch,会导致频繁 GC, 所有 kafka 中有个缓存池的概念,这个缓存池会被重复使用,但是只有固定( batch.size)的大小才能够使用缓存池。
PS:以下 16k 指得是 batch.size 的默认值.
Batch 的创建和释放
1. 内存 16K 缓存池中有可用内存
①. 创建 Batch 的时候, 会去缓存池中,获取队首的一块内存 ByteBuffer 使用。
②. 消息发送完成,释放 Batch, 则会把这个 ByteBuffer,放到缓存池的队尾中,并且调用ByteBuffer.clear
清空数据。以便下次重复使用
在这里插入图片描述
2. 内存 16K 缓存池中无可用内存
①. 创建 Batch 的时候, 去非缓存池中的内存获取一部分内存用于创建 Batch. 注意:这里说的获取内存给 Batch, 其实就是让 非缓存池 nonPooledAvailableMemory 减少 16K 的内存, 然后 Batch 正常创建就行了, 不要误以为好像真的发生了内存的转移。
②. 消息发送完成,释放 Batch, 则会把这个 ByteBuffer,放到缓存池的队尾中,并且调用ByteBuffer.clear
清空数据, 以便下次重复使用
在这里插入图片描述
3. 内存非 16K 非缓存池中内存够用
①. 创建 Batch 的时候, 去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建 Batch. 注意:这里说的获取内存给 Batch, 其实就是让 非缓存池(nonPooledAvailableMemory) 减少对应的内存, 然后 Batch 正常创建就行了, 不要误以为好像真的发生了内存的转移。
②. 消息发送完成,释放 Batch, 纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的 Batch 内存大小。 当然这个 Batch 会被 GC 掉
在这里插入图片描述
4. 内存非 16K 非缓存池内存不够用
①. 先尝试将 缓存池中的内存一个一个释放到 非缓存池中, 直到非缓存池中的内存够用与创建 Batch 了
②. 创建 Batch 的时候, 去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建 Batch. 注意:这里说的获取内存给 Batch, 其实就是让 非缓存池(nonPooledAvailableMemory) 减少对应的内存, 然后 Batch 正常创建就行了, 不要误以为好像真的发生了内存的转移。
③. 消息发送完成,释放 Batch, 纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的 Batch 内存大小。 当然这个 Batch 会被 GC 掉
例如: 下面我们需要创建 48k 的 batch, 因为超过了 16k,所以需要在非缓存池中分配内存, 但是非缓存池中当前可用内存为 0 , 分配不了, 这个时候就会尝试去 缓存池里面释放一部分内存到 非缓存池。
释放第一个 ByteBuffer(16k) 不够,则继续释放第二个,直到释放了 3 个之后总共 48k,发现内存这时候够了, 再去创建 Batch。
在这里插入图片描述
注意:这里我们涉及到的 非缓存池中的内存分配, 仅仅指的的内存数字的增加和减少。
4 问题和答案
发送消息的时候, 当 Broker 挂掉了,消息体还能写入到消息缓存中吗?
当 Broker 挂掉了,Producer 会提示下面的警告⚠️, 但是发送消息过程中
这个消息体还是可以写入到 消息缓存中的,也仅仅是写到到缓存中而已。
在这里插入图片描述
当最新的 ProducerBatch 还有空余的内存,但是接下来的一条消息很大,不足以加上上一个 Batch 中,会怎么办呢?
那么会创建新的 ProducerBatch。
那么创建 ProducerBatch 的时候,应该分配多少的内存呢?
触发创建 ProducerBatch 的那条消息预估大小大于 batch.size ,则以预估内存创建。否则,以 batch.size 创建。
还有一个问题供大家思考:
当消息还存储在缓存中的时候, 假如 Producer 客户端挂掉了,消息是不是就丢失了?
版权声明: 本文为 InfoQ 作者【石臻臻的杂货铺】的原创文章。
原文链接:【http://xie.infoq.cn/article/c26de03b5efedd5d32d77c4ff】。未经作者许可,禁止转载。
评论