写点什么

Apache Spark 结构化 API(三)

发布于: 4 小时前
Apache Spark结构化API(三)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


在本章中,我们将探讨 Apache Spark 添加结构化背后的主要动机,包括这些动机是如何引导高级 API(DataFrame 和 DataSet)的创建,以及它们在 Spark2.x 中不同组件之间的一致性介绍。我们还将研究支撑这些结构化高级 API 的 Spark SQL 引擎。


当 Spark SQL 首次在早期的 Spark1.x 中被引入,接着是 DataFrame 作为 Spark1.3 中 SchemaRDD 的继承者,我们第一次看到了 Spark 完整的结构。Spark SQL 引入了高级表达式操作函数,模拟了类似 SQL 的语法,DataFrame 为后续版本中的更多结构奠定了基础,为 Spark 计算查询中的性能操作铺平了道路。


但在我们讨论较新的结构化 API 之前,让我们先看一下简单的 RDD 编程 API 模型,以简要了解一下 Spark 中没有结构的感觉。


Spark:什么是 RDD?

RDD 是 Spark 最基本的抽象,与 RDD 相关的三个重要特性:

1.依赖关系:宽依赖和窄依赖

2.数据分区(Partitions):数据集组成单位,带有位置信息

3.计算函数: Partition => Iterator[T]


这三个特性都是 RDD 编程 API 模型最基本的组成部分,基于 RDD 模型构建所有更高级别的功能。首先,需要一个依赖关系列表,该依赖关系指示 Spark 如何使用其输入构造 RDD。必要时,Spark 可以根据这些依赖关系重新创建 RDD 并对其进行复制操作。这一特性使得 RDD 具有弹性。


其次,分区使得 Spark 能够对数据进行拆分,以便跨 Executor 的分区进行并行计算。在某些情况下,例如从 HDFS 读取,Spark 将使用位置信息将工作发送给接近数据的 Executor。这样,通过网络传输的数据就会更少,减少网络 IO。


最后,RDD 具有计算功能,它可以将存储在 RDD 中的数据生成一个 Iterator[T]。


简单而优雅!然而,这个原始的模型存在几个问题。首先,计算函数(或计算)对 Spark 是不透明的。也就是说,Spark 不知道你在计算函数中在做什么。无论是执行 connect、filter、select 还是 aggregate,Spark 都只将其视为 lambda 表达式。另一个问题是 Iterator[T]数据类型对于 Python RDD 来说也不透明;Spark 只知道它是 Python 中的通用对象。


此外,由于无法检查函数中的计算或表达式,因此 Spark 无法优化该表达式——无法理解其中的意图。最后,Spark 不了解 T 中的特定数据类型。Spark 是一个不透明的对象,它不知道你是否访问对象中特定类型的列。因此,Spark 所能做的就是将不透明对象序列化为一系列字节,而不使用任何数据压缩技术。


这种不透明性明显阻碍了 Spark 将计算重排为高效的查询计划的能力。那么解决方案是什么呢?


结构化 Spark

Spark2.x 引入了一些构建 Spark 的关键方案。一种是使用数据分析中常见的模式来表达计算。这些模式表示为高级操作,如过滤、选择、计数、聚合、平均和分组,这提供了更多的清晰度和简单性。


通过在 DSL 中使用一组通用运算符,可以进一步缩小了这种特异性。通过 DSL 中的一组操作(如 Spark 支持的 lan 参数(Java、Python、Spark、R、和 SQL)中的操作),这些运算符可以让你告诉 Spark 你希望对数据进行什么计算,因此,它可以构建一个可执行的有效的查询计划。


最终的顺序和结构方案是允许你以表格的形式排列数据,如 SQL 表或电子表格,并使用受支持的结构化数据类型(稍后将介绍)。但是,这种结构到底有什么好处呢?


主要的优点和优势

结构带来许多好处,包括跨 Spark 组件提供性能和空间效率。在简要讨论 DataFrame 和 Dataset API 的使用时,我们将进一步探讨这些优势,但现在我们将集中讨论其他优势:表达性、简单性、可组合性和统一性。


让我们先用一个简单的代码片段来演示可表达性和可组合性。在下面的示例中,我们要汇总每个名称的所有年龄,按名称分组,然后计算年龄平均值——这是数据分析和发现中的一种常见模式。如果我们使用低级 RDD API,代码如下:

In Python

Create an RDD of tuples (name, age)

dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30),("TD", 35),

("Brooke", 25)])

Use map and reduceByKey transformations with their lambda

expressions to aggregate and then compute average

agesRDD = (dataRDD

.map(lambda x: (x[0], (x[1], 1)))

.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

.map(lambda x: (x[0], x[1][0]/x[1][1]))) 

没有人会质疑这段代码告诉 Spark 如何聚合键和用一串 lambda 函数计算平均值,该代码是神秘的且难以阅读的。换句话说,代码正在指示 Spark 如何计算查询,但对 Spark 完全不透明,因为它不能传达意图。此外,Scala 中的等效 RDD 代码看起来与这里显示的 Python 代码完全不同。相比之下,如果我们用高级 DSL 运算符和 DataFrame API 来表达相同的查询,从而指示 Spark 该怎么办?请看一看下面这段代码:  

In Python

from pyspark.sql import SparkSession

from pyspark.sql.functions import avg

  Create a DataFrame using SparkSession

spark = (SparkSession

.builder

.appName("AuthorsAges")

.getOrCreate())

  Create a DataFrame

data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)], ["name", "age"])

  Group the same names together, aggregate their ages, and compute an average

avg_df = data_df.groupBy("name").agg(avg("age"))

  Show the results of the final execution

avg_df.show()

+------+--------+

| name|avg(age)|

+------+--------+

|Brooke| 22.5|

| Jules| 30.0|

| TD| 35.0|

| Denny| 31.0|

+------+--------+

这个版本的代码比早期的版本更有表达力,也更简单,因为我们使用高级 DSL 运算符和 API 来告诉 Spark 该做什么。实际上,我们已经使用了这些运算符来构成我们的查询。而且由于 Spark 可以检查或解析这个查询并理解我们的意图,所以它可以优化或调整操作以高效执行。Spark 确切地知道了我们想做什么:按他们的名字分组,年龄汇总,然后计算所有同名的人的平均年龄。我们使用高级运算符作为一个简单的查询来构建一个完整的计算——它的表达能力如何呢?


有些人会认为,仅通过使用映射到通用或重复数据分析模式的高级表达 DSL 运算符来引入顺序和结构,我们就限制了开发人员指示编译器或控制了应该如何计算其查询的范围,实际上你不会受限于这些结构化模式;你可以随时切换回非结构化的低级 RDD API,尽管我们几乎没有必要这样做。


除了更容易阅读之外,Spark 的高级 API 的结构还引入了其组件和语言之间的统一性。例如,此处显示的 Scala 代码与以前的 Python 代码具有相同的作用,并且 API 看起来几乎相同:

// In Scala

import org.apache.spark.sql.functions.avg

import org.apache.spark.sql.SparkSession

// Create a DataFrame using SparkSession

val spark = SparkSession

.builder

.appName("AuthorsAges")

.getOrCreate()

// Create a DataFrame of names and ages

val dataDF = spark.createDataFrame(Seq(("Brooke", 20), ("Brooke", 25),

("Denny", 31), ("Jules", 30), ("TD", 35))).toDF("name", "age")

// Group the same names together, aggregate their ages, and compute an average

val avgDF = dataDF.groupBy("name").agg(avg("age"))

// Show the results of the final execution 

avgDF.show()

+------+--------+

| name|avg(age)|

+------+--------+

|Brooke| 22.5|

| Jules| 30.0|

| TD| 35.0|

| Denny| 31.0|

+------+--------+ 


