腾讯云原生实时数仓建设实践
作者:龙逸尘,腾讯 CSIG 高级工程师
腾讯云原生实时数仓建设实践
实时数仓面临的挑战
实时数仓被广泛应用于腾讯各大业务,涉及的平台众多,从统计信息中可以看出,集群规模庞大,数据量极大。
复杂的使用场景和超大的数据量,导致我们在实时数仓的建设与使用过程中遇到许多挑战。
时效性
数仓使用者对时效性有非常强烈的诉求:希望查询响应更快,看板更新更及时,指标开发更快完成。因为时效性越高,数据价值也就越高。如何保障数仓的时效性是首要难题。
架构复杂度
如何在保障时效性的同时,降低架构复杂度以减少开发和维护成本,是需要重点考虑的问题。
数据准确性
保证数仓中数据和指标的准确性,不能存在异常或者错误,是对实时数仓的基础要求。
成本
在实际的生产使用中,计算与存储资源并不是免费的,如何优化实时数仓的使用成本也是值得关注的问题。
实时数仓的演进
实时数仓功能需求
总结来看,对于实时数仓,我们最期待的功能是以下四点:
保证全流程 T+0 级别时效性
统一并简化数仓架构
保障数据准确性
降低计算与存储成本
为了解决上述挑战,满足实时数仓的功能需求,我们进行了大量探索和实践,也见证了实时数仓架构的演进过程。
离线数仓
数据仓库一般采用分层架构来构建。在数仓发展的早期阶段,流处理引擎未成熟之前,离线数仓处于主流地位。
离线数仓采用 T+1 级别的离线 ETL 导入数据,在 ODS、DWD 和 DWS 层使用 Hive、Spark 等批处理引擎计算前一天的数据。为了提升时效性,在 ADS 层引入了 ClickHouse 提供秒级查询能力。然而总体上还是只能提供 T+1 级别的时效,无法满足我们对时效性的需求。
Lambda 架构
随着业务的发展,演化出了 Lambda 架构。Lambda 架构在原有离线数仓的基础上,增加了实时层。实时层从源头开始做实时化改造,完成数据的实时、增量计算。离线层与实时层并行运行,最终由统一的数据服务层将计算结果合并。
Lambda 架构充分发挥实时计算的优势,提升了数仓的时效性。同时,还可以利用离线批处理的数据重放能力保障数据的准确性,例如实时流处理遇到了故障导致输出的结果不够精确时,离线批处理系统可以把数据重跑一次,用更精确的数据来覆盖实时流结果。
但是 Lambda 架构的缺点也很明显。首先,它存在批处理和流处理两个相互独立的数据处理流程,同一套业务逻辑代码需要适配性的开发两次,开发成本高;而且需要同时维护实时和离线两套引擎,架构复杂,运维成本高。另外,Lambda 架构对存量数据更新不友好,需要重跑整个离线链路,消耗大量资源。
Kappa 架构
为了解决 Lambda 架构带来的诸多问题, Kappa 架构诞生了。
Kappa 架构将流和批融为一体,不再分为两条数据处理链路。数仓各层使用消息队列作为存储,数据经过 Flink 处理后通过消息队列传递,保障了 T+0 级别时效。
Kappa 架构解决了 Lambda 架构中离线层和实时层之间由于引擎不同,导致的开发运维成本高昂的问题,整体架构简洁明了。但 Kappa 架构也有其痛点。
Kappa 架构基于消息队列的数据回放能力以及流处理引擎提供的 Exactly-Cnce 语义完成历史数据的回溯,然而回溯过程中,流处理引擎的吞吐量是比不上批处理的,这可能导致一些延迟。
Kappa 架构强依赖支持回放的消息队列作为底层存储,为了保证作业崩溃或逻辑修改后可以随时回溯历史数据,消息需要有很长的保存期,大大增加了存储成本。
Kappa 架构数仓中间层没有采用可落盘的文件存储,当前无法使用 OLAP 引擎直接分析数仓中间层的数据,通常需要启动一个单独的作业来导出数据才能进行分析,灵活度欠佳。
Kappa 架构无法复用目前已经非常成熟的基于离线数仓的数据血缘、数据质量管理体系,需要重新实现自己的体系。
经过分析,我们发现 Kappa 架构的主要问题出在它的存储上,因此若想要对其优化,就需要更新其存储架构。那么是否存在一种存储技术,既能够实现分钟级到秒级的数据接入和处理,保障数仓时效性;又能实现数据的流批统一读写,简化数仓架构;还可以支持高效数据回溯和历史数据更新,保证数据准确性呢?
仔细研究后,我们发现基于 Iceberg 的数据湖能很好地满足我们的需求。
Iceberg 的引入
Iceberg 支持流式读写和批量读写,可以统一数仓架构。同时,Iceberg 可以通过小批量的数据增量读写,将数仓整体延迟减小到分钟级甚至是秒级。对于流作业崩溃等情形,可以借助 Iceberg 高效的历史数据回溯能力,从特定的快照开始重新消费数据。
Iceberg 还支持对大规模数据集进行更新删除,因此数据回溯时无需全量重新计算,降低了数据更新延迟。此外,它还支持超长的数据保存期,不必担心数据保存期过短,历史数据被清理而难以回溯等传统数据管道会遇到的难题。
而对于需要分析中间层的数据等需求,Iceberg 支持 parquet 等列式存储格式,因此可以使用常见的批处理分析工具来直接分析 Iceberg 数据文件,结合谓词下推等 OLAP 优化策略,非常高效、便捷。
Iceberg 可以在一定程度上代替 Kafka 等传统流式数据管道作为数仓存储。它的引入,弥补了 Kappa 架构的许多不足。
基于数据湖 Iceberg 的架构
基于数据湖 Iceberg 的实时数仓架构可以很好地满足我们对实时数仓的需求:
支持流式写入增量拉取,可以实现全流程 T+0 级的时效
实现存储层的流批统一,简化架构
完美支持数据高效回溯,保障数据准确性
相比于消息队列基于日志的存储架构,支持列式存储,大大降低存储成本
引入 Iceberg 后,实时数仓的发展已经进入了新的阶段,但当前方案也并非毫无缺陷。
云原生的价值
当前 Iceberg 底层存储仍然以 HDFS 为主,这种基于 Hadoop 生态与 HDFS 存储的的传统实时数仓体系,面临着存算耦合的问题,导致弹性能力不足、资源利用率低,同时存储与计算资源的错配使得成本居高不下。
而随着云原生技术的普及和落地,云原生架构的核心优势可以很好地解决传统实时数仓的痛点。
云原生架构,本质上就是存算分离的。底层是对象存储等分布式共享存储,上层是无状态的分布式共享计算池,借助 K8s 等资源编排引擎,实现计算资源的弹性伸缩。
借助云原生架构提供的弹性计算能力,我们能够削峰填谷地利用计算资源,业务繁忙时迅速增加资源,业务空闲时及时释放多余资源,可以极大提升资源利用率,节省大量成本。
同时,云原生带来的规模经济,使得存储等基础设施变得廉价,为用户提供了更多选择。拿对象存储来举例,相比于 HDFS,对象存储不存在 NameNode 单点问题,可以提供更大的容量和更低的成本,避免了自建存储集群的维护代价,因此受到越来越多用户的青睐。
可以说云原生架构相比于传统 Hadoop 生态架构,最具优势的点在于存算分离特性、资源弹性调度能力以及对象存储等新型基础设施提供的成本优势。
云原生实时数仓架构
在原数据湖数仓架构的基础上,基于云原生的实时数仓应运而生。我们充分发挥云原生的优势,引入 K8s 作为资源调度引擎,提供弹性计算能力,并将底层存储替换为对象存储,提供更大容量与更低成本。
云原生实时数仓充分采用存算分离思想,整个生态体系由计算层,存储层与公共服务组成。
存储层使用对象存储作为底层的数据存储,支持 Parquet、Avro 等存储格式,并基于 Iceberg 更有效率地组织数据。
计算层利用 Alluxio 缓存加速数据的读取,提供数据本地化能力,K8S 提供资源弹性调度的能力,支持上层的 Flink 与 ClickHouse 等计算引擎。
公共服务包括租户管理、权限管理、元数据管理、数据血缘、数据质量等。
云原生实时数仓可以很好地满足我们对实时数仓的功能需求,还提供了更强的弹性计算能力与更低的成本。
云原生实时数仓建设实践
云原生实时数仓建设实践核心是以下三个部分,弹性计算实践 Flink on k8s,存算分离实践 ClickHouse 以及弹性存储实践 Iceberg。
Flink on Kubernetes 实践
Flink on Kubernetes 面临的挑战
Flink 已经成为实时计算的事实标准,为了使其与 K8s 更好地结合,发挥 K8s 弹性调度的优势,社区已经有了很多实践,然而当前仍然存在着一些挑战。
Serverless
当前大多数 K8s 集群仍然是部署在物理机或者云虚拟机上,因此弹性能力受到集群机器的容量限制,无法真正做到 Serverless。
调度性能
对于云原生实时数仓场景,原生的 K8s 调度器无法提供所需的功能,比如保证多租户资源隔离、资源公平调度、资源优先级感知等,急需提升调度能力。
调优诊断困难
在 K8s 环境下,一旦 Flink 作业出现故障,需要面对的是成千上万个运行中的容器和复杂的网络环境。而且随着 pod 的退出,故障现场很可能丢失。这也导致 Flink on K8s 作业很难进行诊断与调优。
扩缩容速度
Flink 作业进行弹性扩缩容操作时,需要对作业进行重启,导致数据断流。用户希望能提升扩缩容速度,更快地启动作业,减少停流时间。
这些问题推动我们对 Flink on K8s 进行了进一步的优化。
Flink on Kubernetes 整体方案
首先看下 Flink on K8s 的整体方案,腾讯内部采用的是 Flink Application Mode on Native K8s 的架构。
Application Mode 允许用户作业的 JobGraph 在 Flink Master 中编译,在 JobManager 中运行 main 方法。因此无需将所有依赖下载到 Client 端,节省大量带宽的同时,将 client 端负载均匀分散到集群的每个节点上,使得 Client 更轻量化并且具有可扩展能力,能更好地适应 Native K8s 环境。
Flink on Kubernetes 实践 - Serverless
之前提到,当前大多数 k8s 集群仍然是部署在物理机或者云虚拟机上,需要提前购买并维护机器,导致弹性能力受到很大限制。另外机器存在闲置时间,降低资源利用率,增加了成本。
因此我们引入了弹性容器服务 EKS,用户无须管理任何计算节点,而是从整个腾讯的资源池中申请资源,这些资源是以 pod 形式交付的,按需申请,实时交付,能完美适配原生 K8s 的接口请求。
这种无服务器形态带来了灵活的弹性伸缩能力,真正做到 Serverless,为用户实现更高的资源利用率和更低的成本。
Flink on Kubernetes 实践 - 自定义调度器
针对 K8s 默认调度器的能力缺陷,我们实现了自定义调度器。
自定义调度器内部与 yarn 队列类似,可以对多租户进行资源隔离。管理员可以为各租户配置保障资源与最大资源,并且预留弹性资源以保证一定的弹性能力。多个租户可能同时发起作业调度请求,此时调度器会根据租户资源队列使用状况、租户优先级策略等信息,为多租户的作业进行有序调度。除此之外自定义调度器还实现了反亲和策略,会将 jobmanager 和 taskmanager 尽可能调度到不同的 k8s 节点上。
自定义调度器补足了 k8s 默认调度器的能力缺陷,满足了云原生实时数仓作业调度的需求。
Flink on Kubernetes 实践 - AutoPilot 自动诊断调优
为了增强 Flink on k8s 作业的诊断与调优能力,我们实现了 AutoPilot 机制。整体的架构如下图所示。
首先为了快速感知作业是否异常,我们设计了作业状态感知方案,包括以下要点:
扩展 Flink 内核,增加实时事件推送能力
除了采集 Flink 的各种指标,还采集 k8s 的 deployment 和 pod 事件
引入 LogListener 和 DiagnosisDelivery 采集作业现场
打造事件中心,事件中心采取主动拉取与被动接收推送相结合的方式,将多源的事件汇集成一个综合事件
通过作业状态感知,我们现在可以采集到作业几乎所有的运行状态与事件。但如果基于事后的观察来进行作业指标调优的话,得到的结果往往是滞后的,我们希望根据历史指标对作业未来的运行状态进行预测。因此我们会保存一段时间内的历史指标,根据历史指标与预测模型生成预测指标,并结合当前的实时指标,综合起来提供给 AutoPilot,AutoPilot 学习开发人员输入的经验与规则,对指标进行分析。它判断作业是否发生了异常,异常原因是什么;作业是否需要参数调优,要调整到什么配置;作业是否需要扩缩容,扩缩容的大小等。比如,发现作业发生重启,直接原因是 Checkpoint 超时,根本原因是作业中数据倾斜,修复方案就是提高作业并行度,随后进行行动,增加作业 TaskManager 数量。
开发人员通过制定各类规则,将他们的工作经验和知识沉淀到 AutoPilot 中。AutoPilot 可以极大减少人工的投入,将开发运维人员从繁琐的故障定位、作业配置调优等工作中解放出来。
Flink on Kubernetes 实践 - 加速作业扩缩容
引入 Auto pilot 之后,在作业的运行过程中,随着数据量和数据特征的改变,作业可能会发生多次自动扩缩容。用户希望能提升扩缩容速度,更快地启动作业,减少停流时间。
分析整个作业的调度流程,发现所有 TaskManager 的 Slot 分配并注册完成后,作业才能启动,而根据木桶效应,耗时最长的 TaskManager 就会成为短板,影响作业的整体启动时间。
TaskManager 注册阶段的耗时主要集中在 Pod 启动以及 Slot 注册两个阶段。
Pod 启动时需要下载作业镜像和依赖,整个过程主要受带宽与作业镜像的大小影响。首先通过弹性网卡(Elastic Network Interface, ENI)直接通信,减少带宽损耗;并且定制化 Flink 镜像,按需裁剪,获得最简单的镜像,减少下载内容的大小;另外将用户依赖与 Flink 镜像分离,采取多线程方式同时下载;此外还可以预加载作业依赖与镜像,延迟绑定 Pod。这些措施使得 Pod 启动耗时减少了 50% 左右。
而在 Slot 注册阶段,如果 JobManager 繁忙无法及时回应,可能导致注册失败。此时 TaskManager 会采取指数退避的方式,不断尝试重新注册 Slot。在此过程中 DNS 反解析的耗时较长,造成 JobManager 处理线程拥塞,导致注册失败。但是此处的反解析只是为了友好化日志打印,并不是一定要在注册 Slot 时进行。因此提供选项直接跳过,后续用到时按需加载。
除此之外我们还会针对慢节点申请冗余 TaskManager ,以提升 TaskManager 启动速度,已注册的 TaskManager 充足后及时释放冗余的节点,减少资源占用。
通过以上优化,我们成功将 99% 50CU 内的作业的启动时间降低到 1 分钟以内。
ClickHouse 存算分离实践
ClickHouse 面临的挑战
ClickHouse 采用 Shared-Nothing 架构,存储计算一体化。每个节点都有自己的本地存储,每个节点的计算资源只关注处理本节点存储的数据。正是因为 ClickHouse 的架构简单而纯粹,为其带来了强大的性能。
然而这种存储与计算资源耦合的设计也造成了使用的局限性。ClickHouse 集群新增节点后数据无法自动均衡,也没有办法简单地卸载掉多余的计算资源,最终导致 ClickHouse 不具备弹性计算的能力。
ClickHouse 存算分离方案设计
为了支持弹性伸缩,我们对 ClickHouse 进行了关键改造,将 ClickHouse 架构分为三层。
元数据服务层,主要存储 ClickHouse 集群的关键元数据,包括表的 Schema 数据 以及数据分布的映射关系。并且通过心跳机制排除失效节点,保证数据重分布。
计算层每个节点都是无状态的存在。每个计算节点都能完整执行 SQL,且具备本地缓存,以及运行所需的索引数据等。具备秒级弹性能力。
存储层使用云原生共享存储服务作为底层存储,例如对象存储 COS 等,提供全局一致的数据视角。
ClickHouse 每个实例都可以成为一个分片(Shard),ClickHouse 集群由若干个 Shard 构成,我们将 Shard 内部的数据切分成了更小的 Bucket。Bucket 和 Shard 的映射关系由元数据服务统一管理。弹性伸缩时, Bucket 与节点所属关系会重新计算。
左图集群缩容时 Shard3 被剔除,原先处于 Shard3 上的 Bucket6/7 将被重新分配到 Shard2 上;而右图集群扩容 新增 Shard4 时,将原先位于 Shard2 上的 Bucktet3 重新分配到新节点 Shard4 上。
Bucket 与节点的所属关系通过心跳告知现有节点,如果发现自己的数据分布关系有变化,则会直接去元数据服务层取出对应 Bucekt 的元数据并加载。
存储层基于云原生的共享存储 COS,提供全局一致的数据视角。由于 Bucket 的真实数据都保存在 COS 中,因此扩缩容只需要加载 Bucket 的元数据,并不需要对数据进行复制与同步,实现了数据的 Zero-Copy。基于 COS 的存储层也带来了可观的成本降低。
上述改造使得 Clickhouse 具有秒级弹性伸缩能力,同时降低了成本,可以在云原生实时数仓的 OLAP 分析中发挥重要作用。
Flink ClickHouse Connector 设计
为了使得 ClickHouse 可以更好地与 Flink 结合使用,我们对 Flink ClickHouse Connector 也进行了改进与优化。
首先基于 FLIP-27 构造了 ClickHouse Source,将 ClickHouse 的读取任务也使用 Flink 来完成,可以更优雅地完成 ClickHouse 集群间的数据复制与同步工作,也便于在实时开发平台管理任务。
写入 ClickHouse 时,既可以写分布式表,也可以直接写本地表。为了减少数据延迟、节省网络带宽,我们事先获取各节点的连接地址,通过写本地表的方式直接写入各个分片,支持随机、轮询、散列等多种写分片方式。
另外我们注意到,流式数据通常会包含大量的更新和删除操作。为了支持频繁变更的数据,可以将 Flink 的 Retract Stream(回撤流)、Upsert Stream(更新-插入流)等含有状态标记的数据流,写入到 ClickHouse 的 CollapsingMergeTree 引擎表中,实现数据更新的语义。
考虑到 ClickHouse 擅长大批量写入的特点,还需要对 Flink ClickHouse Sink 增加攒批写入的支持,避免频繁写入造成的性能下降问题;此外还有故障重试策略、Flink 与 ClickHouse 之间的 SQL 类型映射等需要关注的点。
这些优化使得 Flink ClickHouse Connector 的性能和稳定性得到提升。
Iceberg 结合对象存储的实践
Iceberg 面临的挑战
基于对象存储的 Iceberg 面临的挑战可以总结为以下几点:
底层存储局限:对于海量数据存储的场景,HDFS 和对象存储都存在局限性。
小文件问题:Iceberg 采用实时方式写入会导致大量小文件的生成,大量小文件会影响存储性能。
查询性能不足:对象存储对于海量文件的操作能力有限,导致查询能力不足。如何加速 Iceberg 的查询也是值得探讨的问题。
针对以上的挑战,结合实时数仓数据存储的特征,腾讯大数据团队在 Iceberg 结合对象存储方案的稳定性和性能方面做了大量的优化。
Iceberg 结合对象存储的优势
首先我们进行了底层存储的替换,将 HDFS 替换为对象存储。相较于传统的 HDFS 存储,对象存储不存在 Namenode 的单点问题,可以支持海量文件的存储,同时允许存储容量和计算引擎各自独立地进行扩展,并且采用按量计费方式,成本可控。
然而对象存储也存在一定的局限。例如没有原生的 Rename 语义、List 操作性能弱、失去数据本地化优势导致查询缓慢等。
Iceberg 在数据组织方式上充分考虑了对象存储的特性,补齐了对象存储能力的不足。支持多版本无需进行 Rename 操作;自身存储分区文件列表避免耗时的 Listing 操作;引入稀疏索引,结合谓词下推,减少文件扫描量,加速查询。
这些设计使得 Iceberg 在与对象存储的适配上更有优势。
Iceberg 实践 - 数据优化服务
为了应对 Flink 实时、大量并发写入 Iceberg 导致的小文件问题,我们提供了数据优化服务,包括实时小文件合并、过期快照清理、遗留文件清理三种服务。
通过小文件合并服务,将文件数控制在一个比较稳定的范围内。同时通过过期快照清理、遗留文件清理保证数据的有效性。
数据优化服务可以增强 Iceberg 的可用性,提升用户体验。
Iceberg 实践查询三级加速
为了提升基于对象存储的 Iceberg 的查询性能,我们针对实时数仓的数据 IO 特点做了进一步性能优化,采取三级加速,分别在计算端、可用区端、存储端提供了性能加速能力。
在数据 Scan 阶段,可以用很少的缓存来撬动很好的加速效果。因此在计算端引入了缓存,与计算节点混合部署,利用计算集群本地的内存或磁盘资源缓存热数据,提供 Data Localization 能力,利用高速缓存功能解决存储性能问题。
可用区端引入数据加速器,这是在各可用区单独部署的高速缓存集群,采用全 SSD 存储介质,提供超大带宽与超低时延,加速同可用区的数据处理与查询,减少数据复制等场景的延迟。
而在数据端,在对象存储之上,我们构建了可扩展的元数据服务,为上层计算业务提供兼容 HDFS ⽂件系统语义的元数据操作能⼒。有了元数据加速能力的加持,就可以直接将对象存储当做 HDFS 用,用文件系统语义来访问对象存储服务。一方面,这一能力极大地提升了 List 等大数据文件系统操作的性能;另一方面,也提供了 Rename、Truncate 等典型的文件系统操作指令,提供了大数据生态兼容支持。
三级加速位于 Iceberg 和对象存储 COS 之间,将数据从对象存储移动到距离数据应用更近的位置,使数据更容易被访问到。这种层次化的加速架构,使得基于对象存储的 Iceberg 的查询性能比原生方案具有显著提升。
云原生实时数仓收益总结
云原生实时数仓建设的收益可以总结为以下几点:
云原生实时数仓实现存算分离,将存储与计算资源解耦,提升了架构可拓展性与灵活性
基于 K8s 提供弹性计算能力,按需使用存储与计算资源,实现自动、弹性扩缩容
存储与计算按使用量付费,并且使用对象存储代替传统 hdfs 存储,大大降低了使用成本。
云原生实时数仓展望
当前腾讯云原生实时数仓建设取得了一些成果,也将在未来进行进一步的升级优化
我们希望对实时数仓的流批一体能力进行持续演进,提升各个数仓组件的内核能力,适配 AI、物联网等更多场景。
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 产品在实时数仓领域长期深耕,也将批流融合数仓也作为重点发展方向。在不久的将来,流计算 Oceanus 会提供全套实时数仓构建的解决方案,助力企业数据价值最大化,加速企业实时化数字化的建设进程。
腾讯云大数据
长按二维码关注我们
版权声明: 本文为 InfoQ 作者【腾讯云大数据】的原创文章。
原文链接:【http://xie.infoq.cn/article/2e1ca668a1ab6e4c3f3358564】。未经作者许可,禁止转载。
评论