速度提升 10 倍,腾讯基于 Iceberg 的数据治理与优化实践

导读:本文主要介绍腾讯是如何基于 Apache Iceberg 进行数据的入湖、治理以及后面的一些优化。将从数据入湖、数据治理服务、数据查询优化以及未来展望四个方面展开介绍。
1 数据入湖
本部分主要介绍 Apache Iceberg 基本概念以及结合 Flink 构建实时数据入湖链路。
1. Apache Iceberg 是什么?

iceberg 其实就是在存储和计算层之间的一个表格式,表格式的作用主要是对计算引擎提供一个访问存储层的接口,能够提供一些 ACID 语义和 MVCC 的能力,以及像历史回溯之类的功能。像传统的 Hive 表都会带一些 partition 或者是数据的格式、压缩格式、目录信息等,但这些信息都存储在 Hive Metastore 里,这里也可以将 Metastore 理解为一种文件的组织格式。从图中可以看到,下层存储层这块是一个比较开放的存储层,可以支持传统的 HDFS、对象存储,文件格式也是支持行和列。
2. Apache Iceberg 的特性
基于快照的读写分离和回溯
流批统一的写入和读取
不强绑定计算存储引擎
ACID 语义以及数据多版本
表,模式及分区的变更
Iceberg 很重要的一个特性是支持快照的读写分离回溯以及不绑定任何计算存储引擎,可以方便用户快速接入自己的存储引擎,比如 Spark、Flink 或者 Presto,在 Iceberg 上用的时候可以基于 Iceberg 的 API 做一些 connector。还有一个很重要的功能就是支持 ACID 语义及数据多版本控制,并且也支持表、模式及分区的一个变更。这些功能对于后面我们来构建准实时的数据入湖是一些非常重要的特性。
3. Apache Iceberg 文件组织

图中上层可以看到 Commit 的一个 Timeline,Iceberg 每次写 Commit 操作都会生成一个新的 Snapshot,比如 snapshot-1 其实是包含 snapshot-0 的所有数据,这里可以想象我们平时用 git 的时候,git 的每一次新的提交都包含之前提交的所有信息,通过 git log 就可以看到,这里我们也可以这么理解 snapshot 的 Timeline。下层是 Iceberg 基本的文件组织格式,每一个 manifest 管理 n 个 DataFiles,DataFiles 在 manifest 记录的是 DataFiles 的路径信息。通过这样的一个文件组织格式,在读取的时候就可以很方便地做 Commit TimeLine,比如说现在是 11 点,Commit 的是 Snapshot-1,如果想读 snapshot-0 的话其实只需要指定 Snapshot-id,就可以很方便的实现数据的回溯。
4. Apache Iceberg 读写流程

接下来这个是在写 Iceberg 的时候的一个简单读写过程。从这个链路上我们可以看到每次 Iceberg 的写操作,比如说正在写第一个 S1 的时候是没有办法读的,只有发生了 Commit C 之后 S1 才是可读的,这个时候如果有 n 个线程同时在读但有一些线程在写的时候,可以做到只有 commit 完整的数据之后,对用户的读操作才能被用户的读线程所看到。这也是 Iceberg 里面非常重要的特性,即读写分离。在对 S4 进行写操作的时候,S3、S2、S1 的读操作是不受影响的,同时这个时候 S4 是没有办法读得到的,只有 Commit 之后 S4 才能够读得到。Current Snapshot 这时候就会指向 S4,默认 Iceberg 读的时候都是从最新的 Current Snapshot 开始读 Iceberg 的数据。那么如果要读前面的数据其实就可以指定 Snapshot 的 id 去进行数据回溯的读。
5. 增量读取

