Spark SQL 和 DataFrames:内置数据源简介 (四)
写在前面:
大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。
业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。
想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。
内推信息
如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。
免费学习资料
如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!
学习交流群
如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。
在上一章中,我们解释了 Spark 结构化的演变及其合理性。特别是,我们讨论了 Spark SQL 引擎如何为高级 DataFrame 和 Dataset API 提供统一的接口。现在,我们将继续讨论 DataFrame,并探讨其与 Spark SQL 的交互。
本章和下一章还将探讨 Spark SQL 如何与图 4-1 中所示的一些外部组件交互。特别是 Spark SQL:
1.提供了用于构建我们在第 3 章中探讨的高级结构化 API 的引擎。
2.可以读写各种格式的结构化数据(例如,JSON,Hive 表,Parquet,Avro,ORC,CSV)。
3.使你可以使用 JDBC / ODBC 连接器从 Tableau,Power BI,Talend 等外部商业智能(BI)数据源或从 Postgres 和 MySQL 等 RDBMS 来查询数据。
4.提供与 Spark 应用程序中存储为数据库中表或视图的结构化数据进行交互的编程接口。
5.提供一个交互式 shell 程序,用于对结构化数据执行 SQL 查询。
6.支持 ANSI SQL:2003 标准的命令和 HiveQL。
让我们从如何在 Spark 应用程序中使用 Spark SQL 开始入手。
在 Spark 应用程序中使用 Spark SQL
在 Spark2.0 中引入的 SparkSession 为使用结构化 API 编写 Spark 提供了一个统一的切入点。你可以使用 SparkSession 来调用 Spark 的功能:只需导入类并在代码中创建一个实例。
在 SparkSession 上使用 sql()方法实例化 spark 执行 SQL 查询,例如 spark.sql("SELECT * FROM myTableName")。以 spark.sql 这种方式执行的所有查询结果都会返回一个 DataFrame,如果你需要,可以在该 DataFrame 上执行进一步的 Spark 操作----我们在第 3 章中探讨的那些操作以及在本章和下一章中将学到的方法。
基本查询示例
在本节中,我们将通过几个示例查询有关航空公司的航班准点性和航班延误原因的数据集,该数据集包含有关美国航班的数据,包括日期,延误,距离,始发地和目的地。它以 CSV 文件的形式提供,超过一百万条记录。通过定义 schema,我们将数据读取到 DataFrame 并将该 DataFrame 注册为一个临时视图(稍后将在临时视图中进行更多介绍),以便我们可以使用 SQL 查询它。
代码段中提供了查询示例,而本书的 GitHub repo 中提供了包含此处介绍的所有代码的 Python 和 Scala 笔记(notebook)。这些示例将使你了解如何通过 spark.sql 编程接口在 Spark 应用程序中使用 SQL。与声明性风格的 DataFrame API 相似,此接口允许你在 Spark 应用程序中查询结构化数据。
通常,在 Standalone 模式下的 Spark 应用程序中,你可以手动创建一个 SparkSession 实例,如以下示例所示。但是,在 Spark Shell(或 Databricks 笔记)中,默认为你创建了 SparkSession,并赋值给变量 spark,你可以通过 spark 变量进行访问。
接下来让我们开始将数据集读取到一个临时视图中:
// In Scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("SparkSQLExampleApp")
.getOrCreate()
// Path to data set
val csvFile="/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
// Read and create a temporary view
// Infer schema (note that for larger files you may want to specify the schema)
val df = spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(csvFile)
// Create a temporary view
df.createOrReplaceTempView("us_delay_flights_tbl")
In Python
from pyspark.sql import SparkSession
Create a SparkSession
spark = (SparkSession
.builder
.appName("SparkSQLExampleApp")
.getOrCreate())
Path to data set
csv_file = "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
Read and create a temporary view
Infer schema (note that for larger files you
may want to specify the schema)
df = (spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(csv_file))
df.createOrReplaceTempView("us_delay_flights_tbl")
如果要指定 schema,则可以使用 DDL 格式的字符串。例如:
// In Scala
val schema = "date STRING, delay INT, distance INT,
origin STRING, destination STRING"
In Python
schema = "`date` STRING, `delay` INT, `distance` INT,
`origin` STRING, `destination` STRING"
现在我们已经有了一个临时视图,我们可以使用 Spark SQL 执行 SQL 查询。这些查询与你可能针对 MySQL 或 PostgreSQL 数据库中的 SQL 表执行的查询没有什么不同。这里的重点是表明 Spark SQL 提供了一个符合 ANSI:2003 的 SQL 接口,并演示了 SQL 与 DataFrames 之间的相互可操作性。
美国航班延误数据集有五列:
1.date 列包含类似的字符串 02190925。转换后,它映射到 02-19 09:25 am。
2.delay 列以分钟为单位给出了计划的起飞时间与实际起飞时间之间的延迟。提早出发显示负数。3.distance 列给出了从始发机场到目的地机场的距离(以英里为单位)。
4.origin 列包含始发国际航空运输协会机场代码。
5.destination 列包含目的地国际航空运输协会机场代码。考虑到这一点,让我们尝试针对此数据集进行一些示例查询。
首先,我们将查找距离大于 1000 英里的所有航班:
spark.sql("""SELECT distance, origin, destination
FROM us_delay_flights_tbl WHERE distance > 1000
ORDER BY distance DESC""").show(10)
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
+--------+------+-----------+
only showing top 10 rows
结果显示,所有最长的航班都在檀香山(HNL)和纽约(JFK)之间。接下来,我们将查找出旧金山(SFO)和芝加哥(ORD)之间延迟超过两个小时的所有航班:
spark.sql("""SELECT distance, origin, destination
FROM us_delay_flights_tbl WHERE distance > 1000
ORDER BY distance DESC""").show(10)
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
+--------+------+-----------+
only showing top 10 rows
看来这两个城市之间在不同的日期有很多明显的航班延误。(作为练习,将 date 列转换为可读的格式,并找出这些延迟最常见的日期或月份。思考这些延迟与冬季或假日有关吗?)
让我们尝试一个更复杂的查询,其中在 SQL 语句中使用 CASE 子句。在这个示例中,我们要标记所有美国航班,无论其始发地和目的地如何,以表明其经历的延误:超长延误(> 6 小时),长延误(2–6 小时)等。将这些人类可读的标签添加到名为的新列中 Flight_Delays:
spark.sql("""SELECT delay, origin, destination,
CASE
WHEN delay > 360 THEN 'Very Long Delays'
WHEN delay > 120 AND delay < 360 THEN 'Long Delays'
WHEN delay > 60 AND delay < 120 THEN 'Short Delays'
WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
WHEN delay = 0 THEN 'No Delays'
ELSE 'Early'
END AS Flight_Delays
FROM us_delay_flights_tbl
ORDER BY origin, delay DESC""").show(10)
+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|333 |ABE |ATL |Long Delays |
|305 |ABE |ATL |Long Delays |
|275 |ABE |ATL |Long Delays |
|257 |ABE |ATL |Long Delays |
|247 |ABE |DTW |Long Delays |
|247 |ABE |ATL |Long Delays |
|219 |ABE |ORD |Long Delays |
|211 |ABE |ATL |Long Delays |
|197 |ABE |DTW |Long Delays |
|192 |ABE |ORD |Long Delays |
+-----+------+-----------+-------------+
only showing top 10 rows
与 DataFrame 和 Dataset API 一样,通过 spark.sql 接口,你可以执行常见的数据分析操作,如我们在上一章中探讨的那样。该计算经过 Spark SQL 引擎相同的流程(见第三章中“Catalyst 优化器”了解详细信息),最终得到相同的结果。
前面的所有三个 SQL 查询都可以用等效的 DataFrame API 查询表示。例如,第一个查询可以在 Python DataFrame API 中表示为:
In Python
from pyspark.sql.functions import col, desc
(df.select("distance", "origin", "destination")
.where(col("distance") > 1000)
.orderBy(desc("distance"))).show(10)
Or
(df.select("distance", "origin", "destination")
.where("distance > 1000")
.orderBy("distance", ascending=False).show(10))
这将产生与 SQL 查询相同的结果:
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
|4330 |HNL |JFK |
+--------+------+-----------+
only showing top 10 rows
作为练习,请尝试将其他两个 SQL 查询转换为 DataFrame API 的形式。
如这些示例所示,使用 Spark SQL 接口查询数据类似于用常规 SQL 查询关系数据库表。尽管查询是在 SQL 中进行的,但你会感觉到与第 3 章中遇到的 DataFrame API 操作在可读性和语义上的相似之处,我们将在下一章中进一步探讨。
为了使你能够如前面的示例中所示查询结构化数据,Spark 在内存和磁盘中创建和管理视图和表的所有复杂性。这就引出了我们的下一个主题:如何创建和管理表和视图。
SQL 表和视图
表中包含数据。与 Spark 中的每个表相关联的是其相关的元数据,它是有关表及其数据的信息:数据结构,描述,表名,数据库名,列名,分区,实际数据所在的物理位置等。所有这些存储在 metastore 中。
默认情况下,Spark 使用位于/user/hive/warehouse 的 Apache Hive Metastore 来保留关于表的所有元数据,而不是为 Spark 表提供单独的元存储。但是,你可以通过将 Spark config 的配置 spark.sql.warehouse.dir 设置为另一个目录来更改默认位置,该位置可以设置为本地目录或外部分布式存储。
托管与非托管表(Managed Versus UnmanagedTables)
Spark 允许你创建两种类型的表:托管表和非托管表。对于托管表,Spark 同时管理文件存储中的元数据和数据。这可以是本地文件系统,HDFS 或对象存储,例如 Amazon S3 或 Azure Blob。对于非托管表,Spark 仅管理元数据,而你自己在外部数据源(例如 Cassandra)中管理数据。
对于托管表,由于 Spark 可以管理所有内容,因此 SQL 命令如 DROP TABLE table_name 会将元数据和数据一起删除。对于非托管表,同一命令将仅删除元数据,而不删除实际数据。在下一节中,我们将介绍一些有关如何创建托管表和非托管表的示例。
创建 SQL 数据库和表
表驻留在数据库中。默认情况下,Spark 在 default 数据库下创建表。要创建自己的数据库名称,可以从 Spark 应用程序或笔记执行 SQL 命令。使用美国航班延误数据集,让我们创建一个托管表和一个非托管表。首先,我们将创建一个名为 learn_spark_db 的数据库,并告诉 Spark 我们要使用该数据库:
// In Scala/Python
spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")
从这一点来看,我们在应用程序中执行的用于创建表的任何命令都会作用于 learn_spark_db 数据库下所有的表。
创建托管表
要在 learn_spark_db 数据库中创建托管表,可以执行如下 SQL 查询:
// In Scala/Python
spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING)")
你也可以使用 DataFrame API 进行相同的操作,如下所示:
In Python
Path to our US flight delays CSV file
csv_file = "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
Schema as defined in the preceding example
schema="date STRING, delay INT, distance INT, origin STRING, destination STRING"
flights_df = spark.read.csv(csv_file, schema=schema)
flights_df.write.saveAsTable("managed_us_delay_flights_tbl")
这两个语句最后都能在 learn_spark_db 数据库中创建 us_delay_flights_tbl 托管表。
创建非托管表
相比之下,你可以从自己支持 Spark 应用读取的数据源(例如 Parquet,CSV 或 JSON 文件)创建,要从数据源(例如 CSV 文件)创建非托管表,请使用如下 SQL:
spark.sql("""CREATE TABLE us_delay_flights_tbl(date STRING, delay INT,
distance INT, origin STRING, destination STRING)
USING csv OPTIONS (PATH
'/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')""")
在 DataFrame API 中使用:
(flights_df
.write
.option("path", "/tmp/data/us_flights_delay")
.saveAsTable("us_delay_flights_tbl"))
为了使你能够浏览这些示例,我们创建了 Python 和 Scala 示例笔记,你可以在本书的 GitHub repo 中找到这些笔记。
创建视图
除了创建表之外,Spark 还可在现有表之上创建视图。视图可以是全局视图(SparkSession 在给定集群的所有节点上可见)或会话范围视图(仅对单个 SparkSession 可见),并且它们是临时的:会随着 Spark 应用程序终止而被回收。创建视图的语法与在数据库中创建表的语法相似。创建视图后,就可以像查询表一样对其进行查询。视图和表之间的区别在于,视图实际上并不保存数据。Spark 应用程序终止后,表仍然存在,但视图会被回收。你可以使用 SQL 从现有表创建视图。例如,如果你只希望使用纽约(JFK)和旧金山(SFO)的始发机场处理美国航班延误数据集的子集,则以下查询将创建仅由该切片组成的全局临时视图和临时视图表:
-- In SQL
CREATE OR REPLACE GLOBAL TEMP VIEW us_origin_airport_SFO_global_tmp_view AS
SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE
origin = 'SFO';
CREATE OR REPLACE TEMP VIEW us_origin_airport_JFK_tmp_view AS
SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE
origin = 'JFK'
你也可以使用 DataFrame API 完成相同的操作,如下所示:
In Python
df_sfo = spark.sql("SELECT date, delay, origin, destination FROM
us_delay_flights_tbl WHERE origin = 'SFO'")
df_jfk = spark.sql("SELECT date, delay, origin, destination FROM
us_delay_flights_tbl WHERE origin = 'JFK'")
Create a temporary and global temporary view
df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")
df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")
一旦创建了这些视图,就可以像对表一样对它们执行查询。请记住,访问全局临时视图时,必须使用前缀,如 global_temp.<view_name>,因为 Spark 在名为的全局临时数据库(global_temp)中创建全局临时视图。
-- In SQL
SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view
相比之下,你可以访问不带 global_temp 前缀的普通临时视图。
-- In SQL
SELECT * FROM us_origin_airport_JFK_tmp_view
// In Scala/Python
spark.read.table("us_origin_airport_JFK_tmp_view")
// Or
spark.sql("SELECT * FROM us_origin_airport_JFK_tmp_view")
你也可以像删除表一样删除视图:
-- In SQL
DROP VIEW IF EXISTS us_origin_airport_SFO_global_tmp_view;
DROP VIEW IF EXISTS us_origin_airport_JFK_tmp_view
// In Scala/Python
spark.catalog.dropGlobalTempView("us_origin_airport_SFO_global_tmp_view")
spark.catalog.dropTempView("us_origin_airport_JFK_tmp_view")
临时视图与全局临时视图
临时视图与全局临时视图之间的差异很微妙,这可能是新加入 Spark 的开发人员可能有困惑的地方。临时视图绑定到 Spark 应用程序中的单个 Spark 会话。相比之下,在 Spark 应用程序中的多个 Spark 会话可以看到全局临时视图。是的,你可以在单个 Spark 应用程序中创建多个 SparkSession,这是很方便的,例如,当你要访问(并合并)来自两个不共享同一个 Hive MetaStore 配置的不同 Spark 会话的数据时可以这样做。
查看元数据
如前所述,Spark 管理与每个托管或非托管表关联的元数据。这是 Spark SQL 中用于存储元数据的高级抽象 Catalog 的功能。Catalog 是在 Spark 2.x 的扩展功能并使用了新的公共方法,使你能够检查与数据库、表和视图关联的元数据。Spark 3.0 将其扩展为使用外部 catalog(我们将在第 12 章中进行简要讨论)。例如,在 Spark 应用程序中,创建 SparkSession 之后赋值成变量 spark,你可以通过以下方法访问所有存储的元数据:
// In Scala/Python
spark.catalog.listDatabases()
spark.catalog.listTables()
spark.catalog.listColumns("us_delay_flights_tbl")
从本书的 GitHub 仓库中导入笔记,然后尝试一下。
缓存 SQL 表
尽管我们将在下一章讨论表缓存策略,但是值得一提的是,像 DataFrames 一样,你可以缓存 SQL 表和视图和释放 SQL 表和视图缓存。在 Spark 3.0 中,除了其他选项之外,你还可以将表指定为 LAZY,这意味着该表仅应在首次使用时进行缓存,而不是立即进行缓存:
-- In SQL
CACHE [LAZY] TABLE <table-name>
UNCACHE TABLE <table-name>
读取表写入 DataFrame
通常,数据工程师会在其常规数据提取和 ETL 流程中建立数据管道。它们使用清理后的数据填充 Spark SQL 数据库和表,以供下游应用程序使用。
假设你已经可以使用现有的数据库 learn_spark_db 和表
us_delay_flights_tbl。无需读取外部 JSON 文件,那么你只需使用 SQL 查询表并将返回的结果分配给 DataFrame 即可:
// In Scala
val usFlightsDF = spark.sql("SELECT * FROM us_delay_flights_tbl")
val usFlightsDF2 = spark.table("us_delay_flights_tbl")
In python
us_flights_df = spark.sql("SELECT * FROM us_delay_flights_tbl")
us_flights_df2 = spark.table("us_delay_flights_tbl")
现在,你已经从现有的 Spark SQL 表中生成了一个清洗过的 DataFrame 了。你还可以使用 Spark 的内置数据源读取其他格式的数据,从而使你可以灵活地与各种常见文件格式进行交互。
DataFrames 和 SQL 表的数据源
如图 4-1 所示,Spark SQL 提供了一个到各种数据源的接口。它还提供了一组使用 Data Sources API 在这些数据源之间读写数据的常用方法。
在本节中,我们将介绍一些内置数据源,可用的文件格式以及加载和写入数据的方式,以及与这些数据源有关的特定选项。但首先,让我们仔细研究两个高级数据源 API 结构,它们决定了你与不同数据源进行交互的方式:DataFrameReader 和 DataFrameWriter。
DataFrameReader
DataFrameReader 是用于将数据从数据源读取到 DataFrame 的核心构造。它具有定义的格式和推荐的使用模式:
DataFrameReader.format(args).option("key", "value").schema(args).load() 这种类型的将方法串联在一起的做法在 Spark 中很常见,并且易于阅读。在探索通用数据分析模式时,我们在第 3 章中已经看到了它。请注意,你只能通过 SparkSession 实例访问 DataFrameReader。也就是说,你无法创建 DataFrameReader 的实例。要获取实例句柄,请使用:
SparkSession.read
// or
SparkSession.readStream
当从静态数据源读取到 DataFrame 的句柄的同时,返回 DataFrameReader,读取流返回一个要从数据流源读取的实例。(我们将在书的后面介绍结构化流)。
指向 DataFrameReader 的每个公共方法的参数都采用不同的值。表 4-1 枚举了这些参数,以及支持的参数的一个子集。
虽然我们不会列举参数和选项的所有不同组合,Python,Scala,R 和 Java 的文档提供了建议和指导。不过,值得举几个例子:
// In Scala
// Use Parquet
val file = """/databricks-datasets/learning-spark-v2/flights/summary-
data/parquet/2010-summary.parquet"""
val df = spark.read.format("parquet").load(file)
// Use Parquet; you can omit format("parquet") if you wish as it's the default
val df2 = spark.read.load(file)
// Use CSV
val df3 = spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*")
// Use JSON
val df4 = spark.read.format("json")
.load("/databricks-datasets/learning-spark-v2/flights/summary-data/json/*")
通常,从静态 Parquet 数据源读取数据时不需要任何 schema——Parquet 元数据通常包含该 schema,因此可以对其进行推断。但是,对于流数据源,你将必须提供一个 schema。(我们将在第 8 章中介绍从流数据源进行读取。)Parquet 是 Spark 的默认数据源,因为它高效,使用列存储并采用快速压缩算法。当我们更深入地介绍 Catalyst 优化器时,你将在以后看到其他好处(例如,列式下推)。
DataFrameWriter
DataFrameWriter 进行与之相反的操作:将数据保存或写入到指定的内置数据源。不同于 DataFrameReader,你不是从 SparkSession 进行访问而是从要保存的 DataFrame 访问其实例。它有一些推荐的使用模式:
DataFrameWriter.format(args)
.option(args)
.bucketBy(args)
.partitionBy(args)
.save(path)
要获取实例句柄,请使用:
DataFrame.write
// or
DataFrame.writeStream
DataFrameWriter 中每个方法的参数也采用不同的值。我们在表 4-2 中列出了这些内容,并提供了一部分受支持的参数。
这是一个简短的示例代码段,用于说明方法和参数的使用:
// In Scala
// Use JSON
val location = ...
df.write.format("json").mode("overwrite").save(location)
Parquet
我们将从 Parquet 开始研究数据源,因为它是 Spark 中的默认数据源。Parquet 是许多大数据处理框架和平台所支持和广泛使用的一种开源列式文件格式,可提供许多 I/O 优化(例如压缩,可节省存储空间并允许快速访问数据列)。
由于其效率和这些优化,我们建议在转换和清洗数据后,将 DataFrame 以 Parquet 格式保存以供下游使用。(Parquet 也是 Delta Lake 的默认表打开格式,我们将在第 9 章中介绍。)
将 PARQUET 文件读入 DataFrame
Parquet 文件存储在目录结构中,该目录结构包含数据文件,元数据,许多压缩文件和某些状态文件。页脚中的元数据包含文件格式,数据结构和列数据(例如路径等)。
例如,Parquet 文件中的目录可能包含以下文件集:_SUCCESS
_committed_1799640464332036264
_started_1799640464332036264
part-00000-tid-1799640464332036264-91273258-d7ef-4dc7-<...>-c000.snappy.parquet
目录中可能存在大量的 part-XXXX 压缩文件(此处显示的名称已缩短,以适合页面显示)。
要将 Parquet 文件读入 DataFrame,只需指定格式和路径:
// In Scala
val file = """/databricks-datasets/learning-spark-v2/flights/summary-data/
parquet/2010-summary.parquet/"""
val df = spark.read.format("parquet").load(file)
In Python
file = """/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/
2010-summary.parquet/"""
df = spark.read.format("parquet").load(file)
除非你从流数据源读取数据,否则无需提供 schema,因为 Parquet 会将其保存为元数据的一部分。
将 PARQUET 文件读入 SPARK SQL 表
除了将 Parquet 文件读入 Spark DataFrame 之外,你还可以创建 Spark SQL 非托管表或直接使用 SQL 查看。
// In Scala
val file = """/databricks-datasets/learning-spark-v2/flights/summary-data/
parquet/2010-summary.parquet/"""
val df = spark.read.format("parquet").load(file)
In Python
file = """/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/
2010-summary.parquet/"""
df = spark.read.format("parquet").load(file)
创建表或视图后,你可以使用 SQL 将数据读入 DataFrame 中,如我们在前面的示例中所看到的:
// In Scala
spark.sql("SELECT * FROM us_delay_flights_tbl").show()
In Python
spark.sql("SELECT * FROM us_delay_flights_tbl").show()
这两个操作都返回相同的结果:
+-----------------+-------------------+-----+|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States |Romania |1 |
|United States |Ireland |264 |
|United States |India |69 |
|Egypt |United States |24 |
|Equatorial Guinea|United States |1 |
|United States |Singapore |25 |
|United States |Grenada |54 |
|Costa Rica |United States |477 |
|Senegal |United States |29 |
|United States |Marshall Islands |44 |
+-----------------+-------------------+-----+
only showing top 10 rows
将 DATAFRAMES 写入 PARQUET 文件
将 DataFrame 写入或保存为表或文件是 Spark 中的常见操作。要编写 DataFrame,你只需使用 DataFrameWriter 本章前面概述的方法和参数,并提供将 Parquet 文件要保存的位置。例如:
// In Scala
df.write.format("parquet")
.mode("overwrite")
.option("compression", "snappy")
.save("/tmp/data/parquet/df_parquet")
In Python
(df.write.format("parquet")
.mode("overwrite")
.option("compression", "snappy")
.save("/tmp/data/parquet/df_parquet"))
回想一下,Parquet 是默认文件格式。如果不包括该 format()方法,则 DataFrame 仍将另存为 Parquet 文件。这将在指定的路径上创建一组紧凑和压缩的 Parquet 文件。
由于我们在这里使用 snappy 作为压缩选项,因此我们将拥有 snappy 压缩文件。为简便起见,本示例仅生成一个文件;仅此而已。通常,可能会创建大约十二个文件:
将 DATAFRAMES 写入 SPARK SQL 表
将 DataFrame 写入 SQL 表就像写入文件一样容易,只需使用 saveAsTable()即可,而不是 save()。这将创建一个称为 us_delay_flights_tbl:的托管表:
// In Scala
df.write
.mode("overwrite")
.saveAsTable("us_delay_flights_tbl")
In Python
(df.write
.mode("overwrite")
.saveAsTable("us_delay_flights_tbl"))
综上所述,Parquet 是 Spark 中首选的默认内置数据源文件格式,并且已被许多其他框架采用。我们建议你在 ETL 和数据提取过程中使用此格式。
JSON 格式
JSON 也是一种流行的数据格式。与 XML 相比,它以易于阅读和易于解析的格式而著称。它具有两种表示形式:单行模式和多行模式。Spark 支持两种模式。
在单行模式下,每行表示一个 JSON 对象,而在多行模式下,整个多行对象构成一个 JSON 对象。要在此模式下阅读,请把 multiLine 在 option()方法中设置为 true 。
将 JSON 文件读入 DATAFRAME
你可以像使用 Parquet 一样,将 JSON 文件读入 DataFrame 中,只需"json"在 format()方法中指定即可:
// In Scala
val file = "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
val df = spark.read.format("json").load(file)
In Python
file = "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
df = spark.read.format("json").load(file)
将 JSON 文件读入 SPARK SQL 表
你也可以像使用 Parquet 一样从 JSON 文件创建 SQL 表:
-- In SQL
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
USING json
OPTIONS (
path "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
)
创建表后,你可以使用 SQL 将数据读取到 DataFrame 中:
// In Scala/Python
spark.sql("SELECT * FROM us_delay_flights_tbl").show()
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States |Romania |15 |
|United States |Croatia |1 |
|United States |Ireland |344 |
|Egypt |United States |15 |
|United States |India |62 |
|United States |Singapore |1 |
|United States |Grenada |62 |
|Costa Rica |United States |588 |
|Senegal |United States |40 |
|Moldova |United States |1 |
+-----------------+-------------------+-----+
only showing top 10 rows
将 DATAFRAMES 写入 JSON 文件
将 DataFrame 保存为 JSON 文件很简单。指定适当的 DataFrameWriter 方法和参数,并提供将 JSON 文件保存到的位置:
// In Scala
df.write.format("json")
.mode("overwrite")
.option("compression", "snappy")
.save("/tmp/data/json/df_json")
In Python
(df.write.format("json")
.mode("overwrite")
.option("compression", "snappy")
.save("/tmp/data/json/df_json"))
这将在指定的路径处创建一个目录,该目录中填充了一组紧凑的 JSON 文件:
-rw-r--r-- 1 jules wheel 0 May 16 14:44 _SUCCESS
-rw-r--r-- 1 jules wheel 71 May 16 14:44 part-00000-<...>-c000.json
JSON 数据源选项
表 4-3 说明了 DataFrameReader 和 DataFrameWriter 的常用 JSON 选项。有关完整列表,请参考文档。
CSV
与普通文本文件一样,这种通用文本文件格式用逗号分隔的每个数据或字段。每行以逗号分隔的字段代表一条记录。即使逗号是默认的分隔符,如果逗号是数据的一部分,你也可以使用其他定界符来分隔字段。流行的电子表格可以生成 CSV 文件,因此它是数据和业务分析师中流行的格式。
将 CSV 文件读入 DATAFRAME
与其他内置数据源一样,你可以使用 DataFrameReader 方法和参数将 CSV 文件读入 DataFrame:
// In Scala
val file = "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
val schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"
val df = spark.read.format("csv")
.schema(schema)
.option("header", "true")
.option("mode", "FAILFAST") // Exit if any errors
.option("nullValue", "") // Replace any null data with quotes
.load(file)
In Python
file = "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"
df = (spark.read.format("csv")
.option("header", "true")
.schema(schema)
.option("mode", "FAILFAST") # Exit if any errors
.option("nullValue", "") # Replace any null data field with quotes
.load(file))
从 CSV 数据源创建 SQL 表与使用 Parquet 或 JSON 没什么不同:
-- In SQL
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
USING csv
OPTIONS (
path "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*",
header "true",
inferSchema "true",
mode "FAILFAST"
)
创建表后,你可以像以前一样使用 SQL 将数据读取到 DataFrame 中:
// In Scala/Python
spark.sql("SELECT * FROM us_delay_flights_tbl").show(10)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States |Romania |1 |
|United States |Ireland |264 |
|United States |India |69 |
|Egypt |United States |24 |
|Equatorial Guinea|United States |1 |
|United States |Singapore |25 |
|United States |Grenada |54 |
|Costa Rica |United States |477 |
|Senegal |United States |29 |
|United States |Marshall Islands |44 |
+-----------------+-------------------+-----+
only showing top 10 rows
将 DATAFRAMES 写入 CSV 文件
将 DataFrame 保存为 CSV 文件很简单。指定适当的 DataFrameWriter 方法和参数,并提供将 CSV 文件保存到的位置:
// In Scala
df.write.format("csv").mode("overwrite").save("/tmp/data/csv/df_csv")
In Python
df.write.format("csv").mode("overwrite").save("/tmp/data/csv/df_csv")
这将在指定位置生成一个文件夹,该文件夹中填充了一堆紧凑的压缩文件:
-rw-r--r-- 1 jules wheel 0 May 16 12:17 _SUCCESS
-rw-r--r-- 1 jules wheel 36 May 16 12:17 part-00000-251690eb-<...>-c000.csv
CSV 数据源选项
表 4-4 介绍了一些常见的 DataFrameReader 和 DataFrameWriter 的 CSV 选项。由于 CSV 文件可能很复杂,因此可以使用许多选项。有关完整列表,请参考文档。
Avro
作为内置数据源在 Spark 2.4 中引入,例如 Apache Kafka 使用 Avro 格式用于消息序列化和反序列化。它提供了许多好处,包括直接映射到 JSON,提高速度和效率以及绑定许多可用的编程语言。
将 AVRO 文件读入 DATAFRAME
使用 Avro 文件读取到 DataFrame 的 DataFrameReader 用法与我们在本节中讨论的其他数据源的用法是一致的:
// In Scala
val df = spark.read.format("avro")
.load("/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*")
df.show(false)
In Python
df = (spark.read.format("avro")
.load("/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"))
df.show(truncate=False)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States |Romania |1 |
|United States |Ireland |264 |
|United States |India |69 |
|Egypt |United States |24 |
|Equatorial Guinea|United States |1 |
|United States |Singapore |25 |
|United States |Grenada |54 |
|Costa Rica |United States |477 |
|Senegal |United States |29 |
|United States |Marshall Islands |44 |
+-----------------+-------------------+-----+
only showing top 10 rows
将 AVRO 文件读入 SPARK SQL 表
-- In SQL
CREATE OR REPLACE TEMPORARY VIEW episode_tbl
USING avro
OPTIONS (
path "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
)
创建表后,可以使用 SQL 将数据读入 DataFrame 中:
// In Scala
spark.sql("SELECT * FROM episode_tbl").show(false)
In Python
spark.sql("SELECT * FROM episode_tbl").show(truncate=False)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States |Romania |1 |
|United States |Ireland |264 |
|United States |India |69 |
|Egypt |United States |24 |
|Equatorial Guinea|United States |1 |
|United States |Singapore |25 |
|United States |Grenada |54 |
|Costa Rica |United States |477 |
|Senegal |United States |29 |
|United States |Marshall Islands |44 |
+-----------------+-------------------+-----+
only showing top 10 rows
将 DATAFRAMES 写入 AVRO 文件
将 DataFrame 作为 Avro 文件编写很简单。与往常一样,指定适当的 DataFrameWriter 方法和参数,并提供将 Avro 文件保存到的位置:
-- In SQL
CREATE OR REPLACE TEMPORARY VIEW episode_tbl
USING avro
OPTIONS (
path "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
)
创建表后,可以使用 SQL 将数据读入 DataFrame 中:
// In Scala
spark.sql("SELECT * FROM episode_tbl").show(false)
In Python
spark.sql("SELECT * FROM episode_tbl").show(truncate=False)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States |Romania |1 |
|United States |Ireland |264 |
|United States |India |69 |
|Egypt |United States |24 |
|Equatorial Guinea|United States |1 |
|United States |Singapore |25 |
|United States |Grenada |54 |
|Costa Rica |United States |477 |
|Senegal |United States |29 |
|United States |Marshall Islands |44 |
+-----------------+-------------------+-----+
only showing top 10 rows
这将在指定位置生成一个文件夹,该文件夹中填充了一堆压缩文件:
-rw-r--r-- 1 jules wheel 0 May 17 11:54 _SUCCESS
-rw-r--r-- 1 jules wheel 526 May 17 11:54 part-00000-ffdf70f4-<...>-c000.avro
AVRO 数据源选项
表 4-5 说明了 DataFrameReader 和 DataFrameWriter 的常用选项。文档中提供了完整的选项列表。
ORC
作为一种额外的优化的列式文件格式,Spark 2.x 支持向量化 ORC reader( vectorized
ORC reader)。
两种 Spark 配置决定了要使用哪种 ORC 实现。当 spark.sql.orc.impl 设置为 native 和 spark.sql.orc.enableVectorizedReader 设置 true 为时,Spark 使用向量化 ORC reader。ORC reader 一次读取行块(通常每块 1024 行),而不是一行一行读取,在密集型操作(如扫描,筛选,聚合和联接)中可以简化操作并减少 CPU 使用率。
对于使用 SQL 命令 USING HIVE OPTIONS (fileFormat 'ORC')创建的 Hive ORC SerDe(序列化和反序列化)表,当 Spark 配置参数 spark.sql.hive.convertMetastoreOrc 设置为 true 时,将使用向量化 ORC reader。
将 ORC 文件读入 DATAFRAME
要使用 ORC 向量化 reader 读取 DataFrame,你可以使用常规 DataFrameReader 方法和选项:
// In Scala
val file = "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
val df = spark.read.format("orc").load(file)
df.show(10, false)
In Python
file = "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
df = spark.read.format("orc").option("path", file).load()
df.show(10, False)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States |Romania |1 |
|United States |Ireland |264 |
|United States |India |69 |
|Egypt |United States |24 |
|Equatorial Guinea|United States |1 |
|United States |Singapore |25 |
|United States |Grenada |54 |
|Costa Rica |United States |477 |
|Senegal |United States |29 |
|United States |Marshall Islands |44 |
+-----------------+-------------------+-----+
only showing top 10 rows
将 ORC 文件读取到 SPARK SQL 表中
使用 ORC 数据源创建 SQL 视图时,与 Parquet,JSON,CSV 或 Avro 没有什么区别:
-- In SQL
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
USING orc
OPTIONS (
path "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
)
创建表后,你可以照常使用 SQL 将数据读取到 DataFrame 中:
// In Scala/Python
spark.sql("SELECT * FROM us_delay_flights_tbl").show()
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States |Romania |1 |
|United States |Ireland |264 |
|United States |India |69 |
|Egypt |United States |24 |
|Equatorial Guinea|United States |1 |
|United States |Singapore |25 |
|United States |Grenada |54 |
|Costa Rica |United States |477 |
|Senegal |United States |29 |
|United States |Marshall Islands |44 |
+-----------------+-------------------+-----+
only showing top 10 rows
将 DATAFRAME 写入 ORC 文件
使用以下 DataFrameWriter 方法,读取后写回转换后的 DataFrame 也同样简单:
// In Scala
df.write.format("orc")
.mode("overwrite")
.option("compression", "snappy")
.save("/tmp/data/orc/df_orc")
In Python
(df.write.format("orc")
.mode("overwrite")
.option("compression", "snappy")
.save("/tmp/data/orc/flights_orc"))
结果将是指定位置的文件夹,其中包含一些压缩的 ORC 文件:
-rw-r--r-- 1 jules wheel 0 May 16 17:23 _SUCCESS
-rw-r--r-- 1 jules wheel 547 May 16 17:23 part-00000-<...>-c000.snappy.orc
Images
在 Spark 2.4 中,社区引入了一个新的数据源图像文件,以支持深度学习和机器学习框架,例如 TensorFlow 和 PyTorch。对于基于计算机视觉的机器学习应用程序,加载和处理图像数据集非常重要。
将图像文件读入 DATAFRAME
与所有以前的文件格式一样,你可以使用 DataFrameReader 方法和选项来读取图像文件,如下所示:
// In Scala
import org.apache.spark.ml.source.image
val imageDir = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
val imagesDF = spark.read.format("image").load(imageDir)
imagesDF.printSchema
imagesDF.select("image.height", "image.width", "image.nChannels", "image.mode", "label").show(5, false)
In Python
from pyspark.ml import image
image_dir = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
images_df = spark.read.format("image").load(image_dir)
images_df.printSchema()
root
|-- image: struct (nullable = true)
| |-- origin: string (nullable = true)
| |-- height: integer (nullable = true)
| |-- width: integer (nullable = true)
| |-- nChannels: integer (nullable = true)
| |-- mode: integer (nullable = true)
| |-- data: binary (nullable = true)
|-- label: integer (nullable = true)
images_df.select("image.height", "image.width", "image.nChannels", "image.mode",
"label").show(5, truncate=False)
+------+-----+---------+----+-----+
|height|width|nChannels|mode|label|
+------+-----+---------+----+-----+
|288 |384 |3 |16 |0 |
|288 |384 |3 |16 |1 |
|288 |384 |3 |16 |0 |
|288 |384 |3 |16 |0 |
|288 |384 |3 |16 |0 |
+------+-----+---------+----+-----+
only showing top 5 rows
二进制文件
Spark 3.0 添加了对二进制文件作为数据源的支持。DataFrameReader 将二进制文件转换为包含该文件的原始内容和元数据的单个 DataFrame Row(记录)。二进制文件源生成的 DataFrame 包含如下字段:1.path:StringType
2.modificationTime:TimestampType
3.length:LongTypecontent:BinaryType
将二进制文件读入 DATAFRAME
要读取二进制文件,请将数据源格式指定为 binaryFile。你可以使用与给定全局模式匹配的路径加载文件,同时保留分区信息,使用数据源选项 pathGlobFilter 去匹配。例如,以下代码从输入目录中读取所有带有分区目录的 JPG 文件:
// In Scala
val path = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
val binaryFilesDF = spark.read.format("binaryFile")
.option("pathGlobFilter", "*.jpg")
.load(path)
binaryFilesDF.show(5)
In Python
path = "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
binary_files_df = (spark.read.format("binaryFile")
.option("pathGlobFilter", "*.jpg")
.load(path))
binary_files_df.show(5)
+--------------------+-------------------+------+--------------------
| path| modificationTime|length| content|label|
+--------------------+-------------------+------+--------------------+-----+
|file:/Users/jules...|2020-02-12 12:04:24| 55037|[FF D8 FF E0 00 1...| 0|
|file:/Users/jules...|2020-02-12 12:04:24| 54634|[FF D8 FF E0 00 1...| 1|
|file:/Users/jules...|2020-02-12 12:04:24| 54624|[FF D8 FF E0 00 1...| 0|
|file:/Users/jules...|2020-02-12 12:04:24| 54505|[FF D8 FF E0 00 1...| 0|
|file:/Users/jules...|2020-02-12 12:04:24| 54475|[FF D8 FF E0 00 1...| 0|
+--------------------+-------------------+------+--------------------+-----+
only showing top 5 rows
要忽略目录中的分区数据发现,可以设置 recursiveFileLookup 为"true":
// In Scala
val binaryFilesDF = spark.read.format("binaryFile")
.option("pathGlobFilter", "*.jpg")
.option("recursiveFileLookup", "true")
.load(path)
binaryFilesDF.show(5)
In Python
binary_files_df = (spark.read.format("binaryFile")
.option("pathGlobFilter", "*.jpg")
.option("recursiveFileLookup", "true")
.load(path))
binary_files_df.show(5)
+--------------------+-------------------+------+--------------------+
| path| modificationTime|length| content|
+--------------------+-------------------+------+--------------------+
|file:/Users/jules...|2020-02-12 12:04:24| 55037|[FF D8 FF E0 00 1...|
|file:/Users/jules...|2020-02-12 12:04:24| 54634|[FF D8 FF E0 00 1...|
|file:/Users/jules...|2020-02-12 12:04:24| 54624|[FF D8 FF E0 00 1...|
|file:/Users/jules...|2020-02-12 12:04:24| 54505|[FF D8 FF E0 00 1...|
|file:/Users/jules...|2020-02-12 12:04:24| 54475|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows
请注意,当 recursiveFileLookup 选项设置为"true"时 label 列不存在。当前,二进制文件数据源不支持将 DataFrame 写回原始文件格式。在本节中,你将了解如何从一系列受支持的文件格式将数据读取到 DataFrame 中。我们还向你展示了如何从现有内置数据源创建临时视图和表。无论你使用的是 DataFrame API 还是 SQL,查询都会产生相同的结果。你可以在本书的 GitHub 存储库中的笔记中检查其中一些查询。
总结
回顾一下,本章探讨了 DataFrame API 和 Spark SQL 之间的互操作性。特别是,你了解了如何使用 Spark SQL 进行以下操作:
1.使用 Spark SQL 和 DataFrame API 创建托管表和非托管表。
2.读取和写入各种内置数据源和文件格式。
3.使用 spark.sql 编程接口对存储为 Spark SQL 表或视图的结构化数据执行 SQL 查询。
4.细读 Spark Catalog 并探索与表和视图关联的元数据。
5.使用 DataFrameWriter 和 DataFrameReader API。通过本章中的代码片段以及该书的 GitHub 仓库中的笔记,你可以了解如何使用 DataFrames 和 Spark SQL。继续这一思路,下一章将进一步探讨 Spark 如何与图 4-1 中所示的外部数据源进行交互。你将看到一些更深入的转换示例以及 DataFrame API 和 Spark SQL 之间的互操作性。
版权声明: 本文为 InfoQ 作者【数据与智能】的原创文章。
原文链接:【http://xie.infoq.cn/article/492fcc8b0d117a90e92e5227a】。文章转载请联系作者。
评论