写点什么

在大数据量中 Spark 数据倾斜问题定位排查及解决

  • 2023-12-09
    北京
  • 本文字数:2686 字

    阅读完需:约 9 分钟

在大数据量中Spark数据倾斜问题定位排查及解决

1. 开篇

2023 年即将过去,又到了一年一度的技术总结时刻,在这一年,参与了多个大数据项目的开发建设工作,也参与了几个数仓项目的治理优化工作,在这么多的项目中,让我印象比较深刻的就是在使用 Spark 引擎执行任务出现的报错现象,接下来就回顾复盘下这次任务报错现象及具体的解决方案。

2. 问题描述

因为现在大多数的批量任务都是使用 Spark 去执行,所以 Spark 的地位在公司是举足轻重,那么对于 Spark 的深入理解和优化显得尤为重要,部门人员都在深入学习 Spark 的执行过程,底层原理等,以期待遇到问题之后能够快速解决。


下面对于某次 Spark 任务执行过程中报错原因描述。


目前公司 DWD 层及之后的表都是 Iceberg 表,因为我们的业务特性,需要对数据进行行级更新和删除,传统的 Hive 表不支持行级数据操作,粒度都是表级的,如果采用传统 Hive 表形式,每次对数据进行更新的成本是非常高的,需要全表数据参与,后面经过调研,发现 Iceberg 是支持行级更新,并且和 Spark 结合的比较好,经过测试之后发现没有问题,后面数仓整体就迁到了 Iceberg 中。


这次任务的执行语句描述:将 ODS 层的表按照主键去重后插入到 DWD 层中,表为分区表,DWD 层表格式是 iceberg 格式。

insert overwrite table hive_prod.dwd_xml.dwd_xml_order_cn
select 
pid
,app_date_o
,app_date_s
,app_docnumber_o
,app_docnumber_s
,app_number
,filename
...
,from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:dd:ss') as update_time
,cleandate
,etldate
from (
select t1.*
,row_number() over(partition by pid order by etldate desc,filename desc) as rn
from hive_prod.ods_xml.ods_xml_order_cn t1) t2 where rn=1
order by cleandate,etldate;
复制代码


iceberg 格式的表可以不显示的指定表分区字段,但是要求在写入分区表之前根据每个任务(Spark 分区)的分区规范对分区字段进行排序,上述 sql 中 cleandate,etldate 是分区字段。


等待几分钟,报错:


查看 Spark UI:


发现 task id 是 445 的任务处理的数据量远远大于其他的任务。考虑到时数据倾斜问题。


查看此任务的日志:

出现内存溢出。


多次测试上述 sql,在集群资源空闲很多时,偶尔可以执行成功,但是执行时间超过 25 分钟。

3. 分析推断

初步分析 Spark 的每个 task 任务处理的数量和每个分区的数量有关。


以下是统计的表中每个分区的数据量:


下图是 Spark 处理时的 task 任务:

发现:表中的分区数量和 Task 任务数量是对应的,也就是一个分区 Spark 只起了一个 Task 任务。


表中的分区数据分布不均匀,20221213 这个分区的数量是 8000 多万,一个 Task 处理肯定会出现数据倾斜。


但即使分区数据分布均衡,但是每个分区数据量很大也会有问题,假设表就 10 个分区,如果每个分区数据量都是 1 亿数据,那么最终一个 Task 处理 1 亿数据,还是会有内存溢出风险。

4. 调查原因

点击进入 Spark UI 界面,找到 SQL 一栏,进入我们执行的 SQL 语句中:


调查发现,在 Spark 中执行 iceberg 分区表时,如果有全局排序操作,那么会使得同一分区的数据进入到一个 task 任务中进行排序,如果某个分区数据量比较大,就会导致任务执行非常慢,或者报错,如下图,溢写到磁盘中的数据已经 43.9G 了。


5. 解决方案

只要找到原因,解决就比较简单了

解决方案一:

还是采用这种动态分区方式,但将全局排序 order by 改为局部排序 sort by,因为 iceberg 不要求全局排序,只保证每个 reduce 内的数据有序即可。


改造代码如下:

insert overwrite table hive_prod.dwd_xml.dwd_xml_order_cn
select 
pid
,app_date_o
,app_date_s
,app_docnumber_o
,app_docnumber_s
,app_number
,filename
...
,from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:dd:ss') as update_time
,cleandate
,etldate
from (
select t1.*
,row_number() over(partition by pid order by etldate desc,filename desc) as rn
from hive_prod.ods_xml.ods_xml_order_cn t1) t2 where rn=1
sort by cleandate,etldate;
复制代码

上述代码只把 order by 改为了 sort by。


再次执行,在 Spark UI 上查看执行过程,Task 数量变多了,并且每个 Task 的数量分布很均衡:


整个跑批时间缩短至不到 4 分钟:

解决方案二:

既然是排序操作导致的,那就不要排序了,但是如果不要排序,就不能采用动态分区,将出现数据倾斜的分区采用静态分区方式导入,代码如下:


insert overwrite table hive_prod.dwd_xml.dwd_xml_order_cn
partition (cleandate='20221213',etldate='20221205')
select 
pid
,app_date_o
,app_date_s
,app_docnumber_o
,app_docnumber_s
,app_number
,filename
...
,from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:dd:ss') as update_time
,cleandate
,etldate
from (
select t1.*
,row_number() over(partition by pid order by etldate desc,filename desc) as rn
from hive_prod.ods_xml.ods_xml_order_cn t1) t2 where rn=1
and cleandate='20221213' and  etldate = '20221205' 
复制代码

其余分区可以采用动态分区。

6. 总结

对于数据倾斜的问题,在大数据中是很容易出现的,数据倾斜其实就是部分数据分区的数据量远大于其他分区,导致计算任务无法充分利用集群资源,从而影响严重整体性能。


对于数据倾斜问题,结合上述案例做一个总结:


出现数据倾斜第一步就是要进行原因分析

  1. 数据分布不均匀:数据本身存在不均匀的特点,例如某些键值对的数量远远超过其他键值对。

  2. 数据倾斜引起的操作:某些操作(例如 groupByKey、reduceByKey 等)可能会导致数据倾斜,特别是在数据经过多次 shuffle 的情况下。


第二步就是找解决方案

在预处理阶段:

  1. 均匀分布数据:可以通过一些预处理方法来尽量使数据分布均匀,例如使用 salting 技术给键添加随机前缀。

  2. 数据重分区:通过重新分区数据,将数据均匀地分布到多个分区中,减少数据倾斜的可能性。


在运行时阶段:

  1. 增加分区数:通过增加分区数来提高并行度,从而减轻数据倾斜的影响。

  2. 使用聚合操作代替 groupByKey:groupByKey 操作容易导致数据倾斜,可以尝试使用聚合操作(如 reduceByKey、combineByKey)来替代。

  3. 使用自定义分区器:根据数据的特点,编写自定义分区器,将数据均匀地分布到多个分区中。

  4. 增加缓存:对于一些频繁使用的数据,可以将其缓存到内存中,减少重复计算和 shuffle 操作。


第三步就是监控与调优

  1. 监控任务进度:实时监控任务的进度,发现倾斜问题及时采取相应措施。

  2. 数据采样与分析:对倾斜的数据进行采样和分析,找出导致倾斜的原因,有针对性地解决问题。

  3. 动态调整资源:根据倾斜程度,动态调整集群资源分配,增加倾斜数据所在分区的计算资源。

发布于: 29 分钟前阅读数: 7
用户头像

InfoQ签约作者 2020-11-10 加入

文章首发于公众号:五分钟学大数据。大数据领域原创技术号,深入大数据技术

评论

发布
暂无评论
在大数据量中Spark数据倾斜问题定位排查及解决_大数据_五分钟学大数据_InfoQ写作社区