spark-sql 优化简述
本文分享自天翼云开发者社区《spark-sql优化简述》,作者:徐****东
1、自适应中 reduce 参数控制
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 用于控制任务 Shuffle 后的目标输入大小(以字节为单位)。
spark.sql.adaptive.minNumPostShufflePartitions 用于控制自适应执行中使用的 shuffle 后最小的分区数,可用于控制最小并行度。
spark.sql.adaptive.maxNumPostShufflePartitions 来控制 Shuffle 后分区的最大数量。
2、合理设置单 partition 读取数据量
SET spark.sql.files.maxPartitionBytes=xxxx;
3、合理设置 shuffle partition 的数量
SET spark.sql.shuffle.partitions=xxxx
4、使用 coalesce & repartition 调整 partition 数量
SELECT /*+ COALESCE(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(c) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3, dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(3, dept_col) */ * FROM EMP_TABLE
5、使用 broadcast join
6、开启 Adaptive Query Execution(Spark 3.0)
6.1、动态合并分区: spark 会根据分区的数据量将小数据量的多个分区合并成一个分区,可以提高资源的利用率
spark.sql.adaptive.enabled: 是否开启 AQE 优化
spark.sql.adaptive.coalescePartitions.enabled: 是否开启动态合并分区
spark.sql.adaptive.coalescePartitions.initialPartitionNum: 初始分区数
spark.sql.adaptive.advisoryPartitionSizeInBytes 合并分区的推荐目标大小
spark.sql.adaptive.coalescePartitions.minPartitionNum: 合并之后的最小分区数
当 RDD 的分区数处于 spark.sql.adaptive.coalescePartitions.initialPartitionNum 与 spark.sql.adaptive.coalescePartitions.minPartitionNum 范围内才会合并
spark.sql.adaptive.advisoryPartitionSizeInBytes: 合并分区之后,分区的数据量的预期大小
6.2、动态切换 join 策略: 在 join 的时候,会动态选择性能最高的 join 策略,提高效率
spark.sql.adaptive.enabled: 是否开启 AQE 优化
spark.sql.adaptive.localShuffleReader.enabled:在不需要进行 shuffle 重分区时,尝试使用本地 shuffle 读取器。将 sort-meger join 转换为广播 join
6.3、动态申请资源: 当计算过程中资源不足会自动申请资源
spark.sql.adaptive.enabled: 是否开启 AQE 优化
spark.dynamicAllocation.enabled: 是否开启动态资源申请
spark.dynamicAllocation.shuffleTracking.enabled: 是否开启 shuffle 状态跟踪
6.4、动态 join 数据倾斜: join 的时候如果出现了数据倾斜,会动态调整分区的数据量,优化数据倾斜导致的性能问题。
spark.sql.adaptive.enabled: 是否开启 AQE 优化
倾斜的膨胀系数:spark.sql.adaptive.skewJoin.skewedPartitionFactor:N
倾斜的最低阈值:spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes:M
拆分粒度,以字节为单位:spark.sql.adaptive.advisoryPartitionSizeInBytes
G [代表优化之后,分区数数据的预期大小]
sparksql 判断出现数据倾斜的依据[需要两个条件同时满足]:
当某个分区处理的数据量>= N * 所有 task 处理数据量的中位数
当某个分区处理的数据量>= M
7、文件与分区
SET spark.sql.files.maxPartitionBytes=xxx //读取文件的时候一个分区接受多少数据;
spark.sql.files.openCostInBytes//文件打开的开销,通俗理解就是小文件合并的阈值
8、CBO 优化
spark.sql.cbo.enabled: 是否开启 cbo 优化
spark.sql.cbo.joinReorder.enabled: 是否调整多表 Join 的顺序
spark.sql.cbo.joinReorder.dp.threshold: 设置多表 jion 的表数量的阈值,一旦 join 的表数量超过该阈值则不优化多表 join 的顺序
9、hints 优化
hints 预防主要用在分区和 join 上。
Partitioning Hints Types:COALESCE,REPARTITION,REPARTITION_BY_RANGE
Join Hints Types:BROADCAST,MERGE,SHUFFLE_HASH,SHUFFLE_REPLICATE_NL
SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
## Join Hints for broadcast join
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key;
SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key;
-- Join Hints for shuffle sort merge join
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
## Join Hints for shuffle hash join
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
## Join Hints for shuffle-and-replicate nested loop join
SELECT /*+ SHUFFLE_REPLICATE_NL(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
## When different join strategy hints are specified on both sides of a join, Spark
## prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint
## over the SHUFFLE_REPLICATE_NL hint.
## Spark will issue Warning in the following example
## org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge)
## is overridden by another hint and will not take effect.
SELECT /*+ BROADCAST(t1), MERGE(t1, t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
10、缓存表
对于一条 SQL 语句中可能多次使用到的表,可以对其进行缓存,使用 SQLContext.cacheTable(TableName)或者 DataFrame.cache 即可,SparkSQL 会用内存列存储的格式进行表的缓存,然后 SparkSQL 就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存的使用和 GC 的开销,SQLContext.uncacheTable(tableName)可以将表从缓存中移除,使用 SQLContext.setConf()设置,可以通过
spark.sql.inMemoryColumnarStorage.batchSize
这个参数,默认 10000,配置列存储单位。
永久视图 view:永久保存一段查询语句的逻辑,而不是查询语句的数据,永久有效,查询这个视图,相当于查询一个 SQL 语句,如果保存的查询逻辑复杂,这查询视图也耗时长。支持重新覆盖 create or replace view view1 as
临时视图 temporary view:只在当前会话生效,如果会话结束,则临时视图失效,支持重新覆盖 create or replace temporary view temp_view1 as,类似于 SparkSQL 中的 DataFrame.createOrReplaceTempView('视图名'),hive 不支持这个语法
缓存表 cache table:只在当前会话有效,将一段查询结果集缓存到内存,并赋予一个表名。
table:永久有效,保存数据结构和数据本身到磁盘。
with as:当子查询的嵌套层数太多时,可以用 with as 增加可读性。
11、group by 优化
为了提高 group by 查询的性能,可以尝试以下几种方法:
仅选择必要的字段进行 group by 操作,避免选择过多的字段。
尽可能将 group by 字段类型保持一致,以减少数据转换的开销。
如果可能,可以将 group by 字段进行哈希分区,以减少数据传输和处理的开销。
如果使用的是字符串类型,可以考虑使用哈希函数来减少字符串比较的开销。
12、优化倾斜连接
数据偏斜会严重降低联接查询的性能。此功能通过将倾斜的任务拆分(按需复制)为大小大致相等的任务来动态处理排序合并联接中的倾斜。同时启用 spark.sql.adaptive.enabled 和 spark.sql.adaptive.skewJoin.enabled 配置时,此选项才生效。
评论