写点什么

Netty 核心概念之 ChannelHandler&Pipeline&ChannelHandlerContext

  • 2022 年 1 月 24 日
  • 本文字数:2827 字

    阅读完需:约 9 分钟

主要流程

作为一个 Producer 来说其实核心是梳理 2 个东西:Sender 和 RecordAccumulator

  • Sender: 是 kafka 发送流程的主要服务,负责接收数据并将其放置到 RecordAccumulator,或者从 RecordAccumulator 中取出数据发送到 Kafka 的服务端,或者负责更新一些 meta 服务等情况。

  • RecordAccumulator:kafka 的整个发送流程是异步的,主要目的是为了 batch 一些数据以增大吞吐,而 RecordAccumulator 则是主要负责进行对数据缓存进行管理的主要对象



作为 Sender 单次循环体内的核心的流程大如上图所示,我们可以按照图中的流程自顶向下拆解出各个步骤的细。上述流程在 Sender#sendProducerData 中

如何判断和获取可以发送的 kafka 节点

首先在 RecordAccumulator 内部,数据是以 Map<TopicPartition, Deque>的形式缓存的:

TopicPartition 是很显然指 topic-partion

ProducerBatch 则是需要同一批发送的 Record 请求,ProducerBatch 本身不是线程安全的,实际操作时会以所在的 Deque 粒度进行上锁。在 ProducerBatch 内,实际的 recrod 以 MemoryRecordsBuilder 的形式维护,同时 ProducerBatch 也会为何很多其他数据,比如一些 request 的数据回调等等,如果后面我们可以继续聊,现阶段还是先回归主流程的分析

final long createdMs;final TopicPartition topicPartition;final ProduceRequestResult produceFuture;
private final List<Thunk> thunks = new ArrayList<>();private final MemoryRecordsBuilder recordsBuilder;private final AtomicInteger attempts = new AtomicInteger(0);private final boolean isSplitBatch;private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);
int recordCount;int maxRecordSize;private long lastAttemptMs;private long lastAppendTime;private long drainedMs;private boolean retry;private boolean reopened;复制代码
复制代码

判断这哪些数据是 ready 的核心代码在 kafka 的 RecordAccumulator 的 ready 的部分:

首先需要服务端满足一定的条件:

  • 需要被发送的 partion 的 leader 是已知的,如果包含未知的 leader 则需要访问 kafka 的服务端查询元数据,不过这部分内容会 block 整体流程,因此实际上会做成异步的

  • 当前待发送的 partion 并且没有被 muted,也就是没有被设置为阻塞状态。

  • 当前 partion 不处于 backoff 状态,这里主要指当前的 partion 有正在触发重试的状态。

其次则是当前 partion 的 batch 的需要满足一定条件

  • 当前 batch 的距离上一次发送过去的时间的等待时间> 允许等待的时延(如果是首次尝试则使用 lingerMs,如果是重试逻辑则使用 retryBackoffMs)

  • 当前双端队列是否存在已满的 batch,比如队列中的原始的数量大于 1,或者仅有一个元素但是 size 满足发送条件

  • 当前 Producer 已经处于 close 状态

  • 总体内存已满:我们已经知道 Producer 的数据是需要缓存一段时间的,Producer 内部有一个控制内存的内存池即 BufferPool,如果内存不够用了则会排队申请,如果这对队列不为空则说明总内存不够了

  • 存在正在刷新的线程:这里稍微难理解一点,等我比较确定了再补充。

  • 事务完成,(高版本 kakfa 支持的事务模型,暂不赘述)

如何获取待发送的 Batch 数据

主要逻辑概括的说:

遍历 RecordAccumulator 中的 ConcurrentMap<TopicPartition, Deque>,针对每个 TopicPartition 尝试获取不高于 maxRequestSize 的 batch 列表,将这些 Batch 关闭并放入待发送列表中。

但是在实现中还是有一些逻辑需要注意。

我们都知道基本的 kafka 的 broker 和 kafka 的 topic-partion 的概念,不同的 partion 可能分配到同一个 broker 上。在 kafka 的实现中,每次 drain 的过程只会从当前的 node 节点中调出一个 partion 进行发送消息。

