写点什么

你说通过 Kafka AdminClient 获取 Lag 会有性能问题?尊嘟假嘟 0.o

作者:泊浮目
  • 2025-12-16
    浙江
  • 本文字数:4145 字

    阅读完需:约 14 分钟

你说通过Kafka AdminClient获取Lag会有性能问题?尊嘟假嘟0.o


本文首发于 8 月。内容已用一种抽象的方式做成了视频,喜欢看视频的同学可以在 B 站上搜索“抽象狗哥”观看相应的内容。

0.前言

前阵子团队里出了个大故障,本质是因为其他语言实现的 client 有问题,非常频繁的请求大量元数据,而 Kafka 服务端这边也没有做什么限制,导致 Kafka Broker 宕了。


在相关的复盘报告中,复盘方提到了我这边的监控程序(用于观察线上实时作业的堆压)会频繁的去获取一些元数据,也是在间接的增加 Kafka 集群的压力,建议修改成消费__consumer_offsets的方式。(我这边用的是 AdminClient#listConsumerGroupOffsets 和 AdminClient.listOffsets 来获取 commit 和 end 的 offset)


有点像https://stackoverflow.com/questions/60067622/how-to-get-latest-offset-size-of-a-kafka-topic-using-kafkaadminclient-java-forAdán Escobar提供的答案。


这个事老哥之前有和我沟通过几次,那时我问他:你这边有什么根据吗?他没有正面回答我——听说这老哥之前在别的地方维护过很大的 Kafka 集群,对此我半信半疑的在网上搜索过一阵子,但是并没有找到对应的答案。


直到这次,我这边的监控程序被要求整改。对此我觉得莫名其妙,于是有了这篇文章——我们来扒一扒源码。


本文的代码基于 Kafka 3.9。


消费__consumer_offsets本质上来说就是 Consumer 顺序读 Broker 上的日志,消费过程这块网上源码解析非常多,总体来说代价也不大,就不再赘述了。我们直接来看 AdminClient 上的实现。

1.AdminClient 相关源码分析

1.1 AdminClient#listConsumerGroupOffsets


|--ListConsumerGroupOffsetsHandler
|--ApiKeys.OFFSET_FETCH
\--handleOffsetFetchRequest
\--handleOffsetFetchRequestFromCoordinator
\--handleOffsetFetchRequestFromZookeeper
复制代码


在早期版本中,kafka 的元数据是保存在的 zk 里的。为了更全面的带大家阅读代码,我们把两个实现都读一遍。

From KRaft

    \--fetchOffsetsForGroup
|--GroupCoordinatorAdapter
\-- fetchOffsets
\--handleFetchOffset
|--GroupCoordinator
\--handleFetchOffsets
|--GroupmetadataManager
\--getOffsets
复制代码


那么从 getOffsets 的实现为:


  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))    val group = groupMetadataCache.get(groupId)    if (group == null) {      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>        val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,          Optional.empty(), "", Errors.NONE)        topicPartition -> partitionData      }.toMap    } else {      group.inLock {        if (group.is(Dead)) {          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>            val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,              Optional.empty(), "", Errors.NONE)            topicPartition -> partitionData          }.toMap        } else {          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
topicPartitions.map { topicPartition => if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { val partitionData = group.offset(topicPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } topicPartition -> partitionData } }.toMap } } } }
复制代码


在这里我们可以看到,相关的信息其实从groupMetadataCache这个内存缓存中获取的, 并不是一个很重的操作。而缓存的 load 方法是loadGroupsAndOffsets,因为篇幅原因,不再展开,有兴趣的同学可以自行阅读。

From Zookeeper

