Spark 入门介绍与基础案例 (二)
写在前面:
大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。
业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。
想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。
内推信息
如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。
免费学习资料
如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!
学习交流群
如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。
在本章中,你可以收获如何设置 Spark,并通过三个简单步骤来启动你编写的第一个独立的应用程序。
我们将使用本地模式,在单节点服务上安装 Spark 服务,本章内容中所有数据处理脚本会通过 Spark Shell 实现,这是学习框架的简单方法,这样的好处是能够为执行 Spark 迭代操作快速得到结果。使用 Spark shell 的优点是简单快速,而且可以在编写复杂的 Spark 应用程序之前使用小数据集进行 Spark 计算和验证测试,而不需要去编写复杂的应用程序,打包提交再执行,但是在实际的工作中,对于大型数据集还是需要依赖分布式处理的并行计算能力,本地模式就显得不适合,通常你需要使用 YARN 或 Kubernetes 部署模式。
虽然 Spark shell 只支持 Scala、Python 和 R,但是你可以使用其他任何受支持的语言(包括 Java)编写 Spark 应用程序,并在 Spark SQL 中发出查询。我们确实希望你能熟悉你所选择的语言。
第一步:下载 Apache Spark 安装包
在开始之前,请转到 Spark 下载页面,从步骤 2 的下拉菜单中选择“预构建的 Apache Hadoop2.7”安装包,然后在步骤 3 中点击“下载 Spark”链接(图 2-1)。
然后就能够下载到 spark-3.0.0-preview2-bin-hadoop2.7.tgz 文件,它包含了在本地模式上运行 Spark 所需要的所有与 hadoop 相关的二进制文件。或者,如果要将其安装在已经具有 HDFS 或 Hadoop 的环境上,则可以从下拉菜单中选择和 Hadoop 版本适配的部署包。至于如何从源代码进行构建这个问题不在本书讨论的范围,但你可以在文档中阅读更多关于它的内容。
当这本书快要出版时,Apache Spark3.0 仍然处于预览模式,但是你可以使用相同的下载方法和说明下载最新的 Spark3.0。
自从 Apache Spark2.2 发布以来,开发人员只关心在 Python 中学习 Spark,通过从 PyPI 仓库中安装 PySpark。如果你纯粹使用 Python 编写程序,则不必安装运行 Scala、Java 或 R 所需的所有其他软件库;这样可以使得二进制文件更加轻量化,想要通过 PyPI 安装 PySpark,只要运行 pip install pyspark 指令即可。
还可以为 SQL、ML 和 MLlib 安装一些额外的依赖项,通过 pip install pyspark[sql、ml、mllib]或者 pip install pyspark pyspark[sql](如果只需要 SQL 依赖项的话)安装依赖。
你需要在你的计算机上安装 Java 8 或更高版本,并设置 JAVA_HOME 环境变量。有关如何下载和安装 Java 的说明,请参阅本文档。
如果你想要在 shell 模式下运行 R,则必须安装 R 依赖,然后运行 sparkR 脚本。此外你还可以使用 R 社区的开源项目 sparklyr 从而使用 R 进行分布式计算。
Spark 的目录和文件
我们假设你在你的本地计算机或集群上运行了 Linux 或 macOS 操作系统,本书中所有关于命令的使用和说明都是在上面运行的。一旦你完成了 Spark 安装包下载,cd 到下载的目录,用 tar -xf spark-3.0.0-preview2-binhadoop2.7.tgz 解压成安装目录,然后 cd 到该目录,并查看内容:
让我们简要说明一下这些文件和目录的用途和目的。在 Spark 2.x 和 3.0 中添加了新项目,并且还对一些现有文件和目录的内容进行了更改:
README.md:
使用说明,文件包含有关如何使用 Spark Shell、从源代码构建 Spark、运行独立的 Spark 示例、Spark 文档详细链接和配置指南以及贡献 Spark 的详细说明。
bin:
顾名思义,此目录包含用于与 Spark 交互的大多数可执行脚本,包括 Spark Shell(spark-sql、 pyspark、spark shell 和 sparkR)。在本章后面,我们将使用此目录中的这些 shell 和可执行文件,使用 spark 提交一个独立的 Spark 应用程序,并编写脚本构建和推送 Docker 映像,在支持 Spark 运行在 Kubernetes 上。
sbin:
此目录中的大多数脚本都是管理集群的,用于在各种部署模式下启动和停止集群中的 Spark 服务。有关部署模式的详细信息,请参见第 1 章表 1-1 中的表格。
Kubernetes:
自 Spark2.4 发布以来,此目录包含用于在 Kubernetes 上创建 Spark 分布式服务的 DockerFile 镜像文件。同时它还包含一个说明文件,提供关于如何在构建 Docker 镜像之前构建分布式 Spark 的说明。
data:
此目录下包含了许多以*.txt 结尾的数据文件,这些文件可以作为 Spark 组件的测试数据:MLlib、Structured Streaming 和 GraphX。
examples:
对于任何开发人员来说,快速学习一个新的平台需要冲两个地方入手,第一个就是示例代码,还有就是比较全面的使用文档,具备这两个条件能够让我们更快的上手使用。Spark 提供了 Java、Python、R 和 Scala 的示例,你完全可以在学习框架时使用它们。我们将在本章和后续章节中介绍其中的一些例子。
第二步:使用 Scala 或者 PySpark Shell
如前所述,Spark 提供了四个广泛使用的解释器,类似于交互式“ Shell”,并实现即席查询数据分析,包括 pyspark、Spark shell、spark sql 和 spark R。在许多方面,如果你能够使用 Python、Scala、R、SQL、Unix 操作系统 Shell(如 bash 或 Bourneshell),它们的交互性会类似于你已经熟悉的 shell。
这些 shell 已经得到增强以支持连接到集群,并允许你将分布式数据加载到 Spark Executor 的内存中。无论你处理的是 GB 数据还是小型数据集,Spark Shell 都有助于快速学习 Spark。
要启动 PySpark,cd 到 bin 目录,并通过 pySpark 启动 shell。如果你已经从 PyPI 安装了 PySpark,那么只需输入 pySpark 就足够了:
$ pyspark
Python 3.7.3 (default, Mar 27 2019, 09:23:15)
[Clang 10.0.1 (clang-1001.0.46.3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
20/02/16 19:28:48 WARN NativeCodeLoader: Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.0-preview2
/_/
Using Python version 3.7.3 (default, Mar 27 2019 09:23:15)
SparkSession available as 'spark'.
>>> spark.version
'3.0.0-preview2'
>>>
要使用 Scala 启动类似的 Spark shell,cd 到 bin 目录执行 spark-shell 指令.
$ spark-shell
20/05/07 19:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://10.0.1.7:4040
Spark context available as 'sc' (master = local[*], app id = local-1581910231902)
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.0-preview2
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.version
res0: String = 3.0.0-preview2
scala>
使用本地机器
现在你已经在本地计算机上下载并安装了 Spark,在本章的其余部分中,你将在本地使用 Spark Shell。也就是说,Spark 将以本地模式运行。
有关在本地模式中运行的提醒,请参见第 1 章的表 1-1。
如上一章所述,Spark 计算被表示为算子操作。然后将这些操作转换为基于 RDD 的任务集,并分发给 Spark 的 Executor 执行计算。
让我们看看一个简短的例子,读取文本文件返回一个 DataFrame,显示已读取的字符串,并计算文件中的总行数。这个简单的示例说明了高级结构化 API 的使用,我们将在下一章中介绍它。DataFrame 上的 show(10,false)操作仅显示前 10 行而不进行截断;默认情况下,截断布尔标志为 true。以下是 Scala Shell 的语法结构:
scala> val strings = spark.read.text("../README.md")
strings: org.apache.spark.sql.DataFrame = [value: string]
scala> strings.show(10, false)
+------------------------------------------------------------------------------+
|value |
+------------------------------------------------------------------------------+
|# Apache Spark |
| |
|Spark is a unified analytics engine for large-scale data processing. It |
|provides high-level APIs in Scala, Java, Python, and R, and an optimized |
|engine that supports general computation graphs for data analysis. It also |
|supports a rich set of higher-level tools including Spark SQL for SQL and |
|DataFrames, MLlib for machine learning, GraphX for graph processing, |
| and Structured Streaming for stream processing. |
| |
|<https://spark.apache.org/> |
+------------------------------------------------------------------------------+
only showing top 10 rows
scala> strings.count()
res2: Long = 109
scala>
非常简单,让我们看看使用 Python 交互式 shell pyspark 的一个类似示例:
$ pyspark
Python 3.7.3 (default, Mar 27 2019, 09:23:15)
[Clang 10.0.1 (clang-1001.0.46.3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal
reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/01/10 11:28:29 WARN NativeCodeLoader: Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.0-preview2
/_/
Using Python version 3.7.3 (default, Mar 27 2019 09:23:15)
SparkSession available as 'spark'.
>>> strings = spark.read.text("../README.md")
>>> strings.show(10, truncate=False)
+------------------------------------------------------------------------------+
|value |
+------------------------------------------------------------------------------+
|# Apache Spark |
| |
|Spark is a unified analytics engine for large-scale data processing. It |
|provides high-level APIs in Scala, Java, Python, and R, and an optimized |
|engine that supports general computation graphs for data analysis. It also |
|supports a rich set of higher-level tools including Spark SQL for SQL and |
|DataFrames, MLlib for machine learning, GraphX for graph processing, |
|and Structured Streaming for stream processing. |
| |
|<https://spark.apache.org/> |
+------------------------------------------------------------------------------+
only showing top 10 rows
>>> strings.count()
109
>>>
要退出任何 Spark Shell,请按 Ctrl-D。如所见,与 Spark shell 的快速交互不仅有利于快速学习,而且有利于快速原型化,测试数据非常方便。
在前面的示例中,请注意跨 Scala 和 Python 之间的 API 语法和签名奇偶校验。在 Spark 从 1.x 演变的整个过程中,这是持续改进的功能点之一.
还要注意,我们使用高级结构化 API 将文本文件读取到 Spark DataFrame 中,而不是 RDD。在整本书中,我们将更加关注这些结构化的 API;因为到了 Spark2.x,rdd 被封装成低级 API 使用,更加通用。
在高级结构化 API 中表示的每个计算都被拆分为低级且优化后并生成 RDD 操作,然后转换为 Executor 的 JVM 的 Scala 字节码。由此生成的 RDD 操作代码不提供用户访问,它和面向用户的 RDD 应用程序也不一样。
第三步:理解 Spark 应用的概念
现在你已经下载了 Spark,以独立模式安装在本地电脑上,同时启动了 Spark shell,并以交互方式执行了一些简短的代码示例,接下来就是需要将这些内容整合成完整的应用程序。
为了更好地了解我们的示例代码中涉及的一起操作和原理,你需要熟悉 Spark 应用程序的一些关键概念和如何跨 Spark Executor 作为任务将代码逻辑做转换和执行。我们将首先定义一些重要的术语:
Application(应用):
也就是用户编写的 Spark 应用程序/代码,包含了 Driver 功能代码和分布在集群中多个节点上运行的 Executor 代码。
SparkSession:
它为用户提供了一个统一的切入点来使用 Spark 的各项功能,并允许使用它的 API 对 Spark 进行编程。在交互式 Spark shell 中,Spark 驱动程序会为你实例化一个 SparkSession,而在 Spark 应用程序中,你需要自己创建一个 SparkSession 对象。
Job:
作业,由多个 task 组成的一个并行计算, 这些 task 产生自一个 Spark action (比如, save, collect) 操作. 在 driver 的日志中可以看到 job 这个术语.。
Stage:
阶段,每个 job 被分解为多个 stage, 每个 stage 其实就是一些 task 的集合, 这些 stage 之间相互依赖 (与 MapReduce 中的 map 与 reduce stage 类似)。
Task:
任务,stage 根据数据的分区个数又可以具体分为多个 task,task 可以并行执行,所以 stage 又称为 taskSet(工作集合)。让我们更详细地了解这些概念。
Spark 应用程序和 SparkSession
Spark 驱动程序(Driver)是每个 Spark 应用程序的核心,它负责创建 SparkSession 对象。使用 Spark Shell 时,驱动程序是 shell 的一部分,并为你创建 SparkSession 对象(通过变量 Spark 访问),正如你在启动 shell 之前的示例中看到的那样。
在这些示例中,因为你在本地电脑上启动了 Spark shell,所以所有操作都在本地一个 JVM 中运行。但是你可以基于本地模式轻松地启动 Spark shell 继而并行分析数据。spark shell --help 命令或 pyspark --help 命令将向你介绍如何连接到 Spark 集群管理器。图 2-2 显示了在完成此操作后,Spark 在集群上的执行方式。
一旦有了 SparkSession,你就可以使用 API 编程 Spark 以执行 Spark 操作。
Spark Jobs
在与 Spark shell 的交互式 Session 期间,驱动程序会将 Spark 应用程序转换为一个或多个 Spark 作业(图 2-3)。然后,它会将每个作业转换为一个 DAG 图。这本质上是 Spark 的执行计划,其中 DAG 中的每个节点都可以是单个或多个 Spark 阶段。
Spark Stages
作为 DAG 节点的一部分,根据 RDD 的依赖关系,可以分为宽依赖和窄依赖,根据宽依赖将 DAG 图分为多个 stage。其中宽依赖一般涉及 shuffle 操作,所以也可以认为 shuffle 是 stage 的一个划分依据,stage 是根据每个可以序列或并行形成的操作来创建(如图 2-4)。并非所有的 Spark 操作都可以在一个阶段进行,因此它们可以分为多个阶段。通常在算子的计算边界上划分,在 Spark Executor 之间控制数据传输。
Spark Tasks
每个阶段由 Spark Task(执行单元)组成,然后跨 Spark Executor 联合计算;每个任务映射到一个核心,在一个数据分片上计算(图 2-5)。因此,具有 16 个核的 Executor 可以有 16 个或更多任务并行处理 16 个或更多分区,使得 Spark 任务可以并行执行!
转换、操作和懒加载(transformation、action、Lazy Evaluation)
对分布式数据集来说 Spark 操作可以分为两种类型:转换和操作。转换,顾名思义,可以将一个 Spark DataFrame 转换为一个新的 DataFrame 而不改变原始数据,从而赋予它不可变性的特性。换句话说,select() 或 filter()等操作不会更改原始 DataFrame,而是将转换后的操作结果返回为新的 DataFrame。
值得一提的是所有的转换都是懒性计算的,也就是说懒加载,它们并不会直接计算出来结果,而是作为一种血缘关系被记录或记住。记录的血缘关系允许 Spark 在以后的执行计划中重排某些转换、控制分区数量或将优化 stage 转换操作,以实现更高效的执行。懒加载是 Spark 延迟执行的策略,所有的转换操作只有当调用 action 操作时才会触发结果计算(从磁盘读取或写入磁盘)。
一个操作会触发对所有记录的转换的惰性计算。在图 2-6 中,记录所有转换 T,直到调用动作 A。每个转换 T 都会产生一个新的 DataFrame。
虽然懒加载策略允许 Spark 通过查看链式转换来优化查询,但血缘关系和数据不变性提供了容错能力。因为 Spark 记录了其血缘关系中的每个转换操作,而 DataFrame 在转换之间是不可变的,所以它可以通过重放记录的血缘关系来重现其原始状态,使它在发生故障时具有弹性,也就是容错性。
表 2-1 列出了一些转换和操作的示例。
这些操作和转换有助于创建 Spark 查询计划,我们将在下一章中介绍。在调用 action 操作之前,不会执行查询计划中的任何操作。在下面 Python 和 Scala 的示例中,有两个转换操作-read()和 filter(),还有一个 count()操作,count 操作将触发对作为查询执行计划的一部分而记录的转换操作。在本示例中,在 shell 中执行 filtered.count()之前不会发生结果计算:
# In Python
>>> strings = spark.read.text("../README.md")
>>> filtered = strings.filter(strings.value.contains("Spark"))
>>> filtered.count()
// In Scala
scala> import org.apache.spark.sql.functions._
scala> val strings = spark.read.text("../README.md")
scala> val filtered = strings.filter(col("value").contains("Spark"))
scala> filtered.count()
res5: Long = 20
窄依赖和宽依赖的转换
如前所述,转换是指对 Spark 进行惰性计算的操作。懒加载方案的一个巨大提升是,Spark 可以检查你的计算执行计划,并确定如何优化它。这种优化可以通过连接或流水线化某些操作并将它们分配到一个阶段,或者通过确定哪些操作需要跨集群进行洗牌或交换数据,将它们分成各个阶段来实现。
转换可以被归类为具有窄依赖或宽依赖关系。任何可以从单个输入分区计算出单个输出分区的转换都是一个窄依赖的转换。例如,在前面的代码段中,filter()和 contains()表示窄依赖转换操作,因为它们可以在单个分区上操作并产生结果输出分区,而无需任何数据交换。
但是,groupBy()或者 orderBy()在 spark 中属于宽依赖操作,对来自其他分区的数据进行读取、组合最后写入磁盘。由于每个分区都有自己的计数,即在其数据行中包含“Spark”单词,因此一个 count(groupBy())操作会强制对来自集群中每个 Executor 分区的数据洗牌。在此转换中,orderBy()操作需要其他分区的输出才能完成最终聚合计算。
图 2-7 说明了这两种类型的依赖关系。
Spark UI 界面
Spark 包括一个图形用户界面,你可以用于检查或监视 Spark 应用程序的各个分解阶段,即作业、阶段和任务。根据 Spark 的部署方式,驱动程序将启动默认在端口 4040 上启动 WebUI,在页面上你可以查看各个指标项以及详细信息,例如:
l 调度程序的阶段和任务的列表
l RDD 大小和内存的摘要
l 有关运行环境的信息
l 有关正在运行的 Executor 的信息
l 所有的 Spark SQL 查询
启动 spark-shell 时,部分输出日志会显示要在端口 4040 访问的本地主机 URL。
让我们检查上一节中的 Python 示例是如何转换为作业、阶段和任务的。要查看 DAG 的可视化界面,请单击 Web 用户界面中的“DAG Visualization”。如图 2-8 所示,驱动程序创建了一个作业和一个阶段。
请注意,因为只有一个阶段,所以 Executor 之间的数据不需要进行数据交换。Spark 的各个操作都会显示在蓝色的方框中。
阶段 0 由一个任务组成。如果你有多个任务,则它们将被并行执行。你可以在“Stages”选项卡中查看每个阶段的详细信息,如图 2-9 所示。
我们将在第 7 章中更详细地介绍 SparkUI。现在,请注意,UI 界面为 Spark 的内部工作提供了一个显微透镜,作为调试和检查的工具。
Databricks 是一家在云中提供托管的 Apache Spark 平台的公司。除了使用本地机器在本地模式下运行 Spark 外,你还可以使用免费的 Databricks 社区版来尝试本章和其他章节中的一些示例(图 2-10)。作为 Apache Spark 的学习工具,社区版有许多值得注意的教程和示例。除了使用 Python、R、Scala 或 SQL 编写自己的 notebook 指外,还可以导入其他 notebook,包括 Jupyter notebook。
要获取帐户,请访问https://databricks.com/try并按照说明免费试用社区版。注册后,你可以从其 GitHub 仓库中导入本书的 notebook。
运行第一个 Spark 应用程序
为了便于学习和探索,Spark 包为 Spark 的每个组件提供了一组样本应用程序。欢迎你浏览安装位置的示例目录,了解可用内容。
从本地计算机上的安装目录中,提供了几个简单的 Java 和 Scala 样例,可以使用命令运行 bin/run example <class> [params]。例如:
$ ./bin/run-example JavaWordCount README.md
运行之后控制台上显示输出消息,以及 readme.md 文件中每个单词的列表及其计数(统计单词数量是分布式计算领域的“Hello,World”)。
统计巧克力豆的数量
在上一个示例中,我们计算了一个文件中的单词。如果文件很大,它会被划分成小的数据块分发到集群各个节点中,我们的 Spark 应用程序将计算任务进行分发,在每个分区中计算每个单词的数量,并返回最终的聚合计数结果。但这个例子已经有点陈词滥调了。
如果让我们解决类似的问题,但是使用更大的数据集并使用更多的 Spark 的分布式功能和 DataFrame API。我们将在后面的章节中介绍在这个 Spark 应用程序中使用的 API,但现在我们先继续往下阅读。
这本书的作者中有一位数据科学家,他喜欢烤饼干,并加入巧克力豆,然后经常奖励她在美国不同洲际的学生,她经常用一批这些饼干形象类比机器学习和数据科学课程。但她显然是通过数据驱动的,很明显,她希望确保自己为不同州的学生在分配到对应颜色巧克力豆的饼干(如图 2-11),每种颜色代表一个州的学生。
接下来让我们编写一个 Spark 程序,用于读取超过 100000 个条目(每行都具有<state、mnm_color、count>),并计算和聚合每种颜色和洲际的计数。这些汇总的计数结果告诉我们每个州学生喜欢的巧克力豆颜色。在示例 2-1 中提供了完整的 Python 实现逻辑。
Example 2-1. Counting and aggregating M&Ms (Python version)
# Import the necessary libraries.
# Since we are using Python, import the SparkSession and related functions
# from the PySpark module.
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: mnmcount <file>", file=sys.stderr)
sys.exit(-1)
# Build a SparkSession using the SparkSession APIs.
# If one does not exist, then create an instance. There
# can only be one SparkSession per JVM.
spark = (SparkSession
.builder
.appName("PythonMnMCount")
.getOrCreate())
# Get the M&M data set filename from the command-line arguments
mnm_file = sys.argv[1]
# Read the file into a Spark DataFrame using the CSV
# format by inferring the schema and specifying that the
# file contains a header, which provides column names for comma-
# separated fields.
mnm_df = (spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(mnm_file))
# We use the DataFrame high-level APIs. Note
# that we don't use RDDs at all. Because some of Spark's
# functions return the same object, we can chain function calls.
# 1. Select from the DataFrame the fields "State", "Color", and "Count"
# 2. Since we want to group each state and its M&M color count,
# we use groupBy()
# 3. Aggregate counts of all colors and groupBy() State and Color
# 4 orderBy() in descending order
count_mnm_df = (mnm_df
.select("State", "Color", "Count")
.groupBy("State", "Color")
.agg(count("Count").alias("Total"))
.orderBy("Total", ascending=False))
# Show the resulting aggregations for all the states and colors;
# a total count of each color per state.
# Note show() is an action, which will trigger the above
# query to be executed.
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))
36 | Chapter 2: Downloading Apache Spark and Getting Started # While the above code aggregated and counted for all
# the states, what if we just want to see the data for
# a single state, e.g., CA?
# 1. Select from all rows in the DataFrame
# 2. Filter only CA state
# 3. groupBy() State and Color as we did above
# 4. Aggregate the counts for each color
# 5. orderBy() in descending order
# Find the aggregate count for California by filtering
ca_count_mnm_df = (mnm_df
.select("State", "Color", "Count")
.where(mnm_df.State == "CA")
.groupBy("State", "Color")
.agg(count("Count").alias("Total"))
.orderBy("Total", ascending=False))
# Show the resulting aggregation for California.
# As above, show() is an action that will trigger the execution of the
# entire computation.
ca_count_mnm_df.show(n=10, truncate=False)
# Stop the SparkSession
spark.stop()
你可以使用你最喜欢的工具将此代码写成一个名为 mnmcount.py 的 Python 文件中,从本书的 GitHub 仓库中下载 mnn_dataset.csv 文件,并使用安装箱目录中的 submit-Spark 脚本将其作为 Spark 作业提交。将 SPARK_HOME 环境变量设置为在本地计算机上安装 Spark 的根目录。
前面的代码使用 DataFrame API,它读取起来类似于高级 DSL 查询。我们将在下一章中介绍这个和其他 API;现在,你可以指示 Spark 做什么,而不是如何做,不像 RDD API。真是太酷了!
为了避免将详细的 INFO 信息打印到控制台,请将 log4j.properties.template 文件复制到 log4j.properties,并在 conf/log4j.properties 文件中设置 log4j.rootCategory=WARN。
让我们使用 Python API 提交我们的第一个 Spark 作业(有关代码的说明,请阅读示例 2-1 中的内联注释):
$SPARK_HOME/bin/spark-submit mnmcount.py data/mnm_dataset.csv
+-----+------+-----+
|State|Color |Total|
+-----+------+-----+
|CA |Yellow|1807 |
|WA |Green |1779 |
|OR |Orange|1743 |
| 37|TX |Green |1737 |
|TX |Red |1725 |
|CA |Green |1723 |
|CO |Yellow|1721 |
|CA |Brown |1718 |
|CO |Green |1713 |
|NV |Orange|1712 |
|TX |Yellow|1703 |
|NV |Green |1698 |
|AZ |Brown |1698 |
|CO |Blue |1695 |
|WY |Green |1695 |
|NM |Red |1690 |
|AZ |Orange|1689 |
|NM |Yellow|1688 |
|NM |Brown |1687 |
|UT |Orange|1684 |
|NM |Green |1682 |
|UT |Red |1680 |
|AZ |Green |1676 |
|NV |Yellow|1675 |
|NV |Blue |1673 |
|WA |Red |1671 |
|WY |Red |1670 |
|WA |Brown |1669 |
|NM |Orange|1665 |
|WY |Blue |1664 |
|WA |Yellow|1663 |
|WA |Orange|1658 |
|NV |Brown |1657 |
|CA |Orange|1657 |
|CA |Red |1656 |
|CO |Brown |1656 |
|UT |Blue |1655 |
|AZ |Yellow|1654 |
|TX |Orange|1652 |
|AZ |Red |1648 |
|OR |Blue |1646 |
|UT |Yellow|1645 |
|OR |Red |1645 |
|CO |Orange|1642 |
|TX |Brown |1641 |
|NM |Blue |1638 |
|AZ |Blue |1636 |
|OR |Green |1634 |
|UT |Brown |1631 |
|WY |Yellow|1626 |
|WA |Blue |1625 |
|CO |Red |1624 |
|OR |Brown |1621 |
|TX |Blue |1614 |
|OR |Yellow|1614 |
|NV |Red |1610 |
|CA |Blue |1603 |
|WY |Orange|1595 |
|UT |Green |1591 |
|WY |Brown |1532 |
+-----+------+-----+
Total Rows = 60
+-----+------+-----+
|State|Color |Total|
+-----+------+-----+
|CA |Yellow|1807 |
|CA |Green |1723 |
|CA |Brown |1718 |
|CA |Orange|1657 |
|CA |Red |1656 |
|CA |Blue |1603 |
+-----+------+-----+
首先,我们将看到每个洲际的每个巧克力豆颜色的所有聚合结果,下面是仅针对 CA 的聚合(其首选颜色为黄色)。
如果你想使用这个相同的 Spark 程序的 Scala 版本,该怎么办呢?API 相似;在 Spark 中,在支持的语言中有很好的同等性,很多都是相通的,但是会有细微的语法差异。示例 2-2 是该程序的 Scala 版本。可以先看一看,在下一节中,我们将向你展示如何构建和运行该应用程序。
Example 2-2. Counting and aggregating M&Ms (Scala version)
package main.scala.chapter2
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
/**
* Usage: MnMcount <mnm_file_dataset>
*/
object MnMcount {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("MnMCount")
.getOrCreate()
if (args.length < 1) {
print("Usage: MnMcount <mnm_file_dataset>")
sys.exit(1)
}
// Get the M&M data set filename
val mnmFile = args(0)
// Read the file into a Spark DataFrame
val mnmDF = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(mnmFile)
// Aggregate counts of all colors and groupBy() State and Color
// orderBy() in descending order
val countMnMDF = mnmDF
.select("State", "Color", "Count")
.groupBy("State", "Color")
.agg(count("Count").alias("Total"))
.orderBy(desc("Total"))
// Show the resulting aggregations for all the states and colors
countMnMDF.show(60)
println(s"Total Rows = ${countMnMDF.count()}")
println()
// Find the aggregate counts for California by filtering
val caCountMnMDF = mnmDF
.select("State", "Color", "Count")
.where(col("State") === "CA")
.groupBy("State", "Color")
.agg(count("Count").alias("Total"))
.orderBy(desc("Total"))
// Show the resulting aggregations for California
caCountMnMDF.show(10)
// Stop the SparkSession
spark.stop()
}
}
用 Scala 构建独立的 Spark 应用程序
现在我们将向你展示如何使用 Scala 构建工具(sbt)构建第一个 Scala Spark 程序。
因为 Python 是一种声明式语言,并且没有像先编译这样的步骤(尽管可以在 pyc 中将 Python 代码编译成字节码),我们不会在这里执行此步骤。有关如何使用 Maven 构建 JavaSpark 程序的详情,请参考 Apache Spark 网站上的指南。为了简洁起见,在这本书中,我们主要介绍了 Python 和 Scala 中的例子。
build.sbt 是编译构建文件,类似于 makefile,描述并指示 Scala 编译器构建与 Scala 相关的任务,如 jars、package、依赖关系以及在哪里查找它们的规范文件。在我们的例子中,我们有一个简单的 sbt 文件用于构建巧克力豆代码(示例 2-3)。
Example 2-3. sbt build file
// Name of the package
name := "main/scala/chapter2"
// Version of our package
version := "1.0"
// Version of Scala
scalaVersion := "2.12.10"
// Spark library dependencies
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.0.0-preview2",
"org.apache.spark" %% "spark-sql" % "3.0.0-preview2"
)
假设你已经安装了 Java 开发工具包(JDK)和 sbt,并设置了 JAVA_HOME 和 SPARK_HOME,仅使用一个命令就可以构建 Spark 应用程序:
$ sbt clean package
[info] Updated file /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/
project/build.properties: set sbt.version to 1.2.8
[info] Loading project definition from /Users/julesdamji/gits/LearningSparkV2/
chapter2/scala/project
[info] Updating
[info] Done updating.
...
[info] Compiling 1 Scala source to /Users/julesdamji/gits/LearningSparkV2/
chapter2/scala/target/scala-2.12/classes ...
[info] Done compiling.
[info] Packaging /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/target/
scala-2.12/main-scala-chapter2_2.12-1.0.jar ...
[info] Done packaging.
[success] Total time: 6 s, completed Jan 11, 2020, 4:11:02 PM
成功构建后,可以运行巧克力豆计数实例的 Scala 版本,如下示例:
$SPARK_HOME/bin/spark-submit --class main.scala.chapter2.MnMcount \
jars/main-scala-chapter2_2.12-1.0.jar data/mnm_dataset.csv
...
...
20/01/11 16:00:48 INFO TaskSchedulerImpl: Killing all running tasks in stage 4:
Stage finished
20/01/11 16:00:48 INFO DAGScheduler: Job 4 finished: show at MnMcount.scala:49,
took 0.264579 s
+-----+------+-----+
|State| Color|Total|
+-----+------+-----+
| CA|Yellow| 1807|
| CA| Green| 1723|
| CA| Brown| 1718|
| CA|Orange| 1657|
Your First Standalone Application | 41| CA| Red| 1656|
| CA| Blue| 1603|
+-----+------+-----+
输出的结果与 Python 程序时相同。可以动手试试!
正如上面你所看到的——我们的数据科学家作者会非常乐意使用这些数据来决定为她教的不同洲际的学生提供什么颜色的巧克力豆饼干。
总结
在本章中,我们介绍了你开始使用 Apache Spark 时需要采用的三个简单步骤:下载安装包,熟悉 Scala 或 PySpark 交互式 shell,以及掌握高级 Spark 应用程序的概念和术语。我们简要概述了使用转换(transformation)和操作(action)编写 Spark 应用程序的过程,并简要介绍了使用 SparkUI 来检查创建的作业、阶段和任务。
最后,通过一个简短的示例,我们向你展示如何使用高级结构化 API 来告诉 Spark 该做什么,这将在下一章做进一步介绍,我们将更详细地检查这些 API。
版权声明: 本文为 InfoQ 作者【数据与智能】的原创文章。
原文链接:【http://xie.infoq.cn/article/d4fe53676be37acf2d42ab0fe】。文章转载请联系作者。
评论