写点什么

Spark SQL 和 DataFrames:与外部数据源进行交互 (五)

发布于: 2021 年 07 月 17 日
Spark SQL和DataFrames:与外部数据源进行交互(五)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


在上一章中,我们介绍了与 Spark 中内置数据源的交互。我们还仔细研究了 DataFrame API 及其与 Spark SQL 的相互操作性。在本章中,我们将重点介绍 Spark SQL 与外部组件的接口。具体来说,我们讨论 Spark SQL 能够使你做什么:

1.使用用户自定义的函数对 Apache Hive 和 Apache Spark 进行操作。

2.与外部数据源连接,例如 JDBC 和 SQL 数据库,PostgreSQL,MySQL,Tableau,Azure Cosmos DB 和 MS SQL Server。

3.使用简单和复杂的类型,高阶函数以及常见的关系运算符。

我们还将介绍一些使用 Spark SQL 查询 Spark 的不同选项,例如 Spark SQL shell,Beeline 和 Tableau。


Spark SQL 和 Apache Hive

Spark SQL 是 Apache Spark 的基础组件,该组件将关系处理与 Spark 的功能编程 API 集成在一起。它的起源是在以前有关 Shark 的工作中。Shark 最初基于 Apache Spark 之上的 Hive 代码库构建,并成为 Hadoop 系统上最早的交互式 SQL 查询引擎之一。这说明了兼顾两全其美是有可能的: 速度与企业数据仓库一样快,并且可以像 Hive/MapReduce 一样进行扩展。

Spark SQL 使 Spark 程序员可以利用更快的性能和关系编程(例如,声明性查询和优化的存储)以及调用复杂的分析库(例如,机器学习)。如上一章所述,从 Apache Spark 2.x 开始,SparkSession 提供了一个统一的入口点来操作 Spark 中的数据。


用户自定义函数

尽管 Apache Spark 具有大量内置功能,但 Spark 的灵活性允许数据工程师和数据科学家定义自己的功能。这些被称为用户自定义函数(UDF)。


SPARK SQL UDF

创建自己的 PySpark 或 Scala UDF 的好处是,你(和其他人)将能够使你在 Spark SQL 中进行使用。例如,数据科学家可以将 ML 模型包装在 UDF 中,以便数据分析人员可以在 Spark SQL 中查询其预测结果,而不必了解模型的内部结构。

这是创建 Spark SQL UDF 的简单示例。请注意,UDF 在每个会话中运行,并且不会持久化在底层元存储中:

// In Scala

// Create cubed function

val cubed = (s: Long) => {

  s * s * s

}

 

// Register UDF

spark.udf.register("cubed", cubed)

 

// Create temporary view

spark.range(1, 9).createOrReplaceTempView("udf_test")

 

 In Python

from pyspark.sql.types import LongType

 

 Create cubed function

def cubed(s):

  return s * s * s

 

 Register UDF

spark.udf.register("cubed", cubed, LongType())

 

 Generate temporary view

spark.range(1, 9).createOrReplaceTempView("udf_test")

现在,你可以使用 Spark SQL 执行以下任意一个 cubed()函数:

// In Scala/Python

// Query the cubed UDF

spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

 

+---+--------+

| id|id_cubed|

+---+--------+

|  1|       1|

|  2|       8|

|  3|      27|

|  4|      64|

|  5|     125|

|  6|     216|

|  7|     343|

|  8|     512|

+---+--------+

SPARK SQL 中的赋值顺序和空值校验

Spark SQL(包括 SQL,DataFrame API 和 Dataset API)不保证子表达式的求值顺序。例如,以下查询不能保证“s is NOT NULL“子句在“strlen(s) > 1”子句之前执行:

spark.sql("SELECT s FROM test1 WHERE s IS NOT NULL AND strlen(s) > 1")

因此,为了执行正确的 null 校验,建议你执行以下操作:

1. 使 UDF 本身意识到可能会存在 null 值,并在 UDF 内部进行 null 检查。

2. 使用 IF 或 CASE WHEN 表达式进行 null 检查,并在条件分支中调用 UDF。


使用 PANDAS UDF 加速和分发 PYSPARK UDF

使用 PySpark UDF 之前存在的主要问题之一是,它们的性能比 Scala UDF 慢。这是因为 PySpark UDF 需要在 JVM 和 Python 之间进行数据移动,这个过程比较耗费性能。为了解决此问题,Pandas UDF(也称为向量化 UDF)作为 Apache Spark 2.3 的一部分引入。Pandas UDF 使用 Apache Arrow 传输数据,使用 Pandas 处理数据。你可以使用关键字 pandas_udf 作为修饰符来定义 Pandas UDF ,或者包装函数本身。一旦数据采用 Apache Arrow 格式,不再需要对数据进行序列化处理,因为它已经是 Python 进程可使用的格式。你不是逐行操作单个输入源,而是在 Pandas Series 或 DataFrame 上进行操作(即向量化执行)。


从具有 Python 3.6 及更高版本的 Apache Spark 3.0 起,Pandas UDF 分为两个 API 类别:Pandas UDF 和 Pandas Function API。


Pandas UDF

用 Apache Spark 3.0,Pandas UDF 从 Pandas UDF 中的 Python 类型提示推断 Pandas  UDF 类型,如 pandas.Series,pandas.DataFrame,Tuple,和 Iterator。以前,你需要手动定义和指定每种 Pandas UDF 类型。现在,Pandas UDF 中支持的 Python 类型提示案例是 Series 到 Series,Series 迭代器到 Series 迭代器、多重 Series 迭代器到 Series 迭代器和 Series 到标量(单个值)。


Pandas Function API

Pandas 函数 API 允许你将本地 Python 函数直接应用于 PySpark DataFrame,其中输入和输出均为 Pandas 实例。对于 Spark 3.0,受支持的 Pandas Function API 为 grouped map, map, cogrouped map。

 

欲了解更多信息,请参阅第 12 章中的“利用 Python 类型提示重新设计 Pandas UDF”一节。

以下是用于 Spark 3.0 的标量 Pandas UDF 的示例:

 In Python

 Import pandas

import pandas as pd

 

 Import various pyspark SQL functions including pandas_udf

from pyspark.sql.functions import col, pandas_udf

from pyspark.sql.types import LongType

 

 Declare the cubed function

def cubed(a: pd.Series) -> pd.Series:

    return a * a * a

 

 Create the pandas UDF for the cubed function

cubed_udf = pandas_udf(cubed, returnType=LongType())

前面的代码片段声明了一个称为 cubed()执行立方操作的函数。这是常规的 Pandas 函数,带有额外的 cubed_udf = pandas_udf()调用来创建我们的 Pandas UDF。

