写点什么

kafka 高性能设计之内存池

  • 2023-05-08
    湖南
  • 本文字数:2067 字

    阅读完需:约 7 分钟

Kafka 的内存池是一个用于管理内存分配的缓存区域。它通过在内存上保留一块固定大小的内存池,用于分配消息缓存、批处理缓存等对象,以减少频繁调用内存分配函数的开销。


Kafka 内存池的实现利用了 Java NIO 中的 ByteBuffer。当需要创建一个新的缓存对象时,内存池会取出一块固定大小的内存块,并在存储内存池对象的池中保存该内存块的引用。当该内存块不再被使用时,内存池将把它收回,以供下一次使用。


使用内存池可以提高 Kafka 生产者的性能,因为对象 kafka 这样的消息中间件,需要频繁地创建对象,我们知道频繁地创建对象很消耗内存,使用内存池可以减少内存的消耗,此外,内存池还可以减少内存碎片的产生,提高内存使用效率。

实现

下面我们从几个方面来对象内存池的实现进行详细介绍。

创建内存池

在 kafka 初始化的时候,会对内存池进行初始化,在 Kafka Producer 端,有一个 BufferPool,与它相关的配置参数是buffer.memorybatch.sizebuffer.memory它代表缓冲区内存的大小,默认为 32M,batch.size代表消息批次的大小,默认为 16kb,在 BufferPool 中,batch.size其实就是代表一个 ByteBuffer 的大小,因为 BufferPool 只管理batch.size大小的 ByteBuffer,在 kafka 初始化的时候,就会创建缓冲区(new BufferPool),如下,在创建消息收集器 RecordAccumulator 的时候,就创建了 BufferPool。

this.accumulator = new RecordAccumulator(logContext,                    batchSize,                    this.compressionType,                    lingerMs(config),                    retryBackoffMs,                    deliveryTimeoutMs,                    partitionerConfig,                    metrics,                    PRODUCER_METRIC_GROUP_NAME,                    time,                    apiVersions,                    transactionManager,                    new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
复制代码

分配内存

我们知道 kafka 的消息不是直接发送到 broker,而是先发送到消息收集器 RecordAccumulator,而消息发送到 RecordAccumulator,是需要先申请内存的,如果消息的大小大于内存池 BufferPool 的大小,那么这是不允许的,会抛出异常,比如我的消息的大小时 40M,但是内存池的大小是 32M,那么显然 BufferPool 装不下消息,就会报错。


上一篇中,我们说了消息是被存储在队列中,以 ProducerBatch 的形式,当发送消息时,获取分区对应的队列,入队队列不存在,就创一个队列,这个队列就是装 ProducerBatch 的队列,为 Deque,然后从队列中取出一个 ProducerBatch,如果存在 ProducerBatch,那么就判断这个 ProducerBatch 是否足够装得下消息,如果能够装得下,那么就将消息装入,如果装不下,那么就重新创建一个 ProducerBatch,然后将消息加入新创建的这个 ProducerBatch,最后将这个 ProducerBatch 加入队列中,然后释放掉 ProducerBatch,其实就是释放掉 ByteBuffer 中的 ProducerBatch,因为 ProducerBatch 本身就是由 ByteBuffer 来进行承载。


如果消息的长度大于 16kb(注意,这个16kb是batch.size参数的默认值,如果我们对batch.size进行设置,那么就按照我们设置的值来算),那么就按消息的实际大小来进行创建,如果小于或等于 16kb,那么就按照 16kb 来进行创建,如下代码所示,会将 batchSize 和我们消息的大小进行比较,选出最大的,然后去分配 Buffer。

我们知道 ProducerBatch 是放在 ByteBuffer 中,所以在创建 ProducerBatch 的时候,会去申请一个 ByteBuffer,如果我们的消息小于或者等于batch.size(默认为 16kb),那么就会去缓冲池 BufferPool 中取一块 ByteBuffer 来给 ProducerBatch 使用,如上图所示,这些 ByteBuffer 都被缓冲池 BufferPool 管理起来,如果我们的消息大于batch.size,那么就无法使用缓冲池中的 ByteBuffer 了。


如下,在 allocate 方法中,如果我们消息所需要的 ByteBuffer 的大小等于poolableSize并且 BufferPool 中存在 ByteBuffer,那么久直接从 BufferPool 的队列中获取一个 ByteBuffer,poolableSize其实就是batch.size

释放内存

当我们消息发送完以后,就需要释放 ByteBuffer,然后再将 ByteBuffer 加入到 BufferPool 中,以供后面使用,注意,只有batch.size大小的ByteBuffer才能加入BufferPool中,后面才能复用,大于batch.size的ByteBuffer不能加入BufferPool中,大于 batch.size 的则和非缓冲池的内存有关,和 nonPooledAvailableMemory 这个值有关,就不去详细说它,如下,通过 buffer.clear()清空 ByteBuffer,然后将清空后的 buffer 加入队列中。

总结

上面我们对 kafka 的为什么使用内存池,使用内存池的好处进行了分析,然后对它怎么实现进行了分析,分别从创建,使用和释放去进行详细说明,不过我们应该记住的是,kafka 使用内存池的条件是我们的消息的大小必须小于等于batch.size的值,这样内存池才能发挥它的作用,如果我们的消息很大,然而也没对batch.size进行设置,使用的是默认值,那么将不能使用内存池,不能发挥它的性能。


作者:刘牌

链接:https://juejin.cn/post/7230469903122743351

来源:稀土掘金

用户头像

还未添加个人签名 2021-07-28 加入

公众号:该用户快成仙了

评论

发布
暂无评论
kafka高性能设计之内存池_Java_做梦都在改BUG_InfoQ写作社区