写点什么

大数据 -93 SparkSQL 全面解析:SQL + 分布式计算的完美结合

作者:武子康
  • 2025-09-10
    山东
  • 本文字数:4764 字

    阅读完需:约 16 分钟

大数据-93 SparkSQL 全面解析:SQL + 分布式计算的完美结合

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 08 日更新到:Java-118 深入浅出 MySQL ShardingSphere 分片剖析:SQL 支持范围、限制与优化实践 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节完成的内容如下:


  • Standalone 提交

  • SparkContext 相关概念

  • Shuffle 概念、历史、V1 和 V2 对比


SparkSQL 概述

简单介绍

SparkSQL 是 Apache Spark 中用于处理结构化数据的模块。它不仅支持 SQL 查询,还允许你将 SQL 查询与 Spark 的其他强大功能结合使用,如数据流处理和机器学习。SparkSQL 提供了对数据的高度优化的访问方式,可以处理大量的结构化和半结构化数据集。

核心功能

SQL 支持

SparkSQL 提供了完整的 ANSI SQL 兼容性,允许用户使用标准的 SQL 语句查询数据。与 HiveQL 类似但更强大,支持包括:


  • 基本查询操作:SELECT、WHERE、ORDER BY 等

  • 复杂操作:JOIN(包括内连接、外连接)、GROUP BY 聚合、子查询、窗口函数

  • 数据定义语言:CREATE TABLE、ALTER TABLE 等

  • 数据控制语言:GRANT、REVOKE 等权限管理


例如,可以执行如下复杂查询:


SELECT     dept.name,     AVG(salary) as avg_salary,    RANK() OVER (PARTITION BY dept.id ORDER BY salary DESC) as rankFROM employeesJOIN departments dept ON employees.dept_id = dept.idWHERE hire_date > '2020-01-01'GROUP BY dept.id, dept.name
复制代码

数据框 API

数据框(DataFrame)是 SparkSQL 的核心抽象,具有以下特点:


  • 分布式特性:数据自动分区存储在集群节点上

  • 强类型:支持多种数据类型(StringType、IntegerType 等)

  • 丰富的操作接口:

  • 转换操作:select()、filter()、groupBy()

  • 行动操作:count()、show()、collect()

  • 类 SQL 方法:where()、orderBy()


示例代码:


# 从JSON创建DataFramedf = spark.read.json("hdfs://path/to/data.json")
# 执行转换和过滤result = df.select("name", "age")\ .filter(df.age > 30)\ .groupBy("department")\ .agg({"salary": "avg"})
# 显示结果result.show()
复制代码

与其他 Spark 组件的集成

SparkSQL 与 Spark 生态深度集成:


  1. Spark Streaming:可以将实时流数据转换为 DataFrame 进行处理


   val streamDF = spark.readStream     .format("kafka")     .option("kafka.bootstrap.servers", "host1:port1")     .load()
复制代码


  1. Spark MLlib:DataFrame 可直接作为机器学习算法的输入


   from pyspark.ml import Pipeline   from pyspark.ml.classification import LogisticRegression      lr = LogisticRegression(featuresCol="features")   pipeline = Pipeline(stages=[lr])   model = pipeline.fit(trainDF)
复制代码

Catalyst 优化器

Catalyst 优化器的工作流程:


  1. 解析阶段:将 SQL/DataFrame 操作转换为逻辑计划

  2. 优化阶段:应用规则优化:

  3. 谓词下推(Pushdown Predicates)

  4. 列剪裁(Column Pruning)

  5. 常量折叠(Constant Folding)

  6. 分区裁剪(Partition Pruning)

  7. 物理计划生成:选择最优执行策略


例如,对于查询:


SELECT name FROM users WHERE age > 20
复制代码


优化器会自动将过滤条件"age > 20"下推到数据源读取阶段,减少数据传输量。

统一的数据访问

支持连接各种数据源:


  1. 内置连接器

  2. 文件格式:Parquet、JSON、CSV、ORC

  3. 数据库:JDBC(MySQL、PostgreSQL 等)

  4. 大数据存储:Hive、HDFS

  5. 扩展连接器

  6. 云存储:S3、Azure Blob Storage

  7. NoSQL:Cassandra、MongoDB

  8. 消息队列:Kafka


示例连接代码:


// 读取Parquet文件val parquetDF = spark.read.parquet("hdfs://path/to/file.parquet")
// 连接MySQLval jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:mysql://localhost:3306/db") .option("dbtable", "table1") .load()
复制代码

