写点什么

大数据 -96 SparkSQL 语句详解:从 DataFrame 到 SQL 查询与 Hive 集成全解析

作者:武子康
  • 2025-09-13
    山东
  • 本文字数:4633 字

    阅读完需:约 15 分钟

大数据-96 SparkSQL 语句详解:从 DataFrame 到 SQL 查询与 Hive 集成全解析

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 08 日更新到:Java-118 深入浅出 MySQL ShardingSphere 分片剖析:SQL 支持范围、限制与优化实践 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • SparkSQL 核心操作

  • Action 操作 详细解释+测试案例

  • Transformation 操作 详细解释+测试案例


SQL 语句详解

SparkSQL 概述

SparkSQL 是 Apache Spark 框架中的一个核心模块,专门用于处理结构化和半结构化数据。它提供了对数据进行查询、处理和分析的高级接口,既支持命令式编程风格,又支持声明式查询方式。

兼容性特点

总体而言:SparkSQL 与 HQL 高度兼容,但在语法上比 HQL 更加简洁高效。这种兼容性主要体现在:


  1. 语法结构相似性达到 90%以上

  2. 支持 Hive 的大部分 UDF 函数

  3. 可以直接读取 Hive 表数据

核心特性深入解析

1. DataFrame API

DataFrame 是 SparkSQL 中的核心抽象,它本质上是一个分布式数据集,具有以下特点:


  • 数据结构:以命名列的形式组织数据,类似于关系数据库中的表

  • 数据源支持

  • 关系型数据库:通过 JDBC 连接

  • 文件系统:Parquet、JSON、CSV、ORC 等格式

  • Hive 表:直接读取 Hive 元数据

  • 其他:Avro、Cassandra 等


使用示例:


# 创建DataFramedf = spark.read.json("examples/src/main/resources/people.json")
# 显示数据df.show()
复制代码

2. SQL 查询能力

SparkSQL 提供了完整的 SQL 支持,包括:


  • 标准 SQL 语法:支持 SELECT、JOIN、GROUP BY 等常规操作

  • 高级功能:窗口函数、子查询、CTE 等

  • 执行过程:SQL 查询会被自动转换为逻辑计划,经过优化后生成物理执行计划,最终转换为 RDD 操作


查询示例:


-- 注册临时视图CREATE TEMPORARY VIEW people USING org.apache.spark.sql.json OPTIONS (path "people.json")
-- 执行SQL查询SELECT name, age FROM people WHERE age > 20
复制代码

3. Hive 集成机制

SparkSQL 与 Hive 的集成主要体现在:


  • 元数据共享:可以直接读取 Hive Metastore 中的表定义

  • 语法兼容:支持绝大多数 HiveQL 语法

  • 数据互通:可以读写 Hive 表数据


集成配置步骤:


  1. 将 hive-site.xml 复制到 Spark 配置目录

  2. 设置 Hive Metastore 连接参数

  3. 启动 SparkSession 时启用 Hive 支持

4. 性能优化体系

SparkSQL 的优化技术包括:

Catalyst 优化器

  • 逻辑优化:常量折叠、谓词下推、列裁剪等

  • 物理优化:选择最优的连接算法、确定分区策略等

Tungsten 执行引擎

  • 内存管理:使用 sun.misc.Unsafe 直接操作堆外内存

  • 代码生成:运行时生成优化后的字节码

  • CPU 缓存优化:改进数据布局以提升缓存命中率

典型应用场景

  1. 数据仓库分析:替代传统 Hive 进行 ETL 处理

  2. 交互式查询:通过 JDBC/ODBC 接口支持 BI 工具连接

  3. 流批一体处理:结合 Structured Streaming 实现实时分析

  4. 机器学习数据准备:为 MLlib 提供特征工程支持

与 HiveQL 的主要区别


通过以上对比可以看出,SparkSQL 在保留 HQL 兼容性的同时,通过内存计算和优化技术显著提升了查询性能。

数据样例

// 数据1 1,2,32 2,33 1,2
// 需要实现如下的效果1 11 21 32 22 33 13 2
复制代码

编写代码