我们前面知道 Iceberg 每个不同的 Snapshot 都包含了之前的所有数据,比如说像图中 S2 是包含了 S1 的数据,在每次读取的时候就可以指定 S1 之间的增量即紫色这部分的数据,不需要每次重复地读全量的数据。增量数据在后面构建准实时的入湖链路是非常重要的,因为从构建一个 Flink 的 job 比如从 Kafka 写入数据到 Iceberg,写入之后下游的任务可以继续读 Iceberg,读的时候就可以选择增量的读取,在整个链路上就可以实现实时的入湖链路。
6. Apache Iceberg Flink Sink

在 Iceberg 实时入湖的链路上我们用的是现在比较流行的实时计算 Flink,我们知道上游 InputStream 是会源源不断地往下游去写的,如果在写的时候不做多个并发写的话对整个性能会有非常大的影响,因此把 Iceberg Flink Sink 拆成 Writer 和 Commiter 两个部分。那么为什么只有一个 Commiter 呢?我们知道 Iceberg 的 commit 操作其实要到 Hive MetaStore 去获得一个锁,如果进行多个 commit 的话,每个 commit 都会到 MetaStore 获取那个锁,对 MetaStore 来说不管有多少 commit 操作都会进行排队。所以这里只有一个并发 commit 是为了让 Iceberg 前面 n 个 Writer 所写的数据一次性从不可见的状态变成可见的状态。其实到 commit 状态的数据已经都到了存储上了,只是现在的状态是不可见,这对准实时的数据接入有非常大的帮助,比如说 HDFS 写数据的时候需要在一个 temp 里面把整个 temp 目录 move 到可见的目录上,这里其实数据已经全部都写到存储上了,所做的操作仅仅是把它的状态从原来的不可见状态变成了可见的状态,也就是前面我们所说的每个 snapshot commit 操作。
7. 近实时数据入湖

上图是我们内部大量采用的 Iceberg 实时入湖的一个简单链路。上游消费 Kafka 或者 Binlog 的数据,中间采用 Flink 将数据写入到 Iceberg,下游可以基于 Iceberg 基础上继续再接 Flink 去做一些 ETL 或者其他的操作,也可以直接在 Iceberg 的基础上跑 Spark 或者 Presto 的一些任务。
8. 实时入湖平台化建设

这块是我们内部在整个实时入湖链路的基础上为了方便用户而构建的一个任务管理平台,可以非常方便地帮助用户去新建一个端到端的入湖任务,也可以看到一些任务运行的状态。
2 数据治理服务
我们知道 Flink 任务跟传统的任务不同的是,Flink 任务是一个实时任务,实时任务的特点是常驻性,起了一个 Flink 任务就长时间运行,理想状态下是不会中断的。这种情况下,上游数据是源源不断地进来的,Flink 任务会源源不断地进行 Commit 操作,如果对数据的实效性要求比较高的话,比如说 Flink 任务运行的时间是一分钟、五分钟或者是十分钟级别。当运行了几天或者是一两个星期之后,在磁盘上发生 Commit 的次数就会非常地多,如果根据 partition 进行分区的话,磁盘上的文件数量会膨胀的非常大。如果是传统的批任务的话,跑完一批之后在后面再跑一次 compaction 任务进行 compact。实时任务因为是不中断的,所以就会遇到小文件数量膨胀、元数据膨胀等的一些问题。
1. 实时数据入湖遇到的问题

我们知道实时任务为了保证实时性进行高频的 commit 操作引起的小文件数量以及元数据数目膨胀引起查询性能的降低,还有数据本身缺乏生命周期管理。有时候写很多的数据到 HDFS 上,一段时间就要根据实际业务场景的需求对过去的数据进行清理,比如清理掉两个星期前的数据,这时候就需要额外的服务化平台去帮助用户去做这个事情。还有一点就是数据的实时写入并不能够根据用户真实的查询条件进行分布,因为写入只能根据写入的条件去写入,但是查询条件比如说 where 或者过滤的条件可能是不一样的,这个时候如果某些查询经常频繁发生的话,就会导致访问这 n 个节点的查询性能不太高,后面的服务也需要对数据做一个合理的重分布。
2. 不合适的小文件合并方案