使用场景

  • 数据仓库和商业智能:利用 SparkSQL 可以对大规模数据集进行复杂的查询和分析,非常适合用于数据仓库和商业智能场景。

  • 数据集成:SparkSQL 可以集成多个不同的数据源,将它们整合成一个统一的视图进行处理。

  • 数据管道:在数据管道中,SparkSQL 常用于数据的清洗、转换和聚合。

Hive

  • Hive 的诞生,主要是因为 MapReduce 程序对 Java 的要求比较高,为了他们能够操作 HDFS 上的数据,推出了 Hive。

  • Hive 和 RDBMS 的 SQL 模型比较类似,容易掌握。

  • Hive 的主要缺陷在于它的底层是基于 MapReduce 的,执行比较慢。

Spark 0.x: Shark

在 Spark 0.x 版的时候推出了 Shark,Shark 与 Hive 是紧密关联的,Shark 底层很多东西还是依赖于 Hive,修改了内存管理、物理计划、执行三个模块,底层使用 Spark 基于内存的计算模型,性能上比 Hive 提升了很多倍。Shark 更多的是对 Hive 的改造,替换了 Hive 的物理执行引擎,提高了执行速度。但 Shark 继承了大量的 Hive 代码,因为给优化和维护带来了大量的麻烦。

Spark 1.x: Shark

在 Spark1.x 版本时 Shark 被淘汰,在 2014 年 7 月 1 日的 SparkSummit 上,Databricks 宣布终止对于 Shark 的开发,将重点放到了 SparkSQL 上。Shark 终止后,产生了两个分支:


  • Hive On Spark:Hive 社区的,源码在 Hive 中

  • Spark SQL (Spark On Hive):Spark 社区的,源码在 Spark 中

Spark 3.0

Spark3.0 超过 3400 个 Jira 问题被解决,下面是各个核心组件中分布情况:


Spark SQL 特点

Spark SQL 作为 Apache Spark 生态系统中的核心组件,自 2014 年发布以来成功接替了 Shark 项目,为大数据处理领域提供了高性能的 SQL-on-Hadoop 解决方案。它不仅延续了 Shark 的优势,还通过技术创新显著提升了处理能力。

核心优势

  1. 开发效率提升

  2. 通过统一的数据抽象(DataFrame/Dataset)减少了代码量

  3. 示例:相比 RDD API,同样的聚合操作可减少 30-50%的代码

  4. 支持多种语言接口(Scala、Java、Python、R)

  5. 内存优化存储

  6. 采用列式存储格式(Tungsten 格式)而非传统 JVM 对象存储

  7. 内存使用效率提升 4-5 倍

  8. 支持压缩(如字典编码、位打包等)

  9. 自动选择最优的数据布局(行存/列存)

  10. 性能优化技术

  11. Catalyst 优化器

  12. 基于规则的查询优化

  13. 成本模型驱动的执行计划选择

  14. 谓词下推等优化策略

  15. Tungsten 引擎

  16. 内存管理优化

  17. 缓存感知计算

  18. 代码生成技术

  19. 基准测试显示比 Hive 快 10-100 倍

应用场景

  1. 交互式查询:支持亚秒级响应的即席查询

  2. ETL 处理:高效处理结构化数据转换

  3. 流批一体:与 Structured Streaming 集成实现实时分析

  4. 机器学习:为 MLlib 提供结构化数据支持

架构创新

  • 统一的数据处理 API(DataFrame 作为抽象)

  • 与 Spark 核心引擎深度集成

  • 支持多种数据源(Hive、Parquet、JSON 等)

  • 提供 JDBC/ODBC 接口便于 BI 工具集成


通过上述技术创新,Spark SQL 成功实现了"写更少代码、读更少数据、获得更好性能"的设计目标,成为大数据时代处理结构化数据的首选工具之一。

Spark SQL 数据抽象

Spark SQL 提供了两个新的抽象,分别是:


  • DataFrame

  • DataSet

DataFrame

DataFrame 的前身是 SchemaRDD,Spark 1.3 更名为 DataFrame,不继承 RDD,自己实现了 RDD 的大部分功能。与 RDD 类似,DataFrame 也是一个分布式的数据集:


  • DataFrame 可以看做分布式 Row 对象的集合,提供了由列组成的详细模式信息,使其可以得到优化-

  • DataFrame 不仅有比 RDD 更多的算子,还可以进行执行计划的优化

  • DataFrame 更像传统数据的二维表格,除了数据以外,还记录数据的结构信息,即 Schema

  • DataFrame 支持嵌套数据类型(struct、array、map)

  • DataFrame API 提供了一套高层的关系操作,比函数式的 RDD API 要更加友好,门槛更低。

  • DataFrame 的劣势在于编译期间缺少安全检查,导致运行时出错。


