写点什么

大数据培训:Spark 性能调优与参数配置

作者:@零度
  • 2022 年 3 月 14 日
  • 本文字数:4818 字

    阅读完需:约 16 分钟

Spark 性能调优-基础篇

众所周知,正确的参数配置对提升 Spark 的使用效率具有极大助力,帮助相关数据开发、分析人员更高效地使用 Spark 进行离线批处理和 SQL 报表分析等作业。

推荐参数配置模板如下:

  1. spark-submit 提交方式脚本

/xxx/spark23/xxx/spark-submit --master yarn-cluster \

--name ${mainClassName} \

--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \

--conf spark.yarn.maxAppAttempts=2 \

--conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC \

--driver-memory 2g \

--conf spark.sql.shuffle.partitions=1000 \

--conf hive.metastore.schema.verification=false \

--conf spark.sql.catalogImplementation=hive \

--conf spark.sql.warehouse.dir=${warehouse} \

--conf spark.sql.hive.manageFilesourcePartitions=false \

--conf hive.metastore.try.direct.sql=true \

--conf spark.executor.memoryOverhead=512M \

--conf spark.yarn.executor.memoryOverhead=512 \

--executor-cores 2 \

--executor-memory 4g \

--num-executors 50 \

--class 启动类 \

${jarPath} \

-M ${mainClassName}


2. spark-sql 提交方式脚本

option=/xxx/spark23/xxx/spark-sql

export SPARK_MAJOR_VERSION=2

${option} --master yarn-client \

--driver-memory 1G \

--executor-memory 4G \

--executor-cores 2 \

--num-executors 50 \

--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties" \

--conf spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER \

--conf spark.sql.auto.repartition=true \

--conf spark.sql.autoBroadcastJoinThreshold=104857600 \

--conf "spark.sql.hive.metastore.try.direct.sql=true" \

--conf spark.dynamicAllocation.enabled=true \

--conf spark.dynamicAllocation.minExecutors=1 \

--conf spark.dynamicAllocation.maxExecutors=200 \

--conf spark.dynamicAllocation.executorIdleTimeout=10m \

--conf spark.port.maxRetries=300 \

--conf spark.executor.memoryOverhead=512M \

--conf spark.yarn.executor.memoryOverhead=512 \

--conf spark.sql.shuffle.partitions=10000 \

--conf spark.sql.adaptive.enabled=true \

--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728 \

--conf spark.sql.parquet.compression.codec=gzip \

--conf spark.sql.orc.compression.codec=zlib \

--conf spark.ui.showConsoleProgress=true

-f pro.sql

pro.sql 为业务逻辑脚本


Spark 性能调优-进阶篇

针对有意愿了解 Spark 底层原理的读者,本文梳理了 standalone、Yarn-client、Yarn-cluster 等 3 种常见任务提交方式的交互图,以帮助相关使用者更直观地理解 Spark 的核心技术原理、为阅读接下来的进阶篇内容打好基础。

standalone



1) spark-submit 提交,通过反射的方式构造出 1 个 DriverActor 进程;

2) Driver 进程执行编写的 application,构造 sparkConf,构造 sparkContext;

3) SparkContext 在初始化时,构造 DAGScheduler、TaskScheduler,jetty 启动 webui;

4) TaskScheduler 有 sparkdeployschedulebackend 进程,去和 Master 通信,请求注册 Application;

5) Master 接受通信后,注册 Application,使用资源调度算法,通知 Worker,让 worker 启动 Executor;

6) worker 会为该 application 启动 executor,executor 启动后,会反向注册到 TaskScheduler;

7) 所有 Executor 反向注册到 TaskScheduler 后,Driver 结束 sparkContext 的初始化;

8) Driver 继续往下执行编写的 application,每执行到 1 个 action,就会创建 1 个 job;

9) job 会被提交给 DAGScheduler,DAGScheduler 会对 job 划分为多个 stage(stage 划分算法),每个 stage 创建 1 个 taskSet;

10) taskScheduler 会把 taskSet 里每 1 个 task 都提交到 executor 上执行(task 分配算法);

11) Executor 每接受到 1 个 task,都会用 taskRunner 来封装 task,之后从 executor 的线程池中取出 1 个线程,来执行这个 taskRunner。(task runner:把编写的代码/算子/函数拷贝,反序列化,然后执行 task)。

Yarn-client


1) 发送请求到 ResourceManager(RM),请求启动 ApplicationMaster(AM);

