写点什么

Databend 源码阅读: Storage 概要和 Read Partitions

作者:Databend
  • 2023-11-16
    北京
  • 本文字数:2175 字

    阅读完需:约 7 分钟

Databend 源码阅读: Storage 概要和 Read Partitions

作者:张祖前

Databend Labs 成员,数据库研发工程师

https://github.com/zhyass


❤️ 友情提示:代码演进较快,请注意文档的时效性哦!

引言

Databend 将存储引擎抽象成一个名为 Table 的接口,源码位于 query/catalog/src/table.rs


Table 接口定义了 readappendalteroptimizetruncate 以及 recluster 等方法,负责数据的读写和变更。解释器(interpreter)通过调用 Table trait 的方法生成物理执行的 pipeline


通过实现 Table 接口的方法,可以定义 Databend 的存储引擎,不同的实现对应不同的引擎。


Storage 主要关注 Table 接口的具体实现,涉及表的元信息,索引信息的管理,以及与底层 IO 的交互。

目录

Read Partitions

以下以 fuse 引擎中 read partitions 的实现流程为例,简要分析 Storage 相关源码。


Partitions 的定义位于 query/catalog/src/plan/partition.rs


pub struct Partitions {    // partitions 的分发类型。    pub kind: PartitionsShuffleKind,    // 一组实现了 PartInfo 接口的 partition,    pub partitions: Vec<PartInfoPtr>,    // partitions 是否为 lazy。    pub is_lazy: bool,}
复制代码


Table 接口中的 read_partitions 通过分析查询中的过滤条件,剪裁掉不需要的分区,返回可能满足条件的 Partitions。


#[async_trait::async_trait]impl Table for FuseTable {    #[minitrace::trace]    #[async_backtrace::framed]    async fn read_partitions(        &self,        ctx: Arc<dyn TableContext>,        push_downs: Option<PushDownInfo>,        dry_run: bool,    ) -> Result<(PartStatistics, Partitions)> {        self.do_read_partitions(ctx, push_downs, dry_run).await    }}
复制代码


Fuse 引擎会以 segment 为单位构建 lazy 类型的 FuseLazyPartInfo。通过这种方式,prune_snapshot_blocks 可以下推到 pipeline 初始化阶段执行,特别是在分布式集群模式下,可以有效提高剪裁执行效率。


pub struct FuseLazyPartInfo {    // segment 在 snapshot 中的索引位置。    pub segment_index: usize,    pub segment_location: Location,}
复制代码


分区剪裁流程的实现位于 query/storages/fuse/src/pruning/fuse_pruner.rs 文件中,具体流程如下:


  1. 基于 push_downs 条件构造各类剪裁器(pruner),并实例化 FusePruner

  2. 调用 FusePruner 中的 pruning 方法,创建 max_concurrency 个分批剪裁任务。每个批次包括多个 segment 位置,首先根据 internal_column_pruner 筛选出无需的 segments,再读取 SegmentInfo,并根据 segment 级别的 MinMax 索引进行范围剪裁。

  3. 读取过滤后的 SegmentInfo 中的 BlockMetas,并按照 internal_column_prunerlimit_prunerrange_prunerbloom_prunerpage_pruner 等算法的顺序,剔除无需的 blocks。

  4. 执行 TopNPrunner 进行过滤,从而得到最终剪裁后的 block_metas


pub struct FusePruner {    max_concurrency: usize,    pub table_schema: TableSchemaRef,    pub pruning_ctx: Arc<PruningContext>,    pub push_down: Option<PushDownInfo>,    pub inverse_range_index: Option<RangeIndex>,    pub deleted_segments: Vec<DeletedSegmentInfo>,}
pub struct PruningContext { pub limit_pruner: Arc<dyn Limiter + Send + Sync>, pub range_pruner: Arc<dyn RangePruner + Send + Sync>, pub bloom_pruner: Option<Arc<dyn BloomPruner + Send + Sync>>, pub page_pruner: Arc<dyn PagePruner + Send + Sync>, pub internal_column_pruner: Option<Arc<InternalColumnPruner>>, // Other Fields ...}
impl FusePruner { pub async fn pruning( &mut self, mut segment_locs: Vec<SegmentLocation>, delete_pruning: bool, ) -> Result<Vec<(BlockMetaIndex, Arc<BlockMeta>)>> { ... }}
复制代码


剪裁结束后,以 Block 为单位构造 FusePartInfo,生成 partitions,接着调用 set_partitions 方法将 partitions 注入 QueryContext 的分区队列中。在执行任务时,可以通过 get_partition 方法从队列中取出。


pub struct FusePartInfo {    pub location: String,     pub create_on: Option<DateTime<Utc>>,    pub nums_rows: usize,    pub columns_meta: HashMap<ColumnId, ColumnMeta>,    pub compression: Compression,    pub sort_min_max: Option<(Scalar, Scalar)>,    pub block_meta_index: Option<BlockMetaIndex>,}
复制代码

Conclusion

Databend 的存储引擎设计采用了抽象接口的方式,具有高度的可扩展性,可以很方便地支持多种不同的存储引擎。Storage 模块的主要职责是实现 Table 接口的方法,其中 Fuse 引擎部分尤为关键。


通过对数据的并行处理,以及数据剪裁等手段,可以有效地提高数据的处理效率。鉴于篇幅限制,本文仅对读取分区的流程进行了简单阐述,更深入的解析将在后续的文章中逐步展开。

关于 Databend

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。


👨‍💻‍ Databend Cloud:databend.cn


📖 Databend 文档:databend.rs/


💻 Wechat:Databend


✨ GitHub:github.com/datafuselab…

发布于: 刚刚阅读数: 5
用户头像

Databend

关注

还未添加个人签名 2022-08-25 加入

还未添加个人简介

评论

发布
暂无评论
Databend 源码阅读: Storage 概要和 Read Partitions_Databend_InfoQ写作社区