写点什么

个推 Spark 性能调优实战分享:性能提升 60%↑ 成本降低 50%↓

用户头像
个推
关注
发布于: 15 小时前
个推Spark性能调优实战分享:性能提升60%↑ 成本降低50%↓

前言


Spark 是目前主流的大数据计算引擎,功能涵盖了大数据领域的离线批处理、SQL 类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,应用范围与前景非常广泛。作为一种内存计算框架,Spark 运算速度快,并能够满足 UDF、大小表 Join、多路输出等多样化的数据计算和处理需求。


作为国内专业的数据智能服务商,个推从早期的 1.3 版本便引入 Spark,并基于 Spark 建设数仓,进行大规模数据的离线和实时计算。由于 Spark 在 2.x 版本之前的优化重心在计算引擎方面,而在元数据管理方面并未做重大改进和升级。因此个推仍然使用 Hive 进行元数据管理,采用 Hive 元数据管理+ Spark 计算引擎的大数据架构,以支撑自身大数据业务发展。个推还将 Spark 广泛应用到报表分析、机器学习等场景中,为行业客户和政府部门提供实时人口洞察、群体画像构建等服务。



▲个推在实际业务场景中,分别使用 SparkSQL 和 HiveSQL 对一份 3T 数据进行了计算,上图展示了跑数速度。数据显示:在锁死队列(120G 内存,<50core)前提下, SparkSQL2.3 的计算速度是 Hive1.2 的 5-10 倍。


对企业来讲,效率和成本始终是其进行海量数据处理和计算时所必须关注的问题。如何充分发挥 Spark 的优势,在进行大数据作业时真正实现降本增效呢?个推将多年积累的 Spark 性能调优妙招进行了总结,与大家分享。


Spark 性能调优-基础篇

众所周知,正确的参数配置对提升 Spark 的使用效率具有极大助力。因此,针对不了解底层原理的 Spark 使用者,我们提供了可以直接抄作业的参数配置模板,帮助相关数据开发、分析人员更高效地使用 Spark 进行离线批处理和 SQL 报表分析等作业。


推荐参数配置模板如下:

  1. spark-submit 提交方式脚本

/xxx/spark23/xxx/spark-submit --master yarn-cluster  \--name ${mainClassName} \--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \--conf spark.yarn.maxAppAttempts=2 \--conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC \--driver-memory 2g \--conf spark.sql.shuffle.partitions=1000 \--conf hive.metastore.schema.verification=false \--conf spark.sql.catalogImplementation=hive \--conf spark.sql.warehouse.dir=${warehouse} \--conf spark.sql.hive.manageFilesourcePartitions=false \--conf hive.metastore.try.direct.sql=true \--conf spark.executor.memoryOverhead=512M \--conf spark.yarn.executor.memoryOverhead=512 \--executor-cores 2 \--executor-memory 4g \--num-executors 50 \--class 启动类 \${jarPath} \-M ${mainClassName} 
复制代码


2. spark-sql 提交方式脚本

option=/xxx/spark23/xxx/spark-sqlexport SPARK_MAJOR_VERSION=2${option} --master yarn-client \--driver-memory 1G \--executor-memory 4G \--executor-cores 2 \--num-executors 50 \--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties" \--conf spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER \--conf spark.sql.auto.repartition=true \--conf spark.sql.autoBroadcastJoinThreshold=104857600 \--conf "spark.sql.hive.metastore.try.direct.sql=true" \--conf spark.dynamicAllocation.enabled=true \--conf spark.dynamicAllocation.minExecutors=1 \--conf spark.dynamicAllocation.maxExecutors=200 \--conf spark.dynamicAllocation.executorIdleTimeout=10m \--conf spark.port.maxRetries=300 \--conf spark.executor.memoryOverhead=512M \--conf spark.yarn.executor.memoryOverhead=512 \--conf spark.sql.shuffle.partitions=10000 \--conf spark.sql.adaptive.enabled=true \--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728 \--conf spark.sql.parquet.compression.codec=gzip \--conf spark.sql.orc.compression.codec=zlib \--conf spark.ui.showConsoleProgress=true-f pro.sql
pro.sql 为业务逻辑脚本
复制代码


Spark 性能调优-进阶篇

针对有意愿了解 Spark 底层原理的读者,本文梳理了 standalone、Yarn-client、Yarn-cluster 等 3 种常见任务提交方式的交互图,以帮助相关使用者更直观地理解 Spark 的核心技术原理、为阅读接下来的进阶篇内容打好基础。


standalone

1)   spark-submit 提交,通过反射的方式构造出 1 个 DriverActor 进程;

2)   Driver 进程执行编写的 application,构造 sparkConf,构造 sparkContext;

3)   SparkContext 在初始化时,构造 DAGScheduler、TaskScheduler,jetty 启动 webui;

4)   TaskScheduler 有 sparkdeployschedulebackend 进程,去和 Master 通信,请求注册 Application;

