优化和调整 Spark 应用程序 (七)
写在前面:
大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。
业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。
想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。
内推信息
如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。
免费学习资料
如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!
学习交流群
如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。
在上一章中,我们详细介绍了如何在 Java 和 Scala 中使用数据集。我们探讨了 Spark 如何管理内存以将数据集结构作为其统一的高级 API 的一部分,并考虑了与使用数据集相关的成本以及如何降低这些成本。
除了降低成本外,我们还想考虑如何优化和调整 Spark。在本章中,我们将讨论一组启用优化的 Spark 配置,查看 Spark 的一系列 join 策略,并检查 Spark 用户界面,寻找有关不良行为的线索。
优化和调整 Spark 效率
虽然 Spark 有许多可供调优的配置,但这本书将只涵盖少数最重要和通常被调优的配置。要获得按功能主题分组的完整列表,可以阅读官网文档。
查看和设置 Apache Spark 配置
你可以通过三种方式获取和设置 Spark 属性。首先是通过一组配置文件。在部署 $SPARK_HOME 目录(安装 Spark 的位置)中,有许多配置文件:conf / spark-defaults.conf.template,conf / log4j.properties.template 和 conf / spark-env.sh.template。更改这些文件中的默认值,保存为不带.template 后缀的配置文件。这样一来 spark 会自动加载修改后的配置文件,使得修改后的值生效。
conf / spark-defaults.conf 文件中修改后的配置适用于 Spark 集群以及所有提交给集群的 Spark 应用程序。
第二种方法是使用 spark-submit 命令,在提交应用程序的时候使用--conf 标识符直接在 Spark 应用程序中或在命令中指定 Spark 配置,如下所示:
spark-submit --conf spark.sql.shuffle.partitions = 5
--conf “ spark.executor.memory = 2g”
--class main.scala.chapter7.SparkConfig_7_1 jars / main-
scala-chapter7_2.12-1.0.jar
下面是在 Spark 应用程序本身中调整配置的方法:
// In Scala
import org.apache.spark.sql.SparkSession
def printConfigs(session: SparkSession) = {
// Get conf
val mconf = session.conf.getAll
// Print them
for (k <- mconf.keySet) { println(s"${k} -> ${mconf(k)}\n") }
}
def main(args: Array[String]) {
// Create a session
val spark = SparkSession.builder
.config("spark.sql.shuffle.partitions", 5)
.config("spark.executor.memory", "2g")
.master("local[*]")
.appName("SparkConfig")
.getOrCreate()
printConfigs(spark)
spark.conf.set("spark.sql.shuffle.partitions",
spark.sparkContext.defaultParallelism)
println(" ****** Setting Shuffle Partitions to Default Parallelism")
printConfigs(spark)
}
spark.driver.host -> 10.8.154.34
spark.driver.port -> 55243
spark.app.name -> SparkConfig
spark.executor.id -> driver
spark.master -> local[*]
spark.executor.memory -> 2g
spark.app.id -> local-1580162894307
spark.sql.shuffle.partitions -> 5
第三个选项是通过 Spark shell 的编程接口实现的。与 Spark 中的所有其他内容一样,API 是交互的主要方法。通过 SparkSession 对象,你可以访问大多数 Spark 配置。
在 Spark REPL 中,例如,这个 Scala 代码显示在本地主机上 Spark 以本地模式启动的 Spark 配置(详情上可用的不同的模式,请参阅第一章中的“部署模式”):
// In Scala
// mconf is a Map[String, String]
scala> val mconf = spark.conf.getAll
...
scala> for (k <- mconf.keySet) { println(s"${k} -> ${mconf(k)}\n") }
spark.driver.host -> 10.13.200.101
spark.driver.port -> 65204
spark.repl.class.uri -> spark://10.13.200.101:65204/classes
spark.jars ->
spark.repl.class.outputDir -> /private/var/folders/jz/qg062ynx5v39wwmfxmph5nn...
spark.app.name -> Spark shell
spark.submit.pyFiles ->
spark.ui.showConsoleProgress -> true
spark.executor.id -> driver
spark.submit.deployMode -> client
spark.master -> local[*]
spark.home -> /Users/julesdamji/spark/spark-3.0.0-preview2-bin-hadoop2.7
spark.sql.catalogImplementation -> hive
spark.app.id -> local-1580144503745
你还可以仅查看特定于 Spark SQL 的 Spark 配置:
// In Scala
spark.sql("SET -v").select("key", "value").show(5, false)
# In Python
spark.sql("SET -v").select("key", "value").show(n=5, truncate=False)
+------------------------------------------------------------+
|key |value |
+------------------------------------------------------------+
|spark.sql.adaptive.enabled |false |
|spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin |0.2 |
|spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled|true |
|spark.sql.adaptive.shuffle.localShuffleReader.enabled |true |
|spark.sql.adaptive.shuffle.maxNumPostShufflePartitions |<undefined>|
+------------------------------------------------------------+
only showing top 5 rows
另外,你可以通过 Spark UI 的“Environment”选项卡访问 Spark 的当前配置,只不过是作为只读值,我们将在本章稍后讨论该选项卡,如图 7-1 所示。
要以编程方式设置或修改现有配置,请首先检查该属性是否可修改。spark.conf.isModifiable(“<config_name>”)将返回 true 或 false。所有可修改的配置都可以使用 API 设置为新的值。
// In Scala
scala> spark.conf.get("spark.sql.shuffle.partitions")
res26: String = 200
scala> spark.conf.set("spark.sql.shuffle.partitions", 5)
scala> spark.conf.get("spark.sql.shuffle.partitions")
res28: String = 5
# In Python
>>> spark.conf.get("spark.sql.shuffle.partitions")
'200'
>>> spark.conf.set("spark.sql.shuffle.partitions", 5)
>>> spark.conf.get("spark.sql.shuffle.partitions")
'5'
在设置 Spark 属性的所有方式中,优先级顺序决定采用哪些值。优先级最低的是 spark-defaults.conf 中定义的配置项的值或标志,其次读取 spark-submit 命令行中传递的配置项的值或标志,最后读取 SparkSession 在 Spark 应用程序中通过 SparkConf 设置的值或标志。总结下来优先级的高低为:配置文件 < spark-submi 命令 < 程序配置。最终所有这些属性都会被合并,并且优先在 Spark 应用程序中重置的所有重复属性。同样,命令行上提供的配置项的值将替换配置文件中对应配置项的设置,前提是这些值不会被应用程序中的相同配置覆盖。
调整或设置正确的配置有助于提高性能,正如你将在下一节中看到的那样。这里的建议来自社区中从业人员的经验,着重于如何最大程度地利用 Spark 的集群资源以适应大规模工作负载。
扩展 Spark 以应对高负载
大型 Spark 工作负载通常是批处理工作,有些工作是每晚执行的,有些则是每天定期执行的。无论哪种情况,这些作业都可能处理数十个 TB 字节甚至更多的数据。为了避免由于资源匮乏或性能逐渐下降而导致作业失败,可以启用或更改一些 Spark 配置。这些配置影响三个 Spark 组件:Spark 驱动程序,Executor 和在 Executor 上运行的 shuffle 服务。
Spark 驱动程序的职责是与集群管理器协调,从而在集群中启动 Executor,并在其上调度 Spark 任务。对于大型工作负载,你可能有数百个任务。本节说明了一些可以调整或启用的配置,以优化资源利用率,并行化任务从而避免大量任务的瓶颈。一些优化想法和见解来自诸如 Facebook 之类的大数据公司,这些公司以 TB 的数据规模使用 Spark,并在 Spark + AI Summit 大会上与 Spark 社区共享了这些优化方式和见解。
静态与动态资源分配
当你像之前一样将计算资源指定为 spark-submit 命令行参数时,相当于把资源配置写死了。这意味着,如果由于工作负载超出预期而导致以后在驱动程序中排队任务时需要更多资源,Spark 将无法容纳或分配额外的资源。
相反,如果你使用 Spark 的动态资源分配配置,则随着大型工作负载的需求不断增加或减少,Spark 驱动程序可以请求更多或更少的计算资源。在工作负载是动态的情况下(即,它们对计算能力的需求各不相同),使用资源动态分配有助于解决突然出现峰值的情况。
一个有用的用例是流,其中数据流量可能不均匀。另一个是按需数据分析,在高峰时段你可能会有大量的 SQL 查询。启用动态资源分配可以使 Spark 更好地利用资源,在不使用 executor 时释放它们,并在需要时获取新的 executor。
以及在处理大型或变化的工作负载时,动态分配在多租户环境中也很有用,在该环境中,Spark 可以与 YARN,Mesos 或 Kubernetes 中的其他应用程序或服务一起部署。但是请注意,Spark 不断变化的资源需求可能会同时影响其他需要资源的应用程序。
要启用和配置动态分配,可以使用如下设置。注意这里的数字是任意的;适当的设置将取决于你的工作负载的性质,因此应进行相应的调整。其中一些配置无法在 Spark REPL 内设置,因此你必须以编程方式进行设置:
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.schedulerBacklogTimeout 1m
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.executorIdleTimeout 2min
默认情况下 spark.dynamicAllocation.enabled 设置为 false。当启用以上显示的设置时,Spark 驱动程序将要求集群管理器启动的时候至少创建两个 Executor 进行初始化(spark.dynamicAllocation.minExecutors--executor 最小值)。随着任务队列积压的增加,每次超过积压超时时间(spark.dynamicAllocation.schedulerBacklogTimeout)时,都会请求新的 Executor。在这种情况下,每当有未调度的待处理任务超过 1 分钟时,驱动程序将请求启动新的 Executor 以调度积压的任务,最多 20 个(spark.dynamicAllocation.maxExecutors)。相反,如果 Executor 完成一项任务并且空闲了 2 分钟(spark.dynamicAllocation.executorIdleTimeout),Spark 驱动程序将终止该任务。
配置 SPARK Executor 的内存和 shuffle 服务
仅启用动态资源分配是不够的。你还必须了解 Spark 如何配置和使用 Executor 内存的,以便 Executor 不会因内存不足而受 JVM 垃圾回收的困扰。
每个 executor 可用的内存由 spark.executor.memory 来控制。如图 7-2 所示,它分为三个部分:execution memory, storage memory, and reserved memory。在保留 300 MB 的预留内存之后,默认内存划分为 60%的 execution memory 和 40%的 storage memory,以防止 OOM 错误。Spark 文档声明此方法适用于大多数情况,但是你可以通过 spark.executor.memory 参数调整你期望的比例。当不使用存储内存时,Spark 可以获取它以供执行内存用于执行目的,反之亦然。
执行内存用于 Spark shuffle,join,排序和聚合。由于不同的查询可能需要不同的内存,因此可用内存的比例(spark.mem ory.fraction 默认为 0.6)可能很难设置一个合适的值,但很容易调整。相比之下,存储内存主要用于缓存用户数据结构和从 DataFrame 派生的分区。
在 map 和 shuffle 操作期间,Spark 会写入和读取本地磁盘的 shuffle 文件,因此 I/O 活动频繁。这可能会导致瓶颈,因为对于大型 Spark 作业,默认配置不理想。知道要如何调整不合理的配置可以减轻 Spark 作业各个阶段的风险。
在表 7-1 中,我们抓取了一些建议的配置来进行调整,以便这些操作过程中的 map、spill 和合并过程不受效率低下的 I/O 所困扰,并使这些操作能够在将最终的 shuffle 分区写入磁盘之前使用缓冲区内存。调整在每个 executor 上运行的 shuffle 服务也有助于提高大型 Spark 工作负载的整体性能。
该表中的建议并非适用于所有情况,但是它们应该使你了解如何根据工作负载来调整这些配置。与性能调整中的所有其他内容一样,你必须进行尝试,直到找到合适的平衡。
最大化 SPARK 并行性
Spark 的效率很大程度上是因为它能够大规模并行运行多个任务。要了解如何最大程度地提高并行度(即尽可能并行读取和处理数据),你必须研究 Spark 如何将数据从存储中读取到内存中以及分区对 Spark 意味着什么。
在数据管理用语中,分区是一种将数据排列成可配置和可读的数据块或磁盘上的连续数据块的方式。这些数据子集可以独立读取或处理,如果有必要,可以通过一个进程中的多个线程并行读取或处理。这种独立性很重要,因为它允许数据处理的大量并行性。
Spark 在并行处理任务方面非常高效。正如你在第 2 章中了解到的那样,对于大规模工作负载,Spark 作业将具有多个阶段,并且在每个阶段内将有许多任务。Spark 最多会为每个内核的每个任务分配一个线程,并且每个任务将处理一个不同的分区。为了优化资源利用并最大程度地提高并行度,理想的情况是分区至少与 Executor 上内核的数量一样多,如图 7-3 所示。如果每个 Executor 上的分区数量多于内核数量,则所有内核都将保持繁忙状态。你可以将分区视为并行性的基本单位:在单个内核上运行的单个线程可以在单个分区上工作。
如何创建分区
如前所述,Spark 的任务将数据从磁盘读取到内存中。磁盘上的数据按块或连续的文件块排列。默认情况下,数据存储上的文件块大小从 64 MB 到 128 MB 不等。例如,在 HDFS 和 S3 上,默认大小为 128 MB(可配置)。这些连续块的集合构成一个分区。
Spark 中分区的大小由 spark.sql.files.maxPartitionBytes 决定。默认值为 128 MB。你可以减小大小,但这可能会导致所谓的“小文件问题”,即许多小分区文件,由于文件系统操作,例如,打开,关闭和列出目录,而引入了过多的磁盘 I/O 和性能下降。在分布式文件系统上可能会很慢。
当你显式使用 DataFrame API 的某些方法时,也会创建分区。例如,在创建大型 DataFrame 或从磁盘读取大型文件时,可以显式指示 Spark 创建一定数量的分区:
// In Scala
val ds = spark.read.textFile("../README.md").repartition(16)
ds: org.apache.spark.sql.Dataset[String] = [value: string]
ds.rdd.getNumPartitions
res5: Int = 16
val numDF = spark.range(1000L * 1000 * 1000).repartition(16)
numDF.rdd.getNumPartitions
numDF: org.apache.spark.sql.Dataset[Long] = [id: bigint]
res12: Int = 16
最后,在 shuffle 阶段创建 shuffle 分区。默认情况下,spark.sql.shuffle.partitions 中的 shuffle 分区的数量设置为 200 。你可以根据拥有的数据集的大小来调整此数值,以减少通过网络发送给 Executor 任务的小分区的数量。
spark.sql.shuffle.partitions 对于较小的工作流或流工作负载的默认值太大;你可能希望将其减小到一个较低的值,例如 executor 上的内核数或更少。
在诸如 groupBy()或 join()的操作(也称为宽转换,wide transformations)期间创建的 shuffle 分区会占用网络和磁盘 I/O 资源。在执行这些操作期间,shuffle 会将结果分发到 spark.local.directory 中指定位置的 Executor 的本地磁盘上。使用高性能的 SSD 磁盘来执行此操作将提高性能。
对于 shuffle 阶段设置的 shuffle 分区数量没有通用的计算公式。该数值可能取决于你的用例、数据集、核数和可用的 executor 内存,这是一种反复试验的方法。
除了为大型工作负载扩展 Spark 外,要提高性能,你还可以考虑缓存或持久存储经常访问的 DataFrames 或表。在下一节中,我们将探讨各种缓存和持久性选项。
数据缓存和持久化
缓存和持久化有什么区别?在 Spark 中,它们是同义词。两个 API 调用 cache()和 persist()提供了这些功能。后者可以更好地控制数据的存储方式和位置——在内存和磁盘中(序列化和非序列化)。两者都有助于提高频繁访问的 DataFrame 或表的性能。
DataFrame.cache()
cache()将在内存允许的范围内存储跨 Spark Executor 读取的所有分区(见图 7-2)。尽管 DataFrame 可能被部分缓存,但是分区不能被部分缓存(例如,如果你有 8 个分区,但内存中只能容纳 4.5 个分区,那么将仅缓存 4 个)。但是,如果不是所有分区都被缓存,则当你要再次访问数据时,必须重新计算未缓存的分区,这会降低 Spark 作业的速度。
让我们看一个示例,该示例说明在访问 DataFrame 时如何缓存大型 DataFrame 可以提高性能:
// In Scala
// Create a DataFrame with 10M records
val df = spark.range(1 * 10000000).toDF("id").withColumn("square", $"id" * $"id")
df.cache() // Cache the data
df.count() // Materialize the cache
res3: Long = 10000000
Command took 5.11 seconds
df.count() // Now get it from the cache
res4: Long = 10000000
Command took 0.44 seconds
第一个 count()实例化了缓存,而第二个 count()访问了缓存,从而使该数据集的访问时间快了近 12 倍。
当你使用 cache()或时 persist(),直到你调用遍历每条记录的操作(例如 count()),DataFrame 才会被完全缓存。如果你使用类似的操作 take(1),则只有一个分区将被缓存,因为 Catalyst 意识到你不必为了检索一条记录而计算所有分区。
观察 DataFrame 如何跨本地主机上的一个 executor 存储,如图 7-4 所示,我们可以看到它们都完全放在了内存中(记住较低级别的 DataFrame 由 RDD 支持)。
DataFrame.persist()
persist(StorageLevel.LEVEL)具有细微差别,可通过 StorageLevel 来控制如何缓存数据的级别。表 7-2 总结了不同的存储级别。磁盘上的数据总是使用 Java 或 Kryo 序列化进行序列化。
每个 StorageLevel(除外 OFF_HEAP)都有一个等价的 LEVEL_NAME_2,这意味着在两个不同的 Spark Executor 上重复两次:MEMORY_ONLY_2,MEMORY_AND_DISK_SER_2 等。虽然此选项使用成本很高,但它允许在两个地方进行数据局部化,从而提供了容错能力,并让 Spark 可以选择将任务调度到数据副本的本地执行。
让我们看与上一节相同的示例,但是使用 persist()方法:
// In Scala
import org.apache.spark.storage.StorageLevel
// Create a DataFrame with 10M records
val df = spark.range(1 * 10000000).toDF("id").withColumn("square", $"id" * $"id")
df.persist(StorageLevel.DISK_ONLY) // Serialize the data and cache it on disk
df.count() // Materialize the cache
res2: Long = 10000000
Command took 2.08 seconds
df.count() // Now get it from the cache
res3: Long = 10000000
Command took 0.38 seconds
从图 7-5 中可以看到,数据保留在磁盘上,而不是内存中。要取消持久化缓存的数据,只需调用 DataFrame.unpersist()。
最后,不仅可以缓存 DataFrame,还可以缓存从 DataFrame 派生的表或视图。使它们在 Spark UI 中更具可读性。例如:
// In Scala
df.createOrReplaceTempView("dfTable")
spark.sql("CACHE TABLE dfTable")
spark.sql("SELECT count(*) FROM dfTable").show()
+--------+
|count(1)|
+--------+
|10000000|
+--------+
Command took 0.56 seconds
何时缓存和持久化
缓存的常见应用场景是重复访问大数据集以进行查询或转换。一些示例包括:
DataFrames 常用于迭代机器学习训练中
DataFrames 频繁被用于 ETL 期间进行频繁转换或建立数据管道
什么时候不缓存和持久化
并非所有用例都规定了需要缓存,有一些场景是不需要访问 DataFrame 的,比如下面的例子:
l DataFrame 太大而内存无法满足需求
l 在 DataFrame 上进行廉价不频繁的转换,而无需考虑它的大小
通常,应谨慎使用内存缓存,因为它可能会导致序列化和反序列化从而导致资源消耗,这取决于所使用的 StorageLevel。
接下来,我们将重点转移到讨论几个常见的 Spark 连接操作上,这些操作会触发高代价的数据移动,要求集群提供计算和网络资源,以及如何通过组织数据来减轻这种移动。
Spark 连接策略
连接操作是大数据分析中一种常见的转换类型,其中两个以表或 DataFrames 形式的数据集通过一个公共的配对键合并。与关系型数据库的表关联类似,Spark DataFrame 和 Dataset API 以及 Spark SQL 提供了一系列连接转换:内部连接,外部连接,左连接,右连接等。所有的这些操作都会触发 Spark Executor 之间的数据移动。
这些转换的核心是 Spark 如何计算和要生成什么数据,以及将相关联的数据写入到磁盘中,以及如何将这些 key 和数据传输到节点上进行一系列操作,如 groupBy(),join(),agg(),sortBy()和 reduceByKey()。以上这些操作我们通常称为 shuffle 操作,也就是常说的“洗牌”。
Spark 具有五种不同的连接策略,通过它可以在 Executor 之间交换,移动,排序,分组和合并数据:
Shuffle Hash Join(SHJ):shuffle 哈希连接
Broadcast Hash Join(BHJ):广播哈希连接
Sort Merge Join(SMJ):排序合并连接
Cartesian Join(CJ):笛卡尔积连接
Broadcast Nested Loop Join(BNLJ):广播嵌套连接
在这里,我们仅关注其中的两种策略(BHJ 和 SMJ),也是我们在开发中最常见的两种连接策略。
广播哈希连接(BHJ)
map-side-only join 也称为“仅在 map 端的连接”,当需要将两个数据集和另一个足够大数据集结合使用时,其中一个数据集较小,适合加载到 Driver 和 Executor 内存中,为了避免大规模数据移动,采用广播哈希连接。使用 Spark 广播变量,较小的数据集由驱动程序广播到所有 Spark Executor,如图 7-6 所示,随后将其与每个 Executor 上的较大数据集合并。这种策略避免了大量的数据交换。
默认情况下,如果较小的数据集小于 10 MB,Spark 将使用广播连接。该配置 spark.sql.autoBroadcastJoinThreshold 进行设置; 你可以根据每个 executor 和驱动程序中的内存大小来进行动态调整。如果你确信有足够的内存,则可以对大于 10 MB(甚至最大 100 MB)的 DataFrame 使用广播连接。
一个常见的用例是,当你在两个 DataFrame 之间拥有一组通用的 key 时,其中一个 DataFrame 包含的信息少于另一个 DataFrame,并且这时候你需要将两者合并成视图。例如,考虑一个简单的情况,你拥有世界各地大量的足球运动员的数据集 playersDF 以及球员所在足球俱乐部的数据集 clubsDF,其中 clubsDF 数据集较小,并且你希望通过一个公共的 key 将两者连接起来:
// In Scala
import org.apache.spark.sql.functions.broadcast
val joinedDF = playersDF.join(broadcast(clubsDF), "key1 === key2")
在此代码中,我们强制 Spark 进行广播连接,但是默认情况下,但如果较小的数据集的大小小于 spark.sql.autoBroadcastJoinThreshold,那么会默认使用这种连接策略。
BHJ 是 Spark 提供的最简单,最快的连接策略,因为它不涉及任何数据集。经过 spark 广播之后,所有数据都可以在本地供 Executor 使用。你只需要确保 Spark 驱动程序和 Executor 都具有足够的内存,就可以将较小的数据集保存在内存中。
在执行该操作之后的任何时间,你都可以通过执行以下命令查看物理计划中执行了哪些连接操作:
joinDF.explain(mode)
在 Spark 3.0 中,你可以使用 joinedDF.explain('mode') 显示一个可读的和易于理解的输出,该模式包括了'simple', 'extended', 'codegen', 'cost'和'formatted'这几种类型。
何时使用广播哈希连接
在以下条件下使用这种类型的连接以获得最大利益:
当较小和较大数据集中的每个键被 Spark 散列到同一分区时
当一个数据集比另一个数据集小得多时(并且在默认配置 10 MB 内;如果有足够的内存,则更多;如果不超过 10 MB,则默认配置为 10 MB)
当你只想执行等值连接时,根据匹配的未排序 key 关联两个数据集
当你不必担心使用过多的网络带宽资源或者 OOM 错误时,因为较小的数据集将广播给所有 Spark Executor
在 Spark 中指定 spark.sql.autoBroadcastJoinThreshold 的值为-1,则会导致 Spark 一直采用 shuffle 排序合并连接策略(SMJ),我们将在下一节中讨论。
Shuffle 排序合并连接(SMJ)
排序合并算法是基于某个相同的 key 合并两个大的数据集的有效方法,该 key 是可排序,唯一的、且可以分配给或存储在同一个分区上,也就是说,两个数据集的公共哈希 key 最终会落在同一分区上。从 Spark 的角度来看,这意味着每个数据集中具有相同 key 的所有行都将散列在同一 Executor 的同一分区上。显然,这意味着数据必须在 Executor 之间进行协调或交换。
顾名思义,此连接方案有两个阶段:排序阶段和合并阶段。排序阶段根据连接的 key 对每个数据集进行排序;合并阶段则是从每个数据集中迭代行中的每个 key,如果两个 key 匹配,则合并这些行。
默认情况下,通过 spark.sql.join.preferSortMergeJoin 启用 SortMergeJoin。以下是本书的 GitHub repo 中可用于独立应用程序 notebook 中的代码段。主要思想是提取两个具有一百万条记录的大型 DataFrame,并将它们通过公共的 key 进行连接,即 uid == users_id。
虽然该数据是合成的,但也能说明了这一点:
// In Scalaimport scala.util.Random
// Show preference over other joins for large data sets
// Disable broadcast join// Generate data...spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
// Generate some sample data for two data sets
var states = scala.collection.mutable.MapInt,String
var items = scala.collection.mutable.MapInt,String
val rnd = new scala.util.Random(42)
// Initialize states and items purchasedstates += (0 -> "AZ", 1 -> "CO", 2-> "CA", 3-> "TX", 4 -> "NY", 5-> "MI")items += (0 -> "SKU-0", 1 -> "SKU-1", 2-> "SKU-2", 3-> "SKU-3", 4 -> "SKU-4", 5-> "SKU-5")
// Create DataFrames
val usersDF = (0 to 1000000).map(id => (id, s"user_{id}'
s"user_${id}@databricks.com", states(rnd.nextInt(5))))
.toDF("uid", "login", "email", "user_state")
val ordersDF = (0 to 1000000)
.map(r => (r, r, rnd.nextInt(10000), 10 * r* 0.2d,
states(rnd.nextInt(5)), items(rnd.nextInt(5)))).
toDF("transaction_id", "quantity", "users_id", "amount", "state", "items")
// Do the join
val usersOrdersDF = ordersDF.join(usersDF, "uid")
// Show the joined resultsusersOrdersDF.show(false)
+--------------+--------+--------+--------+-----+-----+---+---+-------|transaction_id|quantity|users_id|amount|state|items|uid|...|user_state|
+--------------+--------+--------+--------+-----+-----+---+---+-------
|3916 |3916 |148 |7832.0 |CA |SKU-1|148|...|CO |
|36384 |36384 |148 |72768.0 |NY |SKU-2|148|...|CO |
|41839 |41839 |148 |83678.0 |CA |SKU-3|148|...|CO |
|48212 |48212 |148 |96424.0 |CA |SKU-4|148|...|CO |
|48484 |48484 |148 |96968.0 |TX |SKU-3|148|...|CO |
|50514 |50514 |148 |101028.0|CO |SKU-0|148|...|CO |
|65694 |65694 |148 |131388.0|TX |SKU-4|148|...|CO |
|65723 |65723 |148 |131446.0|CA |SKU-1|148|...|CO |
93125 |93125 |148 |186250.0|NY |SKU-3|148|...|CO |
|107097 |107097 |148 |214194.0|TX |SKU-2|148|...|CO |
|111297 |111297 |148 |222594.0|AZ |SKU-3|148|...|CO |
|117195 |117195 |148 |234390.0|TX |SKU-4|148|...|CO |
|253407 |253407 |148 |506814.0|NY |SKU-4|148|...|CO |
|267180 |267180 |148 |534360.0|AZ |SKU-0|148|...|CO |
|283187 |283187 |148 |566374.0|AZ |SKU-3|148|...|CO |
|289245 |289245 |148 |578490.0|AZ |SKU-0|148|...|CO |
|314077 |314077 |148 |628154.0|CO |SKU-3|148|...|CO |
|322170 |322170 |148 |644340.0|TX |SKU-3|148|...|CO |
|344627 |344627 |148 |689254.0|NY |SKU-3|148|...|CO |
|345611 |345611 |148 |691222.0|TX |SKU-3|148|...|CO |
+--------------+--------+--------+--------+-----+-----+---+---+-----
only showing top 20 rows
检查我们的最终执行计划,我们注意到 Spark 使用了 SortMerge Join 来连接两个 DataFrame。该 Exchange 操作是对每个 Executor 上的 map 操作的结果的重新排列:
usersOrdersDF.explain()
== Physical Plan ==
InMemoryTableScan [transaction_id#40, quantity#41, users_id#42, amount#43,
state#44, items#45, uid#13, login#14, email#15, user_state#16]
+- InMemoryRelation [transaction_id#40, quantity#41, users_id#42, amount#43,
state#44, items#45, uid#13, login#14, email#15, user_state#16],
StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(3) SortMergeJoin [users_id#42], [uid#13], Inner
:- *(1) Sort [users_id#42 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(users_id#42, 16), true, [id=#56]
: +- LocalTableScan [transaction_id#40, quantity#41, users_id#42,
amount#43, state#44, items#45]
+- *(2) Sort [uid#13 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(uid#13, 16), true, [id=#57]
+- LocalTableScan [uid#13, login#14, email#15, user_state#16]
此外,Spark UI(我们将在下一节中讨论)显示了整个作业的三个阶段:Exchange 和 Sort 操作在最后阶段进行,然后合并结果,如图 7-7 和 7-8 所示。。这样做交换的成本很昂贵,并且需要在 Executor 之间通过网络对分区进行 shuffle。
优化 shuffle 排序合并连接
如果我们为常见的排序键或列创建分区桶,则可以从该方案中省去 Exchange 步骤。也就是说,我们可以创建大量的存储桶来存储特定的排序列(每个存储桶一个键)。通过这种方式对数据进行预分类和重组可以提高性能,因为它使我们可以跳过昂贵的数据交换操作并直接进行操作 WholeStageCodegen。
在本章 notebook 的以下代码片段中(在本书的 GitHub repo 中可以找到),我们将按连接的 users_id 和 uid 列进行排序和分桶,并将桶以 Parquet 格式保存为 Spark 管理表:
// In Scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
// Save as managed tables by bucketing them in Parquet format
usersDF.orderBy(asc("uid"))
.write.format("parquet")
.bucketBy(8, "uid")
.mode(SaveMode.OverWrite)
.saveAsTable("UsersTbl")
ordersDF.orderBy(asc("users_id"))
.write.format("parquet")
.bucketBy(8, "users_id")
.mode(SaveMode.OverWrite)
.saveAsTable("OrdersTbl")
// Cache the tables
spark.sql("CACHE TABLE UsersTbl")
spark.sql("CACHE TABLE OrdersTbl")
// Read them back in
val usersBucketDF = spark.table("UsersTbl")
val ordersBucketDF = spark.table("OrdersTbl")
// Do the join and show the results
val joinUsersOrdersBucketDF = ordersBucketDF
.join(usersBucketDF, $"users_id" === $"uid")
joinUsersOrdersBucketDF.show(false)
+--------------+--------+--------+---------+-----+-----+---+---+------
|transaction_id|quantity|users_id|amount|state|items|uid|...|user_state|
+--------------+--------+--------+---------+-----+-----+---+---+------
|144179 |144179 |22 |288358.0 |TX |SKU-4|22 |...|CO |
|145352 |145352 |22 |290704.0 |NY |SKU-0|22 |...|CO |
|168648 |168648 |22 |337296.0 |TX |SKU-2|22 |...|CO |
|173682 |173682 |22 |347364.0 |NY |SKU-2|22 |...|CO |
|397577 |397577 |22 |795154.0 |CA |SKU-3|22 |...|CO |
|403974 |403974 |22 |807948.0 |CO |SKU-2|22 |...|CO |
|405438 |405438 |22 |810876.0 |NY |SKU-1|22 |...|CO |
|417886 |417886 |22 |835772.0 |CA |SKU-3|22 |...|CO |
|420809 |420809 |22 |841618.0 |NY |SKU-4|22 |...|CO |
|659905 |659905 |22 |1319810.0|AZ |SKU-1|22 |...|CO |
|899422 |899422 |22 |1798844.0|TX |SKU-4|22 |...|CO |
|906616 |906616 |22 |1813232.0|CO |SKU-2|22 |...|CO |
|916292 |916292 |22 |1832584.0|TX |SKU-0|22 |...|CO |
|916827 |916827 |22 |1833654.0|TX |SKU-1|22 |...|CO |
|919106 |919106 |22 |1838212.0|TX |SKU-1|22 |...|CO |
|921921 |921921 |22 |1843842.0|AZ |SKU-4|22 |...|CO |
|926777 |926777 |22 |1853554.0|CO |SKU-2|22 |...|CO |
|124630 |124630 |22 |249260.0 |CO |SKU-0|22 |...|CO |
|129823 |129823 |22 |259646.0 |NY |SKU-4|22 |...|CO |
|132756 |132756 |22 |265512.0 |AZ |SKU-2|22 |...|CO |
+--------------+--------+--------+---------+-----+-----+---+---+-----
only showing top 20 rows
连接的输出按 uid 和 users_id 做排序,因为我们保存的表是升序排列的。因此,在 SortMergeJoin 期间无需进行排序。查看 Spark UI(图 7-9),我们可以看到我们跳过了 Exchange 并直接转到 WholeStageCodegen。
物理计划还显示,与插入前的物理计划相比,没有执行 Exchange:Exchange 与存储之前的物理计划相比,该物理计划还显示未执行任何操作:
joinUsersOrdersBucketDF.explain()
== Physical Plan ==
*(3) SortMergeJoin [users_id#165], [uid#62], Inner
:- *(1) Sort [users_id#165 ASC NULLS FIRST], false, 0
: +- *(1) Filter isnotnull(users_id#165)
: +- Scan In-memory table `OrdersTbl` [transaction_id#163, quantity#164,
users_id#165, amount#166, state#167, items#168], [isnotnull(users_id#165)]
: +- InMemoryRelation [transaction_id#163, quantity#164, users_id#165,
amount#166, state#167, items#168], StorageLevel(disk, memory, deserialized, 1
replicas)
: +- *(1) ColumnarToRow
: +- FileScan parquet
...
何时使用 shuffle 排序合并连接
在以下条件下使用这种类型的连接以获得最大利益:
当两个大型数据集中的每个键可以排序并通过 Spark 散列到同一分区时。
当你只想执行等值连接,基于匹配的排序键组合两个数据集时。
当你要防止 Exchange 和 Sort 操作,以夸网络节省大量的 shuffle 操作时。
到目前为止,我们已经介绍了与调整和优化 Spark 有关的操作方面,以及 Spark 如何在两次常见的连接操作期间交换数据。我们还演示了如何通过使用桶来避免大量的数据交换从而提高 shuffle 排序合并连接操作的性能。
正如你在前面的图中所看到的,Spark UI 是可以对这些操作进行可视化分析的有效渠道。它显示了收集到的指标和程序状态,揭示了有关可能的性能瓶颈的大量信息以及线索。在本章的最后部分,我们讨论在 Spark UI 中可以查看哪些内容。
查看 Spark UI
Spark 提供了精心设计的 Web UI,使得我们能够检查应用程序的各个组件。它提供了有关内存使用情况、作业、阶段和任务的详细信息,以及事件时间表,日志以及各种指标和统计信息,可让你深入了解 Spark 应用程序中在 Spark 驱动程序级别和单个 Executor 中发生的情况。
spark-submit 作业同时会启动 Spark UI,你可以在本地主机上(在本地模式下)或通过默认端口 4040 上的 Spark 驱动程序(在其他模式下)进行访问。
学习 Spark UI 选项卡
Spark UI 有六个选项卡,如图 7-10 所示,每个选项卡都给我们提供了探索的机会。让我们看一下每个选项卡向我们展示的内容。
本讨论适用于 Spark 2.x 和 Spark 3.0。虽然 Spark 3.0 中的大部分 UI 相同,但它还添加了第七个选项卡,即“Structured Streaming”。我买将在第 12 章中进行预览。
Jobs 和 Stages
正如你在第 2 章中了解到的那样,Spark 将应用程序细分为作业、阶段和任务。通过“Jobs 和 Stages”选项卡,你可以浏览这些内容并向下钻取一个细粒度的级别,以检查各个任务的详细信息。你可以查看它们的完成状态并查看与 I/O、内存消耗以及执行时间等相关的指标。
图 7-11 显示了展开的事件时间线的“Jobs”选项卡,显示了 Executor 何时被添加到集群或从集群中删除了 。它还提供了集群中所有已完成作业的表格列表。“Duration”列表示完成每个作业所花费的时间(由第一列中的 JobID 标识)。如果该时间耗时很长,则表明你需要分析该作业的各个阶段,以查看哪些任务可能会导致延迟。通过这个摘要页面,你还可以访问每个作业的详细信息页面,包括 DAG 可视化和已完成阶段的列表。
Stages”选项卡提供了应用程序中所有作业的所有阶段的当前状态的摘要。你还可以访问每个阶段的详细信息页面,提供有关其任务的 DAG 和指标(图 7-12)。除了其他一些可选的统计信息之外,你还可以看到每个任务的平均持续时间,在垃圾回收(GC)上花费的时间以及读取的 shuffle 字节/记录数。如果从远程 executor 读取 shuffle 数据,则较高的 shuffle 读取阻塞时间会发出 I/O 问题的信号。较高的 GC 时间表示堆上的对象太多(你的 Executor 可能内存不足)。如果一个阶段的最大任务时间远远大于中位数,则可能是由于分区中数据分布不均而导致数据倾斜。让我们找出一些有说服力的现象来说明问题。
你还可以在此页面上看到每个执行者的聚合指标以及每个任务的明细。
Executors
“Executors”选项卡提供为应用程序创建的 Executor 的有关信息。正如你在图 7-13 看到的,你可以深入了解有关资源使用情况(磁盘,内存,内核)、在 GC 上花费的时间以及 shuffle 过程中写入和读取的数据量等详细信息。
除了汇总统计数据,你还可以查看每个 executor 如何使用内存以及用于什么目的。这还有助于当你在 DataFrame 或托管表上使用 cache()或 persist()方法时查看资源使用情况,我们将在下面讨论这些问题。
Storage
在“shuffle 排序合并连接”中的 Spark 代码中,在关联后缓存了两个托管表。如图 7-14 所示,“Storage”选项卡提供了有关应用程序使用 cache()或 persist()方法而缓存的任何表或 DataFrame 的信息。
单击图 7-14 中的“ In-memory table`UsersTbl`”链接,可以进一步了解该表是如何在 1 个 Executor 和 8 个分区上的内存和磁盘上缓存的,这个数字对应于我们为该表创建的桶的数量(请参见图 7-15)。
SQL
通过 SQL 选项卡可以跟踪和查看作为 Spark 应用程序的一部分而执行的 Spark SQL 查询的效果。你可以查看执行查询的时间,执行了哪些作业及其持续时间。例如,在 SortMergeJoin 示例中,我们执行了一些查询;所有这些查询都显示在图 7-16 中,其链接可以进一步钻取。
单击查询描述将显示所有物理操作的执行计划的详细信息,如图 7-17 所示。根据该计划,在这里,每个物理运算符 Scan In-memory table、HashAggregate 和 Exchange 都是 SQL 指标。
当我们要检查物理操作符的详细信息并探索发生了什么事情:扫描了多少行,写入了多少 shuffle 字节等等,这些度量标准非常有用。
Environment
如图 7-18 所示,“Environment”选项卡与其他选项卡一样重要。了解你的 Spark 应用程序运行的环境,会发现许多对故障排除有用的线索。实际上,必须知道设置了哪些环境变量,包括了哪些 jar,设置了哪些 Spark 属性(以及它们各自的值,特别是如果你对“优化和调整 Spark 效率”中提到的某些配置进行了调整),设置什么系统属性,使用哪种运行时环境(例如 JVM 或 Java 版本)等。所有这些只读详细信息都是非常重要的信息,如果你发现 Spark 应用程序中有任何异常行为,可以以此作为依据进行排查和调整。
SPARK 应用程序调试
在本节中,我们浏览了 Spark UI 中的各个选项卡。如你所见,UI 提供了大量信息,可用于调试和解决 Spark 应用程序中的问题。除了我们在这里介绍的内容之外,它还提供对驱动程序和 Executor stdout / stderr 日志的访问,在其中你可能已记录了部分调试信息。
通过 UI 进行调试与在你最喜欢的 IDE 中逐步执行应用程序不同,过程更像侦查,跟踪线索,尽管你更喜欢这种方法,也可以在本地诸如 IntelliJ IDEA 之类的 IDE 中调试 Spark 应用程序。
“ Spark 3.0 UI”选项卡显示了发生情况的有价值的线索,以及访问驱动程序和 Executor stdout / stderr 的日志,你可能已在其中记录了某些调试信息。
最初,大量的信息可能会使新手不知所措。但是随着时间的流逝,你将了解在每个选项卡中查找的内容,并且可以更快地检测和诊断异常。这样的调试模式将变得清晰明了,在运行一些 Spark 示例后,通过经常访问这些选项卡并熟悉它们,你将习惯于通过 UI 调整和检查 Spark 应用程序。
总结
在本章中,我们讨论了许多用于优化 Spark 应用程序的优化技术。如你所见,通过调整一些默认的 Spark 配置,可以改善大型工作负载的伸缩性,增强并行性,并最大程度地减少 Spark Executor 之间的内存不足。你还可以了解如何使用适当级别的缓存和持久化策略来加快对常用数据集的访问,并且我们研究了 Spark 使用的两个常用连接进行复杂聚合,并演示了 DataFrames 如何按 key 排序进行分桶,借此跳过 shuffle 操作。
最后,为了更直观地了解性能,Spark UI 提供了可视化界面。尽管 UI 内容丰富且详细,但它并不等效于 IDE 中的逐步调试。但是我们展示了如何通过 Spark UI 的六个选项卡检查和收集指标和统计数据,包括计算和内存使用数据以及 SQL 查询执行跟踪等信息。
在下一章中,我们将深入探讨结构化流,并向你展示在前几章中了解到的结构化 API 如何使你连续地编写流应用程序和批处理应用程序,从而使你能够构建可靠的数据湖和管道。
版权声明: 本文为 InfoQ 作者【数据与智能】的原创文章。
原文链接:【http://xie.infoq.cn/article/28b0538c0fb68cc2aee7a336d】。文章转载请联系作者。
评论