写点什么

Spark SQL 和 DataSet(六)

发布于: 2021 年 07 月 18 日
Spark SQL和DataSet(六)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与 AI 相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI 等相关文章,目前已经输出了 40 万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。


在前面两篇文章中,我们讨论了 Spark SQL 和 DataFrame API。我们研究了如何连接到内置和外部数据源,查看了 Spark SQL 引擎相关的内容,并探讨了诸如 SQL 和 DataFrames 之间的相互操作性,创建和管理视图和表,以及高级 DataFrame 和 SQL 转换等主题。

尽管我们在第 3 章中简要介绍了 DataSet API ,但还是概述了如何在 Spark 中创建,存储,序列化和反序列化 DataSet (强类型分布式集合)这些比较重要的方面。

在本章中,我们将深入了解 DataSet :我们将探索在 Java 和 Scala 中 DataSet 的有关用法 ,还有 Spark 如何管理内存以适应作为高级 API 一部分的 DataSet 结构,以及与使用 DataSet 相关的成本。


Java 和 Scala 的单一 API

正如你从第三章中(图 3-1 和表 3-6)所记得的那样,DataSet 为强类型对象提供了统一且单一的 API。在 Spark 支持的语言中,只有 Scala 和 Java 是强类型的。因此,Python 和 R 只支持无类型的 DataFrame API。

DataSet 是特定领域的类型对象,可以使用函数式编程或从 DataFrame API 熟悉的 DSL 运算符并行操作 DataSet 。

由于这个单一的 API,Java 开发人员不再有落后的风险。例如,Scala 未来的任何接口或行为的变化,如 groupBy(),flatMap(),map(),或 filter() 这些方法,Java API 也会是一样的,因为它是一个单一的接口,有统一的规范,这对这两种实现方式是类似的。


Scala 案例类和 JavaBeans 用于 DataSet

如果你还记得,从第 3 章(表 3-2)可以知道,Spark 本身有内部的数据类型,如 StringType,BinaryType,IntegerType,BooleanType 和 MapType,以便在 Spark 操作期间能够无缝地映射到 Scala 和 Java 语言特定的数据类型。这种映射是通过编码器完成的,我们将在本章后面讨论。

为了创建 Dataset[T],其中 T 是 Scala 中类型化对象,也就是我们常说的泛型,因此你需要一个定义该对象的 case 类。为了方便说明,在这里使用第 3 章(表 3-1)中的示例数据,我们有一个 JSON 文件,其中包含数以百万计的博客作者关于 Apache Spark 的条目,这些条目采用以下格式:

{id: 1, first: "Jules", last: "Damji", url: "https://tinyurl.1", date:

"1/4/2016", hits: 4535, campaigns: {"twitter", "LinkedIn"}},

...

{id: 87, first: "Brooke", last: "Wenig", url: "https://tinyurl.2", date:

"5/5/2018", hits: 8908, campaigns: {"twitter", "LinkedIn"}}

要创建分一个布式 Dataset[Bloggers],我们必须首先定义一个 Scala case 类,该类定义包含 Scala 对象的每个单独字段。这个 case 类作为类型对象 Bloggers 的蓝图或数据结构:

// In Scala

case class Bloggers(id:Int, first:String, last:String, url:String, date:String,

hits: Int, campaigns:Array[String])

现在,我们可以从数据源读取文件:

val bloggers = "../data/bloggers.json"

val bloggersDS = spark

  .read

  .format("json")

  .option("path", bloggers)

  .load()

  .as[Bloggers]

生成的分布式 DataSet 中的每一行都是 Bloggers 类型。

同样,你也可以在 Java 创建 Bloggers 类型的 JavaBean 类,然后使用编码器创建一个 Dataset<Bloggers>,如下所示:

// In Java

import org.apache.spark.sql.Encoders;

import java.io.Serializable;

 

public class Bloggers implements Serializable {

    private int id;

    private String first;

    private String last;

    private String url;