5)   Master 接受通信后,注册 Application,使用资源调度算法,通知 Worker,让 worker 启动 Executor;

6)   worker 会为该 application 启动 executor,executor 启动后,会反向注册到 TaskScheduler;

7)   所有 Executor 反向注册到 TaskScheduler 后,Driver 结束 sparkContext 的初始化;

8)   Driver 继续往下执行编写的 application,每执行到 1 个 action,就会创建 1 个 job;

9)   job 会被提交给 DAGScheduler,DAGScheduler 会对 job 划分为多个 stage(stage 划分算法),每个 stage 创建 1 个 taskSet;

10)  taskScheduler 会把 taskSet 里每 1 个 task 都提交到 executor 上执行(task 分配算法);

11)  Executor 每接受到 1 个 task,都会用 taskRunner 来封装 task,之后从 executor 的线程池中取出 1 个线程,来执行这个 taskRunner。(task runner:把编写的代码/算子/函数拷贝,反序列化,然后执行 task)。


Yarn-client


1)   发送请求到 ResourceManager(RM),请求启动 ApplicationMaster(AM);

2)   RM 分配 container 在某个 NodeManager(NM)上,启动 AM,实际是个 ExecutorLauncher;

3)   AM 向 RM 申请 container;

4)   RM 给 AM 分配 container;

5)   AM 请求 NM 来启动相应的 Executor;

6)   executor 启动后,反向注册到 Driver 进程;

7)   后序划分 stage,提交 taskset 和 standalone 模式类似。


Yarn-cluster


1)   发送请求到 ResourceManager(RM),请求启动 ApplicationMaster(AM);

2)   RM 分配 container 在某个 NodeManager(NM)上,启动 AM;

3)   AM 向 RM 申请 container;

4)   RM 给 AM 分配 container;

5)   AM 请求 NM 来启动相应的 Executor;

6)   executor 启动后,反向注册到 AM;

7)   后序划分 stage,提交 taskset 和 standalone 模式类似。


理解了以上 3 种常见任务的底层交互后,接下来本文从存储格式、数据倾斜、参数配置等 3 个方面来展开,为大家分享个推进行 Spark 性能调优的进阶姿势。


存储格式(文件格式、压缩算法)


众所周知,不同的 SQL 引擎在不同的存储格式上,其优化方式也不同,比如 Hive 更倾向于 orc,Spark 则更倾向于 parquet。同时,在进行大数据作业时,点查、宽表查询、大表 join 操作相对频繁,这就要求文件格式最好采用列式存储,并且可分割。因此我们推荐以 parquet、orc 为主的列式存储文件格式和以 gzip、snappy、zlib 为主的压缩算法。在组合方式上,我们建议使用 parquet+gzip、orc+zlib 的组合方式,这样的组合方式兼顾了列式存储与可分割的情况,相比 txt+gz 这种行式存储且不可分割的组合方式更能够适应以上大数据场景的需求。


个推以线上 500G 左右的数据为例,在不同的集群环境与 SQL 引擎下,对不同的存储文件格式和算法组合进行了性能测试。测试数据表明:相同资源条件下,parquet+gz 存储格式较 text+gz 存储格式在多值查询、多表 join 上提速至少在 60%以上。


结合测试结果,我们对不同的集群环境与 SQL 引擎下所推荐使用的存储格式进行了梳理,如下表:


同时,我们也对 parquet+gz、orc+zlib 的内存消耗进行了测试。以某表的单个历史分区数据为例,parquet+gz、orc+zlib 比 txt+gz 分别节省 26%和 49%的存储空间。


完整测试结果如下表:



可见,parquet+gz、orc+zlib 确实在降本提效方面效果显著。那么,如何使用这两种存储格式呢?步骤如下:


➤hive 与 spark 开启指定文件格式的压缩算法

spark:set spark.sql.parquet.compression.codec=gzip;set spark.sql.orc.compression.codec=zlib;
hive:set hive.exec.compress.output=true;set mapreduce.output.fileoutputformat.compress=true;set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
复制代码


➤建表时指定文件格式

parquet 文件格式(序列化,输入输出类)CREATE EXTERNAL TABLE `test`(rand_num double)PARTITIONED BY (`day` int)ROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat';

orc 文件格式(序列化,输入输出类) ROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.orc.OrcSerde'STORED AS INPUTFORMAT'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat';
复制代码


➤线上表调



ALTER TABLE db1.table1_std SET TBLPROPERTIES ('parquet.compression'='gzip');ALTER TABLE db2.table2_std SET TBLPROPERTIES ('orc.compression'='ZLIB');
复制代码


➤ctas 建表



create table tablename stored as parquet as select ……;create table tablename stored as orc TBLPROPERTIES ('orc.compress'='ZLIB')  as select ……;
复制代码


数据倾斜