我们在 FlinkIcebergSink 这边尝试了很多小文件合并的方案。我们知道 Flink Sink 上游每次都会做 commit 操作告诉当前 commit 操作是 commit 到了哪个 Snapshot,snapshot 里面增加了哪些文件,这些文件其实都是当前 Snapshot 里面的。比如说真实的数据文件在下游再接上一个 operator 的话,就可以对每次 commit 操作的文件进行 compaction 的操作。这里的 rewrite 也是同样的道理,比如上游 commit 了 90 个文件,假设 rewrite 分到了 30 个文件,会对这 30 个文件进行 rewrite 操作把这 30 个文件 rewrite 成一个文件,把文件 rewrite 成一个文件的时候就会把 rewrite 的文件数量告诉下面的 replace,replace 知道当前的事物比如说新增 30 个 rewrite,就会对 30 个 rewrite 文件再次进行 commit 操作,也就是说 replace 和 sink 其实做的都是 commit 操作,只是 replace commit 的是 rewrite 的结果,而 sink commit 的是上游写下来的数据。replace 之后生成新的 snapshot 的文件数量就是 3 而不是之前的 90。这个合并方案我们之前也做过非常多的尝试以及生产和测试环境的大量测试,实际证明这个其实是不合适的。因为下游所有的逻辑都是跟着 Flink 的任务走的,下游的不管是 replace、rewrite 或者是 ScanTaskGen 都要占用 Flink TaskManager 的计算资源。在计算资源有限的情况下,在后面再接上任务的 compaction 写一些任务的话都会大量的占用整个集群的计算资源。如果是同步的任务,下游的 rewrite 都会阻塞掉上游数据的输入,假设把它改造成异步的去跑后台的线程,后台的线程也要占用 task 的计算资源,所以整体在生产上面通过观察发现,每次如果 rewrite 操作的时候整个集群主链路上的数据处理都会受到大大的影响,我们为了保证用户对小文件合并的透明,就想到了要提出一个完整的数据治理的服务。
3. 架构总览

上图是我们数据治理的整体架构,中间蓝色的四个框是我们主要要做的业务逻辑,我们把数据治理服务主要分为四大块,第一块是 Compaction Service,Compaction Service 是为了解决小文件过多的问题,我们知道 Iceberg 是读写分离的,我们可以对 Iceberg 实时链路上写到磁盘上的一些小文件进行异步的 compaction,这个 compaction 需要独立的一部分计算资源。这样的话,计算资源能够帮助解决一些小文件的问题,又不会影响到主链路上的数据。Compaction Service 服务会定期的根据 Iceberg 上游的表决定进行多大力度的合并,比方说文件合并的 target 是 128M,每个 snapshot 文件数量是 n,那就会根据这样的数值去判断当前的 Iceberg 写入 snapshot 里面这些值的状态是多少,以此决定要不要去触发异步文件 compaction 的操作。Expiration Service 是为了定期清理 snapshot,比如说现在的 snapshot 里面的文件是 1、2、3,合并之后的文件假设是 1+2+3=6,我们知道 6 其实是包含 1、2、3 的一些数据,那么现在 6 的 snapshot 的数据就跟 1、2、3 的 snapshot 数据是重复的,在磁盘上是存在 Double 的数据,这个时候就需要定期的跑 snapshot 把合并之前的那些 snapshot 数据进行定期的清理操作,删除一些冗余的数据可以大大的减少存储压力。第三个就是 Clustering Service,对某些查询比较频繁的操作可以通过 Clustering Service 进行数据的重分布,比如说根据某些查询的列进行数据的一些聚合,这样的话在某些查询经常发生时尽可能避免扫描过多的文件,对查询的性能会有极大的提升。最后是 Cleaning Service,针对某些用户会有一些对过去数据的清理操作,后台会有一个 Service 根据用户会去配置表里,比如说配置表里面的 TTL 是 3 天或者 30 天,Cleaning Service 就会定期地根据用户配置的去清理这些过期的数据,这点比较类似 kafka,kafka 也有一些类似数据超时的机制。
4. 总体流程