为了避免每次投递的时候都从 0 开始投递从而导致序列化较大的 partion 会饥饿,客户端虚拟出了一个 drainIndex,在每次 drain 的过程中会递增,实际的其实节点从 start 开始。

int start = drainIndex = drainIndex % parts.size();复制代码
复制代码

不过这里有一点我没太看懂,为什么 drainInde 是全局的,如果是我做可能就做 nodeId 维度的了,不太清楚这里考虑的点是什么?如果是全局的 drainIndex,其实还存在如果单个 Node 的 partion 太多远远多余其他的 Node 从而导致饥饿?

另一个有意思的问题是当有一些比较极端的 case,比如单个 Batch 里面只有一个 message,但是这个 message 的 size 已经大于 request size 的限制了,这时候就会尝试将这条消息单独作为一个 batch 发送,为了实现在这一点,kafka 的 client 只在待发送列表不为空时检测当前待发送+nextbatch 的 size 之和是否大于 request size

上述代在:RecordAccumulator#drainBatchesForOneNode 中

如何检测过期数据

检测其实分成 2 个部分,一部分是 RecordAccumulator 中的 buffer 的数据 ConcurrentMap<TopicPartition, Deque>中的过期数据,一部分是 Sender 中的待发送的数据 Map<TopicPartition, List> inFlightBatches

比较的时间是 deliveryTimeoutMs,和当前时间-创建时间的差值。

对于失效的数据会调用失败的 callBack。

数据发送和打包

完成了上述数据过滤的数据会打包成 ProduceRequest 交给 client 进行发送。

配置和限制条件

梳理完上述的条件之后我们来一起看下有哪些配置控制了上述的一些流程:

  • batch.size:这里指的是每个双端队列的 ProducerBatch 的 size 大小

  • buffer.memory:这里指的是 RecordAccumulator 的 Buffer 的 size 大小

  • max.request.size:这里指的是在 drain 的过程中发给每个 Node 的 size 大小,如果是单个 messge 大于这个值是会跳过检测的,但是会影响打包的方式。

  • linger.ms:在非重试的场景下数据从 ProducerBatch 开始创建到 drain 到待发送区域之间,在 buffer 中驻留的时间

  • retry.backoff.ms:基本同上,区别在于是在重试场景下允许驻留在 buffer 和待发送区域的最大时间,这个配置实际上是为了避免一些极端的场景,比如在重试的场景下,可能是由于服务端有问题,如果我们不增加 client 在内存驻留的时间,则可能在非常短的时间内把重试次数耗尽。

  • delivery.timeout.ms:这个配置指的的是从数据从 add 到 kafka 的客户端到 client 开始处理发送流程的总耗时,包括驻留在 buffer 中的时间和待发送列表中的时间。

  • request.timeout.ms:这部分时间实际上指的是 client 开始发送 request 到收到 response 之间的时间。



比如我所遇到的线上问题 :e.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.

从代码来看是卡在了申请内存的阶段,实际上就是 buffer 的 size 不够了。对照了一下 Producer 的配置发现了 batch.size 设置过大导致了下游的 topicpartion 的数量 batch.size 之后远超过 buffer.memory,也就是 buffer 最多只能放部分 partion 的数据,进而导致整个 Producer 的生产流程阻塞。

最后

如果你觉得此文对你有一丁点帮助,点个赞。或者可以加入我的开发交流群:1025263163 相互学习,我们会有专业的技术答疑解惑

如果你觉得这篇文章对你有点用的话,麻烦请给我们的开源项目点点 star:http://github.crmeb.net/u/defu不胜感激 !

PHP 学习手册:https://doc.crmeb.com技术交流论坛:https://q.crmeb.com

用户头像

还未添加个人签名 2021.11.02 加入

CRMEB就是客户关系管理+营销电商系统实现公众号端、微信小程序端、H5端、APP、PC端用户账号同步,能够快速积累客户、会员数据分析、智能转化客户、有效提高销售、会员维护、网络营销的一款企业应用

评论

发布
暂无评论
Netty核心概念之ChannelHandler&Pipeline&ChannelHandlerContext