2) RM 分配 container 在某个 NodeManager(NM)上,启动 AM,实际是个 ExecutorLauncher;

3) AM 向 RM 申请 container;

4) RM 给 AM 分配 container;

5) AM 请求 NM 来启动相应的 Executor;

6) executor 启动后,反向注册到 Driver 进程;

7) 后序划分 stage,提交 taskset 和 standalone 模式类似。

Yarn-cluster


1) 发送请求到 ResourceManager(RM),请求启动 ApplicationMaster(AM);

2) RM 分配 container 在某个 NodeManager(NM)上,启动 AM;

3) AM 向 RM 申请 container;

4) RM 给 AM 分配 container;

5) AM 请求 NM 来启动相应的 Executor;

6) executor 启动后,反向注册到 AM;

7) 后序划分 stage,提交 taskset 和 standalone 模式类似。

理解了以上 3 种常见任务的底层交互后,接下来本文从存储格式、数据倾斜、参数配置等 3 个方面来展开,为大家分享个推进行 Spark 性能调优的进阶姿势。

存储格式(文件格式、压缩算法)

众所周知,不同的 SQL 引擎在不同的存储格式上,其优化方式也不同,比如 Hive 更倾向于 orc,Spark 则更倾向于 parquet。同时,在进行大数据作业时,点查、宽表查询、大表 join 操作相对频繁,这就要求文件格式最好采用列式存储,并且可分割。因此我们推荐以 parquet、orc 为主的列式存储文件格式和以 gzip、snappy、zlib 为主的压缩算法。在组合方式上,我们建议使用 parquet+gzip、orc+zlib 的组合方式,这样的组合方式兼顾了列式存储与可分割的情况,相比 txt+gz 这种行式存储且不可分割的组合方式更能够适应以上大数据场景的需求。

个推以线上 500G 左右的数据为例,在不同的集群环境与 SQL 引擎下,大数据培训对不同的存储文件格式和算法组合进行了性能测试。测试数据表明:相同资源条件下,parquet+gz 存储格式较 text+gz 存储格式在多值查询、多表 join 上提速至少在 60%以上。

结合测试结果,我们对不同的集群环境与 SQL 引擎下所推荐使用的存储格式进行了梳理,如下表:



同时,我们也对 parquet+gz、orc+zlib 的内存消耗进行了测试。以某表的单个历史分区数据为例,parquet+gz、orc+zlib 比 txt+gz 分别节省 26%和 49%的存储空间。

完整测试结果如下表:


可见,parquet+gz、orc+zlib 确实在降本提效方面效果显著。那么,如何使用这两种存储格式呢?步骤如下:

➤hive 与 spark 开启指定文件格式的压缩算法

spark:

set spark.sql.parquet.compression.codec=gzip;

set spark.sql.orc.compression.codec=zlib;

hive:

set hive.exec.compress.output=true;

set mapreduce.output.fileoutputformat.compress=true;

set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;

➤建表时指定文件格式

parquet 文件格式(序列化,输入输出类)

CREATE EXTERNAL TABLE `test`(rand_num double)

PARTITIONED BY (`day` int)

ROW FORMAT SERDE

'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'

STORED AS INPUTFORMAT

'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'

OUTPUTFORMAT

'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'

;

orc 文件格式(序列化,输入输出类)

ROW FORMAT SERDE

'org.apache.hadoop.hive.ql.io.orc.OrcSerde'

STORED AS INPUTFORMAT

'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'

OUTPUTFORMAT

'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'

;

➤线上表调

ALTER TABLE db1.table1_std SET TBLPROPERTIES ('parquet.compression'='gzip');

ALTER TABLE db2.table2_std SET TBLPROPERTIES ('orc.compression'='ZLIB');

➤ctas 建表

create table tablename stored as parquet as select ……;

create table tablename stored as orc TBLPROPERTIES ('orc.compress'='ZLIB') as select ……;

数据倾斜

数据倾斜分为 map 倾斜和 reduce 倾斜两种情况。本文着重介绍 reduce 倾斜,如 SQL 中常见的 group by、join 等都可能是其重灾区。数据倾斜发生时,一般表现为:部分 task 显著慢于同批 task,task 数据量显著大于其他 task,部分 taskOOM、spark shuffle 文件丢失等。如下图示例,在 duration 列和 shuffleReadSize/Records 列,我们能明显发现部分 task 处理数据量显著升高,耗时变长,造成了数据倾斜:


如何解决数据倾斜?

