写点什么

八股 MQ003——聊聊 Consumer

作者:Codyida
  • 2023-05-04
    广东
  • 本文字数:3566 字

    阅读完需:约 12 分钟

Consumer 是什么?

以 Kafka 为例(以下皆是),Consumer 是 Kafka 重要的组成部分,它可以从 Kafka 的 Topic 中读取消息并做处理。Consumer 是消息消费能力的实现者。它可以单独存在,也可以由多个 Consumer 组合成 Consumer Group,以此来实现消费能力的横向拓展。

Consumer 的生命周期是怎样的

Kafka 的 Consumer 的生命周期从创建到关闭,整个过程可以大概分为以下几个阶段:


  1. 初始化:Consumer 在 Kafka 中是一个类,其创建也就是对应类的实例,需要设置相关的参数去影响 Consumer 消费消息的行为和性能。

  2. 订阅:Consumer 通过 subscribe()方法,指定要消费的 Topic,并获取 Broker 分配的 Partition。除了 subscribe()方法,还可以通过正则表达式来实现动态监听满足条件的多个 Topic,或者通过实现 ConsumerRebalanceListener 来动态地订阅主题和分区。

  3. 拉取:Consumer 通过 poll()方法,从 Broker 获取批量的消息。就像鲨鱼需要一直游动才不会窒息死亡,Consumer 也需要不停调用 poll()方法来维持活跃、获取消息。poll()方法除了拉取批量消息以外,还需要给 Broker 的 Group Coordinator 发送 HeartBeat,用来维持 Group 内的成员关系。Rebalance 过程中,Group Coordinator 也会通过 poll()方法向 Consumer 传达如:Rebalance 开始、Group Leader、重分配的 Partition 等信息。

  4. 消费:Consumer 根据自身业务需求,对拉取到的消息进行各种业务逻辑处理、如:数据存储、消息加工、调用下游、数据计算等。消费需要注意 Consumer 是批量获取的消息,批量处理的整体时长正常时不能超过 max.poll.interval.ms 的间隔,不然这会导致 Rebalance 的发生。

  5. 提交:Consumer 在处理完消息之后,需要通过 commitSync() 或 commitAsync() 方法向 Broker 提交消费完成的 offset。offset 被 Broker 用来记录消息被成功消费了的位置信息,用于重启或故障恢复时继续消费新的消息。可以通过 enable.auto.commit 参数来自动或手动提交 offset。手动提交 offset 会有更好的灵活性。

  6. 关闭:在不需要消费数据时,调用 close()方法,关闭 KafkaConsumer 对象。这会释放资源,取消订阅,提交位移,离开消费者组等。

初始化

Consumer 的初始化就是 KafkaConsumer 类的实例化。实例化时,有以下几个重要的参数可以注意:


  1. bootstrap.servers:这个参数除了在 Consumer 的初始化时会用到,Producer 初始化时也会用到。它是用于发现 Kafka 集群信息的。它形态上是以“,”连接的多个“broker:port”字符串。

  2. key.deserializer 与 value.deserializer:用于将消息的 key 与 value 从二进制字节流反序列化为对象的方式。消息的 key 用于唯一定义一条消息,决定这条消息被生成到 Topic 里的哪个 Partition,消息的 value 对应的是实际生成的消息内容。

  3. group.id:用于判断 Consumer 是否属于同一个 Group 的参数。

  4. fetch.min.bytes:设置 Consumer 从 Kafka Fetch 的最小消息大小。当 Topic 没有满足该参数大小的消息时,poll 会阻塞。

  5. fetch.max.wait.ms:Consumer 等待 Kafka 返回消息的最大等待时间,默认值:500ms。该参数与 fetch.min.bytes 会共同影响 Kafka 返回消息给 Consumer 的时机。

  6. max.partition.fetch.bytes:该参数控制 Kafka 允许的 Topic 的每个 Partition 能返回的最大消息大小。该参数乘以 Partition 数量,相当于 Topic 消费的最大消息总量,再除以 Consumer 的个数,便是每个 Consumer 需要承载的最大消息大小。

  7. session.timeout.ms:和 Rebalance 中介绍的一样,该参数是 Consumer 与 Broker 维持会话的最大超时时间,默认 10s,一般设置为 Consumer HeartBeat 心跳间隔的 3~5 倍。

  8. auto.offset.reset:该参数用于设定 Consumer 初始消费位置。如:latest:从最新位置开始消费;earliest:从最老的消息开始消费。此外,可以通过运维脚本来修改这个值。

  9. enable.auto.commit:用于指定是否启用自动提交偏移量的机制,若为 true,则 Consumer 会在每次 poll 之后或者每隔一段时间(auto.commit.interval.ms 设定)就提交最大的 offset 给 kafka;若为 false,则 Consumer 需要通过 commitSync 或者 commitAsync 方法来提交 offset

  10. partition.assignment.strategy:分区分配策略,可见下面。

  11. max.poll.records:Consumer 的 poll()方法返回的最大消息数量。


具体来说,常见的 Consumer 初始化步骤如下:


  1. 构造 Propertity,进行 consumer 相关的配置;

  2. 创建 KafkaConsumer 的对象 consumer;

  3. 订阅相应的 topic 列表;

  4. 调用 consumer 的 poll 方法拉取订阅的消息。

订阅消息

consumer 通过 subcribe() 方法订阅 Topic。这个方法可以接受多个 topic 作为一个 list,或者一个正则表达式来匹配多个 Topic。