    private String date;

    private int hits;

    private Array[String] campaigns;

 

// JavaBean getters and setters

int getID() { return id; }

void setID(int i) { id = i; }

String getFirst() { return first; }

void setFirst(String f) { first = f; }

String getLast() { return last; }

void setLast(String l) { last = l; }

String getURL() { return url; }

void setURL (String u) { url = u; }

String getDate() { return date; }

Void setDate(String d) { date = d; }

int getHits() { return hits; }

void setHits(int h) { hits = h; }

 

Array[String] getCampaigns() { return campaigns; }

void setCampaigns(Array[String] c) { campaigns = c; }

}

 

// Create Encoder

Encoder<Bloggers> BloggerEncoder = Encoders.bean(Bloggers.class);

String bloggers = "../bloggers.json"

Dataset<Bloggers> bloggersDS = spark

  .read

  .format("json")

  .option("path", bloggers)

  .load()

  .as(BloggerEncoder);

如你所见,在 Scala 和 Java 中创建 DataSet 需要一些深思熟虑,因为你必须要知道正在读取的行的所有字段的列名和类型。在这一点是与 DataFrames 不同,对于 DataFrame 你可以选择让 Spark 推断数据结构,但是 Dataset API 要求你提前定义好数据结构,并且 case 类或 JavaBean 类要与该数据结构匹配,否则会出现异常。

 

Scala 案例类或 Java 类定义中的字段名称必须与数据源中的顺序匹配。数据中每一行的列名会自动映射到类中的相应名称,并且类型会自动保留。

如果字段名称与输入数据匹配,则可以使用现有的 Scala case 类或 JavaBean 类。使用 Dataset API 与使用 DataFrames 一样容易,简洁和声明式。对于大多数 DataSet 的转换,你可以使用在上一章中了解到的相同关系运算符。

让我们使用样本来研究 DataSet 更多的一些方面。


处理 DataSet

创建样本 DataSet 的一种简单而动态的方法是使用 SparkSession 实例。在这种情况下,为了方便说明,我们动态创建了一个包含三个字段的 Scala 对象:uid(用户的唯一 ID),uname(随机生成的用户名字符串)和 usage(服务器或服务使用情况的分钟数)。


创建样本数据

首先,让我们生成一些样本数据:

// In Scala

import scala.util.Random._

// Our case class for the Dataset

case class Usage(uid:Int, uname:String, usage: Int)

val r = new scala.util.Random(42)

// Create 1000 instances of scala Usage class

// This generates data on the fly

val data = for (i <- 0 to 1000)

  yield (Usage(i, "user-" + r.alphanumeric.take(5).mkString(""),

  r.nextInt(1000)))

// Create a Dataset of Usage typed data

val dsUsage = spark.createDataset(data)

dsUsage.show(10)

 

+---+----------+-----+

|uid|     uname|usage|

+---+----------+-----+

|  0|user-Gpi2C|  525|

|  1|user-DgXDi|  502|

|  2|user-M66yO|  170|

|  3|user-xTOn6|  913|

|  4|user-3xGSz|  246|

|  5|user-2aWRN|  727|

|  6|user-EzZY1|   65|

|  7|user-ZlZMZ|  935|

|  8|user-VjxeG|  756|

|  9|user-iqf1P|    3|

+---+----------+-----+

only showing top 10 rows

在 Java 中,这个想法是相似的,但是我们必须使用显式 Encoders 编码器(在 Scala 中,Spark 会隐式处理):

// In Java

import org.apache.spark.sql.Encoders;

import org.apache.commons.lang3.RandomStringUtils;

import java.io.Serializable;

import java.util.Random;

import java.util.ArrayList;

import java.util.List;

 

// Create a Java class as a Bean

public class Usage implements Serializable {

   int uid;                // user id

   String uname;           // username

   int usage;              // usage

 

   public Usage(int uid, String uname, int usage) {

       this.uid = uid;

       this.uname = uname;

       this.usage = usage;

   }

