写点什么

vivo Pulsar 万亿级消息处理实践 (2)- 从 0 到 1 建设 Pulsar 指标监控链路

  • 2025-06-12
    广东
  • 本文字数:3620 字

    阅读完需:约 12 分钟

作者:vivo 互联网大数据团队- You Shuo


本文是《vivo Pulsar 万亿级消息处理实践》系列文章第 2 篇,Pulsar 支持上报分区粒度指标,Kafka 则没有分区粒度的指标,所以 Pulsar 的指标量级要远大于 Kafka。在 Pulsar 平台建设初期,提供一个稳定、低时延的监控链路尤为重要。


系列文章:

  1. vivo Pulsar万亿级消息处理实践-数据发送原理解析和性能调优


本文是基于 Pulsar 2.9.2/kop-2.9.2 展开的。


一、背景

作为一种新型消息中间件,Pulsar 在架构设计及功能特性等方面要优于 Kafka,所以我们引入 Pulsar 作为我们新一代的消息中间件。在对 Pulsar 进行调研的时候(比如:性能测试、故障测试等),针对 Pulsar 提供一套可观测系统是必不可少的。Pulsar 的指标是面向云原生的,并且官方提供了 Prometheus 作为 Pulsar 指标的采集、存储和查询的方案,但是使用 Prometheus 采集指标面临以下几个问题


  1. Prometheus 自带的时序数据库不是分布式的,它受单机资源的限制;

  2. Prometheus 在存储时序数据时消耗大量的内存,并且 Prometheus 在实现高效查询和聚合计算的时候会消耗大量的 CPU。


除了以上列出的可观测系统问题,Pulsar 还有一些指标本身的问题,这些问题包括


  1. Pulsar 的订阅积压指标单位是 entry 而不是条数,这会严重影响从 Kafka 迁移过来的用户的使用体验及日常运维工作;

  2. Pulsar 没有 bundle 指标,因为 Pulsar 自动均衡的最小单位是 bundle,所以 bundle 指标是调试 Pulsar 自动均衡参数时重要的观测依据;

  3. kop 指标上报异常等问题。


针对以上列出的几个问题,我们在下面分别展开叙述。


二、Pulsar 监控告警系统架构

在上一章节我们列出了使用 Prometheus 作为观测系统的局限,由于 Pulsar 的指标是面向云原生的,采用 Prometheus 采集 Pulsar 指标是最好的选择,但对于指标的存储和查询我们使用第三方存储来减轻 Prometheus 的压力,整个监控告警系统架构如下图所示:


在整个可观测系统中,各组件的职能如下:

  • Pulsar、bookkeeper 等组件提供暴露指标的接口

  • Prometheus 访问 Pulsar 指标接口采集指标

  • adaptor 提供了服务发现、Prometheus 格式指标的反序列化和序列化以及指标转发远端存储的能力,这里的远端存储可以是 Pulsar 或 Kafka

  • Druid 消费指标 topic 并提供数据分析的能力

  • vivo 内部的检测告警平台提供了动态配置检测告警的能力


基于以上监控系统的设计逻辑,我们在具体实现的过程中遇到了几个比较关键的问题:


一、adaptor 需要接收 Pulsar 所有线上服务的指标并兼容 Prometheus 格式数据,所以在调研 Prometheus 采集 Pulsar 指标时,我们基于 Prometheus 的官方文档开发了 adaptor,在 adaptor 里实现了服务加入集群的发现机制以及动态配置 prometheus 采集新新加入服务的指标:


在可以动态配置 Prometheus 采集所有线上正在运行的服务指标之后,由于 Prometheus 的指标是基于 protobuf 协议进行传输的,并且 Prometheus 是基于 go 编写的,所以为了适配 Java 版本的 adaptor,我们基于 Prometheus 和 go 提供的指标格式定义文件(remote.proto、types.proto 和 gogo.proto)生成了 Java 版本的指标接收代码,并将 protobuf 格式的指标反序列化后写入消息中间件。


二、Grafana 社区提供的 Druid 插件不能很好的展示 Counter 类型的指标,但是 bookkeeper 上报的指标中又有很多是 Counter 类型的指标,vivo 的 Druid 团队对该插件做了一些改造,新增了计算速率的聚合函数。

