Spark 窗口函数 I

用户头像
马小宝
关注
发布于: 2020 年 07 月 26 日
Spark  窗口函数 I

在大数据分析和挖掘过程中,经常会计算些指标,有些指标可能需要进行很复杂的计算。在这过程中,经常会用到窗口分析函数来操作数据并计算。

Spark 中的窗口函数,跟其他(例如:Hive)支持窗口函数的都很类似。在 Spark 中支持两种方式使用窗口函数:





本文将首先介绍窗口函数的概念,然后讨论如何在 Spark SQL 和 Spark DataFrame API 中使用它们。本文涉及的 DataFrame API ,都使用 pyspark 的代码进行展示。



- 窗口函数(Window Functions) -



窗口函数是一种特殊的聚合函数。当您应用它们时,原表的行不会改变(普通的聚合函数聚合后,行数一般都会小于原表),新值将作为新列添加到其中。



import pyspark.sql.functions as F
from pyspark.sql import Window
product_revenue_data = [
("Thin", "Cell Phone", 6000),
("Normal", "Cell Phone", 1500),
("Mini", "Tablet", 5500),
("Ultra thin", "Cell Phone", 5000),
("Very thin", "Cell Phone", 6000),
("Big", "Tablet", 2500),
("Bendable", "Cell Phone", 3000),
("Foldable", "Cell Phone", 3000),
("Pro", "Tablet", 4500),
("Pro2", "Tablet", 6500)
]
product_revenue = spark.createDataFrame(product_revenue_data,
schema=["product", "category", "revenue"])
product_revenue.show()



+----------+----------+-------+
| product| category|revenue|
+----------+----------+-------+
| Thin|Cell Phone| 6000|
| Normal|Cell Phone| 1500|
| Mini| Tablet| 5500|
|Ultra thin|Cell Phone| 5000|
| Very thin|Cell Phone| 6000|
| Big| Tablet| 2500|
| Bendable|Cell Phone| 3000|
| Foldable|Cell Phone| 3000|
| Pro| Tablet| 4500|
| Pro2| Tablet| 6500|
+----------+----------+-------+




然后,通过 select 方法内部定义了一个典型的窗口函数:

product_revenue.select(
'product',
'category',
'revenue',
F.sum("revenue").over(Window.partitionBy('category').orderBy('revenue')).alias("cn")
).show()



+----------+----------+-------+-----+
| product| category|revenue| cn|
+----------+----------+-------+-----+
| Big| Tablet| 2500| 2500|
| Pro| Tablet| 4500| 7000|
| Mini| Tablet| 5500|12500|
| Pro2| Tablet| 6500|19000|
| Normal|Cell Phone| 1500| 1500|
| Bendable|Cell Phone| 3000| 7500|
| Foldable|Cell Phone| 3000| 7500|
|Ultra thin|Cell Phone| 5000|12500|
| Thin|Cell Phone| 6000|24500|
| Very thin|Cell Phone| 6000|24500|
+----------+----------+-------+-----+



在结果中,cn 是使用窗口函数生成的新列。可以发现,虽然是聚合统计,但返回新表的行数和原表的行数是相等的。

不过与聚合一样,一个完整的窗口分析语句也由两个部分组成:分区条件和窗口函数。例如上面语句:





- 分区条件 -



在 SQL 中,PARTITION BY 关键字用于指定分区,ORDER BY 关键字用于指定排序。SQL语法如下所示:

OVER (PARTITION BY ... ORDER BY ...)

在 DataFrame API 中,Spark 提供了相应的函数。可以如下指定分区和排序:

from pyspark.sql.window import Window
windowSpec = Window.partitionBy(...).orderBy(...)

除了排序和分区外,还可以定义数据的开始边界、结束边界和类型。Spark中定义了5种类型的边界:





<value> PRECEDING和<value> FOLLOWING 表示距离当前行(也就是 CURRENT ROW 行)位置的偏移。对于偏移的方式,SPARK中支持了两种类型的方式,分别是 ROW 和 RANGE。在不同的偏移方式中,<value>和后面的 PRECEDING、FOLLOWING 表示不同的含义。



ROW



ROW 表示的是以行的方式进行偏移,然后确定边界范围。

在是ROW方式中,<value> PRECEDING和<value> FOLLOWING 描述分别在当前输入行之前和之后出现的行数,用<value> 指定具体偏移行数。 

下图展示了以 1 PRECEDING 作为开始边界和 1 FOLLOWING 作为结束边界的 ROW 方式偏移后的边界范围:





RANGE



RANGE 表示的以逻辑的方式进行偏移,然后再确定边界范围。逻辑偏移是排序后当前输入行的值与边界 <value> 指定值之间的差,来指定偏移。单纯看文字解释 RANGE,会很让人迷惑。现在举一个例子,来更好的说明 RANGE 的逻辑偏移。

在此示例中,排序列是 revenue,起始边界定位为 2000 PRECEDING,结束边界定为 1000 FOLLOWING。下面五个图说明了如何使用当前输入行RANGE偏移和确定边界范围:











基本上,对于每个当前输入行的 revenue 值,最终得到边界范围是[当前行的 revenue 值-2000,当前行的 revenue 值+ 1000]。revenue的值在此范围内的所有行都在当前的边界范围内。

总之,要定义窗口规范,用户可以在SQL中使用以下语法。

OVER (PARTITION BY ... ORDER BY ... [frame_type] BETWEEN [start] AND [end])



在实际使用中,[frame_type] 用ROWS或RANGE替换。

[start] 可以是UNBOUNDED PRECEDING、CURRENT ROW、<value> PRECEDING 和 <value> FOLLOWING 中的任何一个。

[end]可以是UNBOUNDED FOLLOWING、CURRENT ROW、<value> PRECEDING 和 <value> FOLLOWING 中的任何一个。

在 Spark DataFrame API 中,使用方法如下:

from pyspark.sql.window import Window
windowSpec = Window.partitionBy(...).orderBy(...)
windowSpec.rowsBetween(start, end)
# OR
windowSpec.rangeBetween(start, end)

上面主要总结了分区条件。分区条件要与具体的窗口函数一起结合使用才能构成一个完成的窗口分析函数语句。

后面的文章中会详细总结Spark中的窗口函数的作用和使用方法。



- End -



参考:

http://spark.apache.org/

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html



用户头像

马小宝

关注

还未添加个人签名 2018.01.03 加入

高级数据挖掘

评论

发布
暂无评论
Spark  窗口函数 I