package icu.wzkcase class Info(id: String, tags: String)object SparkSql01 {
def main(args: Array[String]): Unit = { val sparkSession = SparkSession .builder() .appName("SparkSQLDemo") .master("local[*]") .getOrCreate()
val sc = sparkSession.sparkContext sc.setLogLevel("WARN")
val arr = Array("1 1,2,3", "2 2,3", "3 1,2") val rdd: RDD[Info] = sc .makeRDD(arr) .map{ line => val fields: Array[String] = line.split("\\s+") Info(fields(0), fields(1)) }
import sparkSession.implicits._ implicit val infoEncoder = Encoders.product[Info]
val ds: Dataset[Info] = sparkSession.createDataset(rdd) ds.createOrReplaceTempView("t1")
sparkSession.sql( """ | select id, tag | from t1 | lateral view explode(split(tags, ",")) t2 as tag |""".stripMargin ).show sparkSession.sql( """ | select id, explode(split(tags, ",")) | from t1 |""".stripMargin ).show
sparkSession.close() }
}
复制代码

运行测试

控制台输出结果为:


+---+---+| id|tag|+---+---+|  1|  1||  1|  2||  1|  3||  2|  2||  2|  3||  3|  1||  3|  2|+---+---+
+---+---+| id|col|+---+---+| 1| 1|| 1| 2|| 1| 3|| 2| 2|| 2| 3|| 3| 1|| 3| 2|+---+---+
复制代码

运行结果

运行结果如下图所示:


输入与输出

SparkSQL 内建支持的数据源包括:


  • Parquet (默认数据源)

  • JSON

  • CSV

  • Avro

  • Images

  • BinaryFiles(Spark 3.0)


简单介绍一下,Parquet 是一种列式存储格式,专门为大数据处理和分析而设计。


  • 列式存储:Parquet 采用列式存储格式,这意味着同一列的数据存储在一起。这样可以极大地提高查询性能,尤其是当查询只涉及少量列时。

  • 高效压缩:由于同一列的数据具有相似性,Parquet 能够更高效地进行压缩,节省存储空间。

  • 支持复杂数据类型:Parquet 支持嵌套的数据结构,包括嵌套列表、映射和结构体,这使得它非常适合处理复杂的、半结构化的数据。

  • 跨平台:Parquet 是一种开放标准,支持多种编程语言和数据处理引擎,包括 Apache Spark、Hadoop、Impala 等。


Parquet

特点:Parquet 是一种列式存储格式,特别适合大规模数据的存储和处理。它支持压缩和嵌套数据结构,因此在存储效率和读取性能方面表现优异。


使用方式:spark.read.parquet("path/to/data") 读取 Parquet 文件;df.write.parquet("path/to/output") 将 DataFrame 保存为 Parquet 格式。

JSON

特点:JSON 是一种轻量级的数据交换格式,广泛用于 Web 应用程序和 NoSQL 数据库中。SparkSQL 能够解析和生成 JSON 格式的数据,并支持嵌套结构。


使用方式:spark.read.json("path/to/data") 读取 JSON 文件;df.write.json("path/to/output") 将 DataFrame 保存为 JSON 格式。

CSV

特点:CSV(逗号分隔值)是最常见的平面文本格式之一,简单易用,但不支持嵌套结构。SparkSQL 支持读取和写入 CSV 文件,并提供了处理缺失值、指定分隔符等功能。


使用方式:spark.read.csv("path/to/data") 读取 CSV 文件;df.write.csv("path/to/output") 将 DataFrame 保存为 CSV 格式。

Avro

特点:Avro 是一种行式存储格式,适合大规模数据的序列化。它支持丰富的数据结构和模式演化,通常用于 Hadoop 生态系统中的数据存储和传输。


使用方式:spark.read.format("avro").load("path/to/data") 读取 Avro 文件;df.write.format("avro").save("path/to/output") 将 DataFrame 保存为 Avro 格式。

ORC

特点:ORC(Optimized Row Columnar)是一种高效的列式存储格式,专为大数据处理而设计,支持高压缩率和快速读取性能。它在存储空间和 I/O 性能方面表现优越。


