写点什么

Spark 合并 Iceberg 小文件内存溢出问题定位和解决方案

  • 2022 年 1 月 29 日
  • 本文字数:3659 字

    阅读完需:约 12 分钟

问题描述

此问题来源于客户 POC 测试现场,我们提供了合并小文件的 driver 程序 RewriteDatafile,现场有一个 5 亿数据的 Iceberg 表,其中包括 5 千万删除数据,需要通过 Spark 合并小文件,进而提升 Trino 的查询速度。但是合并过程中任务必中断,任务被 kill。

spark 任务脚本:

/opt/spark-3.0.2-bin-hadoop3.2/bin/spark-submit --class RewriteDatafile

--deploy-mode cluster

--master spark://spark-master-service:35090

--executor-memory 110g

--executor-cores 4

--num-executors 2

--driver-memory 4g

--driver-cores 1

--conf "spark.memory.fraction=0.8"

--conf "spark.storage.memoryFraction=0.3"

--conf "spark.driver.extraJavaOptions=-XX:+UseG1GC"

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC"

--jars /opt/iceberg-spark-jars/iceberg-spark3-runtime-0.12.0.jar /opt/test-rewrite-spark-1.0-SNAPSHOT.jar

-r thrift://10.102.133.13:9083

-w hdfs://10.100.100.55:8020/deepexi/dlink/catalogmanager/iceberg_default_9

-m iceberg_default_9

-d cx

-t t_iceb_10_hist03

-p 2

-s 12800000

-x 10

-maxFileSize 180000000

-minFileSize 5000000


spark 作业合并小文件参数说明:

usage: >>>>>>>> java RewriteDatafile [OPTION]

-d,--database <arg> target database. [must be set]

-h,--help Print this usage information

-m,--catalogMappingName <arg> hive catalog mapping. [must be set]

-p,--parallel <arg> maxParallelism

-r,--hmsUri <arg> hive metastore uri. [must be set]

-s,--targetfilesize <arg> target data file size in bytes. [must be set]

-t,--table <arg> target table. [must be set]

-w,--warehouse <arg> warehouse. [must be set]

-x,--maxcommit <arg> partial-progress.max-commits

-f,--minFileSize <arg> Files smaller than minFileSize will be considered for rewriting

-F,--maxFileSize <arg> Files larger than maxFileSize will be considered for rewriting


问题定位过程

分析日志

收集到的日志:


发现 executor 失联


通过远程同时也看到 worker 节点出现 OOM 日志,jstat 分析 heap 发现堆内老年区使用达到 99%。



通过现场收集到的信息基本可以判断问题直接原因是因为 executor 内存溢出导致 exectuor 无法响应 driver 的心跳,出现失去连接的问题。

问题复现

由于现场无法进行直接调试定位,所以要现在公司内复现这个问题。

刚开始并没有复现出这个问题,先后排除数据量、代码版本的差异,最后发现难以复现的主要原因是删除数据的写入方式不一样,现场采用先导入新增数据,再导入删除数据的方式,内部复现的过程是在每一个事务内写入删除数据,这两种方式的区别是前者是 equality delete,后者是 position delete,在查询或合并时,equality delete 加载的数据量要比 position delete 多。

另外数据工具生成的数据也有一定要求,不能简单的采用数字或者大量重复的字符字段,这样压缩率比较高,也会有差异。

导入方式跟现场保持一致后,就能复现这个问题了。


内存分析

对于内存溢出这种问题提出了一些基本的猜测:

  1. 待合并的数据量太大

结合现场的数据量(共 8G 左右数据)和 spark 集群的资源(每个 executor 50G 左右内存),这种情况下仍然会内存溢出,考虑到 parquet 文件的高压缩率,以及 java 对象导致的数据膨胀,加载的数据量过大也是有可能的,但是从 spark 读取 iceberg 文件的原理上分析,文件的读取是通过迭代器读取的,不会一次性加载一个文件到内存中,同时 spark 也有自己的数据落盘机制,所以这种猜测并不符合实际。通过调低 max-file-group-size-bytes 和 max-concurrent-file-group-rewrites 这两个参数值,降低任务的并发度,但是收效甚微,所以并不是合并的数据量过大导致的内存溢出。

  1. 代码有缺陷,有内存泄漏

为了更直观的观察内存使用情况,通过 jmap 拿到 executor 内存 dump 文件,使用 JProfiler 对 executor 的 dump 文件进行分析,在内存使用率很高的情况下,内存中大对象的占用情况如下:


占用内存的大户还是 RowReader,文件扫描的对象,所以认为可能是这些对象存在内存泄漏,选择最大的那个,进一步剖析其内部具体是哪些对象占用比较多:


其中这个 HashSet 中的数据量大概在 126 万左右:


目前为止还不能确定是内存泄漏,也有可能是数据膨胀造成的,能确定的是这些大对象都和 delete 有关,DeleteFilter 是在读取文件时对数据文件进行过滤,删除 delete file 中的数据,提供给现场的 iceberg 已经合入 delete compaction 功能,对于这部分代码逻辑还不熟,所以需要进一步代码分析。


代码分析

