本文首发于 8 月。内容已用一种抽象的方式做成了视频,喜欢看视频的同学可以在 B 站上搜索“抽象狗哥”观看相应的内容。
0.前言
前阵子团队里出了个大故障,本质是因为其他语言实现的 client 有问题,非常频繁的请求大量元数据,而 Kafka 服务端这边也没有做什么限制,导致 Kafka Broker 宕了。
在相关的复盘报告中,复盘方提到了我这边的监控程序(用于观察线上实时作业的堆压)会频繁的去获取一些元数据,也是在间接的增加 Kafka 集群的压力,建议修改成消费__consumer_offsets的方式。(我这边用的是 AdminClient#listConsumerGroupOffsets 和 AdminClient.listOffsets 来获取 commit 和 end 的 offset)
有点像https://stackoverflow.com/questions/60067622/how-to-get-latest-offset-size-of-a-kafka-topic-using-kafkaadminclient-java-for中Adán Escobar提供的答案。
这个事老哥之前有和我沟通过几次,那时我问他:你这边有什么根据吗?他没有正面回答我——听说这老哥之前在别的地方维护过很大的 Kafka 集群,对此我半信半疑的在网上搜索过一阵子,但是并没有找到对应的答案。
直到这次,我这边的监控程序被要求整改。对此我觉得莫名其妙,于是有了这篇文章——我们来扒一扒源码。
本文的代码基于 Kafka 3.9。
消费__consumer_offsets本质上来说就是 Consumer 顺序读 Broker 上的日志,消费过程这块网上源码解析非常多,总体来说代价也不大,就不再赘述了。我们直接来看 AdminClient 上的实现。
1.AdminClient 相关源码分析
1.1 AdminClient#listConsumerGroupOffsets
|--ListConsumerGroupOffsetsHandler
|--ApiKeys.OFFSET_FETCH
\--handleOffsetFetchRequest
\--handleOffsetFetchRequestFromCoordinator
\--handleOffsetFetchRequestFromZookeeper
复制代码
在早期版本中,kafka 的元数据是保存在的 zk 里的。为了更全面的带大家阅读代码,我们把两个实现都读一遍。
From KRaft
\--fetchOffsetsForGroup
|--GroupCoordinatorAdapter
\-- fetchOffsets
\--handleFetchOffset
|--GroupCoordinator
\--handleFetchOffsets
|--GroupmetadataManager
\--getOffsets
复制代码
那么从 getOffsets 的实现为:
def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) val group = groupMetadataCache.get(groupId) if (group == null) { topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) topicPartition -> partitionData }.toMap } else { group.inLock { if (group.is(Dead)) { topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) topicPartition -> partitionData }.toMap } else { val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
topicPartitions.map { topicPartition => if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { val partitionData = group.offset(topicPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } topicPartition -> partitionData } }.toMap } } } }
复制代码
在这里我们可以看到,相关的信息其实从groupMetadataCache这个内存缓存中获取的, 并不是一个很重的操作。而缓存的 load 方法是loadGroupsAndOffsets,因为篇幅原因,不再展开,有兴趣的同学可以自行阅读。
From Zookeeper
逻辑非常简单,直接粘代码:
private def handleOffsetFetchRequestFromZookeeper(request: RequestChannel.Request): CompletableFuture[Unit] = { val header = request.header val offsetFetchRequest = request.body[OffsetFetchRequest]
def createResponse(requestThrottleMs: Int): AbstractResponse = { val offsetFetchResponse = // reject the request if not authorized to the group if (!authHelper.authorize(request.context, DESCRIBE, GROUP, offsetFetchRequest.groupId)) offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED) else { val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset fetch requests")) val (authorizedPartitions, unauthorizedPartitions) = partitionByAuthorized( offsetFetchRequest.partitions.asScala, request.context)
// version 0 reads offsets from ZK val authorizedPartitionData = authorizedPartitions.map { topicPartition => try { if (!metadataCache.contains(topicPartition)) (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION) else { val payloadOpt = zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition) payloadOpt match { case Some(payload) => (topicPartition, new OffsetFetchResponse.PartitionData(payload, Optional.empty(), OffsetFetchResponse.NO_METADATA, Errors.NONE)) case None => (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION) } } } catch { case e: Throwable => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), OffsetFetchResponse.NO_METADATA, Errors.forException(e))) } }.toMap
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava) } trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.") offsetFetchResponse } requestHelper.sendResponseMaybeThrottle(request, createResponse) CompletableFuture.completedFuture[Unit](()) }
复制代码
首先检查用户是否被授权访问指定的组,如果没有授权,则返回授权失败的错误响应。然后根据请求中的分区信息,将分区分为授权和未授权的分区。对于授权的分区,尝试从 ZooKeeper 中获取消费偏移量,并根据结果生成相应的分区数据。如果出现异常,则生成一个包含无效偏移量的分区数据。最后将授权和未授权的分区数据合并,并将响应发送回客户端。
1.2 AdminClient#listOffsets
|--ListOffsetsHandler
|--ApiKeys.LIST_OFFSETS
\--handleListOffsetRequest
\--handleListOffsetRequestV0
\--handleListOffsetRequestV1AndAbove
复制代码
这里也分成了两个版本,引入 KRaft 之前是handleListOffsetRequestV0,之后则是handleListOffsetRequestV1AndAbove。除了在部分功能支持的差异和错误处理更加细致外,核心调用的replicaManager.fetchOffsetForTimestamp并无变化。而这个函数的底层实现本质是调用 Kafka Log,即去 Broker 的 Log 上查询相关的信息。
2.小结
listConsumerGroupOffsets这个命令在 KRaft 之前的实现是读取 Zookeeper,但由于 ZK 存储的特性,小量点查的代价并不大。如果在启用 KRaft 的情况下,并不是什么性能瓶颈。
而listOffsets则是通过 Kafka Broker 读取对应 Topic Partition 中的 Log 实现的,相比 Consumer 消费__consumer_offsets来说,性能在其之下——如果进行大频次的读,本质上来说是在做随机 IO 读,是比不上消费__consumer_offsets的顺序读的。如果高频次的做读取操作,是一定会引起 IO 压力的。
2.1 其他答疑
以下问题来自于一些视频号底下的提问,这边统一回答。
Q1:Kafka 百万吞吐,几个查询接口就查挂了?
A:
高吞吐基于顺序读写与 PageCache 等特性。seek 多个 topic parition 的 end offset 是没法利用以上特性的,和高吞吐毫无相关。
并没有说因为调 admin client API 导致 KAFKA 挂了。但这里面的确是有可优化的点。
Q2:不就是查这么点信息吗?能消耗多少资源?我管过的集群多了,定时任务半小时查一次,从来没见查挂过。
A:文中提到了高频次的读取操作是分钟级的。实际上我们的 Kafka 也不小,正是因为故障影响面大,所以我这边也有幸参与了复盘。
Q3:获取元数据会导致集群压力,认真的嘛?我怎么记得 Kafka 发消息前都会检查一次当前 topic 的元数据
A:我们这里的获取元数据特指 seek 到 kafka log 的对应位置去获取 end offset。
评论