使用方式:spark.read.orc("path/to/data") 读取 ORC 文件;df.write.orc("path/to/output") 将 DataFrame 保存为 ORC 格式。

Hive Tables

特点:SparkSQL 能够无缝集成 Hive,直接访问 Hive 元数据,并对 Hive 表进行查询。它支持 HiveQL 语法,并能够利用 Hive 的存储格式和结构。


使用方式:通过 spark.sql("SELECT * FROM hive_table")查询 Hive 表;也可以使用 saveAsTable 将 DataFrame 写入 Hive 表。

JDBC/ODBC

特点:SparkSQL 支持通过 JDBC/ODBC 接口连接关系型数据库,如 MySQL、PostgreSQL、Oracle 等。它允许从数据库读取数据并将结果写回数据库。


使用方式:spark.read.format("jdbc").option("url", "jdbc:mysql://host/db").option("dbtable", "table").option("user", "username").option("password", "password").load() 读取数据库表;df.write.format("jdbc").option("url", "jdbc:mysql://host/db").option("dbtable", "table").option("user", "username").option("password", "password").save() 将 DataFrame 写入数据库。

Text Files

特点:SparkSQL 可以处理简单的文本文件,每一行被读取为一个字符串。适合用于处理纯文本数据。


使用方式:spark.read.text("path/to/data") 读取文本文件;df.write.text("path/to/output") 将 DataFrame 保存为文本格式。

Delta Lake (外部插件)

特点:Delta Lake 是一种开源存储层,构建在 Parquet 格式之上,支持 ACID 事务、可扩展元数据处理和流批一体的实时数据处理。尽管不是内建的数据源,但它在 Spark 生态系统中得到了广泛支持。


使用方式:spark.read.format("delta").load("path/to/delta-table") 读取 Delta 表;df.write.format("delta").save("path/to/delta-table") 将 DataFrame 保存为 Delta 格式。

测试案例

val df1 =spark.read.format("parquet").load("data/users.parquet")// Use Parquet; you can omit format("parquet") if you wish asit's the defaultval df2 = spark.read.load("data/users.parquet")
// Use CSVval df3 = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("data/people1.csv")
// Use JSONval df4 = spark.read.format("json").load("data/emp.json")
复制代码


此外还支持 JDBC 的方式:


val jdbcDF = sparkSession  .read  .format("jdbc")  .option("url", "jdbc:mysql://h122.wzk.icu/spark_test?useSSL=false")  .option("driver", "com.mysql.jdbc.Driver")  .option("user", "hive")  .option("password", "hive@wzk.icu")  .load()jdbcDF.show()
复制代码

访问 Hive

导入依赖

<dependency>  <groupId>org.apache.spark</groupId>  <artifactId>spark-hive_2.12</artifactId>  <version>${spark.version}</version></dependency>
复制代码

hive-site

需要在项目的 Resource 目录下,新增一个 hive-site.xml 备注:最好使用 metastore service 连接 Hive,使用直接 metastore 的方式时,SparkSQL 程序会修改 Hive 的版本信息


<configuration>    <property>        <name>hive.metastore.uris</name>        <value>thrift://h122.wzk.icu:9083</value>    </property></configuration>
复制代码

编写代码

object AccessHive {  def main(args: Array[String]): Unit = {    val spark = SparkSession      .builder()      .appName("Demo1")      .master("local[*]")      .enableHiveSupport()      // 设为true时,Spark使用与Hive相同的约定来编写Parquet数据      .config("spark.sql.parquet.writeLegacyFormat", true)      .getOrCreate()
val sc = spark.sparkContext sc.setLogLevel("warn")
spark.sql("show databases").show spark.sql("select * from ods.ods_trade_product_info").show
val df: DataFrame = spark.table("ods.ods_trade_product_info") df.show()
df.write.mode(SaveMode.Append).saveAsTable("ods.ods_trade_product_info_back") spark.table("ods.ods_trade_product_info_back").show
spark.close() }}
复制代码


发布于: 刚刚阅读数: 3
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-96 SparkSQL 语句详解:从 DataFrame 到 SQL 查询与 Hive 集成全解析_Java_武子康_InfoQ写作社区