通过对 DeleteFilter 的代码分析发现,delete file 是要加载到内存中,维护在 HashSet 中,大致过程:


在迭代读取 data file 中的数据,类似数据库中一个大表和一个小表进行 join 的过程,大表是流式表,小表是构建表,遍历 data file 的每一条数据时,在 delete set 中检查是否存在,存在的数据不会写到新的数据文件中,实现合并删除数据的过程。

对 DeleteFilter 和 StructLikeSet 进行代码分析并没有发现内存泄漏的可能,比如 StructLIkeSet 中使用的 ThreadLocal 变量,都在代码中及时的做了 remove 处理


具体删除文件以 parquet 格式存储时有多大?一个数据文件会加载多少 delete 文件?在代码中加入一些自定义日志拿到了这些信息



每个 datafile 对应的删除文件有 0 至 4 个不等,也就是说扫描一个 datafile 要对应加载多个删除文件到内存中。

一个删除文件六十万条记录,文件大小在 1.4mb 左右,根据上一步的内存分析,126 万记录占用 450m 内存,也就是说 3mb 左右 parquet 文件加载到内存后能占用 450mb 内存,膨胀率大概 150 倍,所以 POC 测试现场 110G 处理不了 8G 数据也是有可能的

问题原因总结

根据对内存和代码日志的分析,能得出这样的结论,加载到内存中的 delete 数据是主要的内存占用大户,比如上面的内存分析中,delete 数据共占有 2G 左右内存,只有遍历完一个数据文件时才会回收 delete 集合,spark 在读取数据文件是有一定并发度的,所以在同一时间一直有 delete 数据占用内存,内存不足时也不会落盘,因为 HashSet 不能被序列化


Spark rewrite 中一个 file group 作为一个 job 单元提交,group 之间可以并行,group 内部又划分多个 combined task,combined task 也可以并行执行。

所以解决这个问题的核心是降低同一时间内存中 delete 数据的大小,最直接的方式就是降低 spark 读文件任务的并发度。

解决方案

  1. 通过参数调优解决内存溢出问题

从上面的问题原因分析,解决这个问题就要控制 group 和 combinedTask 的并发度,group 的并发可以直接通过参数 max-concurrent-file-group-rewrites 限制。combinedTask 比较难以控制,一个 group 下的 combinedTask 数量是由 groupSize、targetFileSize、deleteFileSize、dataFileSize 四个变量共同计算的结果(详情可以看 TableScanUtil.planTasks()),groupSize 是 max-file-group-size-bytes 参数,targetFileSize 是 target-file-size-bytes 参数。

如何限制 combinedTask 的数量?既然不能直接设置 combinedTask 的数量,可以把 groupSize 设置的非常小(等于 1),这样每个 groupSize 等于每个文件大小,即 N(group)=N(combinedTask)=N(dataFile),这种情况下一个 group 下的 combinedTask 的并发为 1,每次只处理一个文件,只加载这个文件对应的 delete file。

所以通过参数调优的解决方案如下:

在内存不足的情况下优化 spark rewrite 任务合并小文件功能的解决方案:

(1)通过调整 max-concurrent-file-group-rewrites 控制 group 的并发度(默认值是 1)。

(2)如果调整后的效果不佳,降低 max-file-group-size-bytes 参数值(默认是 100G),内存严重不足的情况下可以调整为 1。

(3)如果任务执行成功,并且 max-file-group-size-bytes < target-file-size-bytes,则需要重新执行一遍任务,参数 max-file-group-size-bytes 重新调整为较大值(一定要大于 target-file-size-bytes),max-concurrent-file-group-rewrites 也可以适当放开,在没有 delete file 的情况下,spark 合并小文件不会消耗大量内存。


为何需要执行两次?因为 max-file-group-size-bytes < target-file-size-bytes 时,一个组下的文件可能不会合并,max-file-group-size-bytes=1 时,所有文件都不会合并,只会过滤删除数据。

在公司服务器上测试的结果:



3 亿数据,3 千万删除数据,executor memory 为 2G,虽然比较慢,但是通过两次任务可以完成文件合并。


如果 delete 数据很多,即使 max-file-group-size-bytes 和 max-concurrent-file-group-rewrites 都设置成 1 也会出现内存不足,那么只能减少 executor 数量,增大 executor 内存。


  1. 优化 Iceberg delete compaction 功能的代码

从代码实现的角度看,有两个方向可以优化:

  • 减少每个数据文件要加载的删除文件。

先加载 data file 中的 bloom filter,遍历 deletefile 的数据时,通过 bloom filter 判断这条数据是否在 data file 中,如果存在就加载 delete file,不存在就不加载,目前正在进行详细设计,后面会贡献到社区。

  • 降低数据膨胀率,使用压缩率更高的方式替代 HashSet。

可以考虑 FST 这种字典数据结构替代 HashSet,FST(Finite State Transducer)在 Lucene 中应用很广泛,优势就是占用空间小,但是实现难度较大。


发布于: 刚刚阅读数: 2
用户头像

还未添加个人签名 2018.10.30 加入

还未添加个人简介

评论

发布
暂无评论
Spark合并Iceberg小文件内存溢出问题定位和解决方案