逻辑非常简单,直接粘代码:


  private def handleOffsetFetchRequestFromZookeeper(request: RequestChannel.Request): CompletableFuture[Unit] = {    val header = request.header    val offsetFetchRequest = request.body[OffsetFetchRequest]
def createResponse(requestThrottleMs: Int): AbstractResponse = { val offsetFetchResponse = // reject the request if not authorized to the group if (!authHelper.authorize(request.context, DESCRIBE, GROUP, offsetFetchRequest.groupId)) offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED) else { val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset fetch requests")) val (authorizedPartitions, unauthorizedPartitions) = partitionByAuthorized( offsetFetchRequest.partitions.asScala, request.context)
// version 0 reads offsets from ZK val authorizedPartitionData = authorizedPartitions.map { topicPartition => try { if (!metadataCache.contains(topicPartition)) (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION) else { val payloadOpt = zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition) payloadOpt match { case Some(payload) => (topicPartition, new OffsetFetchResponse.PartitionData(payload, Optional.empty(), OffsetFetchResponse.NO_METADATA, Errors.NONE)) case None => (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION) } } } catch { case e: Throwable => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), OffsetFetchResponse.NO_METADATA, Errors.forException(e))) } }.toMap
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava) } trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.") offsetFetchResponse } requestHelper.sendResponseMaybeThrottle(request, createResponse) CompletableFuture.completedFuture[Unit](()) }
复制代码


首先检查用户是否被授权访问指定的组,如果没有授权,则返回授权失败的错误响应。然后根据请求中的分区信息,将分区分为授权和未授权的分区。对于授权的分区,尝试从 ZooKeeper 中获取消费偏移量,并根据结果生成相应的分区数据。如果出现异常,则生成一个包含无效偏移量的分区数据。最后将授权和未授权的分区数据合并,并将响应发送回客户端。

1.2 AdminClient#listOffsets


|--ListOffsetsHandler
|--ApiKeys.LIST_OFFSETS
\--handleListOffsetRequest
\--handleListOffsetRequestV0
\--handleListOffsetRequestV1AndAbove
复制代码


这里也分成了两个版本,引入 KRaft 之前是handleListOffsetRequestV0,之后则是handleListOffsetRequestV1AndAbove。除了在部分功能支持的差异和错误处理更加细致外,核心调用的replicaManager.fetchOffsetForTimestamp并无变化。而这个函数的底层实现本质是调用 Kafka Log,即去 Broker 的 Log 上查询相关的信息。

2.小结

listConsumerGroupOffsets这个命令在 KRaft 之前的实现是读取 Zookeeper,但由于 ZK 存储的特性,小量点查的代价并不大。如果在启用 KRaft 的情况下,并不是什么性能瓶颈。


listOffsets则是通过 Kafka Broker 读取对应 Topic Partition 中的 Log 实现的,相比 Consumer 消费__consumer_offsets来说,性能在其之下——如果进行大频次的读,本质上来说是在做随机 IO 读,是比不上消费__consumer_offsets的顺序读的。如果高频次的做读取操作,是一定会引起 IO 压力的。

2.1 其他答疑

以下问题来自于一些视频号底下的提问,这边统一回答。

Q1:Kafka 百万吞吐,几个查询接口就查挂了?

A:


  1. 高吞吐基于顺序读写与 PageCache 等特性。seek 多个 topic parition 的 end offset 是没法利用以上特性的,和高吞吐毫无相关。

  2. 并没有说因为调 admin client API 导致 KAFKA 挂了。但这里面的确是有可优化的点。

Q2:不就是查这么点信息吗?能消耗多少资源?我管过的集群多了,定时任务半小时查一次,从来没见查挂过。

A:文中提到了高频次的读取操作是分钟级的。实际上我们的 Kafka 也不小,正是因为故障影响面大,所以我这边也有幸参与了复盘。

Q3:获取元数据会导致集群压力,认真的嘛?我怎么记得 Kafka 发消息前都会检查一次当前 topic 的元数据

A:我们这里的获取元数据特指 seek 到 kafka log 的对应位置去获取 end offset。

发布于: 刚刚阅读数: 3
用户头像

泊浮目

关注

You will, my hands. 2018-03-16 加入

Infra && Big Data,从业8年,毕业就创业,开源爱好者,带过团队进过厂子。全网同名,B站/微信视频号搜“抽象狗哥”,公众号“狗哥琐话”。

评论

发布
暂无评论
你说通过Kafka AdminClient获取Lag会有性能问题?尊嘟假嘟0.o_kafka_泊浮目_InfoQ写作社区