consumer 订阅 topic 之后,会加入到 Consumer Group 之中。一个 Group 内的多个 Consumer 会负载均衡地消费 Topic 内的多个 Partition。Kafka 的分区分配策略是:


  1. RangeAssignor:按照 Consumer 数量与 Partition 数量进行均分。

  2. RoundRobinAssignor:将 Consumer 与订阅的 Topic 所有分区分别按照字典序进行排序,然后通过轮询的方式将每个分区逐个分配给 Consumer,

  3. StickyAssignor:该策略有两个目的:1. 尽可能均分分区;2. 尽可能保障每次分配的结果变化不大。这个逻辑比较复杂,就先按下不表。

poll 模型

poll 方法是 consumer 的心脏。poll 方法会处理 Group Coordinate、Rebalance、HeartBeat、消息抓取、离开 Group 等的所有信息。consumer 在创建之后,也是在 poll 之后,才会去连接到 Kafak 的实际集群。


首先大概看一下 poll 方法的原理。它主要做了以下几件事情:


  1. 检查 Consumer 是否有订阅 Topic 与 Partition;如果没有,这里会抛出异常;

  2. 调用实际的 pollOnce() 方法获取消息 records;

  3. 返回数据之前,发送下次的 fetch()请求,避免下次请求时被 pullOnce 阻塞;

  4. 如果在一定时间内没有获取到数据,则返回拉取的 records 为空;


pollOnce()方法大概做了以下几件事情:


  1. 获取 GroupCoordinator 的地址,与之建立相应的 TCP 连接。发送 JoinGroup 与 SyncGroup 请求。至此 Consumer 完成了与 Kafka 集群的连接,并加入了一个 ConsumerGroup。

  2. 更新需要拉取的 Partition 的 fetch-position offset,准备拉取消息。

  3. 获取 fetcher 拉取的消息,如果此时获取到消息,直接返回。如果为空则需要重新发送 sendFetch 的请求。

  4. 调用底层 NetworkClient 的 poll 方法,等待 send 的 fetch 完成所有消息的抓取。

  5. Rebalance 检查:判断当前 Consumer 分配的 Topic-Partition 信息是否有变化。有的话,这次 poll 方法直接返回空,Consumer 等待 GroupCoordinator 发起 Rebalance 流程。

消费

消息就是 Consumer 代表的实际业务逻辑了。在风控场景中,一般会有风控物料组装、风控模型请求、策略逻辑计算、策略日志留存等等。

提交

Consumer 在完成消费时,会向 Kafka 同步自己完成消费的消息的位置,这个位置会保留在 Partition 中,用于记录这个分区消息消费的进度。


Consumer 完成 commit offset 有如下几种方法:


  1. 自动 Commit:通过配置 enable.auto.commit=true 与设置 auto.commit.interval.ms 自动 commit 间隔,来控制这个自动的过程。自动 commit 机制也与 poll 模型绑定,当 Consumer 每次 poll 时,Consumer 便会检查是否满足自动 Commit 的要求。

  2. 手动 Commit:通过 CommitSync 与 CommitAsync 来实现同步与异步 Commit。在 Consumer 完成消息消费之后,如果调用 CommitSync,那么会阻塞消费进程直到 Commit 成功。而如果使用 CommitAsync,则会异步触发,立即开始下一次 poll。CommitAsync 可以通过 Callback 方法,实现异步 Commit 失败时的感知。也可以考虑结合使用两种 Commit 方式,如每次批处理完成后,使用异步方法,不阻塞下一次批处理,提升 Consumer 处理效率;但当 Consumer 退出或者程序异常时,使用同步 Commit,确保 Consumer 最新的消费进度能被同步到 Partition 上。

关闭

Consumer 的关闭,也可以说退出 Group,从使用上来看,会有以下几种情况:


  1. 通过 consumer.Close() 或者 consumer.Shutdown()方法,关闭网络连接与 socket,触发 Rebalance。但 Close()方法会完成 offset 的提交。

  2. 还可以通过设置 Consumer 未接收新消息的时间来完成 Consumer 的关闭,没使用过,这里不深入。


Consumer 退出时,大概会做以下几件事:


  1. 向 GroupCoordinator 发送一个 LeaveGroupRequest,告知自己要离开 Group。

  2. GroupCoordinator 收到 LeaveGroupRequest 后,会检查 Consumer 是否是 Group 的成员,如果是,就将其从成员列表中移除,并更新 Group 的元数据。

  3. GroupCoordinator 还会检查消费者是否是消费组的 leader,如果是,就会从剩余的成员中选择一个新的 leader,并通知它。

  4. GroupCoordinator 还会检查 Group 是否还有其他成员,如果没有,就会删除消费组的元数据,并释放相关的资源。

  5. GroupCoordinator 还会触发一次 Rebalance,将原来由 Consumer 负责的分区分配给其他消费者。

  6. Consumer 关闭时,还会关闭网络连接和套接字,释放所有的资源,并根据配置,提交最后的 offset 到存储(Kafka 或 Zookeeper)。

参考资料

  1. Chapter 4. Kafka Consumers: Reading Data from Kafka

  2. 你真的了解bootstrap.servers这个参数么?

  3. Kafka 源码解析之 Consumer Poll 模型(七) - 知乎 (zhihu.com)

发布于: 2023-05-04阅读数: 2
用户头像

Codyida

关注

还未添加个人签名 2017-12-21 加入

还未添加个人简介

评论

发布
暂无评论
八股MQ003——聊聊Consumer_后端_Codyida_InfoQ写作社区