如果了解 SQL 操作,其中一些 DSL 运算符会执行你将熟悉的类似关系的操作,如选择、筛选、分组和聚合。


我们开发人员所珍视的所有这些简单性和表达性都是可能的,因为构建了高级结构化 API 的 Spark SQL 引擎。正是因为这个支撑了所有的 Spark 组件的引擎,我们才能获得统一的 API。无论是在结构化流(Structured Streaming)还是 MLLib 中对 DataFrame 做查询,你始终都会将 DataFrame 作为结构化数据进行转换和操作。我们将在这一章后面详细介绍 Spark SQL 引擎,但现在我们探讨常见操作所用的 API 和 DSL,以及如何将它们用于数据分析。


DataFrame API 

受 Pandas DataFrame 结构、格式和一些特定操作的启发,Spark DataFrame 类似于具有命名列和模式的分布式内存表,其中每列都有一个特定的数据类型: integer, string, array, map, real, date, timestamp 等。看起来 Spark DataFrame 就像一个表格。如表 3-1 所示。



当数据可视化为结构化表时,它不仅易于理解,而且在涉及可能需要在行和列上执行的常见操作时也很容易使用。还记得,正如你在第 2 章中了解到的,DataFrame 是不可变的,并且 Spark 保留了所有转换的血缘关系。你可以添加或更改列的名称和数据类型,从而在保留先前版本的同时创建新的 DataFrame。可以在数据结构(schema)中声明 DataFrame 中的命名列及其关联的 Spark 数据类型。


在使用它们定义数据结构(schema)之前,让我们检查一下 Spark 中可用的通用和结构化数据类型。然后,我们将说明如何使用 schema 创建 DataFrame 并录入表 3-1 中的数据。


Spark 的基本数据类型

与其支持的编程语言相匹配,Spark 支持基本的内部数据类型。这些数据类型可以在 Spark 应用程序中声明,也可以在数据结构(schema)中定义。例如,在 Scala 中,你可以定义或声明一个特定的列名,类型可以是 String、Byte、Long 或 Map 等类型。在这里,我们定义与 Spark 数据类型绑定的变量名:$SPARK_HOME/bin/spark-shell

scala> import org.apache.spark.sql.types._

import org.apache.spark.sql.types._

scala> val nameTypes = StringType

nameTypes: org.apache.spark.sql.types.StringType.type = StringType

scala> val firstName = nameTypes

firstName: org.apache.spark.sql.types.StringType.type = StringType

scala> val lastName = nameTypes

lastName: org.apache.spark.sql.types.StringType.type = StringType


表 3-2 列出了 Spark 中支持的基本 Scala 数据类型。除了 DecimalType 以外,它们都是 DataTypes 的子类型,具体如下。

Spark 支持类似的基本 Python 数据类型,如表 3-3 中所列举的那样。




Spark 的结构化和复杂的数据类型

对于复杂的数据分析,你不会只处理简单或基本的数据类型。你的数据将很复杂,通常是结构化的或嵌套的,你将需要 Spark 来处理这些复杂的数据类型。它们有多种形式: maps, arrays, structs, dates, timestamps, fields,等。表 3-4 列出了 Spark 支持的 Scala 结构化数据类型。



表 3-5 中枚举了 Spark 支持的 Python 等效结构化数据类型。



虽然这些表显示了支持的各种类型,但在为数据定义数据结构(schema)时,查看这些类型如何组合更为重要。


数据结构(schema)和 DataFrame 创建

Spark 中的数据结构(schema)定义了 DataFrame 的列名和关联的数据类型。通常情况下,在从外部数据源读取结构化数据时开始定义数据结构(schema)(下一章将详细介绍)。相对于采用“读取模式”方法,预先定义数据结构(schema)具有三个好处:

1.你可以避免 Spark 的推断数据类型的工作。


2.你可以防止 Spark 创建一个单独的作业,只是为了读取文件的大部分内容来确定数据结构(schema),对于大型数据文件来说,这可能会很昂贵和耗时。


3.如果数据与数据结构(schema)不匹配,则可以提前检测到错误。因此,我们建议你在要从数据源读取大文件时始终预先定义数据结构(schema)。为了简短说明,让我们为表 3-1 中的数据定义一个数据结构(schema),并使用该模式来创建一个 DataFrame。


定义数据结构(schema)的两种方式

Spark 允许你通过两种方式定义模式。一种是通过编程的模式定义,另一种是使用数据定义语言(DDL),这样的定义出来的数据结构更简单、更容易阅读。要以编程方式为具有三个命名列:作者、标题和页的 DataFrame 定义数据结构(schema),可以使用 Spark DataFrame API。例如:

// In Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(StructField("author", StringType, false),

StructField("title", StringType, false),

StructField("pages", IntegerType, false)))

  In Python

from pyspark.sql.types import *

schema = StructType([StructField("author", StringType(), False),

StructField("title", StringType(), False),

StructField("pages", IntegerType(), False)])

使用 DDL 定义相同的 schema 要简单得多:

// In Scala

val schema = "author STRING, title STRING, pages INT"

  In Python

schema = "author STRING, title STRING, pages INT"

你可以选择任何你想要定义 schema 的方式。对于许多例子,我们将同时使用:

from pyspark.sql import SparkSession

  Define schema for our data using DDL

schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING,

`Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

  Create our static data

data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter","LinkedIn"]],

[2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",

"LinkedIn"]],

[3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",

"twitter", "FB", "LinkedIn"]],

[4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,

["twitter", "FB"]],

[5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",

"twitter", "FB", "LinkedIn"]],

[6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568,

["twitter", "LinkedIn"]]

]

  Main program

if __name__ == "__main__":

  Create a SparkSession

spark = (SparkSession

.builder

.appName("Example-3_6")

.getOrCreate())

  Create a DataFrame using the schema defined above

blogs_df = spark.createDataFrame(data, schema)

  Show the DataFrame; it should reflect our table above

blogs_df.show()

  Print the schema used by Spark to process the DataFrame

print(blogs_df.printSchema()) 


从控制台运行此程序将产生以下输出:


如果要在代码的其他地方使用此 schema,只需执行 blogs_df.schema 即可,

它将返回 schema 定义:

StructType(List(StructField("Id",IntegerType,false),

StructField("First",StringType,false),

StructField("Last",StringType,false),

StructField("Url",StringType,false),

StructField("Published",StringType,false),

StructField("Hits",IntegerType,false),

StructField("Campaigns",ArrayType(StringType,true),false)))


正如上面你所看到的,DataFrame 设计与表 3-1 的设计以及相应的数据类型和模式输出相匹配。

如果要从 JSON 文件中读取数据,而不是创建静态数据,则 schema 定义将完全相同。让我们用一个 Scala 实例来说明相同的代码,这次从 JSON 文件读取:

// In Scala

package main.scala.chapter3

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types._

object Example3_7 {

def main(args: Array[String]) {

val spark = SparkSession

.builder

.appName("Example-3_7")

.getOrCreate()

if (args.length <= 0) {

println("usage Example3_7 <file path to blogs.json>")

System.exit(1)

}

// Get the path to the JSON file

val jsonFile = args(0)

// Define our schema programmatically

val schema = StructType(Array(StructField("Id", IntegerType, false),

StructField("First", StringType, false),

StructField("Last", StringType, false),

StructField("Url", StringType, false),

StructField("Published", StringType, false),

StructField("Hits", IntegerType, false),

StructField("Campaigns", ArrayType(StringType), false)))

// Create a DataFrame by reading from the JSON file

// with a predefined schema

val blogsDF = spark.read.schema(schema).json(jsonFile)

// Show the DataFrame schema as output

blogsDF.show(false) 

// Print the schema

println(blogsDF.printSchema)

println(blogsDF.schema)

}

}

毫不奇怪,来自 Scala 程序的输出与来自 Python 程序的输出没有什么不同:


root

|-- Id: integer (nullable = true)

|-- First: string (nullable = true)

|-- Last: string (nullable = true)

|-- Url: string (nullable = true)

|-- Published: string (nullable = true)

|-- Hits: integer (nullable = true)

|-- Campaigns: array (nullable = true)

| |-- element: string (containsNull = true)

StructType(StructField("Id",IntegerType,true),

StructField("First",StringType,true),

StructField("Last",StringType,true),

StructField("Url",StringType,true),

StructField("Published",StringType,true),

StructField("Hits",IntegerType,true),

StructField("Campaigns",ArrayType(StringType,true),true)) 


现在你了解了如何在 DataFrame 中使用结构化数据和 schema,让我们重点关注 DataFrame 列和行以及使用 DataFrame API 对它们进行操作的含义。


列和表达式

如前所述,DataFrame 中的命名列在概念上类似于 Pandas 或 R DataFrame 或 RDBMS 表中的命名列:它们描述了字段的类型。你可以按列名列出所有列,也可以使用关系表达式或计算表达式对它们的值进行操作。在 Spark 支持的语言中,列是具有公共方法(由 Column 类型表示)的对象。


你也可以在列上使用逻辑或数学表达式。例如,可以使用 expr(“columnName * 5”)或创建一个简单的表达式(expr("columnName - 5") > col(anothercolumnName)),其中 columnName 是 Spark 类型(整型、字符串等)。expr()是 pyspark.sql.functions(Python)和 org.apache.spark.sql.functions(Scala)软件包的一部分。与这些包中的任何其他函数一样,expr()采用 Spark 解析为表达式的参数来计算结果。Scala、Java 和 Python 都有与 columns 关联的公共方法。你会注意到,Spark 文档同时引用了 col 和 column。column 是对象的名称,而 col()则是一个标准的内置函数返回 Column。让我们来看看 Spark 中使用列的示例。每个例子后都带有其结果的输出。

// In Scala

scala> import org.apache.spark.sql.functions._

scala> blogsDF.columns

res2: Array[String] = Array(Campaigns, First, Hits, Id, Last, Published, Url)

// Access a particular column with col and it returns a Column type

scala> blogsDF.col("Id")

res3: org.apache.spark.sql.Column = id

// Use an expression to compute a value

scala> blogsDF.select(expr("Hits * 2")).show(2)

// or use col to compute value

scala> blogsDF.select(col("Hits") * 2).show(2)

+----------+

|(Hits * 2)|

+----------+

| 9070|

| 17816|

+----------+

// Use an expression to compute big hitters for blogs

// This adds a new column, Big Hitters, based on the conditional expression

blogsDF.withColumn("Big Hitters", (expr("Hits > 10000"))).show()

+---+---------+-------+---+---------+-----+--------------------+------

| Id| First| Last|Url|Published| Hits| Campaigns|Big Hitters|

+---+---------+-------+---+---------+-----+--------------------+------

| 1| Jules| Damji|...| 1/4/2016| 4535| [twitter, LinkedIn]| false|

| 2| Brooke| Wenig|...| 5/5/2018| 8908| [twitter, LinkedIn]| false|

| 3| Denny| Lee|...| 6/7/2019| 7659|[web, twitter, FB...| false|

| 4|Tathagata| Das|...|5/12/2018|10568| [twitter, FB]| true| 

| 5| Matei|Zaharia|...|5/14/2014|40578|[web, twitter, FB...| true|

| 6| Reynold| Xin|...| 3/2/2015|25568| [twitter, LinkedIn]| true|

+---+---------+-------+---+---------+-----+--------------------+---

// Concatenate three columns, create a new column, and show the

// newly created concatenated column

blogsDF

.withColumn("AuthorsId", (concat(expr("First"), expr("Last"), expr("Id"))))

.select(col("AuthorsId"))

.show(4)

+-------------+

| AuthorsId|

+-------------+

| JulesDamji1|

| BrookeWenig2|

| DennyLee3|

|TathagataDas4|

+-------------+

// These statements return the same value, showing that

// expr is the same as a col method call

blogsDF.select(expr("Hits")).show(2)

blogsDF.select(col("Hits")).show(2)

blogsDF.select("Hits").show(2)

+-----+

| Hits|

+-----+

| 4535|

| 8908|

+-----+

// Sort by column "Id" in descending order

blogsDF.sort(col("Id").desc).show()

blogsDF.sort($"Id".desc).show()

+--------------------+---------+-----+---+-------+---------+----------

| Campaigns| First| Hits| Id| Last|Published| Url|

+--------------------+---------+-----+---+-------+---------+---------

| [twitter, LinkedIn]| Reynold|25568| 6| Xin| 3/2/2015|https://tinyurl.6|

|[web, twitter, FB...| Matei|40578| 5|Zaharia|5/14/2014|https://tinyurl.5|

| [twitter, FB]|Tathagata|10568| 4| Das|5/12/2018|https://tinyurl.4|

|[web, twitter, FB...| Denny| 7659| 3| Lee| 6/7/2019|https://tinyurl.3|

| [twitter, LinkedIn]| Brooke| 8908| 2| Wenig| 5/5/2018|https://tinyurl.2|

| [twitter, LinkedIn]| Jules| 4535| 1| Damji| 1/4/2016|https://tinyurl.1|

+--------------------+---------+-----+---+-------+---------+--------

在上一个示例中,表达式 blogs_df.sort(col("Id").desc)和 blogsDF.sort($"Id".desc)是相同的。它们都以 Id 降序对 DataFrame 列进行排序:一个使用显式函数 col("Id")来返回 Column 对象,而另一个使用 $在列的名称之前,后者是 Spark 中的一个函数,用于将命名 Id 为的列转换为 Column。


我们仅在此处进行了比较浅的使用,有关 Column 对象的所有公共方法的完整列表,请参考 Spark 文档。

DataFrame 中的列对象不能单独存在。在一条记录中,每一列都是一行的一部分,所有行一起构成 DataFrame,我们将在本章后面看到,它实际上是 Scala 中的 DataSet[Row]。


行(Row)

Spark 中的行是一个通用的 Row 对象,包含一个或多个列。每一列可以具有相同的数据类型(例如,整数或字符串),也可以具有不同的类型(整数、字符串、映射、数组等)。由于 Row 是 Spark 中的对象,并且是一个有序的字段集合,因此可以在每个 Spark 支持的语言中实例化 Row,并从 0 开始的索引访问其字段:

// In Scala

import org.apache.spark.sql.Row

// Create a Row

val blogRow = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015",

Array("twitter", "LinkedIn"))

// Access using index for individual items

blogRow(1)

res62: Any = Reynold

  In Python

from pyspark.sql import Row

blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015",["twitter", "LinkedIn"])

  access using index for individual items

blog_row[1]

'Reynold' 

Row 可以用于创建 DataFrame 以便于进行快速交互和测试,:

  In Python

rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]

authors_df = spark.createDataFrame(rows, ["Authors", "State"])

authors_df.show()

// In Scala

val rows = Seq(("Matei Zaharia", "CA"), ("Reynold Xin", "CA"))

val authorsDF = rows.toDF("Author", "State")

authorsDF.show()

+-------------+-----+

| Author|State|

+-------------+-----+

|Matei Zaharia| CA|

| Reynold Xin| CA|

+-------------+-----+


但在实际操作中,通常需要从文件中读取 DataFrame。在大多数情况下,因为你的文件将会很大,所以定义 schema 并使用它是创建 DataFrame 的一种更快、更有效的方法。


在创建了一个大型的分布式 DataFrame 之后,你将要对其执行一些常见的数据操作。让我们研究一下你可以在结构化 API 中使用高级关系运算符执行的一些 Spark 操作。


常见的 DataFrame 操作

要在 DataFrame 上执行常见的数据操作,首先需要从保存结构化数据的数据源中加载 DataFrame。Spark 提供了一个接口,DataFrameReader 使你能够从多种数据源以 JSON,CSV,Parquet,Text,Avro,ORC 等格式将数据读取到 DataFrame 中。同样,可以将 DataFrame 按照特定格式写回到数据源中。Spark 使用 DataFrameWriter。


使用 DataFrameReader 和 DataFrameWriter 

在 Spark 中读写很简单,因为社区提供了这些高级抽象,可以连接到各种数据源,包括常见的 NoSQL 存储,RDBMS,Apache Kafka 和 Kinesis 等流引擎。

首先,让我们阅读一个大型 CSV 文件,其中包含有关旧金山消防部门呼叫的数据。如前所述,我们将为此文件定义一个架构,并使用 DataFrameReader 该类及其方法告诉 Spark 该怎么做。由于此文件包含 28 列和超过 4,380,660 条记录, 定义 schema 比使用 Spark 推断 schema 更有效。

如果不想指定 schema,Spark 可以以较低的成本从示例中推断 schema。例如,你可以使用采样比率 samplingRatio 选项:

// In Scala

val sampleDF = spark

.read

.option("samplingRatio", 0.001)

.option("header", true)

.csv("""/databricks-datasets/learning-spark-v2/

sf-fire/sf-fire-calls.csv""")

让我们来看看如何做到这一点:

In Python, define a schema

from pyspark.sql.types import *

  Programmatic way to define a schema

fire_schema = StructType([StructField('CallNumber', IntegerType(), True),

StructField('UnitID', StringType(), True),

StructField('IncidentNumber', IntegerType(), True),

StructField('CallType', StringType(), True),

StructField('CallDate', StringType(), True),

StructField('WatchDate', StringType(), True),

StructField('CallFinalDisposition', StringType(), True),

StructField('AvailableDtTm', StringType(), True),

StructField('Address', StringType(), True),

StructField('City', StringType(), True),

StructField('Zipcode', IntegerType(), True),

StructField('Battalion', StringType(), True),

StructField('StationArea', StringType(), True),

StructField('Box', StringType(), True),

StructField('OriginalPriority', StringType(), True),

StructField('Priority', StringType(), True),

StructField('FinalPriority', IntegerType(), True),

StructField('ALSUnit', BooleanType(), True),

StructField('CallTypeGroup', StringType(), True),

StructField('NumAlarms', IntegerType(), True),

StructField('UnitType', StringType(), True),

StructField('UnitSequenceInCallDispatch', IntegerType(), True),

StructField('FirePreventionDistrict', StringType(), True),

StructField('SupervisorDistrict', StringType(), True),

StructField('Neighborhood', StringType(), True),

StructField('Location', StringType(), True),

StructField('RowID', StringType(), True),

StructField('Delay', FloatType(), True)])

  Use the DataFrameReader interface to read a CSV file

sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"

fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

// In Scala it would be similar

val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),

The DataFrame API | 59 StructField("UnitID", StringType, true),

StructField("IncidentNumber", IntegerType, true),

StructField("CallType", StringType, true),

StructField("Location", StringType, true),

...

...

StructField("Delay", FloatType, true)))

// Read the file using the CSV DataFrameReader

val sfFireFile="/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"

val fireDF = spark.read.schema(fireSchema)

.option("header", "true")

.csv(sfFireFile)

spark.read.csv()函数读取 CSV 文件,并返回具有 schema 中指定类型的行和命名列的 DataFrame。


要以你选择的格式将 DataFrame 写入外部数据源,你可以使用“DataFrameWriter”接口。与 DataFrameReader 一样,它也支持多个数据源。默认格式是 Parquet,一种流行的列式格式,它使用 snappy 压缩来压缩数据。如果将 DataFrame 编写为 Parquet,则将 schema 保留为 Parquet 元数据的一部分。在这种情况下,后续读回 DataFrame 不需要你手动提供 schema。


常见的数据操作是查看和转换数据,然后将 DataFrame 另存为 Parquet 格式或 SQL 表。持久化转换后的 DataFrame 和读取它一样简单。例如,要在读取数据后持久保存我们刚刚使用的 DataFrame 作为文件,你可以执行以下操作:

// In Scala to save as a Parquet file

val parquetPath = ...

fireDF.write.format("parquet").save(parquetPath)

  In Python to save as a Parquet file

parquet_path = ...

fire_df.write.format("parquet").save(parquet_path) 

另外,你也可以将其另存为一个表,该表在 Hive Metastore 中注册元数据(我们将在下一章介绍 SQL 托管和非托管表,Metastore 和 DataFrame):

// In Scala to save as a table

val parquetTable = ... // name of the table

fireDF.write.format("parquet").saveAsTable(parquetTable)

  In Python

parquet_table = ...   name of the table

fire_df.write.format("parquet").saveAsTable(parquet_table)

读取数据后,让我们逐步介绍一些对 DataFrame 执行的常见操作。


转换和操作(Transformations and actions )

现在你有了一个存在内存中的由旧金山消防部门组成的分布式 DataFrame,作为一个开发人员,你要做的第一件事就是检查你的数据,看看这些列是什么样子的。它们的类型正确了吗?


是否需要转换成不同的类型呢?它们有 Null 值吗?在第 2 章的“转换、操作和惰性计算”中,你可以了解如何使用转换和操作对 DataFrame 进行处理,并看到了每个转换示例的一些常见示例。使用这些操作,我们可以从旧金山消防部门的电话中找到什么?


投影和滤镜

关系解析中的投影是一种通过使用过滤器只返回与某个关系条件匹配的列的方法。在 Spark 中,使用 select`() 方法进行投影,而过滤器可以使用 filter()或 where() 方法表示。我们可以使用此技术来检查旧金山消防部门数据集的具体方面:

In Python

few_fire_df = (fire_df

.select("IncidentNumber", "AvailableDtTm", "CallType")

.where(col("CallType") != "Medical Incident"))

few_fire_df.show(5, truncate=False)

// In Scala

val fewFireDF = fireDF

.select("IncidentNumber", "AvailableDtTm", "CallType")

.where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)

+--------------+----------------------+--------------+

|IncidentNumber|AvailableDtTm |CallType |

+--------------+----------------------+--------------+

|2003235 |01/11/2002 01:47:00 AM|Structure Fire|

|2003235 |01/11/2002 01:51:54 AM|Structure Fire|

|2003235 |01/11/2002 01:47:00 AM|Structure Fire|

|2003235 |01/11/2002 01:47:00 AM|Structure Fire|

|2003235 |01/11/2002 01:51:17 AM|Structure Fire|

+--------------+----------------------+--------------+

only showing top 5 rows 

如果我们想知道有多少不同的 CallTypes 被记录为引发火灾的原因怎么办?这些简单而富有表现力的查询可以完成这项工作:

  In Python, return number of distinct types of calls using countDistinct()

from pyspark.sql.functions import *

(fire_df

The DataFrame API | 61 .select("CallType")

.where(col("CallType").isNotNull())

.agg(countDistinct("CallType").alias("DistinctCallTypes"))

.show())

// In Scala

import org.apache.spark.sql.functions._

fireDF

.select("CallType")

.where(col("CallType").isNotNull)