   // JavaBean getters and setters

   public int getUid() { return this.uid; }

   public void setUid(int uid) { this.uid = uid; }

   public String getUname() { return this.uname; }

   public void setUname(String uname) { this.uname = uname; }

   public int getUsage() { return this.usage; }

   public void setUsage(int usage) { this.usage = usage; }

 

   public Usage() {

   }

 

   public String toString() {

       return "uid: '" + this.uid + "', uame: '" + this.uname + "',

       usage: '" + this.usage + "'";

   }

}

 

// Create an explicit Encoder

Encoder<Usage> usageEncoder = Encoders.bean(Usage.class);

Random rand = new Random();

rand.setSeed(42);

List<Usage> data = new ArrayList<Usage>()

 

// Create 1000 instances of Java Usage class

for (int i = 0; i < 1000; i++) {

  data.add(new Usage(i, "user" +

  RandomStringUtils.randomAlphanumeric(5),

  rand.nextInt(1000));

  

// Create a Dataset of Usage typed data

Dataset<Usage> dsUsage = spark.createDataset(data, usageEncoder);

 

Scala 和 Java 之间生成的 DataSet 将有所不同,因为随机种子算法可能有所不同。因此,你的 Scala 和 Java 的查询结果将有所不同。

现在我们有了生成的 DataSet --- dsUsage,让我们执行在上一章中完成的一些常见转换操作。


样本数据转换

回想一下,DataSet 是特定领域的对象的强类型集合。这些对象可以使用功能或关系操作进行并行转换操作。这些转换的例子包括 map(),reduce(),filter(),select()和 aggregate()。作为高阶函数的示例,这些方法可以通过 lambda,闭包或函数作为参数并返回结果。因此,它们非常适合函数式编程。

Scala 是一种函数式编程语言,最近,lambda,函数自变量和闭包也已添加到 Java 中。让我们尝试一些 Spark 中的高阶函数,并对之前创建的示例数据使用函数式编程结构。


高阶函数和函数式编程

举一个简单的例子,让我们使用它 filter()来返回 dsUsage DataSet 中所有使用时间超过 900 分钟的用户。一种实现方法是使用函数表达式作为 filter()方法的参数:

// In Scala

import org.apache.spark.sql.functions._

dsUsage

  .filter(d => d.usage > 900)

  .orderBy(desc("usage"))

  .show(5, false)

另一种方法是定义一个函数并将该函数作为参数提供给 filter():

def filterWithUsage(u: Usage) = u.usage > 900

dsUsage.filter(filterWithUsage(_)).orderBy(desc("usage")).show(5) 

 

+---+----------+-----+

|uid| uname|usage|

+---+----------+-----+

|561|user-5n2xY| 999|

|113|user-nnAXr| 999|

|605|user-NL6c4| 999|

|634|user-L0wci| 999|

|805|user-LX27o| 996|

+---+----------+-----+

only showing top 5 rows

在第一种情况下,我们使用 lambda 表达式{d.usage > 900}作为 filter()方法的参数,而在第二种情况下,我们定义了 Scala 函数 def filterWithUsage(u: Usage) = u.usage > 900。在这两种情况下,该 filter()方法都会在分布式 Dataset 中的对象的每一行上进行迭代 Usage 对象,并应用表达式或执行该函数,Usage 为表达式或函数值作为一行,当布尔值为 true 时,返回一个具有 Usage 类型的新的 Dataset。(有关方法签名的详细信息,请参见 Scala 文档。)

在 Java 中,用于 filter()参数的类型为 FilterFunction<T>。可以匿名内联定义或使用命名函数定义。在此示例中,我们将通过名称定义函数并将其分配给变量 f。在 filter()中应用此函数将返回一个新的 Dataset,其中包含我们的过滤条件为 true 的所有行:

// In Java

// Define a Java filter function

FilterFunction<Usage> f = new FilterFunction<Usage>() {

   public boolean call(Usage u) {

       return (u.usage > 900);

   }

};

 

// Use filter with our function and order the results in descending order

dsUsage.filter(f).orderBy(col("usage").desc()).show(5);

 

+---+----------+-----+

|uid|uname     |usage|

+---+----------+-----+

|67 |user-qCGvZ|997  |

|878|user-J2HUU|994  |

|668|user-pz2Lk|992  |

|750|user-0zWqR|991  |

|242|user-g0kF6|989  |

+---+----------+-----+

only showing top 5 rows

并非所有的 lambda 或函数参数都必须求 Boolean 值。他们也可以返回计算值。考虑使用高阶函数的示例 map(),我们的目标是找出 usage 中价值超过特定阈值的每个用户的使用成本,以便我们为每位用户提供每分钟的特殊价格。

// In Scala

// Use an if-then-else lambda expression and compute a value

dsUsage.map(u => {if (u.usage > 750) u.usage * .15 else u.usage * .50 })

  .show(5, false)

// Define a function to compute the usage

def computeCostUsage(usage: Int): Double = {

  if (usage > 750) usage * 0.15 else usage * 0.50

}

// Use the function as an argument to map()

dsUsage.map(u => {computeCostUsage(u.usage)}).show(5, false)

+------+

|value |

+------+

|262.5 |

|251.0 |

|85.0  |

|136.95|

|123.0 |

+------+

only showing top 5 rows

要在 Java 中使用 map(),必须定义一个 MapFunction<T>。可以是匿名类,也可以是 extends 的已定义类 MapFunction<T>。在此示例中,我们将其内联使用,即在方法中调用自己:

// In Java

// Define an inline MapFunction

dsUsage.map((MapFunction<Usage, Double>) u -> {

   if (u.usage > 750)

       return u.usage * 0.15;

   else

       return u.usage * 0.50;

}, Encoders.DOUBLE()).show(5); // We need to explicitly specify the Encoder

+------+

|value |

+------+

|65.0  |

|114.45|

|124.0 |

|132.6 |

|145.5 |

+------+

only showing top 5 rows

尽管我们已经计算了使用成本的值,但是我们不知道计算的值与哪些用户相关联。我们如何获得这些信息?

步骤很简单:

1. 创建一个 Scala case 样例类或 JavaBean 类 UsageCost,并添加一个名为 cost 的其他字段或列。

2. 定义一个函数来计算 cost 然后在 map()方法中使用它。

这是 Scala 中的样子:

// In Scala

// Create a new case class with an additional field, cost

case class UsageCost(uid: Int, uname:String, usage: Int, cost: Double)

 

// Compute the usage cost with Usage as a parameter

// Return a new object, UsageCost

def computeUserCostUsage(u: Usage): UsageCost = {

  val v = if (u.usage > 750) u.usage * 0.15 else u.usage * 0.50

    UsageCost(u.uid, u.uname, u.usage, v)

}

 

// Use map() on our original Dataset

dsUsage.map(u => {computeUserCostUsage(u)}).show(5)

 

+---+----------+-----+------+

|uid|     uname|usage|  cost|

+---+----------+-----+------+

|  0|user-Gpi2C|  525| 262.5|

|  1|user-DgXDi|  502| 251.0|

|  2|user-M66yO|  170|  85.0|

|  3|user-xTOn6|  913|136.95|

|  4|user-3xGSz|  246| 123.0|

+---+----------+-----+------+

only showing top 5 rows

现在,我们有了一个转换后的 DataSet ,其中有一个新的列 cost ,由我们 map()转换中的函数以及所有其他列计算得出。

同样,在 Java 中,如果我们想要与每个用户相关的成本,则需要定义一个 JavaBean 类 UsageCost 和 MapFunction<T>。有关完整的 JavaBean 示例,请参见本书的 GitHub repo;为了简洁起见,我们仅在此处显示内联 MapFunction<T>:

// In Java

// Get the Encoder for the JavaBean class

Encoder<UsageCost> usageCostEncoder = Encoders.bean(UsageCost.class);

 

// Apply map() function to our data

dsUsage.map( (MapFunction<Usage, UsageCost>) u -> {

       double v = 0.0;

       if (u.usage > 750) v = u.usage * 0.15; else v = u.usage * 0.50;

       return new UsageCost(u.uid, u.uname,u.usage, v); },

         usageCostEncoder).show(5);

 

+------+---+----------+-----+

|  cost|uid|     uname|usage|

+------+---+----------+-----+

|  65.0|  0|user-xSyzf|  130|

|114.45|  1|user-iOI72|  763|

| 124.0|  2|user-QHRUk|  248|

| 132.6|  3|user-8GTjo|  884|

| 145.5|  4|user-U4cU1|  970|

+------+---+----------+-----+

only showing top 5 rows

关于使用高阶函数和 DataSet ,需要注意以下几点:

1.我们使用类型化的 JVM 对象作为函数的参数。

2.我们使用点表示法(来自面向对象的编程)来访问类型化的 JVM 对象内的各个字段,从而使其更易于阅读。

3.我们的某些函数和 lambda 签名可以是类型安全的,从而确保编译时错误检测并指示 Spark 处理哪些数据类型,执行哪些操作等。

4.使用 lambda 表达式中的 Java 或 Scala 语言功能,我们的代码可读,表达和简洁。

5.Spark 在 Java 和 Scala 中都提供了等效的 map()和 filter(),没有高级函数的构造,因此你不必被迫对 Datasets 或 DataFrames 使用函数式编程。相反,你可以简单地使用条件 DSL 运算符或 SQL 表达式:例如 dsUsage.filter("usage > 900")或 dsUsage($"usage" > 900)。(有关此内容的更多信息,请参见“使用 DataSet 的成本”。)

6.对于 DataSet ,我们使用编码器,这是一种在 JVM 和 Spark 内部二进制格式之间针对其数据类型有效地转换数据的机制(有关更多信息,请参见“DataSet 编码器”中的内容)。

 

函数和函数式编程并非 Spark DataSet 独有。你也可以将它们与 DataFrames 一起使用。回想一下,DataFrame 是一个 Dataset[Row],其中 Row 是一个通用的无类型 JVM 对象,可以容纳不同类型的字段。方法签名采用在其上进行操作的表达式或函数 Row,这意味着每个 Row 数据类型都可以作为表达式或函数的输入值。


将 DataFrame 转换为 DataSet

为了对查询和构造进行强类型检查,可以将 DataFrames 转换为 Datasets。要将现有的 DataFrame df 转换为 SomeCaseClass 类型的 Dataset ,只需使用 df.as[SomeCaseClass]表示即可。我们之前看到了一个示例:

// In Scala

val bloggersDS = spark

  .read

  .format("json")

  .option("path", "/data/bloggers/bloggers.json")

  .load()

  .as[Bloggers]

spark.read.format("json")返回一个 DataFrame<Row>,在 Scala 中是的类型别名 Dataset[Row]。Using.as[Bloggers]指示 Spark 使用编码器(将在本章稍后讨论)将对象从 Spark 的内部内存表示形式序列化/反序列化为 JVM Bloggers 对象。

DataSet 和 DataFrame 的内存管理

Spark 是一个内存密集型的分布式大数据引擎,因此其有效使用内存对其执行速度至关重要。在其整个发行历史中,Spark 的内存使用已发生了显着的变化:

1.Spark 1.0 使用基于 RDD 的 Java 对象进行内存存储,序列化和反序列化,这在资源和速度方面付出的代价都非常高。而且,存储空间是在 Java 堆上分配的,因此对于大型数据集你只能依靠 JVM 的垃圾回收(GC)。

2.Spark 1.x 引入了 Tungsten 项目。它的突出特点之一是一种新的基于行的内部格式,该格式使用偏移量和指针在堆外存储器中布局 DataSet 和 DataFrame 。Spark 使用一种称为编码器的高效机制在 JVM 及其内部 Tungsten 格式之间进行序列化和反序列化。堆外分配内存意味着减少了 GC 对 Spark 的影响。

3.Spark 2.x 引入了第二代 Tungsten 引擎,具有整个阶段的代码生成和基于列的矢量化内存布局的功能。该新版本以现代编译器的思想和技术为基础,还利用现代 CPU 和缓存体系结构,以“单条指令,多个数据”(SIMD)方法进行快速并行数据访问。


DataSet 编码器

编码器将堆外内存中的数据从 Spark 的内部 Tungsten 格式转换为 JVM Java 对象。换句话说,它们将 Dataset 对象从 Spark 的内部格式序列化和反序列化为 JVM 对象,包括原始数据类型。例如,Encoder[T]将会从 Spark 的内部 Tungsten 格式转换为 Dataset[T]。

Spark 内置支持为原始类型(例如,字符串,整数,长整数)、Scala case 类和 JavaBeans 自动生成生成编码器。与 Java 和 Kryo​​的序列化和反序列化相比,Spark 编码器明显更快。

在我们先前的 Java 示例中,我们显式创建了一个编码器:

Encoder<UsageCost> usageCostEncoder = Encoders.bean(UsageCost.class);

但是,对于 Scala,Spark 会自动为这些高效的转换器生成字节码。让我们看一下 Spark 内部基于 Tungsten 行的格式。


Spark 的内部格式与 Java 对象格式

Java 对象的开销很大,包括标头信息,哈希码,Unicode 信息等。即使是简单的 Java 字符串(如“ abcd”)也需要 48 字节的存储空间,而不是你想象的 4 字节。可以想象一下创建一个 MyClass(Int, String, String)对象的开销。

Spark 不会为 DataSet 或 DataFrame 创建基于 JVM 的对象,而是分配堆外 Java 内存来布局其数据,并使用编码器将数据从内存中表示形式转换为 JVM 对象。例如,图 6-1 显示了如何在 MyClass(Int, String, String)内部存储 JVM 对象。

当数据以这种连续方式存储并且可以通过指针算术和 offets 访问时,编码器可以快速序列化或反序列化该数据。这意味着什么?


序列化和反序列化(SerDe)

在分布式计算中,这并不是一个新概念,在分布式计算中,数据经常通过网络在群集中的计算机节点之间传播,序列化和反序列化是将类型化对象编码(序列化)为发送方的二进制表示或格式,接收方从二进制格式转换为重新规范的数据类型对象的过程。

例如,如果 JVM 对象 MyClass 在图 6-1 有在 Spark 集群节点之间共享,发送者将序列成字节数组,并且接收器将它反序列化为 MyClass 类型的 JVM 对象。

JVM 拥有自己的内置 Java 序列化器和反序列化器,但是效率低下,因为(如上一节所述)JVM 在堆内存中创建的 Java 对象会膨胀。因此,这个过程很缓慢。

出于以下几个原因,这是 Dataset 编码器可以解决的地方:

1.Spark 的内部 Tungsten 二进制格式(见图 6-1 和 6-2)将对象存储在 Java 堆内存之外,并且结构紧凑,因此这些对象占用的空间更少。

2.编码器可以使用带有内存地址和偏移量的简单指针算法遍历整个存储器,从而快速进行序列化(图 6-2)。

3.在接收端,编码器可以将二进制表示形式快速反序列化为 Spark 的内部表示形式。编码器不受 JVM 的垃圾收集暂停的阻碍。

但是,正如我们接下来讨论的那样,生活中大多数美好的事物都是有代价的。


使用 DataSet 的成本

在第 3 章的“DataFrame 与 DataSet ”中,我们概述了使用 DataSet 的一些好处——但是这些好处是有代价的。正如前面所指出的那样,当 DataSet 被传递到高阶函数如,filter()、map()和 flatMap(),或作为参数传递给 lambdas 方法时,存在与从 Spark 的内部 Tungsten 格式反序列化到 JVM 对象的成本。

与在 Spark 中引入编码器之前使用的其他串行器相比,此开销较小且可以接受。但是,在较大的 DataSet 和密集查询中,这些成本会累积并可能影响性能。

降低成本的策略

减轻过度序列化和反序列化的一种策略是在查询中使用 DSL 表达式,并避免过度使用 lambda 作为匿名函数作为高阶函数的参数。因为 lambda 在运行前一直是匿名的,并且对 Catalyst 优化器是不透明的,所以当你使用它们时,它不能有效地识别你在做什么(你没有告诉 Spark 该做什么),因此无法优化查询(请参阅第三章中“ Catalyst Optimizer”这一节))。

第二种策略是将查询链接在一起,从而尽量减少序列化和反序列化。将查询链接在一起是 Spark 中的一种常见做法。

让我们用一个简单的例子来说明。假设我们有一个类型为 Person 的 DataSet ,其中 Person 定义为 Scala 案例类:

// In Scala

Person(id: Integer, firstName: String, middleName: String, lastName: String,

gender: String, birthDate: String, ssn: String, salary: String)

我们想使用函数式编程对此 DataSet 发出一组查询。

让我们检查一个我们无效地编写查询的情况,以便我们无意地承担重复序列化和反序列化的代价:

import java.util.Calendar

val earliestYear = Calendar.getInstance.get(Calendar.YEAR) - 40

 

personDS

 

  // Everyone above 40: lambda-1

  .filter(x => x.birthDate.split("-")(0).toInt > earliestYear)

  

  // Everyone earning more than 80K

  .filter($"salary" > 80000)

  

  // Last name starts with J: lambda-2

  .filter(x => x.lastName.startsWith("J"))

  

  // First name starts with D

  .filter($"firstName".startsWith("D"))

  .count()

如你在图 6-3 中所观察到的,每次我们从 lambda 迁移到 DSL(filter($"salary" > 8000))时,都会产生序列化和反序列化 Person JVM 对象的成本。

图 6-3。用 lambda 和 DSL 链接查询的低效方式

相比之下,以下查询仅使用 DSL,不使用 lambda。结果,它的效率更高,整个组合查询和链接查询都不需要序列化/反序列化:

personDS

  .filter(year($"birthDate") > earliestYear) // Everyone above 40

  .filter($"salary" > 80000) // Everyone earning more than 80K

  .filter($"lastName".startsWith("J")) // Last name starts with J

  .filter($"firstName".startsWith("D")) // First name starts with D

  .count()

出于好奇心里,你可以在本书的 GitHub 仓库中查看本章笔记中两次运行之间的时间差异。


总结

在本章中,我们详细介绍了如何在 Java 和 Scala 中使用 DataSet 。我们探索了 Spark 如何管理内存以适应 DataSet 构造(作为其统一和高级 API 的一部分),并且我们考虑了与使用 DataSet 相关的一些成本以及如何减少这些成本。我们还向你展示了如何在 Spark 中使用 Java 和 Scala 的函数式编程构造。

最后,我们了解了编码器如何从 Spark 的内部 Tungsten 二进制格式到 JVM 对象进行序列化和反序列化。

在下一章中,我们将介绍如何通过检查高效的 I/O 策略、优化和调整 Spark 配置以及在调试 Spark 应用程序时要查找的属性和筛选值来优化 Spark。

 

发布于: 2021 年 07 月 18 日阅读数: 27
用户头像

还未添加个人签名 2018.05.14 加入

公众号【数据与智能】主理人,个人微信:liuq4360 12 年大数据与 AI相关项目经验, 10 年推荐系统研究及实践经验,目前已经输出了40万字的推荐系统系列精品文章,并有新书即将出版。

评论

发布
暂无评论
Spark SQL和DataSet(六)