上图是整体数据的流程,可以看到 Compaction 整个服务中的数据流。我们先从 Compaction 服务来简单的介绍,比如说用户对 Iceberg 进行操作,我们在 Iceberg 接口这边已经为 Iceberg 实现了 Iceberg 的 Metrics 汇报到外接系统的功能,首先 Metrics 的 Reporter 会将 Iceberg 的一些建表、删除、更新或者任何 Commit 操作所产生的 snapshot 创建 snapshot 的 summery 汇报到 iceberg 的 Metrics Event Handler 那边,Metrics Event Handler 接收到不同的事件之后会根据不同事件的类型将这些事件存储到 MySQL。这里我们做了一个改造采用一个消息队列来保证事件的时效性,并且对消息队列里面的数据定期的保存在 CheckPoint 中。我们知道表其实是有两种状态,DDL 状态或者 DML 状态,表的一些基础的记录信息比如表在 compact 之前/后的文件数量、以及表的文件数、操作的类型比如新建、commit、delete 这些表所能提供的一些 metrics 信息,当数据通过消息队列发送到中间阶段的时候,中间阶段内部有个规则管理器会去配置大量的规则,比如一些用户希望表在每产生 100 个 10M 文件的时候就进行一次合并。这些规则的接口其实是开放给用户去配置的,这些接口配置之后会将配置传给下游的任务调度器,任务读取器会读取上游发送过来的一些规则,以决定现在要根据这些规则去起一个什么样的任务。图中我们可以看到下游会有很多不同的任务,比如 JOB1、JOB2、JOB3 这些任务目前是采用离线的 Spark 任务去跑上游发送过来的信息。执行的频率有 5 分钟、10 分钟和 60 分钟,主要就是根据用户所配置的表。用户的表里面如果要保留过去 100 个文件,这个时候在监控里面看到用户会一直频繁地在提交,那么在单位时间内所产生的文件数量会非常非常的多,这个时候就需要更低的频率比如 5 分钟去对用户的表执行一次 compact 操作。有些用户比是 10 分钟或者 20 分钟才 commit 一次,这个时候可能只需要跑小时级别或者跑 5 个小时调度一次去做这个文件的处理,这边是针对不同用户的表的一些 metrics 的情况来决定应该将用户的表放给哪一个粒度的调度任务去执行。那么我们知道每个 job 可以看到很多用户的表,一个 job 可能会处理三四个表,每次处理完之后会将这三四个表的处理逻辑通过 Metrics System 消息队列再反馈给刚刚我们记录的 MySQL,然后再通过 Grafana 或者一些监控的工具就可以看到整个任务的 compaction 的运行情况,包括 compact 之后表是什么状态这里都可以看得到。还有一个记录重要的点是每次 compact 的表任务执行的过程中花了多少时间,这样就可以通过 Job Handler 动态地调整每个 Job 所负责的表的数量,比方说一个 job 执行的表是 1、2、3 三个表,发现 1 表跟 2 表执行 compact 任务花了 10 秒钟就执行完了,3 表执行了 5 分钟,因为整个任务是并发提交的,所以需要等到第三个表执行完之后这个任务才能够继续调度下一次。这个时候就可以在下次调度的时候把 3 表调度到其他的任务区,1 表就可以在一分钟之内进行不断地做文件数量的处理。
5. 实践效果

对于用户来说要使用 compaction 服务其实是非常简单的,只需要创建一个表然后在表里面配置文件处理的参数,图中表示的是 snapshot 保留过去 10 分钟的 snapshot,或者是过去 10 个 snapshot 的数量,metadata 的文件保存过去 10 个 metadata 文件,每新产生 5 个 snapshot 就触发一次 rewrite 操作。这个时候用户只需要去配置后端的 metrics 的汇报和文件的 compaction 以及文件的一些 expiration,这些所有的动作在这个时候全部对用户是透明的,用户只需要去配置这个表,后面我们的服务都会自动地帮用户去做好。