下图是 RDD 存储和 DataFrame 存储的对比图:


Dataset

Dataset 是在 Spark1.6 中添加的新的接口


  • 与 RDD 相比,保存了更多的描述信息,概念上等同于关系型数据库中的二维表

  • 与 DataFrame 相比,保存了类型信息,是强类型的,提供了编译时类型检查

  • 调用 Dataset 的方法先会生成逻辑计划,然后 Spark 的优化器进行优化,最终生成物理计划,然后提交到集群中运行

  • Dataset 包含了 DataFrame 的功能,在 Spark2.0 中两者得到了统一,DataFrame 表示为 Dataset[Row],即 Dataset 的子集


Row & Schema

DataFrame = RDD[Row] + Schema。DataFrame 的前身是 SchemaRDDRow 是一个泛化、无类型的 JVM Object


我们可以启动 spark-shell 进行直观的体验:


spark-shell --master local[*]
复制代码


尝试运行下面的代码:


import org.apache.spark.sql.Row
val row1 = Row(1, "abc", 1.2)
row1(0)row1(1)row1(2)
row1.getInt(0)row1.getString(1)row1.getDouble(2)
row1.getAs[Int](0)row1.getAs[String](1)row1.getAs[Double](2)
复制代码


运行过程如下所示:


三者共性

  • RDD、DataFrame、Dataset 是 Spark 平台下的分布式弹性数据集,为处理海量数据提供便利

  • 三者都有许多相同的概念,如分区、持久化、容错等,有许多共同的函数,如 Map、Filter、SortBy

  • 三者都有惰性机制,只有在遇到 Action 算子时,才会开始真正的计算。

  • 对 DataFrame 和 Dataset 进行操作许多操作都需要这个包进行支持(import spark.implicits._)

三者区别

DataFrame(DataFrame = RDD[Row] + Schema)


  • 与 RDD 和 DataSet 不同,DataFrame 每一行的类型固定为 Row,只有通过解析才能获取各个字段的值

  • DataFrame 与 Dataset 均支持 SparkSQL 的操作


Dataset(Dataset = RDD[case class].toDS)


  • Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同

  • DataFrame 定义为 Dataset[Row],每一行的类型是 Row,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用前面的提到的 getAs 的方式来拿出字段

  • Dataset 每一行的类型都是一个 case class,在自定义了 case class 之后可以很自由的获得每一行的信息

数据类型

SparkSQL 支持多种数据类型,这些数据类型可以表示不同种类的结构化数据。理解这些数据类型有助于你在使用 SparkSQL 进行数据处理时,正确地定义和操作数据。

基本类型

  • StringType:表示字符串类型的数据。用于存储文本数据。

  • BinaryType:表示二进制数据类型,用于存储字节数组。

  • BooleanType:表示布尔类型的数据,只有两个可能的值:true 和 false。

  • DateType:表示日期类型的数据,不包含时间部分。格式通常为 YYYY-MM-DD。

  • TimestampType:表示时间戳类型的数据,包含日期和时间。格式通常为 YYYY-MM-DD HH:MM:SS.SSSSSS。

  • DoubleType:表示双精度浮点数类型的数据。用于存储高精度的数值。

  • FloatType:表示单精度浮点数类型的数据。比 DoubleType 占用更少的存储空间,但精度较低。

  • ByteType:表示一个 8 位有符号整数的数据类型。取值范围为 -128 到 127。

  • ShortType:表示一个 16 位有符号整数的数据类型。取值范围为 -32768 到 32767。

  • IntegerType:表示一个 32 位有符号整数的数据类型。取值范围为 -2147483648 到 2147483647。

  • LongType:表示一个 64 位有符号整数的数据类型。用于存储长整型数据。

  • DecimalType:表示精确的小数类型的数据。通常用于存储货币或需要精确计算的小数。

复杂类型

  • ArrayType:表示一个数组类型,可以存储相同数据类型的多个值。它的元素类型可以是任何数据类型(包括嵌套的复杂类型)。

  • MapType:表示键值对(key-value pairs)的集合,类似于哈希表或字典。键和值都可以是任意数据类型。

  • StructType:表示一个结构体类型,类似于关系数据库中的行。它由一组字段组成,每个字段都有一个名称和类型。StructType 是用来定义表的模式的主要方式。

特殊类型

  • NullType:表示空值的类型,通常在处理空数据或缺失数据时使用。

  • CalendarIntervalType:表示一个时间间隔,用于存储时间差异,例如几年几个月几天。



发布于: 刚刚阅读数: 2
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-93 SparkSQL 全面解析:SQL + 分布式计算的完美结合_Java_武子康_InfoQ写作社区