我们总结了 7 种数据倾斜解决方案,能够帮助大家解决常见的数据倾斜问题:

解决方案一:使用 Hive ETL 预处理数据

即在数据血缘关系中,把倾斜问题前移处理,从而使下游使用方无需再考虑数据倾斜问题。

该方案适用于下游交互性强的业务,如秒级/分钟级别提数查询。

解决方案二:过滤少数导致倾斜的 key

即剔除倾斜的大 key,该方案一般结合百分位点使用,如 99.99%的 id 记录数为 100 条以内,那么 100 条以外的 id 就可考虑予以剔除。

该方案在统计型场景下较为实用,而在明细场景下,需要看过滤的大 key 是否为业务所侧重和关注。

解决方案三:提高 shuffle 操作的并行度

即对 spark.sql.shuffle.partitions 参数进行动态调整,通过增加 shuffle write task 写出的 partition 数量,来达到 key 的均匀分配。SparkSQL2.3 在默认情况下,该值为 200。开发人员可以在启动脚本增加如下参数,对该值进行动态调整:

conf spark.sql.shuffle.partitions=10000

conf spark.sql.adaptive.enabled=true

conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728

该方案非常简单,但是对于 key 的均匀分配却能起到较好的优化作用。比如,原本 10 个 key,每个 50 条记录,只有 1 个 partition,那么后续的 task 需要处理 500 条记录。通过增加 partition 数量,可以使每个 task 都处理 50 条记录,10 个 task 并行跑数,耗时只需要原来 1 个 task 的 1/10。但是该方案对于大 key 较难优化,比如,某个大 key 记录数有百万条,那么大 key 还是会被分配到 1 个 task 中去。

解决方案四:将 reducejoin 转为 mapjoin

指的是在 map 端 join,不走 shuffle 过程。以 Spark 为例,可以通过广播变量的形式,将小 RDD 的数据下发到各个 Worker 节点上(Yarn 模式下是 NM),在各个 Worker 节点上进行 join。

该方案适用于小表 join 大表场景(百 G 以上的数据体量)。此处的小表默认阈值为 10M,低于此阈值的小表,可分发到 worker 节点。具体可调整的上限需要小于 container 分配的内存。

解决方案五:采样倾斜 key 并分拆 join 操作

如下图示例:A 表 join B 表,A 表有大 key、B 表无大 key,其中大 key 的 id 为 1,有 3 条记录。



如何进行分拆 join 操作呢?

  • 首先将 A 表、B 表中 id1 单独拆分出来,剔除大 key 的 A' 和 B' 先 join,达到非倾斜的速度;

  • 针对 A 表大 key 添加随机前缀,B 表扩容 N 倍,单独 join;join 后剔除随机前缀即可;

  • 再对以上 2 部分 union。

该方案的本质还是减少单个 task 处理过多数据时所引发的数据倾斜风险,适用于大 key 较少的情况。

解决方案六:使用随机前缀和扩容 RDD 进行 join

比如,A 表 join B 表,以 A 表有大 key、B 表无大 key 为例:

  • 对 A 表每条记录打上[1,n] 的随机前缀,B 表扩容 N 倍,join。

  • join 完成后剔除随机前缀。

该方案适用于大 key 较多的情况,但也会增加资源消耗。

解决方案七:combiner

即在 map 端做 combiner 操作,减少 shuffle 拉取的数据量。

该方案适合累加求和等场景。

在实际场景中,建议相关开发人员具体情况具体分析,针对复杂问题也可将以上方法进行组合使用。

Spark 参数配置

针对无数据倾斜的情况,我们梳理总结了参数配置参照表帮助大家进行 Spark 性能调优,这些参数的设置适用于 2T 左右数据的洞察与应用,基本满足大多数场景下的调优需求。


总结

目前,Spark 已经发展到了 Spark3.x,最新版本为 Spark 3.1.2 released (Jun 01, 2021)。Spark3.x 的许多新特性,如动态分区修剪、Pandas API 的重大改进、增强嵌套列的裁剪和下推等亮点功能,为进一步实现降本增效提供了好思路。未来,个推也将继续保持对 Spark 演进的关注,并持续展开实践和分享。

文章来源于数据仓库与 Python 大数据

用户头像

@零度

关注

关注尚硅谷,轻松学IT 2021.11.23 加入

还未添加个人简介

评论

发布
暂无评论
大数据培训:Spark性能调优与参数配置_大数据_@零度_InfoQ写作平台