接下来可以看到整体的数据文件和 meta 文件数量,根据刚刚我们配置的值,如果在长时间运行 compaction 之后是能够控制在一个比较合理的范围。我们可以看到下面 meta 文件夹里面放的其实是 iceberg 的 meta 信息,包括像 m0、avro 这些都是 snapshot 的信息。上层的 parquet 是真实的数据文件,我们可以看到第二位有 140、269、286 这些的文件其实都是执行的 compact 之后的 rewrite 之后的文件。
3 数据查询优化
从刚刚这个合并里面我们可以知道,在做 rewrite 的时候只是把这些文件进行简单的重写,比方说将三个文件写成一个文件,对整个的查询性能其实是已经能够得到一定的提升,因为相当于扫描的文件数量得到大大的降低,但是如果说真的要对某些频繁发生的查询性能进一步优化的话,这样是远远不够的。所以我们接下去会介绍我们在数据查询优化方面所做的一些工作,首先介绍基于空间曲线算法优化 iceberg 的数据查询效率。
1. 空间填充曲线简介

首先介绍一下什么是空间查询曲线?
在数学分析里面,空间填充曲线是一个参数化的组合函数,会将单位区间内的区间映射到单位的正方形或者立方体中。比如说在一个二维的空间里,可以通过一维的一条曲线穿过这个空间里面的每一个点,直到填充满整个二维的空间平面,如果曲线所填充的粒度越来越密的话,其实整个二维平面会被填充满,这个是数学的一个重要特性。查询优化这块其实就是基于这样的特性,我们可以看到空间填充曲线的话,因为在二维平面里面它经过了二维平面的每一个点所以我们就可以将整个二维平面的空间降成一维,将多维的一个空间点转化成一维对于后面的数据查询优化算法是非常大的帮助,今天我们讲的一个重点其实是利用了图中第四个 Z-Order 的算法。
2. GeoHash 算法介绍

我们知道 GeoHash 算法就是基于 Geo 的特性来做的,我们可以看到图中 GeoHash 算法其实是一个地理位置编码将空间分成一个网格,在网格中可以定位某一些点以及哪些点离这个点最近。这个算法常用的一个场景是点评、外卖查看附近有多少外卖商家。从图中我们可以看到,对于生成一个 z order 地址来说,比如说黑色的虚线所画的这块,四个地址我们就可以认为是靠的比较近的。一个点附近 hash 字符串如果前缀是一样的我们就认为它的点是靠的比较近的,我们可以看到每个虚线框里面的前缀都是 100,通过这样的一个 z 地址可以把二维平面里面的数据进行降维,可以让降维之后的距离变得比较近,通过这种算法我们就可以很方便的进行多维数据的聚合操作。
3. 为什么需要多维数据聚合

我们知道在 N 列数排序的时候比如 order by FirstName,第一列的效果往往是比较好的,越往后可能效果会越差,到 n 列之后整个数据可能就是离散的。如果查询条件比较多的情况下,文件过滤效果是比较差的,因为可能需要扫描表的所有数据才能去读。数据如果呈现自然聚集的话会有几个特点,比如单调递增的 id 或者其他根据数据写入的时间或是写入前对数据进行的排序,这在这个例子中可以看到越往后几列数据越乱。
4. Iceberg 表多维聚合

