写点什么

字节跳动基于 Doris 的湖仓分析探索实践

  • 2022 年 9 月 29 日
    北京
  • 本文字数:3407 字

    阅读完需:约 11 分钟

字节跳动基于Doris的湖仓分析探索实践

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群


Doris 简介

Doris 是一种 MPP 架构的分析型数据库,主要面向多维分析,数据报表,用户画像分析等场景。自带分析引擎和存储引擎,支持向量化执行引擎,不依赖其他组件,兼容 MySQL 协议。

Apache Doris 具备以下几个特点:

  • 良好的架构设计,支持高并发低延时的查询服务,支持高吞吐量的交互式分析。多 FE 均可对外提供服务,并发增加时,线性扩充 FE 和 BE 即可支持高并发的查询请求。

  • 支持批量数据 load 和流式数据 load,支持数据更新。支持 Update/Delete 语法,unique/aggregate 数据模型,支持动态更新数据,实时更新聚合指标。

  • 提供了高可用,容错处理,高扩展的企业级特性。FE Leader 错误异常,FE Follower 秒级切换为新 Leader 继续对外提供服务。

  • 支持聚合表和物化视图。多种数据模型,支持 aggregate,replace 等多种数据模型,支持创建 rollup 表,支持创建物化视图。rollup 表和物化视图支持动态更新,无需用户手动处理。

  • MySQL 协议兼容,支持直接使用 MySQL 客户端连接,非常易用的数据应用对接。

Doris 由 Frontend(以下简称 FE)和 Backend(以下简称 BE)组成,其中 FE 负责接受用户请求,编译,优化,分发执行计划,元数据管理,BE 节点的管理等功能,BE 负责执行由 FE 下发的执行计划,存储和管理用户数据。

数据湖格式 Hudi 简介

Hudi 是下一代流式数据湖平台,为数据湖提供了表格式管理的能力,提供事务,ACID,MVCC,数据更新删除,增量数据读取等功能。支持 Spark,Flink,Presto,Trino 等多种计算引擎。


Hudi 根据数据更新时行为不同分为两种表类型:


针对 Hudi 的两种表格式,存在 3 种不同的查询类型:

Doris 分析 Hudi 数据的技术背景

在数仓业务中,随着业务对数据实时性的要求越来越高,T+1 数仓业务逐渐往小时级,分钟级,甚至秒级演进。实时数仓的应用也越来越广,也经历了多个发展阶段。目前存在着多种解决方案。

Lambda 架构

Lambda 将数据处理流分为在线分析和离线分析分为两条不同的处理路径,两条路径互相独立,互不影响。

离线分析处理 T+1 数据,使用 Hive/Spark 处理大数据量,不可变数据,数据一般存储在 HDFS 等系统上。如果遇到数据更新,需要 overwrite 整张表或整个分区,成本比较高。

在线分析处理实时数据,使用 Flink/Spark Streaming 处理流式数据,分析处理秒级或分钟级流式数据,数据保存在 Kafka 或定期(分钟级)保存到 HDFS 中。

该套方案存在以下缺点:

  • 同一套指标可能需要开发两份代码来进行在线分析和离线分析,维护复杂

  • 数据应用查询指标时可能需要同时查询离线数据和在线数据,开发复杂

  • 同时部署批处理和流式计算两套引擎,运维复杂

  • 数据更新需要 overwrite 整张表或分区,成本高

Kappa 架构

随着在线分析业务越来越多,Lambda 架构的弊端就越来越明显,增加一个指标需要在线离线分别开发,维护困难,离线指标可能和在线指标对不齐,部署复杂,组件繁多。于是 Kappa 架构应运而生。

Kappa 架构使用一套架构处理在线数据和离线数据,使用同一套引擎同时处理在线和离线数据,数据存储在消息队列上。

Kappa 架构也有一定的局限:

  • 流式计算引擎批处理能力较弱,处理大数据量性能较弱

  • 数据存储使用消息队列,消息队列对数据存储有有效性限制,历史数据无法回溯

  • 数据时序可能乱序,可能对部分对时序要求比较严格的应用造成数据错误

  • 数据应用需要从消息队列中取数,需要开发适配接口,开发复杂

基于数据湖的实时数仓

针对 Lambda 架构和 Kappa 架构的缺陷,业界基于数据湖开发了 Iceberg, Hudi, DeltaLake 这些数据湖技术,使得数仓支持 ACID, Update/Delete, 数据 Time Travel, Schema Evolution 等特性,使得数仓的时效性从小时级提升到分钟级,数据更新也支持部分更新,大大提高了数据更新的性能。兼具流式计算的实时性和批计算的吞吐量,支持的是近实时的场景。

以上方案中其中基于数据湖的应用最广,但数据湖模式无法支撑更高的秒级实时性,也无法直接对外提供数据服务,需要搭建其他的数据服务组件,系统较为复杂。基于此背景下,部分业务开始使用 Doris 来承接,业务数据分析师需要对 Doris 与 Hudi 中的数据进行联邦分析,此外在 Doris 对外提供数据服务时既要能查询 Doris 中数据,也要能加速查询离线业务中的数据湖数据,因此我们开发了 Doris 访问数据湖 Hudi 中数据的特性。

Doris 分析 Hudi 数据的设计原理

