写点什么

vivo Pulsar 万亿级消息处理实践(3)-KoP 指标异常修复

  • 2025-07-10
    广东
  • 本文字数:7779 字

    阅读完需:约 26 分钟

作者:vivo 互联网大数据团队- Chen Jianbo

本文是《vivo Pulsar 万亿级消息处理实践》系列文章第 3 篇。

Pulsar 是 Apache 基金会的开源分布式流处理平台和消息中间件,它实现了 Kafka 的协议,可以让使用 Kafka API 的应用直接迁移至 Pulsar,这使得 Pulsar 在 Kafka 生态系统中更加容易被接受和使用。KoP 提供了从 Kafka 到 Pulsar 的无缝转换,用户可以使用 Kafka API 操作 Pulsar 集群,保留了 Kafka 的广泛用户基础和丰富生态系统。它使得 Pulsar 可以更好地与 Kafka 进行整合,提供更好的消息传输性能、更强的兼容性及可扩展性。vivo 在使用 Pulsar KoP 的过程中遇到过一些问题,本篇主要分享一个分区消费指标缺失的问题。


系列文章:

  1. vivo Pulsar万亿级消息处理实践(1)-数据发送原理解析和性能调优

  2.  vivo Pulsar万亿级消息处理实践(2)-从0到1建设Pulsar指标监控链路


文章太长?1 分钟看图抓住核心观点👇

一、问题背景

在一次版本灰度升级中,我们发现某个使用 KoP 的业务 topic 的消费速率出现了显著下降,具体情况如下图所示:

什么原因导致正常的升级重启服务器会出现这个问题呢?直接查看上报采集的数据报文:

kop_server_MESSAGE_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 3kop_server_BYTES_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 188
复制代码

我们看到,KoP 消费指标 kop_server_MESSAGE

_OUT、kop_server_BYTES_OUT 是有上报的,但指标数据里的 group 标签变成了空串(缺少消费组名称),分区的消费指标就无法展示了。是什么原因导致了消费组名称缺失?


二、问题分析

1、找到问题代码

我们去找下这个消费组名称是在哪里获取的,是否逻辑存在什么问题。根据 druid 中的 kop_subscription 对应的消费指标 kop_server_

MESSAGE_OUT、kop_server_BYTES_OUT,找到相关代码如下:

