大数据 -95 Spark 集群 SparkSQL Action 与 Transformation 操作 详细解释与测试案例

点一下关注吧!!!非常感谢!!持续更新!!!
🚀 AI 篇持续更新中!(长期更新)
AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖
💻 Java 篇正式开启!(300 篇)
目前 2025 年 09 月 08 日更新到:Java-118 深入浅出 MySQL ShardingSphere 分片剖析:SQL 支持范围、限制与优化实践 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!
📊 大数据板块已完成多项干货更新(300 篇):
包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解

章节内容
上节完成的内容如下:
SparkSession
RDD、DataFrame、DataSet
三者之间互相转换 详细解释
核心操作
Transformation(转换操作)
定义详解
Transformation 是 Spark 中一类重要的操作类型,其核心特点是"懒执行"(Lazy Evaluation)。这意味着当我们在代码中调用 Transformation 操作时,Spark 不会立即执行实际的计算任务,而是会:
记录下操作逻辑和依赖关系
构建一个逻辑执行计划(DAG,有向无环图)
返回一个新的数据集(RDD/DataFrame)表示转换后的数据形态
这种延迟执行机制带来了多个优势:
允许 Spark 优化整个执行计划
减少不必要的中间结果存储
提高整体执行效率
只有当遇到 Action 操作(如 count()、collect()等)时,Spark 才会触发整个执行计划的运算。
常见操作详解
select()
功能:从 DataFrame 中选择特定的列
参数说明:
可以接受列名的字符串列表
也可以通过 Column 对象指定
示例场景:
filter()
功能:根据条件过滤行数据
参数说明:
接受一个布尔表达式作为过滤条件
执行特点:
会生成一个新的 DataFrame
不会改变原 DataFrame
示例场景:
join()
功能:合并两个 DataFrame
常见 join 类型:
inner join(内连接)
outer join(外连接)
left join(左连接)
right join(右连接)
cross join(交叉连接)
关键参数:
连接条件(on)
连接类型(how)
示例场景:
groupBy()
功能:按照指定列对数据进行分组
典型使用方式:
通常与聚合函数配合使用
可以指定一个或多个分组列
执行特点:
返回 GroupedData 对象
需要配合聚合操作使用
示例场景:
agg()
功能:执行聚合操作
常见聚合函数:
count()
sum()
avg()
max()
min()
first()
last()
参数形式:
可以接受字典形式指定各列聚合方式
也可以使用表达式形式
示例场景:
性能优化提示
尽量避免在 Transformation 中执行数据收集操作
合理选择 join 策略(广播 join 等)
注意数据倾斜问题,特别是在 groupBy 操作时
合理使用缓存策略(cache/persist)减少重复计算
Action(行动操作)
定义:Action 操作会触发 Spark 的计算并返回结果。与 Transformation 不同,Action 操作会执行整个计算逻辑,并产生最终的输出,如将结果写入外部存储或将数据返回给驱动程序。
常见操作:
show(): 显示 DataFrame 的内容。
collect(): 将 DataFrame 的数据收集到驱动程序上,作为本地集合返回。
count(): 计算 DataFrame 中的行数。
write(): 将 DataFrame 的数据写入外部存储(如 HDFS、S3、数据库等)。
take(): 返回 DataFrame 的前 n 行数据。
Action 操作
与 RDD 类似的操作
show
collect
collectAsList
head
first
count
take
takeAsList
reduce
与结构相关
printSchema
explain
columns
dtypes
col
生成数据
保存并上传到服务器上
写入内容如下图所示:

测试运行
我们进入 spark-shell 进行测试
执行结果如下图所示:

继续进行测试:
运行结果如下图所示:

继续进行测试:
运行结果如下图所示:

继续进行测试:
运行结果如下图所示:

Transformation 操作
RDD 类似的操作
持久化/缓存 与 checkpoint
select
where
group by / 聚合
order by
join
集合操作
空值操作(函数)
函数
与 RDD 类似的操作
map
filter
flatMap
mapPartitions
sample
randomSplit
limt
distinct
dropDuplicates
describe
我们进行测试:
测试结果如下图:

我们继续进行测试:
执行结果如下图:

存储相关
cacheTable
persist
checkpoint
unpersist
cache
备注:Dataset 默认的存储级别是 MEMEORY_AND_DISK
执行结果如下图所示:

select 相关
列的多种表示
select
selectExpr
启动 Spark-Shell 继续进行测试
执行结果如下图所示:

继续进行测试
运行结果如下图所示:

where 相关
接着对上述内容进行测试:
运行测试结果如下图:

groupBy 相关
groupBy
agg
max
min
avg
sum
count
进行测试:
orderBy 相关
orderBy == sort
运行测试的结果如下图所示:

继续进行测试:
测试结果如下图所示:

JOIN 相关
运行的测试结果如下图所示:

这里编写两个 case:
运行测试的结果如下图所示:

接下来我们进行连表操作:
测试的运行结果如下图所示:


集合相关
运行结果如下图所示:

空值处理
版权声明: 本文为 InfoQ 作者【武子康】的原创文章。
原文链接:【http://xie.infoq.cn/article/9ac77f7b8e6d9514a4fd3b75c】。文章转载请联系作者。
评论