.agg(countDistinct('CallType) as 'DistinctCallTypes)

.show()

+-----------------+

|DistinctCallTypes|

+-----------------+

| 32|

+-----------------+

我们可以使用以下查询列出数据集中的不同调用类型:

  In Python, filter for only distinct non-null CallTypes from all the rows

(fire_df

.select("CallType")

.where(col("CallType").isNotNull())

.distinct()

.show(10, False))

// In Scala

fireDF

.select("CallType")

.where($"CallType".isNotNull())

.distinct()

.show(10, false)

Out[20]: 32

+-----------------------------------+

|CallType |

+-----------------------------------+

|Elevator / Escalator Rescue |

|Marine Fire |

|Aircraft Emergency |

|Confined Space / Structure Collapse|

|Administrative |

|Alarms |

|Odor (Strange / Unknown) |

|Lightning Strike (Investigation) |

|Citizen Assist / Service Call |

|HazMat |

+-----------------------------------+

only showing top 10 rows 


重命名、添加和删除列

有时因为样式或规范的原因而需要重命名特定的列,有时是为了可读性或简洁性需要重命名。消防部门数据集中的原始列名在其中有空格。例如,列名 IncidentNumber 为 Incident Number。列名中的空格可能会出现问题,特别是当你要将 DataFrame 写入或保存为 Parquet 文件时(禁止使用)。通过使用 StructField 指定 schema 中所需的列名,我们有效地更改了结果 DataFrame 中的所有名称。


或者,可以使用使用 withColumnRenamed()方法选择性地重命名列。例如,让我们将 Delay 列的名称更改为 ResponseDelayedinMins,并查看超过 5 分钟的响应时间:

 In Python

new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")

(new_fire_df

.select("ResponseDelayedinMins")

.where(col("ResponseDelayedinMins") > 5)

.show(5, False))

// In Scala

val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")

newFireDF

.select("ResponseDelayedinMins")

.where($"ResponseDelayedinMins" > 5)

.show(5, false)

这为我们提供了一个新的重新命名的列:

+---------------------+

|ResponseDelayedinMins|

+---------------------+

|5.233333 |

|6.9333334 |

|6.116667 |

|7.85 |

|77.333336 |

+---------------------+

only showing top 5 rows


由于 DataFrame 转换是不可变的,因此当我们使用 withColumnRenamed()列来重命名列时,我们会得到一个新的 DataFrame,同时保留具有旧列名的原始 DataFrame。


修改列或其类型是数据处理期间的常见操作。在某些情况下,数据是原始数据或脏数据,或者其类型不适合于作为参数提供给关系运算符。例如,在我们的旧金山消防部门数据集中,CallDate,WatchDate 和 AlarmDtTm 列都是字符串,而不是 Unix 时间戳或 SQL 日期,Spark 支持两种日期格式转换或操作(例如,在日期或时间相关的数据分析期间)。


那么,我们如何将它们转换为一种更可用的格式呢?这非常简单,归功于一些高级的 API 方法。spark.sql.functions 具有一组 to / from 日期/时间戳功能,例如 to_timestamp()和 to_date(),我们可以将其用于这些情况:

 In Python

fire_ts_df = (new_fire_df

.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))

.drop("CallDate")

.withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))

.drop("WatchDate")

.withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),

"MM/dd/yyyy hh:mm:ss a"))

.drop("AvailableDtTm"))

  Select the converted columns

(fire_ts_df

.select("IncidentDate", "OnWatchDate", "AvailableDtTS")

.show(5, False))

// In Scala

val fireTsDF = newFireDF

.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))

.drop("CallDate")

.withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))

.drop("WatchDate")

.withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),

"MM/dd/yyyy hh:mm:ss a"))

.drop("AvailableDtTm")

// Select the converted columns

fireTsDF

.select("IncidentDate", "OnWatchDate", "AvailableDtTS")

.show(5, false) 

这些查询带来了很大的麻烦,许多事情正在发生。让我们解开他们所做的事情:

1.将现有列的数据类型从字符串转换为受 Spark 支持的时间戳。

2.使用字符串的格式“MM/dd/yyyy”或“MM/dd/yyyyhh:mm:ssa”中指定的新格式。

3.转换为新数据类型后,drop()将旧列添加到第一个参数中,并将新列追加到 withColumn()方法中。

4.将新修改的 DataFrame 分配给 fire_ts_df。查询将产生三个新列:

+-------------------+-------------------+-----------------

|IncidentDate       |OnWatchDate        |AvailableDtTS   |

+-------------------+-------------------+-------------------+

|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:58:43|

|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:10:17|

|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:47:00|

|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:54|

|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:47:00|

+-------------------+-------------------+-------------------+

only showing top 5 rows 


现在我们已经修改了日期,我们可以使用 spark.sql.functionslike dayofmonth()中的函数进行查询 dayofyear(),还可以利用 daofweej 进一步分析我们的数据。我们可以找出过去七天内记录了多少个电话,也可以看到此查询的数据集中包含了多少年的消防部门的电话:

  In Python

(fire_ts_df

.select(year('IncidentDate'))

.distinct()

.orderBy(year('IncidentDate'))

.show())

// In Scala

fireTsDF

.select(year($"IncidentDate"))

.distinct()

.orderBy(year($"IncidentDate"))

.show()

+------------------+

|year(IncidentDate)|

+------------------+

| 2000|

| 2001|

| 2002|

| 2003|

| 2004|

| 2005|

| 2006|

| 2007|

| 2008|

| 2009|

| 2010|

| 2011|

| 2012|

| 2013|

| 2014|

| 2015|

| 2016|

| 2017|

| 2018|

+------------------+ 


到目前为止,在本节中,我们已经使用了许多常见的数据操作:读取和写入 DataFrame;定义 schema 并在读取 DataFrame 时使用它;将 DataFrame 保存为 Parquet 文件或表;投影和过滤现有 DataFrame 中选定的列;以及修改、重命名和删除列。


最后一个常见的操作是按列中的值对数据进行分组,并以某种方式聚合数据,比如简单地计数,这种分组和计数的模式和投影和过滤一样常见。我们试试。


聚合

如果我们想知道最常见的火灾电话类型是什么,或者什么邮政编码占大多数怎么办?这些问题在数据分析和使用中很常见。


在 DataFrame 上的一些转换和操作,如 groupBy()、orderBy()和 count()操作,通过提供列名,然后进行各种聚合计算。对于计划对其执行频繁或重复查询的大型 DataFrame,你可以从缓存中获益。我们将在后面的章节中介绍 DataFrame 缓存策略及其好处。让我们回答第一个问题:最常见的火灾呼叫类型是什么?

  In Python

(fire_ts_df

.select("CallType")

.where(col("CallType").isNotNull())

.groupBy("CallType")

.count()

.orderBy("count", ascending=False)

.show(n=10, truncate=False)) 

// In Scala

fireTsDF

.select("CallType")

.where(col("CallType").isNotNull)

.groupBy("CallType")

.count()

.orderBy(desc("count"))

.show(10, false)

+-------------------------------+-------+

|CallType |count |

+-------------------------------+-------+

|Medical Incident |2843475|

|Structure Fire |578998 |

|Alarms |483518 |

|Traffic Collision |175507 |

|Citizen Assist / Service Call |65360 |

|Other |56961 |

|Outside Fire |51603 |

|Vehicle Fire |20939 |

|Water Rescue |20037 |

|Gas Leak (Natural and LP Gases)|17284 |

+-------------------------------+-------+

从这个输出中,我们可以得出结论,最常见的呼叫类型是“医疗事故”。

DataFrameAPI 还提供了 collect()方法,但对于非常大的 DataFrame,这会占用大量的资源,因为它可能会导致内存不足(OOM)异常。与 count()向驱动程序返回单个数字的方法不同,collect()返回整个 DataFrame 或 Dataset 中的所有 Row 对象的集合。如果你想查看一些行记录,你最好使用 take(n),它将只返回 DataFrame Row 对象的前 n 条记录。


其他常见的 DataFrame 操作

除了我们看到的其他 DataFrame API 提供了描述性统计方法,如 min()、max()、sum()和 avg()。让我们看一些示例,这些示例说明如何使用旧金山消防部门的数据集进行计算。在这里,我们计算警报的总和,平均响应时间,以及数据集中所有火灾呼叫的最小和最大响应时间,以 Pythonic 的方式导入 PySpark 函数,以避免与内置的 Python 函数冲突:

  In Python