同理,我们在 iceberg 表中间做多维聚合时,首先将不同的 snapshot 的文件进行合并写入小文件,然后进行 optimize 优化数据的分布,其实就是刚刚我们说的基于 z order 算法。我们将原先分散在集群中不同地方的文件进行重分布,这样在查询的时候只需要根据查询 optimize 之后的结果文件就可以了。在这个例子中绿色的点可以理解为是符合过滤条件的,红色是不符合过滤条件的,过滤条件指的是在做 where 的时候的过滤条件。可以看到在 Snapshot N 的时候数据是处于上游写入的状态,在第二个阶段的时候进行 optimize 的时候可能是第一次进行 optimize 操作它的 strategy 是 all 需要扫描所有的文件,这个时候不符合过滤条件的都被聚集到 m1 和 m4,m1、m4 里面都是红点不符合聚合条件。当然它也不能保证所有的红点都 scan 到某些文件,因为数据要保证相同的过滤条件尽可能的聚集在一个文件里。在第二个阶段比如说这个时候会在第一次进行 optimize 之后还会进行一次写,因为上游只要是实时的就会不断的往 Snapshot 里面写入数据,比如 f2001 到 f3000 这一段写了 n 个数据文件,在 Snapshot N+3 阶段执行的是 incremental 的 optimize 就只去优化新写入的这些文件,经过这样的操作之后,文件的数量会大大的减少,在查询的时候就可以避免非常多的没有用的文件扫描操作。

我们在图中可以看到,在 Spark sql 这边其实是支持这样的一个语法,比如说在 optimize table employee zorder by first_name, last_name,我们假设 first_name 和 last_name 是二维的,因为是两个字段其实就是二维的。首先根据 first_name 和 last_name 会先计算它的分区 id,计算的规则目前我们实现的是一个固定的 partition 值,然后将这个分区 id 转换成一个二进制,然后基于 GEOHash 算法。这里可以看到是交错位去生成 z 地址的,Thomas 0 放奇数位,More 0 放偶数位,以奇偶交错的方式生成 z 地址。这样生成 z 地址可以看到 Thomas More 和 Thomas Alva Edison 以及 Melisa Kort 在第二列的排序都不是有序的。经过 ZOrder 之后前缀都是 16 个 0,这时候就可以将这几个聚合在一个文件里。当根据 FirstName 或者 LastName 进行查找的时候就可以很方便的根据 ZOrder 的地址进行查询操作避免其他文件的扫描。可以看到它其实是根据多维 Column 生成 Z 地址,从 f2 到 f1000 的数据先根据里面的数据进行线性扫描,扫描生成一次地址,生成地址之后,接下来要做的一个很重要的事情就是 Repartition。

根据 Z 地址进行 Range 重分区,因为只有根据 Z 地址进行 Range 重分区之后我们才能够将原先分布在不同的点的数据的文件聚合到同一个点上。比如根据 Z 地址重分区之后可能生成了两个 partition,在查询的时候就必须对数据进行写回存储。repartition 之后要进行一个重写操作,重写之后生成一个新的 snapshot N+1,这个过程也就是刚刚 S N 到 S N+1 的一个中间发生的详细的过程。

经过事物回写存储之后,在查询的时候就根据 where 条件智慧扫描 m1 和 m3 的数据,因为 m2 里面都是红点不符合查询的条件。
5. 查询性能优化评测