让我们从一个简单的 Pandas 系列(为定义 x)开始,然后将本地函数 cubed()应用于立方计算:

Create a Pandas Series

x = pd.Series([1, 2, 3])

 

 The function for a pandas_udf executed with local Pandas data

print(cubed(x))

输出如下:

0 1

1 8

2 27

dtype:int64

现在,让我们切换到 Spark DataFrame。我们可以将其作为 Spark 向量化 UDF 执行,如下所示:

 Create a Spark DataFrame, 'spark' is an existing SparkSession

df = spark.range(1, 4)

 

 Execute function as a Spark vectorized UDF

df.select("id", cubed_udf(col("id"))).show()

这是输出:

+ --- + --------- +

| id | cubed(id)|

+ --- + --------- +

| 1 | 1 |

| 2 | 8 |

| 3 | 27 |

+ --- + --------- +

与局部函数相反,使用向量化的 UDF 将导致执行 Spark 作业。以前的本地函数是仅在 Spark 驱动程序上执行的 Pandas 函数。在此 pandas_udf 功能的一个阶段(图 5-1)中查看 Spark UI 时,这一点变得更加明显。

要更深入地了解 Pandas UDF,请参阅 pandas 用户自定义函数文档。

像许多 Spark 作业一样,该作业通过 parallelize()方法将本地数据(二进制批处理)发送给 Executor,并调用 mapPartitions()将二进制数据转换为 Spark 的内部数据格式,该格式可以分发给 Spark 的工作节点。有许多 WholeStageCodegen 步骤代表了性能上的根本提升(这要归功于 Tungsten 项目的整个阶段代码生成,显着提高了 CPU 效率和性能)。但这是由 ArrowEvalPython 的步骤确定(在本例中)正在执行 Pandas UDF 的步骤。


使用 Spark SQL Shell,Beeline 和 Tableau 查询

存在各种机制可以用于查询 Apache Spark,包括 Spark SQL Shell,Beeline CLI 实用程序以及诸如 Tableau 和 Power BI 之类的报表工具。在本节中,我们包含有关 Tableau 的说明;对于 Power BI,请参阅文档。


使用 Spark SQL Shell

spark-sql CLI 是执行 Spark SQL 查询的便捷工具。虽然此实用程序在本地模式下与 Hive Metastore 服务进行通信,但它不会与 Thrift JDBC/ODBC 服务(也称为 Spark Thrift Server STS)通信。STS 允许 JDBC/ODBC 客户端在 Apache Spark 上通过 JDBC 和 ODBC 协议执行 SQL 查询。

要启动 Spark SQL CLI,进入 $SPARK_HOME 文件夹中执行以下命令:

./bin/spark-sql

启动 Shell 后,可以进行交互方式执行 Spark SQL 查询。让我们看几个例子。


创建表

要创建新的 Spark SQL 表,请执行以下语句:

spark-sql> CREATE TABLE people (name STRING, age int);

你的输出应与此类似,并注意创建 Spark SQL 表 people 及其文件位置(/user/hive/warehouse/people):

20/01/11 22:42:16 WARN HiveMetaStore: Location: file:/user/hive/warehouse/people

specified for non-external table:people

Time taken: 0.63 seconds


将数据插入表中

你可以通过执行以下的 sql 语句将数据插入 Spark SQL 表:

INSERT INTO people SELECT name, age FROM ...

由于你不依赖于从预先存在的表或文件中加载数据,因此可以使用 INSERT...VALUES 语句将数据插入表中。这三个 sql 语句将三个人信息(包括姓名和年龄,如果知道)插入到 people 表中:

spark-sql> INSERT INTO people VALUES ("Michael", NULL);

Time taken: 1.696 seconds

spark-sql> INSERT INTO people VALUES ("Andy", 30);

Time taken: 0.744 seconds

spark-sql> INSERT INTO people VALUES ("Samantha", 19);

Time taken: 0.637 seconds

spark-sql>


执行 SPARK SQL 查询

现在,表中已经有数据了,你可以对它执行 Spark SQL 查询。让我们从查看元存储中存在的表开始:

spark-sql> SHOW TABLES;

default people false

Time taken: 0.016 seconds, Fetched 1 row(s)

接下来,让我们找出表中 20 岁以下的年轻人:

spark-sql> SELECT * FROM people WHERE age < 20;

Samantha 19

Time taken: 0.593 seconds, Fetched 1 row(s)

同样,让我们​​看看年龄为空的人是谁,一般有这种情况说明可能是脏数据:

spark-sql> SELECT name FROM people WHERE age IS NULL;

Michael

Time taken: 0.272 seconds, Fetched 1 row(s)


使用 Beeline

如果你使用过 Apache Hive,则你可能熟悉命令行工具 Beeline,这是一个用于对 HiveServer2 运行 HiveQL 查询的通用实用程序。Beeline 是基于 SQLLine CLI 的 JDBC 客户端。你可以使用同一实用程序对 Spark Thrift 服务执行 Spark SQL 查询。请注意,当前实现的 Thrift JDBC/ODBC 服务对应于 Hive 1.2.1 中的 HiveServer2。你可以使用 Spark 或 Hive 1.2.1 自身带有的 Beeline 脚本来测试 JDBC 服务。


启动 THRIFT 服务

要启动 Spark Thrift JDBC/ODBC 服务,进入 $SPARK_HOME 文件夹中执行以下命令:

./sbin/start-thriftserver.sh

 

如果尚未启动 Spark driver 和 worker,请在执行 start-thriftserver.sh 命令之前先执行以下命令:

./sbin/start-all.sh


通过 BEELINE 连接到 THRIFT 服务

要使用 Beeline 测试 Thrift JDBC/ODBC 服务,请执行以下命令:

./bin/beeline

然后将 Beeline 配置为连接到本地 Thrift 服务:

!connect jdbc:hive2://localhost:10000 

 

默认情况下,beeline 处于非安全模式。因此,用户名是你的登录名(例如 user@learningspark.org),密码是空的。


使用 BEELINE 执行 SPARK SQL 查询

从这里,你可以运行 Spark SQL 查询,类似于使用 Beeline 进行 Hive 查询的方式。以下是一些查询示例及其输出:

0: jdbc:hive2://localhost:10000> SHOW tables;

+-----------+------------+--------------+

| database | tableName | isTemporary |

+-----------+------------+--------------+

| default | people | false |

+-----------+------------+--------------+

1 row selected (0.417 seconds)

0: jdbc:hive2://localhost:10000> SELECT * FROM people;

+-----------+-------+

| name | age |

+-----------+-------+

| Samantha | 19 |

| Andy | 30 |

| Michael | NULL |

+-----------+-------+

3 rows selected (1.512 seconds)

0: jdbc:hive2://localhost:10000>


停止 THRIFT 服务