基于以上背景,我们设计了 Apache Doris 中查询数据湖格式 Hudi 数据,因 Hudi 生态为 java 语言,而 Apache Doris 的执行节点 BE 为 C++环境,而 C++ 无法直接调用 Hudi java SDK,针对这一点,我们有四种解决方案:

  1. 实现 Hudi C++ client,在 BE 中直接调用 Hudi C++ client 去读写 Hudi 表。

该方案需要完整实现一套 Hudi C++ client,开发周期较长,后期 Hudi 行为变更需要同步修改 Hudi C++ client,维护较为困难。

  1. BE 通过 thrift 协议发送读写请求至 Broker,由 Broker 调用 Hudi java client 读取 Hudi 表。

该方案需要在 Broker 中增加读写 Hudi 数据的功能,目前 Broker 定位仅为 fs 的操作接口,引入 Hudi 打破了 Broker 的定位。第二,数据需要在 BE 和 Broker 之间传输,性能较低。

  1. 在 BE 中使用 JNI 创建 JVM,加载 Hudi java client 去读写 Hudi 表。

该方案需要在 BE 进程中维护 JVM,有 JVM 调用 Hudi java client 对 Hudi 进行读写。读写逻辑使用 Hudi 社区 java 实现,可以维护与社区同步;同时数据在同一个进程中进行处理,性能较高。但需要在 BE 维护一个 JVM,管理较为复杂。

  1. 使用 BE arrow parquet c++ api 读取 hudi parquet base file,hudi 表中的 delta file 暂不处理。

该方案可以由 BE 直接读取 hudi 表的 parquet 文件,性能最高。但当前不支持 base file 和 delta file 的合并读取,因此仅支持 COW 表 Snapshot Queries 和 MOR 表的 Read Optimized Queries,不支持 Incremental Queries。

综上,我们选择方案四,第一期实现了 COW 表 Snapshot Queries 和 MOR 表的 Read Optimized Queries,后面联合 Hudi 社区开发 base file 和 delta file 合并读取的 C++接口。

Doris 分析 Hudi 数据的技术实现

Doris 中查询分析 Hudi 外表使用步骤非常简单。

创建 Hudi 外表

建表时指定 engine 为 Hudi,同时指定 Hudi 外表的相关信息,如 hive metastore uri,在 hive metastore 中的 database 和 table 名字等。

建表仅仅在 Doris 的元数据中增加一张表,无任何数据移动。

建表时支持指定全部或部分 hudi schema,也支持不指定 schema 创建 hudi 外表。指定 schema 时必须与 hiveMetaStore 中 hudi 表的列名,类型一致。

Example:

   CREATE TABLE example_db.t_hudi     ENGINE=HUDI    PROPERTIES (    "hudi.database" = "hudi_db",    "hudi.table" = "hudi_table",    "hudi.hive.metastore.uris"  =  "thrift://127.0.0.1:9083"    );            CREATE TABLE example_db.t_hudi (    column1 int,    column2 string)    ENGINE=HUDI    PROPERTIES (    "hudi.database" = "hudi_db",    "hudi.table" = "hudi_table",    "hudi.hive.metastore.uris"  =  "thrift://127.0.0.1:9083"    );
复制代码

查询 Hudi 外表

  • 查询 Hudi 数据表时,FE 在 analazy 阶段会查询元数据获取到 Hudi 外表的的 hive metastore 地址,从 Hive metastore 中获取 hudi 表的 schema 信息与文件路径。

  • 获取 hudi 表的数据地址

  • FE 规划 fragment 增加 HudiScanNode。HudiScanNode 中获取 Hudi table 对应的 data file 文件列表。

  • 根据 Hudi table 获取的 data file 列表生成 scanRange

  • 下发 HudiScan 任务至 BE 节点

  • BE 节点根据 HudiScanNode 指定的 Hudi 外表文件路径调用 native parquet reader 进行数据读取。

后期规划

目前 Apche Doris 查询 Hudi 表已合入社区,当前已支持 COW 表的 Snapshot Query,支持 MOR 表的 Read Optimized Query。对 MOR 表的 Snapshot Query 暂时还未支持,流式场景中的 Incremental Query 也没有支持。

后续还有几项工作需要处理,我们和社区也在积极合作进行中:

  1. MOR 表的 Snapshot Query。MOR 表实时读需要合并读取 Data file 与对应的 Delta file,BE 需要支持 Delta file AVRO 格式的读取,需要增加 avro 的 native 读取方式。

  2. COW/MOR 表的 Incremental Query。支持实时业务中的增量读取。

  3. BE 读取 Hudi base file 和 delta file 的 native 接口。目前 BE 读取 Hudi 数据时,仅能读取 data file,使用的是 parquet 的 C++ SDK。后期我们和联合 Hudi 社区提供 Huid base file 和 delta file 的 C++/Rust 等语言的读取接口,在 Doris BE 中直接使用 native 接口来查询 Hudi 数据。


本文为字节跳动数据平台研发工程师在 DataFunSummit 大会演讲实录,关注字节跳动数据平台微信公众号,回复【0929】,领取本次分享 PPT。


立即跳转火山引擎E-MapReduce官网了解更多信息

用户头像

公众号byte-dataplatform 2021.12.29 加入

字节跳动数据平台团队,赋能字节跳动各业务线,对内支持字节绝大多数业务线,对外发布了火山引擎品牌下的数据智能产品,服务行业企业客户。同名公众号欢迎了解。

评论

发布
暂无评论
字节跳动基于Doris的湖仓分析探索实践_数据仓库_字节跳动数据平台_InfoQ写作社区