import pyspark.sql.functions as F

(fire_ts_df

.select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"),

The DataFrame API | 67 F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))

.show())

// In Scala

import org.apache.spark.sql.{functions => F}

fireTsDF

.select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"),

F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))

.show()


对于数据科学工作场景中常见的更高级的统计需求,请阅读 API 文档,如 stat(),describe(),correlation(),covariance(), sampleBy(),approxQuantile()frequentItems()等。


如你所见,可以使用 DataFrames 的高级 API 和 DSL 运算符来轻松编写和链接表达式查询。如果我们要尝试使用 RDD 执行相同的操作,我们将无法想象代码的不透明度和相对不可读性!


端到端的 DataFrame 示例

在旧金山消防局的公共数据集中,还有许多可以进行的探索性数据分析、ETL 和通用数据操作。

为了简洁起见,我们不会在这里包含所有的示例代码,但是书中的 GitHub 仓库为你提供了 Python 和 Scala 手册,以便你尝试使用此数据集完成端到端 DataFrame 示例。手册使用 DataFrame API 和 DSL 关系运算符测试并回答你以下可能会问的常见问题:

2018 年,所有不同类型的火灾电话?

2018 年,哪些月的火灾电话次数最高?

2018 年,旧金山的哪个社区引发的火灾电话最多?

2018 年,哪个社区对火警电话的反应时间最糟糕?

2018 年哪一周的火灾电话最多?

社区、邮政编码和火灾电话的数量之间是否有相关性?

我们如何使用 Parquet 文件或 SQL 表来存储这些数据并读取它?到目前为止,我们已经广泛讨论了 DataFrame API,它是一个跨越 Spark 的 Mllib 和结构化流组件的结构化 API 之一,我们将在书中介绍它。


接下来,我们将把重点转移到 Dataset API 上,并探讨这两个 API 如何为 Spark 的开发者提供统一的结构化接口从而进行编程工作。然后,我们将比较 RDD、DataFrame 和 Dataset API 之间的关系,并帮助你确定何时使用哪个 API 以及原因。


Dataset API

如本章前面所述,Spark2.0 将 DataFrame 和 Dataset API 统一为具有类似接口的结构化 API,因此开发人员只需要学习一组 API。数据集具有两个特性:类型化的和非类型化的 API,如图 3-1 所示。


从概念上讲,你可以将 Scala 中的 DataFrame 视为通用对象集合 Dataset[Row]的别名,其中 Row 是通用非类型 JVM 对象,可能包含不同类型的字段。相比之下,Dataset 是 Scala 中或 Java 中的强类型 JVM 对象的集合。或者,如 Dataset 文档所说,Dataset 是:

特定域对象的强类型集合,可以使用函数或关系操作进行转换。Scala 中的每个 Dataset 都有一个称为 DataFrame 的无类型视图,它是一个 Dataset Row。


有类型的对象、非类型的对象和通用行

在 Spark 支持的语言中,数据集只在 Java 和 Scala 中有意义,而在 Python 和 R 中,只有 DataFrame 有意义。这是因为 Python 和 R 不是编译时类型安全的;类型是在执行期间动态推断或分配的,而不是在编译时动态分配的。在 Scala 和 Java 中,情况正好相反:类型在编译时绑定到变量和对象。然而,在 Scala 中,DataFrame 只是非类型 Dataset[Row]的别名。表 3-6 简而言之。


Row 是 Spark 中的通用对象类型,它包含可以使用索引访问的混合类型的集合。在内部,Spark 会操作 Row 对象,并将它们转换为表 3-2 和表 3-3 中所涵盖的等效类型。例如,对于 Scala 或 Java 和 Python,Row 中的一个整数字段将分别映射或转换为整数类型:

// In Scala

import org.apache.spark.sql.Row

val row = Row(350, true, "Learning Spark 2E", null)

  In Python

from pyspark.sql import Row

row = Row(350, True, "Learning Spark 2E", None) 

使用行对象的索引,可以使用公共 getter 方法访问各个字段:

// In Scala

row.getInt(0)

res23: Int = 350

row.getBoolean(1)

res24: Boolean = true

row.getString(2)

res25: String = Learning Spark 2E

  In Python

row[0]

Out[13]: 350

row[1]

Out[14]: True

row[2]

Out[15]: 'Learning Spark 2E'

相比之下,类型化对象是 JVM 中实际的 Java 或 Scala 类对象。数据集中的每个元素都映射到一个 JVM 对象。


创建 DataSet

与从数据源创建 DataFrame 一样,在创建数据集时,你必须知道 schema。换句话说,你需要了解数据类型。尽管使用 JSON 和 CSV 数据可以推断出 schema,但对于大型数据集,这是资源密集型的(成本昂贵),非常消耗资源。在 Scala 中创建数据集时,为结果数据集指定 schema 最简单的方法是使用样例类(Case classes)。在 Java 中,使用 JavaBean 类(我们在第 6 章中进一步讨论 JavaBean 和 Scala 样例类)。


Scala: 样例类(Case classes) 

当你希望将自己的域中特定的对象实例化为数据集时,你可以通过在 Scala 中定义一个样例类来实例化。作为一个例子,让我们查看 JSON 文件中从物联网设备读取的集合(我们在本节后面的端到端示例中使用此文件)。我们的文件有几行 JSON 字符串,外观如下:

{"device_id": 198164, "device_name": "sensor-pad-198164owomcJZ", "ip":

"80.55.20.25", "cca2": "PL", "cca3": "POL", "cn": "Poland", "latitude":

53.080000, "longitude": 18.620000, "scale": "Celsius", "temp": 21,

"humidity": 65, "battery_level": 8, "c02_level": 1408,"lcd": "red",

"timestamp" :1458081226051} 


要将每个 JSON 条目表示为 DeviceIoTData,一种特定领域的对象,我们可以定义一个 Scala 样例类:

case class DeviceIoTData (battery_level: Long, c02_level: Long,

cca2: String, cca3: String, cn: String, device_id: Long,

device_name: String, humidity: Long, ip: String, latitude: Double,

lcd: String, longitude: Double, scale:String, temp: Long,

timestamp: Long) 

 一旦定义,我们可以使用它读取文件并将返回的内容 Dataset[Row]转换为 Dataset[DeviceIoTData](输出被截断以适合页面):

// In Scala

val ds = spark.read

.json("/databricks-datasets/learning-spark-v2/iot-devices/iot_devices.json")

.as[DeviceIoTData]

ds: org.apache.spark.sql.Dataset[DeviceIoTData] = [battery_level...]

ds.show(5, false)

+-------------|---------|----|----|-------------|---------|---+

|battery_level|c02_level|cca2|cca3|cn |device_id|...|

+-------------|---------|----|----|-------------|---------|---+

|8 |868 |US |USA |United States|1 |...|

|7 |1473 |NO |NOR |Norway |2 |...|

|2 |1556 |IT |ITA |Italy |3 |...|

|6 |1080 |US |USA |United States|4 |...|

|4 |931 |PH |PHL |Philippines |5 |...|

+-------------|---------|----|----|-------------|---------|---+

only showing top 5 rows


Dataset 操作

就像你可以在 DataFrame 上执行转换和操作一样,你也可以使用数据集。根据操作类型的不同,操作结果将会有所不同:

// In Scala