经过我们的优化之后,可以看到图底部有条 select 语句计算一个简单的 count,根据 first_name 和 last_name 进行过滤。上部分是在没有优化之前在 HDFS 上和优化之后的性能对比,可以看到性能差距是非常大的,性能优化的一个主要的点就是把大量的小文件扫描的时间优化了。
4 未来展望
Iceberg 内核及数据湖平台化的工作规划
1. Iceberg 内核能力
进一步优化索引系统,提升查询性能。前面说到我们对查询性能进行了 zorder 索引系统的构建以提升查询的性能。但是 zorder 是有一定的局限性的,它需要根据查询条件去进行 re-clusting,如果查询条件发生变化的话需要重新计算。另一个是如果查询条件特别多达到几十个或者上百个的话,zorder 会面临维度膨胀的问题,计算出来的 z 地址会非常的长。这一点我们后面会根据不同的场景需求进行不同的索引来尽量避免过多的文件扫描以及使用 zorder 没有解决的一些问题。
增量读取能力的增强,MOR 方式入湖。我们知道现在的增量读取是每次读取的是 incremental 的 snapshot,但这个时候如果发生 replace 操作的话,产生的 rewrite 之后的 snapshot 在这里的增量读取整个语义目前是没有很好的定义的,我们是希望在引擎层通过 skip 以及通过记录 rewrite 之前的一些 meta 信息来解决这个问题,这块的话也是我们下一步的一个任务的规划。
SQL 能力的增强。用户可能很希望只通过一些 SQL 就能够执行一些任务,比如用户通过我们的平台化建表,我们可以将这个表很方便地纳入平台的管理上,用户自己建的表如果我们没有办法 check 到的话,也是需要提供一些 SQL 增强的能力,方便用户更好的去执行使用 iceberg 过程中遇到的一些数据管理问题。
2. 数据湖平台化建设
持续迭代数据治理服务平台。我们会持续迭代数据治理服务化平台,包括如何更好地去执行小文件合并的策略,包括怎么样尽量避免重复的小文件的 rewrite 操作,已经重写了的小文件什么时候将它合并到我应该要被写的文件列表里,这些都是需要在后面不断地迭代中去不断地优化。
统一的元数据管理,元数据发现。对于用户来说,因为数据湖本身是写后 schema 的模式,所以用户其实并不希望数据只是上传了 CSV 或者 JSON 这样的原始数据对于里面的 schema 其实可能并不知道,希望平台能够帮助他发现这些 schema,这一块也是平台化建设中后面不断地去优化的内容。
与更多数据系统打通,构建入湖+分析平台。比如说数据已经写入 iceberg 里面去了,可能会在上面继续构建一些分析型的任务,比如更好地去优化 presto 的查询性能,或者去进行平台化构建更多分析型的任务,比如 spark 或者 flink 的批任务,这块会跟更多的计算引擎去打通,更方便用户使用从端到端,从入湖到分析的整个的一个链路平台。
5 问答环节
Q:iceberg 的 zorder 有计划提交到社区吗?
A:我们现在是把剩下的一些测试和优化进一步做完善之后,后续有计划反馈给社区。
Q:zorder 优化是针对大量小文件场景下的优化吗?
A:zorder 优化的场景是某些经常发生的查询,查询条件相对固定这种情况下为了提升查询的性能会用 zorder 的算法根据查询的条件将数据进行重新的分布。小文件就是刚刚讲到的上面会有一个 compaction 先将文件进行一个合并,然后 zorder 对数据进行重分布,以此来提升整个的查询性能。
Q:也就是说小文件不多的时候效果也是很好的吗?
A:是的。
Q:hive 全量历史数据的迁移入湖有相关的支持方案吗?
A:这块我们内部构建的一个平台上其实是已经有相应的任务,比如执行了一个 hive 的一个入库,从 hive 表导入到 iceberg 表的链路上,内部其实是会有平台化上面会取一些类似于像 spark 导数任务去做这样的事情。
Q:需要把 hive 的全量导数这部分全量读一遍,然后才能入湖吗?
A:现在我们是这么做的,就是 spark 导数。
Q:Clustering service 是通过数据的冗余存储,把数据以其他的 column 做 partition 或 bucket 或 sort 来提高 file proning 的效果吗?
A:clustering 就刚刚我们说的,它其实根据的是 Column,比如说查询的话是根据 Column 就是我会将这个 column 的数据在某些经常发生的一些查询的条件的 column 数据会把它聚合在同一个文件里面去,其实在 iceberg 上这个阶段如果说针对 clustering 数据生成新的 snapshot,针对去读 snapshot 的时候就会读到下面 clustering 之后的一些文件。如果说是担心数据冗余的话,因为 bean 重新生成后的数据文件肯定是需要通过 snapshot 对外暴露的,可以去跑一些数据定期清理的一些动作去完成这个事情。
今天的分享就到这里,谢谢大家。
评论