druid 插件的安装可以参考官方文档(详情


三、由于 Prometheus 比较依赖内存和 CPU,而我们的机器资源组又是有限的,在使用远端存储的基础上,我们针对该问题优化了一些 Prometheus 参数,这些参数包括:

  • --storage.tsdb.retention=30m:该参数配置了数据的保留时间为 30 分钟,在这个时间之后,旧的数据将会被删除。

  • --storage.tsdb.min-block-duration=5m:该参数配置了生成块(block)的最小时间间隔为 5 分钟。块是一组时序数据的集合,它们通常被一起压缩和存储在磁盘上,该参数间接控制 Prometheus 对内存的占用。

  • --storage.tsdb.max-block-duration=5m:该参数配置了生成块(block)的最大时间间隔为 5 分钟。如果一个块的时间跨度超过这个参数所设的时间跨度,则这个块将被分成多个子块。

  • --enable-feature=memory-snapshot-on-shutdown:该参数配置了在 Prometheus 关闭时,自动将当前内存中的数据快照写入到磁盘中,Prometheus 在下次启动时读取该快照从而可以更快的完成启动。


三、Pulsar 指标优化

Pulsar 的指标可以成功观测之后,我们在日常的调优和运维过程中发现了一些 Pulsar 指标本身存在的问题,这些问题包括准确性、用户体验、以及性能调优等方面,我们针对这些问题做了一些优化和改造,使得 Pulsar 更加通用、易维护。


3.1 Pulsar 消费积压指标

原生的 Pulsar 订阅积压指标单位是 entry,从 Kafka 迁移到 Pulsar 的用户希望 Pulsar 能够和 Kafka 一样,提供以消息条数为单位的积压指标,这样可以方便用户判断具体的延迟大小并尽量不改变用户使用消息中间件的习惯。


在确保配置 brokerEntryMetadataInterceptors=

org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor 情况下,Pulsar broker 端在往 bookkeeper 端写入 entry 前,通过拦截器往 entry 的头部添加索引元数据,该索引在同一分区内单调递增,entry 头部元数据示例如下:

biz-log-partition-1 -l 24622961 -e 6Batch Message ID: 24622961:6:0Publish time: 1676917007607Event time: 0Broker entry metadata index: 157398560244Properties:"X-Pulsar-batch-size    2431""X-Pulsar-num-batch-message    50"
复制代码

以分区为指标统计的最小单位,基于 last add confirmed entry 和 last consumed entry 计算两个 entry 中的索引差值,即是订阅在每个分区的数据积压。下面是 cursor 基于订阅位置计算订阅积压的示意图,其中 last add confirmed entry 在拦截器中有记录最新索引,对于 last consumed entry,cursor 需要从 bookkeeper 中读取,这个操作可能会涉及到 bookkeeper 读盘,所以在收集延迟指标的时候可能会增加采集的耗时。



效果

上图是新订阅积压指标和原生积压指标的对比,新增的订阅积压指标单位是条,原生订阅积压指标单位是 entry。在客户端指定单条发送 100w 条消息时,订阅积压都有明显的升高,当客户端指定批次发送 100w 条消息的时候,新的订阅积压指标会有明显的升高,而原生订阅积压指标相对升高幅度不大,所以新的订阅积压指标更具体的体现了订阅积压的情况。



3.2 Pulsar bundle 指标

Pulsar 相比于 Kafka 增加了自动负载均衡的能力,在 Pulsar 里 topic 分区是绑定在 bundle 上的,而负载均衡的最小单位是 bundle,所以我们在调优负载均衡策略和参数的时候比较依赖 bunlde 的流量分布指标,并且该指标也可以作为我们切分 bundle 的参考依据。我们在开发 bundle 指标的时候做了下面两件事情:


统计当前 Pulsar 集群非游离状态 bundle 的负载情况对于处于游离状态的 bundle(即没有被分配到任何 broker 上的 bundle),我们指定 Pulsar leader 在上报自身 bundle 指标的同时,上报这些处于游离状态的 bundle 指标,并打上是否游离的标签。


效果


上图就是 bundle 的负载指标,除了出入流量分布的情况,我们还提供了生产者/消费者到 bundle 的连接数量,以便运维同学从更多角度来调优负载均衡策略和参数。


3.3 kop 消费延迟指标无法上报

在我们实际运维过程中,重启 kop 的 Coordinator 节点后会偶发消费延迟指标下降或者掉 0 的问题,从 druid 查看上报的数据,我们发现在重启 broker 之后消费组就没有继续上报 kop 消费延迟指标。


(1)原因分析

由于 kop 的消费延迟指标是由 Kafka lag exporter 采集的,所以我们重点分析了 Kafka lag exporter 采集消费延迟指标的逻辑,下图是 Kafka-lag-exporter 采集消费延迟指标的示意图:


其中,kafka-lag-exporter 计算消费延迟指标的逻辑会依赖 kop 的 describeConsumerGroups 接口,但是当 GroupCoordinator 节点重启后,该接口返回的 member 信息中 assignment 数据缺失,kafka-lag-exporter 会将 assignment 为空的 member 给过滤掉,所以最终不会上报对应 member 下的分区指标,代码调试如下图所示:



为什么 kop/Kafka describeConsumerGroups 接口返回 member 的 assignment 是空的?因为 consumer 在启动消费时会通过 groupManager.storeGroup 写入__consumer_

offset,在 coordinator 关闭时会转移到另一个 broker,但另一个 broker 并没有把 assignment 字段反序列化出来(序列化为 groupMetadataValue,反序列化为 readGroupMessageValue),如下图:



(2)解决方案

在 GroupMetadataConstants#readGroup-

MessageValue()方法对 coordinator 反序列化消费组元数据信息时,将 assignment 字段读出来并设置(序列化为 groupMetadataValue,反序列化为 readGroupMessageValue),如下图:


四、总结

在 Pulsar 监控系统构建的过程中,我们解决了与用户体验、运维效率、Pulsar 可用性等方面相关的问题,加快了 Pulsar 在 vivo 的落地进度。虽然我们在构建 Pulsar 可观测系统过程中解决了一部分问题,但是监控链路仍然存在单点瓶颈等问题,所以 Pulsar 在 vivo 的发展未来还会有很多挑战。

用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020-07-10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
vivo Pulsar 万亿级消息处理实践(2)-从0到1建设 Pulsar 指标监控链路_大数据_vivo互联网技术_InfoQ写作社区