val filterTempDS = ds.filter({d => {d.temp > 30 && d.humidity > 70})

filterTempDS: org.apache.spark.sql.Dataset[DeviceIoTData] = [battery_level...]

filterTempDS.show(5, false)

+-------------|---------|----|----|-------------|---------|---+

|battery_level|c02_level|cca2|cca3|cn |device_id|...|

+-------------|---------|----|----|-------------|---------|---+

|0 |1466 |US |USA |United States|17 |...|

|9 |986 |FR |FRA |France |48 |...|

|8 |1436 |US |USA |United States|54 |...|

|4 |1090 |US |USA |United States|63 |...|

|4 |1072 |PH |PHL |Philippines |81 |...|

+-------------|---------|----|----|-------------|---------|---+

only showing top 5 rows 

在此查询中,我们使用一个函数作为数据集方法 filter()的参数。这是一个具有很多签名的重载方法。我们使用的版本采用 filter(func: (T) > Boolean): Dataset[T] lambda 函数 func: (T) > Boolean 作为参数。lambda 函数的参数是类型为 DeviceIoTData 的 JVM 对象。这样,我们可以使用点(.)表示法访问其各个数据字段,就像在 Scala 类或 JavaBean 中一样。


另一件需要注意的事情是,对于 DataFrame,你将 filter()条件表示为类似 SQL 的 DSL 操作,这些操作是与语言无关的(正如我们前面在 fire call 示例中看到的)。对于数据集,我们利用原生语言的表达式作为 Scala 或 Java 代码。下面是另一个较小数据集的例子:

// In Scala

case class DeviceTempByCountry(temp: Long, device_name: String, device_id: Long,

cca3: String)

val dsTemp = ds

.filter(d => {d.temp > 25})

.map(d => (d.temp, d.device_name, d.device_id, d.cca3))

.toDF("temp", "device_name", "device_id", "cca3")

 .as[DeviceTempByCountry]

dsTemp.show(5, false)

+----+---------------------+---------+----+

|temp|device_name |device_id|cca3|

+----+---------------------+---------+----+

|34 |meter-gauge-1xbYRYcj |1 |USA |

|28 |sensor-pad-4mzWkz |4 |USA |

|27 |sensor-pad-6al7RTAobR|6 |USA |

|27 |sensor-pad-8xUD6pzsQI|8 |JPN |

|26 |sensor-pad-10BsywSYUF|10 |USA |

+----+---------------------+---------+----+

only showing top 5 rows


或者,你只能检查数据集的第一行:

val device = dsTemp.first()

println(device)

device: DeviceTempByCountry =

DeviceTempByCountry(34,meter-gauge-1xbYRYcj,1,USA) 

另外,你可以使用列名表达相同的查询,然后强制转换为

Dataset[DeviceTempByCountry]:

// In Scala

val dsTemp2 = ds

.select($"temp", $"device_name", $"device_id", $"device_id", $"cca3")

.where("temp > 25")

.as[DeviceTempByCountry]

在语义上,select()类似于上一个查询中的 map(),这两个查询都会选择字段并生成等效的结果。


总的来说,我们可以在数据集上执行 filter(),map(),groupBy(),select(),take()这些操作,与 DataFrame 上的操作相似。在某种程度上,数据集与 RDD 相似,因为它们提供了与上述方法类似的接口以及编译时安全性,但具有更容易读取和面向对象的编程接口。


当我们使用数据集时,底层的 Spark SQL 引擎会处理 JVM 对象的创建、控制版本、序列化和反序列化。它还借助数据集编码器来处理 Java 外堆内存管理。(我们将在第 6 章中讨论更多关于数据集和内存管理的内容。)


端到端的数据集示例

在此端到端数据集示例中,你将使用物联网数据集进行类似 DataFrame 示例中的使用的数据分析、ETL(提取、转换和加载)和数据操作。这个数据集很小而且还是伪造的,但我们这里的主要目的是说明使用数据集表达查询的清晰度以及这些查询的可读性,就像我们用 DataFrame 一样。

同样,为了简洁起见,我们不会在这里涵盖所有的示例代码;但是,我们已经在 GitHub 仓库中添加了这个手册。手册中介绍了你可能使用此数据集执行的常见操作。使用数据集 API,我们将尝试执行以下操作:

1.检测电池电量低于阈值的故障设备。

2.确定二氧化碳排放量较高水平的违规国家。

3.计算温度、电池电量、二氧化碳和湿度的最小值和最大值。

4.按平均温度、二氧化碳、湿度和国家进行排序和分组。


DataFrames Vs Datasets 

到目前为止,你可能知道为什么以及何时应该使用 DataFrame 或 Dataset。在许多情况下,两者都可以做到,这取决于你使用的语言,但在某些情况下,一种语言比另一种语言更加可取。下面有几个例子:

1.如果要告诉 Spark 要做什么,而不是如何做,请使用 DataFrame 或 Dataset。

2.如果你需要丰富的语义、高级抽象和 DSL 运算符,请使用 DataFrame 或 Dataset。

3.如果你希望实现严格的编译时类型安全性,并且不介意为特定的 Dataset[T]创建多个样例类,请使用“Dataset”。

4.如果处理需要高级表达式、过滤器、映射、聚合、计算平均值或和、SQL 查询、列访问或在半结构化数据上使用关系运算符,请使用 DataFrame 或 Dataset。

5.如果你的处理要求类似于 SQL 的查询的关系转换,请使用 DataFrame。

6.如果你想利用 Tungsten 的高效编码器序列化,并从中受益,请使用“Dataset”。

7.如果希望跨 Spark 组件进行统一、代码优化和 API 简化,请使用 DataFrame。

8.如果你是 R 用户,请使用 DataFrame。

9.如果你是 Python 用户,请使用 DataFrame,如果需要更多的控制,请使用 RDD。

10.如果你需要空间和速度效率,请使用 DataFrame。

11.如果希望在编译期间而不是在运行时捕获错误,请选择相应的 API,如图 3-2 所示。


什么时候使用 RDD

你可能会问:RDD 是否被降级为二等公民?他们已经被召回了吗?答案是非常明确的,“没有”!RDD API 将继续得到支持,尽管 Spark2.x 和 Spark3.x 中进行的所有开发工作都继续保持 DataFrame 接口和语义,而不是使用 RDD,但仍将继续支持 RDD API。

在某些情况下,你需要考虑使用 RDD,例如:

1.是否使用了使用 RDD 编写的第三方软件包。

2.可以放弃代码优化、有效利用的空间以及 DataFrame 和 Dataset 所提供的性能优势。

3.想要精确地指示 Spark 如何执行查询。


此外,你还可以使用简单的 API 方法调用 df.rdd 在 DataFrame、Dataset 和 RDD 之间进行无缝切换。(但是,请注意,切换是需要成本的,除非有必要,否则应该避免。)毕竟,DataFrame 和 Dataset 是建立在 RDD 之上,它们在整个测试阶段的代码生成过程中被分解为紧凑的 RDD 代码,我们将在下一节中讨论。


最后,前面部分提供了关于 Spark 中的结构化 API 如何使开发人员能够使用简单、友好的 API 来编写对结构化数据进行丰富的查询的洞察。换句话说,你使用高级操作告诉 Spark 该做什么,而不是怎么做,并且它确定了为你构建查询和生成紧凑代码的最有效的方法。


构建高效查询和生成紧凑代码的过程是 Spark SQL 引擎的工作。这是我们一直在研究的构建结构化 API 基础。现在让我们来看看该引擎的内幕。


Spark SQL 及基础引擎

在编程级别上,Spark SQL 允许开发人员对具有模式的结构化数据发出与 ANSI SQL:2003 兼容的查询。自从在 Spark1.3 中引入以来,Spark SQL 已经发展成为一个强大的引擎,在此基础上建立了许多高级的结构化功能。除了允许你对数据发出类似 SQL 的查询外,Spark SQL 引擎还包括:

1.统一 Spark 组件,并允许抽象为 Java、Scala、Python 和 R 中的 DataFrame/Dataset,这简化了对结构化数据集的工作。

2.连接到 Apache Hive 元存储库和表。

3.从结构化文件(JSON、CSV、文本、CSV、拼花、ORC 等)读写具有特定 schema 的结构化数据。并将数据转换为临时表。

4.提供交互式 Spark SQL Shell 支持快速数据浏览。

5.通过标准数据库 JDBC/ODBC 连接器提供与外部工具之间的桥梁。

6.为 JVM 生成优化的查询计划和紧凑的代码,以便最终执行。图 3-3 显示了与 Spark SQL 交互以实现所有这些目标的组件。


Spark SQL 引擎的核心是 Catalyst 优化器和 Project Tungsten。它们一起支持高级 DataFrame、Dataset API 和 SQL 查询。我们将在第六章中讨论 Tungsten,现在,让我们仔细看看优化器。


优化器

Catalyst 优化器接受计算查询,并将其转换为一个执行计划。它经历了四个转换阶段,如图 3-4 所示:

1.分析

2.辑优化

3.物理规划

4.代码生成


例如,考虑第 2 章中 M&M 示例中的一个查询。以下两个示例代码块将经历相同的过程,最终会执行类似的查询计划和相同的字节码。也就是说,无论你使用什么语言,你的计算都会经历相同的过程,而得到的字节码很可能是相同的:

 In Python

count_mnm_df = (mnm_df

.select("State", "Color", "Count")

.groupBy("State", "Color")

.agg(count("Count")

.alias("Total"))

.orderBy("Total", ascending=False))

-- In SQL

SELECT State, Color, Count, sum(Count) AS Total

FROM MNM_TABLE_NAME

GROUP BY State, Color, Count

ORDER BY Total DESC

要查看 Python 代码所经历的不同阶段,可以在 DataFrame 上使用 count_mnm_df.explain(True)方法。或者,要查看不同的逻辑和物理计划,在 Scala 中可以调用 df.queryExecution.logical 或 df.queryExecution.optimizedPlan。(在第 7 章中,我们将讨论关于调整和调试 Spark 以及如何读取查询计划。)这给我们提供了以下输出:

count_mnm_df.explain(True)

== Parsed Logical Plan ==

'Sort ['Total DESC NULLS LAST], true