private void handleEntries(final List<Entry> entries,                               final TopicPartition topicPartition,                               final FetchRequest.PartitionData partitionData,                               final KafkaTopicConsumerManager tcm,                               final ManagedCursor cursor,                               final AtomicLong cursorOffset,                               final boolean readCommitted) {....        // 处理消费数据时,获取消费组名称        CompletableFuture<String> groupNameFuture = requestHandler                .getCurrentConnectedGroup()                .computeIfAbsent(clientHost, clientHost -> {                    CompletableFuture<String> future = new CompletableFuture<>();                    String groupIdPath = GroupIdUtils.groupIdPathFormat(clientHost, header.clientId());                    requestHandler.getMetadataStore()                            .get(requestHandler.getGroupIdStoredPath() + groupIdPath)                            .thenAccept(getResultOpt -> {                                if (getResultOpt.isPresent()) {                                    GetResult getResult = getResultOpt.get();                                    future.complete(new String(getResult.getValue() == null                                            ? new byte[0] : getResult.getValue(), StandardCharsets.UTF_8));                                } else {                                    // 从zk节点 /client_group_id/xxx 获取不到消费组,消费组就是空的                                    future.complete("");                                }                            }).exceptionally(ex -> {                                future.completeExceptionally(ex);                                return null;                            });                    returnfuture;                });
        // this part is heavyweight, and we should not execute in the ManagedLedger Ordered executor thread        groupNameFuture.whenCompleteAsync((groupName, ex) -> {            if (ex != null) {                log.error("Get groupId failed.", ex);                groupName = "";            }.....            // 获得消费组名称后,记录消费组对应的消费指标            decodeResult.updateConsumerStats(topicPartition,                    entries.size(),                    groupName,                    statsLogger);
复制代码

代码的逻辑是,从 requestHandler 的 currentConnectedGroup(map)中通过 host 获取 groupName,不存在则通过 MetadataStore(带缓存的 zk 存储对象)获取,如果 zk 缓存也没有,再发起 zk 读请求(路径为/client_group_id/host-clientId)。读取到消费组名称后,用它来更新消费组指标。从复现的集群确定走的是这个分支,即是从 metadataStore(带缓存的 zk 客户端)获取不到对应 zk 节点/client_group_id/xxx。


2、查找可能导致 zk 节点/client_group_id/xxx 节点获取不到的原因

有两种可能性:一是没写进去,二是写进去但是被删除了。

    @Override    protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator,                                                CompletableFuture<AbstractResponse> resultFuture) {...        // Store group name to metadata store for current client, use to collect consumer metrics.        storeGroupId(groupId, groupIdPath)                .whenComplete((stat, ex) -> {                    if (ex != null) {                        // /client_group_id/xxx节点写入失败                        log.warn("Store groupId failed, the groupId might already stored.", ex);                    }                    findBroker(TopicName.get(pulsarTopicName))                            .whenComplete((node, throwable) -> {                                ....                            });                });...
复制代码

从代码看到,clientId 与 groupId 的关联关系是通过 handleFindCoordinatorRequest(FindCoordinator)写进去的,而且只有这个方法入口。由于没有找到 warn 日志,排除了第一种没写进去的可能性。看看删除的逻辑:

protected void close(){    if (isActive.getAndSet(false)) {        ...        currentConnectedClientId.forEach(clientId -> {            String path = groupIdStoredPath + GroupIdUtils.groupIdPathFormat(clientHost, clientId);            // 删除zk上的 /client_group_id/xxx 节点            metadataStore.delete(path, Optional.empty())                    .whenComplete((__, ex) -> {                        if (ex != null) {                            if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {                                if (log.isDebugEnabled()) {                                    log.debug("The groupId store path doesn't exist. Path: [{}]", path);                                }                                return;                            }                            log.error("Delete groupId failed. Path: [{}]", path, ex);                            return;                        }                        if (log.isDebugEnabled()) {                            log.debug("Delete groupId success. Path: [{}]", path);                        }                    });        });    }}
复制代码

删除是在 requsetHandler.close 方法中执行,也就是说连接断开就会触发 zk 节点删除。

但有几个疑问:

  • /client_group_id/xxx 到底是干嘛用的?消费指标为什么要依赖它

  • 为什么要在 handleFindCoordinatorRequest 写入?

  • 节点/client_group_id/xxx 为什么要删除,而且是在连接断开时删除,删除时机是否有问题?

首先回答第 1 个问题,通过阅读代码可以知道,/client_group_id/xxx 这个 zk 节点是用于在不同 broker 实例间交换数据用的(相当 redis cache),用于临时存放 IP+clientId 与 groupId 的映射关系。由于 fetch 接口(拉取数据)的 request 没有 groupId 的,只能依赖加入 Group 过程中的元数据,在 fetch 消费时才能知道当前拉数据的 consumer 是哪个消费组的。


3、复现

若要解决问题,最好能够稳定地复现出问题,这样才能确定问题的根本原因,并且确认修复是否完成。

因为节点是在 requsetHandle.close 方法中执行删除,broker 节点关闭会触发连接关闭,进而触发删除。假设:客户端通过 brokerA 发起 FindCoordinator 请求,写入 zk 节点/client_group

_id/xxx,同时请求返回 brokerB 作为 Coordinator,后续与 brokerB 进行 joinGroup、syncGroup 等交互确定消费关系,客户端在 brokerA、brokerB、brokerC 都有分区消费。这时重启 brokerA,分区均衡到 BrokerC 上,但此时/client_group_id/xxx 因关闭 broker 而断开连接被删除,consumer 消费刚转移到 topic1-partition-1 的分区就无法获取到 groupId。

按照假设,有 3 个 broker,开启生产和消费,通过在 FindCoordinator 返回前获取 node.leader()的返回节点 BrokerB,关闭 brokerA 后,brokerC 出现断点复现,再关闭 brokerC,brokerA 也会复现(假设分区在 brokerA 与 brokerC 之间转移)。

复现要几个条件:

  1. broker 数量要足够多(不小于 3 个)

  2.  broker 内部有 zk 缓存 metadataCache 默认为 5 分钟,可以把时间调小为 1 毫秒,相当于没有 cache

  3.  findCoordinator 返回的必须是其他 broker 的 IP

  4.  重启的必须是接收到 findCoordinator 请求那台 broker,而不是真正的 coordinator,这时会从 zk 删除节点

  5. 分区转移到其他 broker,这时新的 broker 会重新读取 zk 节点数据

到此,我们基本上清楚了问题原因:连接关闭导致 zk 节点被删除了,别的 broker 节点需要时就读取不到了。那怎么解决?


三、问题解决

方案一

既然知道把消费者与 FindCoordinator 的连接进行绑定不合适的,那么是否应该把 FindCoordinator 写入 zk 节点换成由 JoinGroup 写入,断连即删除。

consumer 统一由 Coordinator 管理,由于 FindCoordinator 接口不一定是 Coordinator 处理的,如果换成由 Coordinator 处理的 JoinGroup 接口是否就可以了,这样 consumer 断开与 Coordinator 的连接就应该删除数据。但实现验证时却发现,客户端在断连后也不会再重连,所以没法重新写入 zk,不符合预期。


方案二

还是由 FindCoordinator 写入 zk 节点,但删除改为 GroupCoordinator 监听 consumer 断开触发。

因为 consumer 统一由 Coordinator 管理,它能监听到 consumer 加入或者离开。GroupCoordinator 的 removeMemberAndUpdateGroup 方法是 coordinator 对 consumer 成员管理。

private void removeMemberAndUpdateGroup(GroupMetadata group,                                        MemberMetadata member) {    group.remove(member.memberId());    switch (group.currentState()) {        case Dead:        case Empty:            return;        case Stable:        case CompletingRebalance:            maybePrepareRebalance(group);            break;        case PreparingRebalance:            joinPurgatory.checkAndComplete(new GroupKey(group.groupId()));            break;        default:            break;    }    // 删除 /client_group_id/xxx 节点    deleteClientIdGroupMapping(group, member.clientHost(), member.clientId());}
复制代码


调用入口有两个,其中 handleLeaveGroup 是主动离开,onExpireHeartbeat 是超时被动离开,客户端正常退出或者宕机都可以调用 removeMemberAndUpdateGroup 方法触发删除。

public CompletableFuture<Errors> handleLeaveGroup(    String groupId,    String memberId) {    return validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).map(error ->        CompletableFuture.completedFuture(error)    ).orElseGet(() -> {        return groupManager.getGroup(groupId).map(group -> {            return group.inLock(() -> {                if (group.is(Dead) || !group.has(memberId)) {                    return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);                } else {                    ...                                    // 触发删除消费者consumer                    removeMemberAndUpdateGroup(group, member);                    return CompletableFuture.completedFuture(Errors.NONE);                }            });        })        ....    });}
复制代码


void onExpireHeartbeat(GroupMetadata group,                       MemberMetadata member,                       long heartbeatDeadline) {    group.inLock(() -> {        if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {            log.info("Member {} in group {} has failed, removing it from the group",                member.memberId(), group.groupId());            // 触发删除消费者consumer            removeMemberAndUpdateGroup(group, member);        }        return null;    });}
复制代码

但这个方案有个问题是,日志运维关闭 broker 也会触发一个 onExpireHeartbeat 事件删除 zk 节点,与此同时客户端发现 Coordinator 断开了会马上触发 FindCoordinator 写入新的 zk 节点,但如果删除晚于写入的话,会导致误删除新写入的节点。我们干脆在关闭 broker 时,使用 ShutdownHook 加上 shuttingdown 状态防止关闭 broker 时删除 zk 节点,只有客户端断开时才删除。

这个方案修改上线半个月后,还是出现了一个客户端的消费指标无法上报的情况。后来定位发现,如果客户端因 FullGC 出现卡顿情况,客户端可能会先于 broker 触发超时,也就是先超时的客户端新写入的数据被后监听到超时的 broker 误删除了。因为写入与删除并不是由同一个节点处理,所以无法在进程级别做并发控制,而且也无法判断哪次删除对应哪次的写入,所以用 zk 也是很难实现并发控制。


方案三

其实这并不是新的方案,只是在方案二基础上优化:数据一致性检查。

既然我们很难控制好写入与删除的先后顺序,我们可以做数据一致性检查,类似于交易系统里的对账。因为 GroupCoordinator 是负责管理 consumer 成员的,维护着 consumer 的实时状态,就算 zk 节点被误删除,我们也可以从 consumer 成员信息中恢复,重新写入 zk 节点。

private void checkZkGroupMapping(){      for (GroupMetadata group : groupManager.currentGroups()) {          for (MemberMetadata memberMetadata : group.allMemberMetadata()) {              String clientPath = GroupIdUtils.groupIdPathFormat(memberMetadata.clientHost(), memberMetadata.clientId());              String zkGroupClientPath = kafkaConfig.getGroupIdZooKeeperPath() + clientPath;              // 查找zk中是否存在节点            metadataStore.get(zkGroupClientPath).thenAccept(resultOpt -> {                  if (!resultOpt.isPresent()) {                      // 不存在则进行补偿修复                    metadataStore.put(zkGroupClientPath, memberMetadata.groupId().getBytes(UTF\_8), Optional.empty())                              .thenAccept(stat -> {                                  log.info("repaired clientId and group mapping: {}({})",                                          zkGroupClientPath, memberMetadata.groupId());                              })                              .exceptionally(ex -> {                                  log.warn("repaired clientId and group mapping failed: {}({})",                                          zkGroupClientPath, memberMetadata.groupId());                                  return null;                              });                  }              }).exceptionally(ex -> {                  log.warn("repaired clientId and group mapping failed: {} ", zkGroupClientPath, ex);                  return null;              });          }      }  }
复制代码

经过方案三的优化上线,即使是历史存在问题的消费组,个别分区消费流量指标缺少 group 字段的问题也得到了修复。具体效果如下图所示:


四、总结

经过多个版本的优化和线上验证,最终通过方案三比较完美的解决了这个消费指标问题。在分布式系统中,并发问题往往难以模拟和复现,我们也在尝试多个版本后才找到有效的解决方案。如果您在这方面有更好的经验或想法,欢迎提出,我们共同探讨和交流。

发布于: 刚刚阅读数: 5
用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020-07-10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
vivo Pulsar 万亿级消息处理实践(3)-KoP指标异常修复_Java_vivo互联网技术_InfoQ写作社区