数据倾斜分为 map 倾斜和 reduce 倾斜两种情况。本文着重介绍 reduce 倾斜,如 SQL 中常见的 group by、join 等都可能是其重灾区。数据倾斜发生时,一般表现为:部分 task 显著慢于同批 task,task 数据量显著大于其他 task,部分 taskOOM、spark shuffle 文件丢失等。如下图示例,在 duration 列和 shuffleReadSize/Records 列,我们能明显发现部分 task 处理数据量显著升高,耗时变长,造成了数据倾斜:


如何解决数据倾斜?

我们总结了 7 种数据倾斜解决方案,能够帮助大家解决常见的数据倾斜问题:


解决方案一:使用 Hive ETL 预处理数据

即在数据血缘关系中,把倾斜问题前移处理,从而使下游使用方无需再考虑数据倾斜问题。

该方案适用于下游交互性强的业务,如秒级/分钟级别提数查询。


解决方案二:过滤少数导致倾斜的 key

即剔除倾斜的大 key,该方案一般结合百分位点使用,如 99.99%的 id 记录数为 100 条以内,那么 100 条以外的 id 就可考虑予以剔除。

该方案在统计型场景下较为实用,而在明细场景下,需要看过滤的大 key 是否为业务所侧重和关注。


解决方案三:提高 shuffle 操作的并行度

即对 spark.sql.shuffle.partitions 参数进行动态调整,通过增加 shuffle write task 写出的 partition 数量,来达到 key 的均匀分配。SparkSQL2.3 在默认情况下,该值为 200。开发人员可以在启动脚本增加如下参数,对该值进行动态调整:

conf spark.sql.shuffle.partitions=10000 conf spark.sql.adaptive.enabled=true conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728 
复制代码


该方案非常简单,但是对于 key 的均匀分配却能起到较好的优化作用。比如,原本 10 个 key,每个 50 条记录,只有 1 个 partition,那么后续的 task 需要处理 500 条记录。通过增加 partition 数量,可以使每个 task 都处理 50 条记录,10 个 task 并行跑数,耗时只需要原来 1 个 task 的 1/10。但是该方案对于大 key 较难优化,比如,某个大 key 记录数有百万条,那么大 key 还是会被分配到 1 个 task 中去。


解决方案四:将 reducejoin 转为 mapjoin

指的是在 map 端 join,不走 shuffle 过程。以 Spark 为例,可以通过广播变量的形式,将小 RDD 的数据下发到各个 Worker 节点上(Yarn 模式下是 NM),在各个 Worker 节点上进行 join。

该方案适用于小表 join 大表场景(百 G 以上的数据体量)。此处的小表默认阈值为 10M,低于此阈值的小表,可分发到 worker 节点。具体可调整的上限需要小于 container 分配的内存。


解决方案五:采样倾斜 key 并分拆 join 操作

如下图示例:A 表 join B 表,A 表有大 key、B 表无大 key,其中大 key 的 id 为 1,有 3 条记录。

如何进行分拆 join 操作呢?

  • 首先将 A 表、B 表中 id1 单独拆分出来,剔除大 key 的 A' 和 B' 先 join,达到非倾斜的速度;

  • 针对 A 表大 key 添加随机前缀,B 表扩容 N 倍,单独 join;join 后剔除随机前缀即可;

  • 再对以上 2 部分 union。

该方案的本质还是减少单个 task 处理过多数据时所引发的数据倾斜风险,适用于大 key 较少的情况。


解决方案六:使用随机前缀和扩容 RDD 进行 join

比如,A 表 join B 表,以 A 表有大 key、B 表无大 key 为例:

  • 对 A 表每条记录打上[1,n] 的随机前缀,B 表扩容 N 倍,join。

  • join 完成后剔除随机前缀。

该方案适用于大 key 较多的情况,但也会增加资源消耗。


解决方案七:combiner

即在 map 端做 combiner 操作,减少 shuffle 拉取的数据量。

该方案适合累加求和等场景。


在实际场景中,建议相关开发人员具体情况具体分析,针对复杂问题也可将以上方法进行组合使用。


Spark 参数配置


针对无数据倾斜的情况,我们梳理总结了参数配置参照表帮助大家进行 Spark 性能调优,这些参数的设置适用于 2T 左右数据的洞察与应用,基本满足大多数场景下的调优需求。



总结

目前,Spark 已经发展到了 Spark3.x,最新版本为 Spark 3.1.2 released (Jun 01, 2021)。Spark3.x 的许多新特性,如动态分区修剪、Pandas API 的重大改进、增强嵌套列的裁剪和下推等亮点功能,为进一步实现降本增效提供了好思路。未来,个推也将继续保持对 Spark 演进的关注,并持续展开实践和分享。

用户头像

个推

关注

数据智能服务商,致力于构建数据智能新生态 2021.05.27 加入

感谢您的关注。个推(每日互动,股票代码:300766)作为中国数据智能A股上市公司,将在这里分享数据洞察报告、产品最新动态、运营增长干货、线上线下活动等。更多信息可查看个推官网www.GeTui.com。

评论

发布
暂无评论
个推Spark性能调优实战分享:性能提升60%↑ 成本降低50%↓