+- Aggregate [State 10, Color 11], [State 10, Color 11, count(Count 12) AS...]

+- Project [State 10, Color 11, Count 12]

+- Relation[State 10,Color 11,Count 12] csv

== Analyzed Logical Plan ==

State: string, Color: string, Total: bigint

Sort [Total 24L DESC NULLS LAST], true

+- Aggregate [State 10, Color 11], [State 10, Color 11, count(Count 12) AS...]

+- Project [State 10, Color 11, Count 12]

+- Relation[State 10,Color 11,Count 12] csv

== Optimized Logical Plan ==

Sort [Total 24L DESC NULLS LAST], true

+- Aggregate [State 10, Color 11], [State 10, Color 11, count(Count 12) AS...]

+- Relation[State 10,Color 11,Count 12] csv

== Physical Plan ==

*(3) Sort [Total 24L DESC NULLS LAST], true, 0

+- Exchange rangepartitioning(Total 24L DESC NULLS LAST, 200)

+- *(2) HashAggregate(keys=[State 10, Color 11], functions=[count(Count 12)],

output=[State 10, Color 11, Total 24L])

+- Exchange hashpartitioning(State 10, Color 11, 200)

+- *(1) HashAggregate(keys=[State 10, Color 11],

functions=[partial_count(Count 12)], output=[State 10, Color 11, count 29L])

+- *(1) FileScan csv [State 10,Color 11,Count 12] Batched: false,

Format: CSV, Location:

InMemoryFileIndex[file:/Users/jules/gits/LearningSpark2.0/chapter2/py/src/...

dataset.csv], PartitionFilters: [], PushedFilters: [], Read 数据结构(schema):

struct<State:string,Color:string,Count:int>

让我们考虑另一个 DataFrame 计算示例。随着底层引擎优化其逻辑和物理计划,以下 Scala 代码也经历了类似的过程:

// In Scala

// Users DataFrame read from a Parquet table

val usersDF = ...

// Events DataFrame read from a Parquet table

val eventsDF = ...

// Join two DataFrames

val joinedDF = users

.join(events, users("id") === events("uid"))

.filter(events("date") > "2015-01-01") 


经过初始分析阶段后,查询计划由 Catalyst 优化器进行转换和重新排列,如图 3-5 所示。


让我们先分析这四个查询优化阶段。


阶段 1:分析

Spark SQL 引擎首先会为 SQL 或 DataFrame 查询生成一个抽象语法树(AST)。在此初始阶段,任何列或表名都将会被解析为内部的 Catalog,catalog 是一个指向 Spark SQL 的编程接口,该接口包含列、数据类型、函数、表、数据库、列名等等的列表。一旦全部成功解决,查询将继续进入下一阶段


阶段 2:逻辑优化

如图 3-4 所示,该阶段包括两个内部阶段。应用基于标准化的优化方法,Catalyst 优化器将首先构建一组多个计划,然后使用其基于成本的优化器(CBO)将成本分配给每个计划。这些计划展示为算子树的形式(如图 3-5);例如,它们可能包括常数折叠、谓词下推、投影计算、布尔表达式简化等过程。这个逻辑计划是对物理计划的输入。


阶段 3:物理执行计划

在此阶段,Spark SQL 使用与 Spark 执行引擎相匹配的物理运算符,为所选的逻辑计划生成最佳的物理计划。


阶段 4:代码生成

查询优化的最后阶段涉及生成在每台机器上运行的高效 Java 字节码。因为 Spark SQL 可以对内存中加载的数据集进行操作,所以 Spark 可以使用最先进的编译器技术来生成代码以加快执行速度。换句话说,它充当了编译器。Tungsten 项目在这里发挥了重要作用,是整个阶段代码生成的利器。整个阶段的代码生成是什么呢?这是一个物理查询优化阶段,它将整个查询分解成一个函数,摆脱虚拟函数调用,并使用 CPU 寄存器存储中间数据。Spark2.0 中引入的第二代 Tungsten 引擎使用此方法生成紧凑的 RDD 代码以便最终执行。这种精简的策略显著提高了 CPU 的效率和性能。

我们已经在概念层面上讨论了 Spark SQL 引擎的工作原理,其中包括其两个主要组件:Catalyst 优化器和 Tungsten 项目。内部的技术工作不在本书的讨论范围之内;然而,对于好奇的读者,我们建议你查看文本中的参考资料,以进行深入的技术讨论。


总结

在本章中,我们深入研究了 Spark 的结构化 API,了解了 Spark 的结构化历史和优点。通过说明性的常见数据操作和代码示例,我们证明了高级的 DataFrame 和 Dataset API 比低级的 RDD API 更具表达力和直观性。结构化 API 旨在简化大型数据集的处理,为通用数据操作提供了特定领域的运算符,提高了代码的清晰度和表达性我们根据你使用的用例场景,探讨了何时使用 RDD、DataFrame 和 Dataset。最后,我们深入了解了 Spark SQL 引擎的主要组件(Catalyst 优化器和 Project Tungesten)如何支持结构化高级 API 和 DSL 运算符。正如你所看到的,无论你使用哪种 Spark 支持的语言,Spark 查询都会经历相同的优化过程,从逻辑和物理计划构建到最终紧凑代码的生成。本章中的概念和代码示例为接下来的两章奠定了基础,其中我们将进一步说明 DataFrame、Dataset 和 Spark SQL 之间的无缝互操作性。

1.可以从https://oreil.ly/iDzQK获得此公共数据。

2.原始数据集有 60 列以上。我们删除了一些不必要的列,删除了具有空值或无效值的记录,并添加了一个额外的 Delay 列。

发布于: 4 小时前阅读数: 11
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
Apache Spark结构化API(三)