完成 spark sql 操作后,可以使用以下命令停止 Thrift 服务:

./sbin/stop-thriftserver.sh


使用 Tableau

与通过 Beeline 或 Spark SQL CLI 运行查询类似,你可以通过 Thrift JDBC/ODBC 服务将自己喜欢的 BI 工具连接到 Spark SQL。在本节中,我们将向你展示如何将 Tableau Desktop(2019.2 版)连接到本地 Apache Spark 实例。

你需要已经安装 Tableau 的 Spark ODBC 驱动程序版本 1.2.0 或更高版本。如果你已经安装(或升级到)Tableau 2018.1 或更高版本,则该驱动程序应该已经预先安装。


启动 THRIFT 服务

要启动 Spark Thrift JDBC/ODBC 服务,进入到 $SPARK_HOME 文件夹中执行以下命令:

./sbin/start-thriftserver.sh

 

如果尚未启动 Spark 驱动程序和 worker 服务,请在执行以下 start-thriftserver.sh 命令之前优先执行下面的命令启动相关服务:

 

./sbin/start-all.sh


启动 TABLEAU

如果是第一次启动 Tableau,将看到一个“连接”对话框,该对话框允许你连接到大量数据源。默认情况下,Spark SQL 选项不会包含在左侧的“To a Server”菜单中(请参见图 5-2)。

要访问 Spark SQL 选项,请单击该列表底部的“更多…”,然后从出现在主面板中的列表中选择 Spark SQL,如图 5-3 所示。


这将弹出 Spark SQL 对话框(图 5-4)。连接到本地 Apache Spark 实例时,可以使用以下参数使用非安全用户名身份验证模式:

 服务器:localhost

 端口:10000(默认)

 类型:SparkThriftServer(默认)

 身份验证:用户名

 用户名:你的登录名,例如,user@learningspark.org

 需要 SSL:不选中


图 5-4。Spark SQL 对话框


一旦成功连接到 Spark SQL 数据源后,你将看到类似于图 5-5 的数据源连接视图。

从左侧的“Select schema”下拉菜单中,选择“default”。然后输入要查询的表的名称(请参见图 5-6)。请注意,你可以单击放大镜图标以获取可用表的完整列表。



有关使用 Tableau 连接到 Spark SQL 数据库的更多信息,请参考 Tableau 的 Spark SQL 文档和 Databricks Tableau 文档。

输入 people 作为表名,然后将表从左侧拖放到主对话框中(在标记为“Drag tables here”的空间中)。你应该看到如图 5-7 所示的内容。

单击“立即更新(Update Now)”,然后 Tableau 将查询 Spark SQL 数据源(图 5-8)。

现在,你可以对 Spark 数据源、关联表等执行查询,就像对任何其他 Tableau 数据源一样。


停止 THRIFT 服务

完成后,你可以使用以下命令停止 Thrift 服务:

./sbin/stop-thriftserver.sh


外部数据源

在本节中,我们将从 JDBC 和 SQL 数据库开始,着重介绍如何使用 Spark SQL 连接到外部数据源。


JDBC 和 SQL 数据库

Spark SQL 包含一个数据源 API,可以使用 JDBC 从其他数据库读取数据。Spark SQL 将结果返回为 DataFrame,从而简化了查询这些数据源,进而提供了 Spark SQL 的所有优点(包括性能和与其他数据源关联的能力)。

首先,你需要为 JDBC 数据源指定 JDBC 驱动程序,并且该驱动程序必须位于 Spark 类路径上。从 $SPARK_HOME 文件夹中,你可以发出如下命令:

./bin/spark-shell --driver-class-path $database.jar --jars $database.jar 

使用数据源 API,可以将远程数据库中的表作为 DataFrame 或 Spark SQL 临时视图加载。用户可以在数据源选项中指定 JDBC 连接属性。表 5-1 包含 Spark 支持的一些更常见的连接属性(不区分大小写)。



表 5-1:通用连接属性


有关连接属性的完整列表,请参见 Spark SQL 文档。


分区的重要性

在 Spark SQL 和 JDBC 外部源之间传输大量数据时,对数据源进行分区很重要。你的所有数据都通过一个驱动程序进行连接,这可能导致饱和并显著降低提取性能,并有可能使源系统的资源饱和。虽然这些 JDBC 属性是可选的,但对于任何大规模操作,强烈建议使用表 5-2 中显示的属性。

表 5-2:分区连接属性


让我们看一个示例,以帮助你了解这些属性的工作方式。假设我们使用以下设置:

l numPartitions: 10

l lowerBound: 1000

l upperBound: 10000

然后步长等于 1000,将创建 10 个分区。这等效于执行以下 10 个查询(每个分区一个):

SELECT * FROM table WHERE partitionColumn BETWEEN 1000 and 2000


SELECT * FROM table WHERE partitionColumn BETWEEN 2000 and 3000

...


SELECT * FROM table WHERE partitionColumn BETWEEN 9000 and 10000

虽然没有所有内容,但在使用这些属性时,请牢记以下提示:

一个好的起点是 numPartitions 最好是 Spark worker 的倍数。例如,如果你有四个 Spark worker 节点,则可能从 4 个或 8 个分区开始。但是一定需要注意的是,源系统可以很好地处理读取请求也很重要。对于具有处理窗口的系统,你可以最大程度地增加对源系统的并发请求数。对于缺少处理窗口的系统(例如,OLTP 系统连续处理数据),应减少并发请求的数量,以防止源系统负载过高。


最初,根据最小和最大实际值计算 lowerBound。例如,如果你选择{numPartitions:10, lowerBound: 1000, upperBound: 10000},但所有值都在 2000 和 4000 之间,那么 10 个查询中只有 2 个(每个分区一个)将完成所有工作。在这种情况下,更好的配置将是{numPartitions:10, lowerBound: 2000, upperBound: 4000}


 选择可以均匀分布的 partitionColumn,以避免数据倾斜。例如,如果你的大多数 partitionColumn 的值都是 2500,则{numPartitions:10, lowerBound: 1000, upperBound: 10000}大多数工作将由请求在 2000 和 3000 之间的任务来执行。相反,请选择其他 partitionColumn 分区,或者在可能的情况下生成一个新分区(可能是多个列的哈希),以更均匀地分配你的分区。


PostgreSQL

要连接到 PostgreSQL 数据库,请从 Maven 构建或下载 JDBC jar 并将其添加到你的类路径中。然后启动一个 Spark shell(spark-shell 或 pyspark),并指定该 jar:

bin / spark-shell --jars postgresql-42.2.6.jar

以下示例显示了如何使用 Scala 中的 Spark SQL 数据源 API 和 JDBC 从 PostgreSQL 数据库加载并保存到 PostgreSQL 数据库:

// In Scala

// Read Option 1: Loading data from a JDBC source using load method

val jdbcDF1 = spark

  .read

  .format("jdbc")

  .option("url", "jdbc:postgresql:[DBSERVER]")

  .option("dbtable", "[SCHEMA].[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .load()

 

// Read Option 2: Loading data from a JDBC source using jdbc method

// Create connection properties

import java.util.Properties

val cxnProp = new Properties()

cxnProp.put("user", "[USERNAME]")

cxnProp.put("password", "[PASSWORD]")

 

// Load data using the connection properties

val jdbcDF2 = spark

  .read

  .jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]", cxnProp)

 

// Write Option 1: Saving data to a JDBC source using save method

jdbcDF1

  .write

  .format("jdbc")

  .option("url", "jdbc:postgresql:[DBSERVER]")

  .option("dbtable", "[SCHEMA].[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .save()

 

// Write Option 2: Saving data to a JDBC source using jdbc method

jdbcDF2.write

  .jdbc(s"jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]", cxnProp)

这是在 PySpark 中的操作方法:

 In Python

 Read Option 1: Loading data from a JDBC source using load method

jdbcDF1 = (spark

  .read

  .format("jdbc")

  .option("url", "jdbc:postgresql://[DBSERVER]")

  .option("dbtable", "[SCHEMA].[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .load())

 

 Read Option 2: Loading data from a JDBC source using jdbc method

jdbcDF2 = (spark

  .read

  .jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]",

          properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))

 

 Write Option 1: Saving data to a JDBC source using save method

(jdbcDF1

  .write

  .format("jdbc")

  .option("url", "jdbc:postgresql://[DBSERVER]")

  .option("dbtable", "[SCHEMA].[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .save())

 

 Write Option 2: Saving data to a JDBC source using jdbc method

(jdbcDF2

  .write

  .jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]",

          properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))


MySQL

要连接到 MySQL 数据库,请从 Maven 或 MySQL 中构建或下载 JDBC jar (后者更简单!)并将其添加到你的类路径中。然后启动一个 Spark shell(spark-shell 或 pyspark),并指定该 jar:

bin / spark-shell --jars mysql-connector-java_8.0.16-bin.jar

以下示例显示了如何使用 Scala 中的 Spark SQL 数据源 API 和 JDBC 从 MySQL 数据库加载数据并将其保存到 MySQL 数据库:

// In Scala

// Loading data from a JDBC source using load

val jdbcDF = spark

  .read

  .format("jdbc")

  .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")

  .option("driver", "com.mysql.jdbc.Driver")

  .option("dbtable", "[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .load()

 

// Saving data to a JDBC source using save

jdbcDF

  .write

  .format("jdbc")

  .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")

  .option("driver", "com.mysql.jdbc.Driver")

  .option("dbtable", "[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .save()

以下是在 Python 中执行此操作的方法:

 In Python

 Loading data from a JDBC source using load

jdbcDF = (spark

  .read

  .format("jdbc")

  .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")

  .option("driver", "com.mysql.jdbc.Driver")

  .option("dbtable", "[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .load())

 

 Saving data to a JDBC source using save

(jdbcDF

  .write

  .format("jdbc")

  .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")

  .option("driver", "com.mysql.jdbc.Driver")

  .option("dbtable", "[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .save())


Azure Cosmos 数据库

若要连接到 Azure Cosmos DB 数据库,请从 Maven 或 GitHub 构建或下载 JDBC jar ,并将其添加到你的类路径中。然后启动一个 Scala 或 PySpark shell,并指定此 jar(请注意,此示例使用的是 Spark 2.4):

bin / spark-shell --jars azure-cosmosdb-spark_2.4.0_2.11-1.3.5-uber.jar

你还可以选择使用其 Maven 选项--packages 从 Spark Packages 中获得连接器:

export PKG="com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.3.5"

bin/spark-shell --packages $PKG 

以下示例显示如何使用 Scala 和 PySpark 中的 Spark SQL 数据源 API 和 JDBC 从 Azure Cosmos DB 数据库加载数据并将其保存到 Azure Cosmos DB 数据库。请注意,通常使用 query_custom 配置来利用 Cosmos DB 中的各种索引:

// In Scala

// Import necessary libraries

import com.microsoft.azure.cosmosdb.spark.schema._

import com.microsoft.azure.cosmosdb.spark._

import com.microsoft.azure.cosmosdb.spark.config.Config

 

// Loading data from Azure Cosmos DB

// Configure connection to your collection

val query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"

val readConfig = Config(Map(

  "Endpoint" -> "https://[ACCOUNT].documents.azure.com:443/",

  "Masterkey" -> "[MASTER KEY]",

  "Database" -> "[DATABASE]",

  "PreferredRegions" -> "Central US;East US2;",

  "Collection" -> "[COLLECTION]",

  "SamplingRatio" -> "1.0",

  "query_custom" -> query

))

 

// Connect via azure-cosmosdb-spark to create Spark DataFrame

val df = spark.read.cosmosDB(readConfig)

df.count

 

// Saving data to Azure Cosmos DB

// Configure connection to the sink collection

val writeConfig = Config(Map(

  "Endpoint" -> "https://[ACCOUNT].documents.azure.com:443/",

  "Masterkey" -> "[MASTER KEY]",

  "Database" -> "[DATABASE]",

  "PreferredRegions" -> "Central US;East US2;",

  "Collection" -> "[COLLECTION]",

  "WritingBatchSize" -> "100"

))

 

// Upsert the DataFrame to Azure Cosmos DB

import org.apache.spark.sql.SaveMode

df.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)

 

 In Python

 Loading data from Azure Cosmos DB

 Read configuration

query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"

readConfig = {

  "Endpoint" : "https://[ACCOUNT].documents.azure.com:443/",

  "Masterkey" : "[MASTER KEY]",

  "Database" : "[DATABASE]",

  "preferredRegions" : "Central US;East US2",

  "Collection" : "[COLLECTION]",

  "SamplingRatio" : "1.0",

  "schema_samplesize" : "1000",

  "query_pagesize" : "2147483647",

  "query_custom" : query

}

 

 Connect via azure-cosmosdb-spark to create Spark DataFrame

df = (spark

  .read

  .format("com.microsoft.azure.cosmosdb.spark")

  .options(**readConfig)

  .load())

 

 Count the number of flights

df.count()

 

 Saving data to Azure Cosmos DB

 Write configuration

writeConfig = {

 "Endpoint" : "https://[ACCOUNT].documents.azure.com:443/",

 "Masterkey" : "[MASTER KEY]",

 "Database" : "[DATABASE]",

 "Collection" : "[COLLECTION]",

 "Upsert" : "true"

}

 

 Upsert the DataFrame to Azure Cosmos DB

(df.write

  .format("com.microsoft.azure.cosmosdb.spark")

  .options(**writeConfig)

  .save())

有关更多信息,请参考 Azure Cosmos DB 文档。


MS SQL 服务

要连接到 MS SQL Server 数据库,请下载 JDBC jar 并将其添加到你的类路径中。然后启动 Scala 或 PySpark shell,并指定以下 jar:

bin / spark-shell --jars mssql-jdbc-7.2.2.jre8.jar

以下示例显示了如何使用 Scala 和 PySpark 中的 Spark SQL 数据源 API 和 JDBC 从 MS SQL Server 数据库加载数据并将其保存到 MS SQL Server 数据库:

// In Scala

// Loading data from a JDBC source

// Configure jdbcUrl

val jdbcUrl = "jdbc:sqlserver://[DBSERVER]:1433;database=[DATABASE]"

 

// Create a Properties() object to hold the parameters.

// Note, you can create the JDBC URL without passing in the

// user/password parameters directly.

val cxnProp = new Properties()

cxnProp.put("user", "[USERNAME]")

cxnProp.put("password", "[PASSWORD]")

cxnProp.put("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")

 

// Load data using the connection properties

val jdbcDF = spark.read.jdbc(jdbcUrl, "[TABLENAME]", cxnProp)

 

// Saving data to a JDBC source

jdbcDF.write.jdbc(jdbcUrl, "[TABLENAME]", cxnProp)

 

 In Python

 Configure jdbcUrl

jdbcUrl = "jdbc:sqlserver://[DBSERVER]:1433;database=[DATABASE]"

 

 Loading data from a JDBC source

jdbcDF = (spark

  .read

  .format("jdbc")

  .option("url", jdbcUrl)

  .option("dbtable", "[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .load())

 

 Saving data to a JDBC source

(jdbcDF

  .write

  .format("jdbc")

  .option("url", jdbcUrl)

  .option("dbtable", "[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .save())


其他外部来源

Apache Spark 可以连接许多外部数据源。其他流行的数据源包括:

Apache Cassandra

Snowflake

MongoDB


DataFrames 和 Spark SQL 中的高阶函数

由于复杂数据类型是简单数据类型的组合,因此很容易直接对其进行操作。有两种用于处理复杂数据类型的典型解决方案:

将嵌套结构分解为单独的元素,并进行相应的处理,然后重新创建嵌套结构

构建用户自定义函数

这些方法的好处是可以让你以扁平化的格式考虑问题。它们通常涉及(但不限于)使用实用程序函数,例如 get_json_object(),from_json(),to_json(),explode(),和 selectExpr()。

让我们仔细看看这两种选项。


选项 1:Explode 和 Collect

在此嵌套的 SQL 语句中,我们首先 explode(values),它为每个元素(value)创建新的一行(带有 id)的值:

-- In SQL

SELECT id, collect_list(value + 1) AS values

FROM  (SELECT id, EXPLODE(values) AS value

        FROM table) x

GROUP BY id

虽然 collect_list()返回具有重复项的对象列表,但该 GROUP BY 语句需要进行洗牌操作,这意味着重新收集的数组的顺序不一定与原始数组的顺序相同。由于 values 可以是任意数量的维度(非常宽和/或非常长的数组),而我们正在做 GROUP BY,这种方法的使用成本可能会非常昂贵。


选项 2:用户自定义函数

为了执行相同的任务(添加 1 到中的每个元素 values),我们还可以创建 map()用于遍历每个元素(value)并执行添加操作的 UDF :

// In Scala

def addOne(values: Seq[Int]): Seq[Int] = {

    values.map(value => value + 1)

}

val plusOneInt = spark.udf.register("plusOneInt", addOne(_: Seq[Int]): Seq[Int])

然后,我们可以在 Spark SQL 中使用此 UDF,如下所示:

spark.sql("SELECT id, plusOneInt(values) AS values FROM table").show() 

尽管这比使用 explode()并且 collect_list()更好,因为不会有任何排序问题,但是序列化和反序列化过程本身可能会很昂贵。但是,还必须注意,这 collect_list()可能会导致 Executor 遇到大数据集时出现内存不足问题,而使用 UDF 可以减轻这些问题。


复杂数据类型的内置函数

你可以使用 Apache Spark 2.4 及更高版本中包含的内置函数来处理复杂数据类型,而不必使用前面介绍的潜在的昂贵技术。表 5-3(数组类型)和表 5-4(映射类型)中列出了一些较常见的类型。有关完整列表,请参阅 Databricks 文档中的笔记。






表 5-3 数组类型的函数



表 5-4 映射类型的函数


高阶函数

除了前面提到的内置函数外,还有一些将匿名 lambda 函数作为参数的高阶函数。下面是一个高阶函数的示例:

-- In SQL

transform(values, value -> lambda expression)

该 transform()函数以数组(values)和匿名函数(lambda expression)作为输入。通过将匿名函数应用于每个元素,然后将结果分配给输出数组,该函数可以明确地创建一个新数组(类似于 UDF 方法,但效率更高)。

让我们创建一个样本数据集,以便我们可以运行一些示例:

 In Python

from pyspark.sql.types import *

schema = StructType([StructField("celsius", ArrayType(IntegerType()))])

 

t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]

t_c = spark.createDataFrame(t_list, schema)

t_c.createOrReplaceTempView("tC")

 

# Show the DataFrame

t_c.show()

 

// In Scala

// Create DataFrame with two rows of two arrays (tempc1, tempc2)

val t1 = Array(35, 36, 32, 30, 40, 42, 38)

val t2 = Array(31, 32, 34, 55, 56)

val tC = Seq(t1, t2).toDF("celsius")

tC.createOrReplaceTempView("tC")

 

// Show the DataFrame

tC.show()

这里是输出:

+ -------------------- +

|               celsius|

+ -------------------- +

| [35、36、32、30,...  |

| [31,32,34,55,56]  |

+ -------------------- +

使用前面的 DataFrame,你可以运行以下更高阶的函数查询。


transform()

transform(array <T>,function <T,U>):array <U>

该 transform()函数通过将一个函数应用于输入数组的每个元素来生成一个数组(类似于一个 map()函数):

// In Scala/Python

// Calculate Fahrenheit from Celsius for an array of temperatures

spark.sql("""

SELECT celsius,

 transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit

  FROM tC

""").show()

 

+--------------------+--------------------+

|             celsius|          fahrenheit|

+--------------------+--------------------+

|[35,36,32,30,... |[95,96,89,86,... |

|[31,32,34,55,56] |[87,89,93,131,...|

+--------------------+--------------------+

filter()

filter(array <T>,function <T,Boolean>):array <T>

该 filter()函数产生一个数组,该数组仅由输入数组的布尔函数值为 true 的元素组成:

// In Scala/Python

// Filter temperatures > 38C for array of temperatures

spark.sql("""

SELECT celsius,

 filter(celsius, t -> t > 38) as high

  FROM tC

""").show()

 

+--------------------+--------+

|             celsius|    high|

+--------------------+--------+

|[35,36,32,30,...|[40,42]|

|[31,32,34,55,56]|[55,56]|

+--------------------+--------+

exists()

exists(array<T>, function<T, V, Boolean>): Boolean

如果布尔函数对于输入数组中的某一个元素成立,则该 exists()函数返回 true:

// In Scala/Python

// Is there a temperature of 38C in the array of temperatures

spark.sql("""

SELECT celsius,

       exists(celsius, t -> t = 38) as threshold

  FROM tC

""").show()

 

+--------------------+---------+

|             celsius|threshold|

+--------------------+---------+

|[35,36,32,30,...|     true|

|[31,32,34,55,56]|    false|

+--------------------+---------+

reduce()

reduce(array <T>,B,function <B,T,B>,function <B,R>)

该 reduce()函数通过函数 function<B, T, B>将元素合并到缓冲区 B 中,并在最终缓冲区上应用函数 function<B, R>,从而将数组的元素减少为单个值:

// In Scala/Python

// Calculate average temperature and convert to F

spark.sql("""

SELECT celsius,

       reduce(

          celsius,

          0,

          (t, acc) -> t + acc,

          acc -> (acc div size(celsius) * 9 div 5) + 32

        ) as avgFahrenheit

  FROM tC

""").show()

 

+--------------------+-------------+

|             celsius|avgFahrenheit|

+--------------------+-------------+

|[35,36,32,30,...|           96|

|[31,32,34,55,56]|          105|

+--------------------+-------------+


通用 DataFrame 和 Spark SQL 操作

Spark SQL 的部分功能来自其支持的各种 DataFrame 操作(也称为无类型的 Dataset 操作)。操作列表非常广泛,包括:

汇总功能

收集功能

日期时间功能

数学函数

混合功能

非集合函数

排序功能

字符串函数

UDF 功能

窗口函数

有关完整列表,请参见 Spark SQL 文档。

在本章中,我们将重点介绍以下常见的关系操作:

union 和 join

窗口

修改

为了执行这些 DataFrame 操作,我们首先准备一些数据。在以下代码段中,我们:

1. 导入两个文件并创建两个 DataFrame,一个用于机场(airportsna)信息,一个用于美国航班延误(departureDelays)。

2. 使用 expr(),将 delay 和 distance 列从 STRING 转换为 INT。

3. 创建一个较小的表 foo,我们可以将其作为演示示例的重点;它仅包含有关在短时间内从西雅图(SEA)到旧金山(SFO)的三班航班的信息。

让我们开始吧:

// In Scala

import org.apache.spark.sql.functions._

 

// Set file paths

val delaysPath =

  "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"

val airportsPath =

  "/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"

 

// Obtain airports data set

val airports = spark.read

  .option("header", "true")

  .option("inferschema", "true")

  .option("delimiter", "\t")

  .csv(airportsPath)

airports.createOrReplaceTempView("airports_na")

 

// Obtain departure Delays data set

val delays = spark.read

  .option("header","true")

  .csv(delaysPath)

  .withColumn("delay", expr("CAST(delay as INT) as delay"))

  .withColumn("distance", expr("CAST(distance as INT) as distance"))

delays.createOrReplaceTempView("departureDelays")

 

// Create temporary small table

val foo = delays.filter(

  expr("""origin == 'SEA' AND destination == 'SFO' AND

      date like '01010%' AND delay > 0"""))

foo.createOrReplaceTempView("foo")

 

 In Python

 Set file paths

from pyspark.sql.functions import expr

tripdelaysFilePath =

  "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"

airportsnaFilePath =

  "/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"

  

 Obtain airports data set

airportsna = (spark.read

  .format("csv")

  .options(header="true", inferSchema="true", sep="\t")

  .load(airportsnaFilePath))

 

airportsna.createOrReplaceTempView("airports_na")

 

 Obtain departure delays data set

departureDelays = (spark.read

  .format("csv")

  .options(header="true")

  .load(tripdelaysFilePath))

 

departureDelays = (departureDelays

  .withColumn("delay", expr("CAST(delay as INT) as delay"))

  .withColumn("distance", expr("CAST(distance as INT) as distance")))

 

departureDelays.createOrReplaceTempView("departureDelays")

 

 Create temporary small table

foo = (departureDelays

  .filter(expr("""origin == 'SEA' and destination == 'SFO' and

    date like '01010%' and delay > 0""")))

foo.createOrReplaceTempView("foo")

所述 departureDelays DataFrame 包含> 1.3M 航班数据,而 foo DataFrame 包含从 SEA 到 SFO 的航班在特定时间范围内只有三条信息,见下面的输出:

// Scala/Python

spark.sql("SELECT * FROM airports_na LIMIT 10").show()

 

+-----------+-----+-------+----+

|       City|State|Country|IATA|

+-----------+-----+-------+----+

| Abbotsford|   BC| Canada| YXX|

|   Aberdeen|   SD|    USA| ABR|

|    Abilene|   TX|    USA| ABI|

|      Akron|   OH|    USA| CAK|

|    Alamosa|   CO|    USA| ALS|

|     Albany|   GA|    USA| ABY|

|     Albany|   NY|    USA| ALB|

|Albuquerque|   NM|    USA| ABQ|

| Alexandria|   LA|    USA| AEX|

|  Allentown|   PA|    USA| ABE|

+-----------+-----+-------+----+

 

spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

 

+--------+-----+--------+------+-----------+

|    date|delay|distance|origin|destination|

+--------+-----+--------+------+-----------+

|01011245|    6|     602|   ABE|        ATL|

|01020600|   -8|     369|   ABE|        DTW|

|01021245|   -2|     602|   ABE|        ATL|

|01020605|   -4|     602|   ABE|        ATL|

|01031245|   -4|     602|   ABE|        ATL|

|01030605|    0|     602|   ABE|        ATL|

|01041243|   10|     602|   ABE|        ATL|

|01040605|   28|     602|   ABE|        ATL|

|01051245|   88|     602|   ABE|        ATL|

|01050605|    9|     602|   ABE|        ATL|

+--------+-----+--------+------+-----------+

 

spark.sql("SELECT * FROM foo").show()

 

+--------+-----+--------+------+-----------+

|    date|delay|distance|origin|destination|

+--------+-----+--------+------+-----------+

|01010710|   31|     590|   SEA|        SFO|

|01010955|  104|     590|   SEA|        SFO|

|01010730|    5|     590|   SEA|        SFO|

+--------+-----+--------+------+-----------+

在以下各节中,我们将使用此数据执行 union,join 和窗口化示例。


Unions

Apache Spark 中的一种常见模式是将具有相同模式的两个不同的 DataFrame 联合在一起。可以使用以下 union()方法实现:

// Scala

// Union two tables

val bar = delays.union(foo)

bar.createOrReplaceTempView("bar")

bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'

AND date LIKE '01010%' AND delay > 0""")).show()

 

 In Python

# Union two tables

bar = departureDelays.union(foo)

bar.createOrReplaceTempView("bar")

 

 Show the union (filtering for SEA and SFO in a specific time range)

bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'

AND date LIKE '01010%' AND delay > 0""")).show()

该 bar DataFrame 是 foo 和 delays 两者 union 之后得到的。使用相同的过滤条件在 bar DataFrame 中得到 foo 结果,正如预期的那样,我们看到了重复的数据:

-- In SQL

spark.sql("""

SELECT *

  FROM bar

 WHERE origin = 'SEA'

   AND destination = 'SFO'

   AND date LIKE '01010%'

   AND delay > 0

""").show()

 

+--------+-----+--------+------+-----------+

|    date|delay|distance|origin|destination|

+--------+-----+--------+------+-----------+

|01010710|   31|     590|   SEA|        SFO|

|01010955|  104|     590|   SEA|        SFO|

|01010730|    5|     590|   SEA|        SFO|

|01010710|   31|     590|   SEA|        SFO|

|01010955|  104|     590|   SEA|        SFO|

|01010730|    5|     590|   SEA|        SFO|

+--------+-----+--------+------+-----------+


Joins

常见的 DataFrame 操作是将两个 DataFrame(或表)连接在一起。默认情况下,Spark SQL 默认连接是 inner join,其中连接选项包括 inner,cross,outer,full,full_outer,left,left_outer,right,right_outer,left_semi 和 left_anti 这些连接方式。文档中提供了更多信息(适用于 Scala 和 Python)。

以下代码示例执行 airports 和 foo DataFrame 之间的 inner join:

 

// In Scala

foo.join(

  airports.as('air),

  $"air.IATA" === $"origin"

).select("City", "State", "date", "delay", "distance", "destination").show()

 

 In Python

 Join departure delays data (foo) with airport info

foo.join(

  airports,

  airports.IATA == foo.origin

).select("City", "State", "date", "delay", "distance", "destination").show()

 

-- In SQL

spark.sql("""

SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination

  FROM foo f

  JOIN airports_na a

    ON a.IATA = f.origin

""").show()

前面的代码可以使你通过查看 foo DataFrame 和 airports DataFrame 两者的关联信息,得到进入城市的日期,延迟,距离和目的地信息:

+-------+-----+--------+-----+--------+-----------+

| City|State| date|delay|distance|destination|

+-------+-----+--------+-----+--------+-----------+

|Seattle| WA|01010710| 31| 590| SFO|

|Seattle| WA|01010955| 104| 590| SFO|

|Seattle| WA|01010730| 5| 590| SFO|

+-------+-----+--------+-----+--------+-----------+


windowing

窗口函数使用窗口(输入的行范围)中行的值返回一组值,通常是以另一行的形式。使用窗口函数,可以在一组行上进行操作,同时仍为每个输入行返回一个值。在本节中,我们将展示如何使用 dense_rank()窗口功能。如表 5-5 所示,还有许多其他功能。

让我们首先回顾一下从西雅图(SEA),旧金山(SFO)和纽约(JFK)出发并前往一组特定的目的地位置的航班所经历的 TotalDelays(计算得出 sum(Delay)),如以下查询所示:

-- In SQL

DROP TABLE IF EXISTS departureDelaysWindow;

 

CREATE TABLE departureDelaysWindow AS

SELECT origin, destination, SUM(delay) AS TotalDelays

  FROM departureDelays

 WHERE origin IN ('SEA', 'SFO', 'JFK')

   AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')

 GROUP BY origin, destination;

 

SELECT * FROM departureDelaysWindow

 

+------+-----------+-----------+

|origin|destination|TotalDelays|

+------+-----------+-----------+

|   JFK|        ORD|       5608|

|   SEA|        LAX|       9359|

|   JFK|        SFO|      35619|

|   SFO|        ORD|      27412|

|   JFK|        DEN|       4315|

|   SFO|        DEN|      18688|

|   SFO|        SEA|      17080|

|   SEA|        SFO|      22293|

|   JFK|        ATL|      12141|

|   SFO|        ATL|       5091|

|   SEA|        DEN|      13645|

|   SEA|        ATL|       4535|

|   SEA|        ORD|      10041|

|   JFK|        SEA|       7856|

|   JFK|        LAX|      35755|

|   SFO|        JFK|      24100|

|   SFO|        LAX|      40798|

|   SEA|        JFK|       4667|

+------+-----------+-----------+

如果你想为每个这些始发机场找到三个延误最严重的目的地,该怎么办?你可以通过针对每个起点运行三个不同的查询,然后将结果合并在一起,来实现此目的,如下所示:

-- In SQL

SELECT origin, destination, SUM(TotalDelays) AS TotalDelays

 FROM departureDelaysWindow

WHERE origin = '[ORIGIN]'

GROUP BY origin, destination

ORDER BY SUM(TotalDelays) DESC

LIMIT 3

这里[ORIGIN]是三个不同产地的值:JFK,SEA 和 SFO。

但是更好的方法是使用窗口函数 dense_rank()来执行以下计算:

-- In SQL

spark.sql("""

SELECT origin, destination, TotalDelays, rank

  FROM (

     SELECT origin, destination, TotalDelays, dense_rank()

       OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank

       FROM departureDelaysWindow

  ) t

 WHERE rank <= 3

""").show()

 

+------+-----------+-----------+----+

|origin|destination|TotalDelays|rank|

+------+-----------+-----------+----+

|   SEA|        SFO|      22293|   1|

|   SEA|        DEN|      13645|   2|

|   SEA|        ORD|      10041|   3|

|   SFO|        LAX|      40798|   1|

|   SFO|        ORD|      27412|   2|

|   SFO|        JFK|      24100|   3|

|   JFK|        LAX|      35755|   1|

|   JFK|        SFO|      35619|   2|

|   JFK|        ATL|      12141|   3|

+------+-----------+-----------+----+

通过使用 dense_rank()窗口功能,我们可以快速确定三个出发城市的延误最严重的目的地是:

l 西雅图(SEA):旧金山(SFO),丹佛(DEN)和芝加哥(ORD)

l 旧金山(SFO):洛杉矶(LAX),芝加哥(ORD)和纽约(JFK)

l 纽约(JFK):洛杉矶(LAX),旧金山(SFO)和亚特兰大(ATL)

重要的是要注意,每个窗口分组都需要适合一个执行程序,并且在执行过程中将组成一个分区。因此,你需要确保你的查询不是无界的(即限制窗口的大小)。


修改

另一个常见的操作是对 DataFrame 进行修改。尽管 DataFrames 本身是不可变的,但是你可以通过创建具有不同列的新的、不同的 DataFrames 的操作来修改它们。(从前面的章节中回想起,底层的 RDD 是不可变的,也就是说它们不能更改,以确保 Spark 操作具有数据血缘。)让我们从前面的小 DataFrame 示例开始:

// In Scala/Python

foo.show()

 

--------+-----+--------+------+-----------+

|    date|delay|distance|origin|destination|

+--------+-----+--------+------+-----------+

|01010710|   31|     590|   SEA|        SFO|

|01010955|  104|     590|   SEA|        SFO|

|01010730|    5|     590|   SEA|        SFO|

+--------+-----+--------+------+-----------+

添加新列

要将新列添加到 foo DataFrame,请使用以下 withColumn()方法:

// In Scala

import org.apache.spark.sql.functions.expr

val foo2 = foo.withColumn(

              "status",

              expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")

           )

# In Python

from pyspark.sql.functions import expr

foo2 = (foo.withColumn(

          "status",

          expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")

        ))

新创建的 foo2 DataFrame 具有原始 foo DataFrame 的内容以及 status 列,该列是由 CASE 语句定义的:

// In Scala/Python

foo2.show()

 

+--------+-----+--------+------+-----------+-------+

|    date|delay|distance|origin|destination| status|

+--------+-----+--------+------+-----------+-------+

|01010710|   31|     590|   SEA|        SFO|Delayed|

|01010955|  104|     590|   SEA|        SFO|Delayed|

|01010730|    5|     590|   SEA|        SFO|On-time|

+--------+-----+--------+------+-----------+-------+

删除列

要删除列,请使用 drop()方法。例如,让我们删除该 delay 列,因为我们 status 在上一节中添加了一个列:

// In Scala

val foo3 = foo2.drop("delay")

foo3.show()

 

 In Python

foo3 = foo2.drop("delay")

foo3.show()

 

+--------+--------+------+-----------+-------+

|    date|distance|origin|destination| status|

+--------+--------+------+-----------+-------+

|01010710|     590|   SEA|        SFO|Delayed|

|01010955|     590|   SEA|        SFO|Delayed|

|01010730|     590|   SEA|        SFO|On-time|

+--------+--------+------+-----------+-------+

重命名列

你可以使用以下 withColumnRenamed()方法重命名列:

// In Scala

val foo4 = foo3.withColumnRenamed("status", "flight_status")

foo4.show()

 

 In Python

foo4 = foo3.withColumnRenamed("status", "flight_status")

foo4.show()

 

+--------+--------+------+-----------+-------------+

|    date|distance|origin|destination|flight_status|

+--------+--------+------+-----------+-------------+

|01010710|     590|   SEA|        SFO|      Delayed|

|01010955|     590|   SEA|        SFO|      Delayed|

|01010730|     590|   SEA|        SFO|      On-time|

+--------+--------+------+-----------+-------------+

旋转

在处理数据时,有时你需要将列交换为行----旋转数据。让我们获取一些数据来说明这个概念:

-- In SQL

SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay

  FROM departureDelays

 WHERE origin = 'SEA'

 

+-----------+-----+-----+

|destination|month|delay|

+-----------+-----+-----+

|        ORD|    1|   92|

|        JFK|    1|   -7|

|        DFW|    1|   -5|

|        MIA|    1|   -3|

|        DFW|    1|   -3|

|        DFW|    1|    1|

|        ORD|    1|  -10|

|        DFW|    1|   -6|

|        DFW|    1|   -2|

|        ORD|    1|   -3|

+-----------+-----+-----+

only showing top 10 rows

通过数据透视,你可以将名称放置在 month 列中(而不是 1 和 2,可以分别显示 Jan 和 Feb),并且可以按目的地和月份对延迟执行汇总计算(在这种情况下,平均值和最大值):

-- In SQL

SELECT * FROM (

SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay

  FROM departureDelays WHERE origin = 'SEA'

)

PIVOT (

  CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay

  FOR month IN (1 JAN, 2 FEB)

)

ORDER BY destination

 

+-----------+------------+------------+------------+------------+

|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|

+-----------+------------+------------+------------+------------+

|        ABQ|       19.86|         316|       11.42|          69|

|        ANC|        4.44|         149|        7.90|         141|

|        ATL|       11.98|         397|        7.73|         145|

|        AUS|        3.48|          50|       -0.21|          18|

|        BOS|        7.84|         110|       14.58|         152|

|        BUR|       -2.03|          56|       -1.89|          78|

|        CLE|       16.00|          27|        null|        null|

|        CLT|        2.53|          41|       12.96|         228|

|        COS|        5.32|          82|       12.18|         203|

|        CVG|       -0.50|           4|        null|        null|

|        DCA|       -1.15|          50|        0.07|          34|

|        DEN|       13.13|         425|       12.95|         625|

|        DFW|        7.95|         247|       12.57|         356|

|        DTW|        9.18|         107|        3.47|          77|

|        EWR|        9.63|         236|        5.20|         212|

|        FAI|        1.84|         160|        4.21|          60|

|        FAT|        1.36|         119|        5.22|         232|

|        FLL|        2.94|          54|        3.50|          40|

|        GEG|        2.28|          63|        2.87|          60|

|        HDN|       -0.44|          27|       -6.50|           0|

+-----------+------------+------------+------------+------------+

only showing top 20 rows

总结

本章探讨了 Spark SQL 如何与外部组件进行交互的接口。我们讨论了创建用户自定义函数(包括 Pandas UDF)的过程,并介绍了一些用于执行 Spark SQL 查询的选项(包括 Spark SQL Shell,Beeline 和 Tableau)。然后,我们提供了有关如何使用 Spark SQL 连接各种外部数据源的示例,例如 SQL 数据库,PostgreSQL,MySQL,Tableau,Azure Cosmos DB,MS SQL Server 等。

我们探索了 Spark 用于复杂数据类型的内置函数,并提供了一些使用高阶函数的示例。最后,我们讨论了一些常见的关系运算符,并展示了如何选择执行 DataFrame 的操作。

在下一章中,我们将探讨如何使用数据集,强类型操作的好处以及何时以及为何使用它们。

发布于: 2021 年 07 月 17 日阅读数: 25
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
Spark SQL和DataFrames:与外部数据源进行交互(五)