大数据开发之 SparkSQL 面试篇
1.谈谈你对 Spark SQL 的理解
Spark SQL 是一个用来处理结构化数据的 Spark 组件,前身是 shark,但是 shark 过多的依赖于 hive 如采用 hive 的语法解析器、查询优化器等,制约了 Spark 各个组件之间的相互集成,因此 Spark SQL 应运而生大数据培训。
Spark SQL 在汲取了 shark 诸多优势如内存列存储、兼容 hive 等基础上,做了重新的构造,因此也摆脱了对 hive 的依赖,但同时兼容 hive。除了采取内存列存储优化性能,还引入了字节码生成技术、CBO 和 RBO 对查询等进行动态评估获取最优逻辑计划、物理计划执行等。基于这些优化,使得 Spark SQL 相对于原有的 SQL on Hadoop 技术在性能方面得到有效提升。
同时,Spark SQL 支持多种数据源,如 JDBC、HDFS、HBase。它的内部组件,如 SQL 的语法解析器、分析器等支持重定义进行扩展,能更好的满足不同的业务场景。与 Spark Core 无缝集成,提供了 DataSet/DataFrame 的可编程抽象数据模型,并且可被视为一个分布式的 SQL 查询引擎。
2.谈谈你对 DataSet/DataFrame 的理解
DataSet/DataFrame 都是 Spark SQL 提供的分布式数据集,相对于 RDD 而言,除了记录数据以外,还记录表的 schema 信息。
DataSet 是自 Spark1.6 开始提供的一个分布式数据集,具有 RDD 的特性比如强类型、可以使用强大的 lambda 表达式,并且使用 Spark SQL 的优化执行引擎。DataSet API 支持 Scala 和 Java 语言,不支持 Python。但是鉴于 Python 的动态特性,它仍然能够受益于 DataSet API(如,你可以通过一个列名从 Row 里获取这个字段 row.columnName),类似的还有 R 语言。
DataFrame 是 DataSet 以命名列方式组织的分布式数据集,类似于 RDBMS 中的表,或者 R 和 Python 中的 data frame。DataFrame API 支持 Scala、Java、Python、R。在 Scala API 中,DataFrame 变成类型为 Row 的 Dataset:type DataFrame = Dataset[Row]。
DataFrame 在编译期不进行数据中字段的类型检查,在运行期进行检查。但 DataSet 则与之相反,因为它是强类型的。此外,二者都是使用 catalyst 进行 sql 的解析和优化。为了方便,以下统一使用 DataSet 统称。
DataSet 创建
DataSet 通常通过加载外部数据或通过 RDD 转化创建。
1.加载外部数据 以加载 json 和 mysql 为例:val ds = sparkSession.read.json("/路径/people.json")
val ds = sparkSession.read.format("jdbc").options(Map("url" -> "jdbc:mysql://ip:port/db","driver" -> "com.mysql.jdbc.Driver","dbtable" -> "tableName", "user" -> "root", "root" -> "123")).load()
2.RDD 转换为 DataSet 通过 RDD 转化创建 DataSet,关键在于为 RDD 指定 schema,通常有两种方式(伪代码):1.定义一个 case class,利用反射机制来推断
从 HDFS 中加载文件为普通 RDDval lineRDD = sparkContext.textFile("hdfs://ip:port/person.txt").map(_.split(" "))
定义 case class(相当于表的 schema)case class Person(id:Int, name:String, age:Int)
将 RDD 和 case class 关联 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
将 RDD 转换成 DataFrameval ds= personRDD.toDF
2.手动定义一个 schema StructType,直接指定在 RDD 上
val schemaString ="name age"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRdd = peopleRdd.map(p=>Row(p(0),p(1)))
val ds = sparkSession.createDataFrame(rowRdd,schema)
操作 DataSet 的两种风格语法
DSL 语法
1.查询 DataSet 部分列中的内容
personDS.select(col("name"))personDS.select(col("name"), col("age"))
2.查询所有的 name 和 age 和 salary,并将 salary 加 1000
personDS.select(col("name"), col("age"), col("salary") + 1000)personDS.select(personDS("name"), personDS("age"), personDS("salary") + 1000)
3.过滤 age 大于 18 的
personDS.filter(col("age") > 18)
4.按年龄进行分组并统计相同年龄的人数
personDS.groupBy("age").count()
注意:直接使用 col 方法需要 import org.apache.spark.sql.functions._
SQL 语法
如果想使用 SQL 风格的语法,需要将 DataSet 注册成表
personDS.registerTempTable("person")
//查询年龄最大的前两名 val result = sparkSession.sql("select * from person order by age desc limit 2")//保存结果为 json 文件。注意:如果不指定存储格式,则默认存储为 parquetresult.write.format("json").save("hdfs://ip:port/res2")
3.说说 Spark SQL 的几种使用方式 1.sparksql-shell 交互式查询就是利用 Spark 提供的 shell 命令行执行 SQL
2.编程首先要获取 Spark SQL 编程"入口":SparkSession(当然在早期版本中大家可能更熟悉的是 SQLContext,如果是操作 hive 则为 HiveContext)。这里以读取 parquet 为例:
val spark = SparkSession.builder().appName("example").master("local[*]").getOrCreate();val df = sparkSession.read.format("parquet").load("/路径/parquet 文件")
然后就可以针对 df 进行业务处理了。
3.Thriftserverbeeline 客户端连接操作
启动 spark-sql 的 thrift 服务,sbin/start-thriftserver.sh,大数据培训启动脚本中配置好 Spark 集群服务资源、地址等信息。然后通过 beeline 连接 thrift 服务进行数据处理。hive-jdbc 驱动包来访问 spark-sql 的 thrift 服务 在项目 pom 文件中引入相关驱动包,跟访问 mysql 等 jdbc 数据源类似。示例:
Class.forName("org.apache.hive.jdbc.HiveDriver")val conn = DriverManager.getConnection("jdbc:hive2://ip:port", "root", "123");try {val stat = conn.createStatement()val res = stat.executeQuery("select * from people limit 1")while (res.next()) {println(res.getString("name"))}} catch {case e: Exception => e.printStackTrace()} finally{if(conn!=null) conn.close()}
4.说说 Spark SQL 获取 Hive 数据的方式
Spark SQL 读取 hive 数据的关键在于将 hive 的元数据作为服务暴露给 Spark。除了通过上面 thriftserver jdbc 连接 hive 的方式,也可以通过下面这种方式:
首先,配置 $HIVE_HOME/conf/hive-site.xml,增加如下内容:
<property><name>hive.metastore.uris</name><value>thrift://ip:port</value></property>
然后,启动 hive metastore
最后,将 hive-site.xml 复制或者软链到 $SPARK_HOME/conf/。如果 hive 的元数据存储在 mysql 中,那么需要将 mysql 的连接驱动 jar 包如 mysql-connector-java-5.1.12.jar 放到 $SPARK_HOME/lib/下,启动 spark-sql 即可操作 hive 中的库和表。而此时使用 hive 元数据获取 SparkSession 的方式为:
val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
5.分别说明 UDF、UDAF、Aggregator
UDF UDF 是最基础的用户自定义函数,以自定义一个求字符串长度的 udf 为例:val udf_str_length = udf{(str:String) => str.length}spark.udf.register("str_length",udf_str_length)val ds =sparkSession.read.json("路径/people.json")ds.createOrReplaceTempView("people")sparkSession.sql("select str_length(address) from people")UDAF 定义 UDAF,需要继承抽象类 UserDefinedAggregateFunction,它是弱类型的,下面的 aggregator 是强类型的。以求平均数为例:import org.apache.spark.sql.{Row, SparkSession}import org.apache.spark.sql.expressions.MutableAggregationBufferimport org.apache.spark.sql.expressions.UserDefinedAggregateFunctionimport org.apache.spark.sql.types._
object MyAverage extends UserDefinedAggregateFunction {// Data types of input arguments of this aggregate functiondef inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)// Data types of values in the aggregation bufferdef bufferSchema: StructType = {StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)}// The data type of the returned valuedef dataType: DataType = DoubleType// Whether this function always returns the same output on the identical inputdef deterministic: Boolean = true// Initializes the given aggregation buffer. The buffer itself is a Row
that in addition to// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides// the opportunity to update its values. Note that arrays and maps inside the buffer are still// immutable.def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}// Updates the given aggregation buffer buffer
with new input data from input
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {if (!input.isNullAt(0)) {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}}// Merges two aggregation buffers and stores the updated buffer values back to buffer1
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}// Calculates the final resultdef evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)}
// Register the function to access itspark.udf.register("myAverage", MyAverage)
val df = spark.read.json("examples/src/main/resources/employees.json")df.createOrReplaceTempView("employees")df.show()val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")result.show()
Aggregatorimport org.apache.spark.sql.{Encoder, Encoders, SparkSession}import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long)case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {// A zero value for this aggregation. Should satisfy the property that any b + zero = bdef zero: Average = Average(0L, 0L)// Combine two values to produce a new value. For performance, the function may modify buffer
// and return it instead of constructing a new objectdef reduce(buffer: Average, employee: Employee): Average = {buffer.sum += employee.salarybuffer.count += 1buffer}// Merge two intermediate valuesdef merge(b1: Average, b2: Average): Average = {b1.sum += b2.sumb1.count += b2.countb1}// Transform the output of the reductiondef finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count// Specifies the Encoder for the intermediate value typedef bufferEncoder: Encoder[Average] = Encoders.product// Specifies the Encoder for the final output value typedef outputEncoder: Encoder[Double] = Encoders.scalaDouble}
val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]ds.show()// Convert the function to a TypedColumn
and give it a nameval averageSalary = MyAverage.toColumn.name("average_salary")val result = ds.select(averageSalary)result.show()
6.对比一下 Spark SQL 与 HiveSQL
7.说说 Spark SQL 解析查询 parquet 格式 Hive 表如何获取分区字段和查询条件
问题现象
sparksql 加载指定 Hive 分区表路径,生成的 DataSet 没有分区字段。如,sparkSession.read.format("parquet").load(s"${hive_path}"),hive_path 为 Hive 分区表在 HDFS 上的存储路径。
hive_path 的几种指定方式会导致这种情况的发生(test_partition 是一个 Hive 外部分区表,dt 是它的分区字段,分区数据有 dt 为 20200101 和 20200102):
1.hive_path 为"/spark/dw/test.db/test_partition/dt=20200101"2.hive_path 为"/spark/dw/test.db/test_partition/*"
因为牵涉到的源码比较多,这里仅以示例的程序中涉及到的源码中的 class、object 和方法,绘制成 xmind 图如下,想细心研究的可以参考该图到 spark 源码中进行分析。
问题分析
这里主要给出几个源码段,结合上述 xmind 图理解:
在没有指定参数 basePath 的情况下:
1.hive_path 为/spark/dw/test.db/test_partition/dt=20200101
sparksql 底层处理后得到的 basePaths: Set(new Path(“/spark/dw/test.db/test_partition/dt=20200101”))【伪代码】
leafDirs: Seq(new Path(“/spark/dw/test.db/test_partition/dt=20200101”))【伪代码】
2.hive_path 为/spark/dw/test.db/test_partition/*
sparksql 底层处理后得到的 basePaths: Set(new Path(“/spark/dw/test.db/test_partition/dt=20200101”),new Path(“/spark/dw/test.db/test_partition/dt=20200102”))【伪代码】
leafDirs: Seq(new Path(“/spark/dw/test.db/test_partition/dt=20200101”),new Path(“/spark/dw/test.db/test_partition/dt=20200102”))【伪代码】
这两种情况导致源码 if(basePaths.contains(currentPath))为 true,还没有解析分区就重置变量 finished 为 true 跳出循环,因此最终生成的结果也就没有分区字段:
解决方案
1.在 Spark SQL 加载 Hive 表数据路径时,指定参数 basePath,如 sparkSession.read.option("basePath","/spark/dw/test.db/test_partition")2.主要重写 basePaths 方法和 parsePartition 方法中的处理逻辑,同时需要修改其他涉及的代码。由于涉及需要改写的代码比较多,可以封装成工具
8.说说你对 Spark SQL 小文件问题处理的理解
在生产中,无论是通过 SQL 语句或者 Scala/Java 等代码的方式使用 Spark SQL 处理数据,在 Spark SQL 写数据时,往往会遇到生成的小文件过多的问题,而管理这些大量的小文件,是一件非常头疼的事情。
大量的小文件会影响 Hadoop 集群管理或者 Spark 在处理数据时的稳定性:
1.Spark SQL 写 Hive 或者直接写入 HDFS,过多的小文件会对 NameNode 内存管理等产生巨大的压力,会影响整个集群的稳定运行
2.容易导致 task 数过多,如果超过参数 spark.driver.maxResultSize 的配置(默认 1g),会抛出类似如下的异常,影响任务的处理
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 478 tasks (2026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
当然可以通过调大 spark.driver.maxResultSize 的默认配置来解决问题,但如果不能从源头上解决小文件问题,以后还可能遇到类似的问题。此外,Spark 在处理任务时,一个分区分配一个 task 进行处理,多个分区并行处理,虽然并行处理能够提高处理效率,但不是意味着 task 数越多越好。如果数据量不大,过多的 task 运行反而会影响效率。最后,Spark 中一个 task 处理一个分区从而也会影响最终生成的文件数。
在数仓建设中,产生小文件过多的原因有很多种,比如:
1.流式处理中,每个批次的处理执行保存操作也会产生很多小文件 2.为了解决数据更新问题,同一份数据保存了不同的几个状态,也容易导致文件数过多
那么如何解决这种小文件的问题呢?
1.通过 repartition 或 coalesce 算子控制最后的 DataSet 的分区数 注意 repartition 和 coalesce 的区别 2.将 Hive 风格的 Coalesce and Repartition Hint 应用到 Spark SQL 需要注意这种方式对 Spark 的版本有要求,建议在 Spark2.4.X 及以上版本使用,示例:INSERT ... SELECT /*+ COALESCE(numPartitions) / ...INSERT ... SELECT /+ REPARTITION(numPartitions) */ ...3.小文件定期合并可以定时通过异步的方式针对 Hive 分区表的每一个分区中的小文件进行合并操作上述只是给出 3 种常见的解决办法,并且要结合实际用到的技术和场景去具体处理,比如对于 HDFS 小文件过多,也可以通过生成 HAR 文件或者 Sequence File 来解决。
9.SparkSQL 读写 Hive metastore Parquet 遇到过什么问题吗?
Spark SQL 为了更好的性能,在读写 Hive metastore parquet 格式的表时,会默认使用自己的 Parquet SerDe,而不是采用 Hive 的 SerDe 进行序列化和反序列化。该行为可以通过配置参数 spark.sql.hive.convertMetastoreParquet 进行控制,默认 true。
这里从表 schema 的处理角度而言,就必须注意 Hive 和 Parquet 兼容性,主要有两个区别:1.Hive 是大小写敏感的,但 Parquet 相反 2.Hive 会将所有列视为 nullable,但是 nullability 在 parquet 里有独特的意义
由于上面的原因,在将 Hive metastore parquet 转化为 Spark SQL parquet 时,需要兼容处理一下 Hive 和 Parquet 的 schema,即需要对二者的结构进行一致化。主要处理规则是:
1.有相同名字的字段必须要有相同的数据类型,忽略 nullability。兼容处理的字段应该保持 Parquet 侧的数据类型,这样就可以处理到 nullability 类型了(空值问题) 2.兼容处理的 schema 应只包含在 Hive 元数据里的 schema 信息,主要体现在以下两个方面:(1)只出现在 Parquet schema 的字段会被忽略 (2)只出现在 Hive 元数据里的字段将会被视为 nullable,并处理到兼容后的 schema 中
关于 schema(或者说元数据 metastore),Spark SQL 在处理 Parquet 表时,同样为了更好的性能,会缓存 Parquet 的元数据信息。此时,如果直接通过 Hive 或者其他工具对该 Parquet 表进行修改导致了元数据的变化,那么 Spark SQL 缓存的元数据并不能同步更新,此时需要手动刷新 Spark SQL 缓存的元数据,来确保元数据的一致性,方式如下:
// 第一种方式应用的比较多
sparkSession.catalog.refreshTable(s"${dbName.tableName}")
sparkSession.catalog.refreshByPath(s"${path}")
10.说说 Spark SQL 如何选择 join 策略在了解 join 策略选择之前,首先看几个先决条件:
build table 的选择 Hash Join 的第一步就是根据两表之中较小的那一个构建哈希表,这个小表就叫做 build table,大表则称为 probe table,因为需要拿小表形成的哈希表来"探测"它。源码如下:
/* 左表作为 build table 的条件,join 类型需满足:
InnerLike:实现目前包括 inner join 和 cross join
RightOuter:right outer join*/
private def canBuildLeft(joinType: JoinType): Boolean = joinType match {case _: InnerLike | RightOuter => truecase _ => false}
/* 右表作为 build table 的条件,join 类型需满足(第 1 种是在业务开发中写的 SQL 主要适配的):
InnerLike、LeftOuter(left outer join)、LeftSemi(left semi join)、LeftAnti(left anti join)
ExistenceJoin:only used in the end of optimizer and physical plans, we will not generate SQL for this join type*/private def canBuildRight(joinType: JoinType): Boolean = joinType match {case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => truecase _ => false}
满足什么条件的表才能被广播如果一个表的大小小于或等于参数 spark.sql.autoBroadcastJoinThreshold(默认 10M)配置的值,那么就可以广播该表。源码如下:
private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean = {val buildLeft = canBuildLeft(joinType) && canBroadcast(left)val buildRight = canBuildRight(joinType) && canBroadcast(right)buildLeft || buildRight}
private def canBroadcast(plan: LogicalPlan): Boolean = {plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold}
private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): BuildSide = {val buildLeft = canBuildLeft(joinType) && canBroadcast(left)val buildRight = canBuildRight(joinType) && canBroadcast(right)
// 最终会调用 broadcastSidebroadcastSide(buildLeft, buildRight, left, right)}
除了通过上述表的大小满足一定条件之外,我们也可以通过直接在 Spark SQL 中显示使用 hint 方式(/+ BROADCAST(small_table) /),直接指定要广播的表,源码如下:
private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean = {val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcastval buildRight = canBuildRight(joinType) && right.stats.hints.broadcastbuildLeft || buildRight}
private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): BuildSide = {val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcastval buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
// 最终会调用 broadcastSidebroadcastSide(buildLeft, buildRight, left, right)}
无论是通过表大小进行广播还是根据是否指定 hint 进行表广播,最终都会调用 broadcastSide,来决定应该广播哪个表:
private def broadcastSide(canBuildLeft: Boolean,canBuildRight: Boolean,left: LogicalPlan,right: LogicalPlan): BuildSide = {
def smallerSide =if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft
if (canBuildRight && canBuildLeft) {// 如果左表和右表都能作为 build table,则将根据表的统计信息,确定 physical size 较小的表作为 build table(即使两个表都被指定了 hint)smallerSide} else if (canBuildRight) {// 上述条件不满足,优先判断右表是否满足 build 条件,满足则广播右表。否则,接着判断左表是否满足 build 条件 BuildRight} else if (canBuildLeft) {BuildLeft} else {// 如果左表和右表都不能作为 build table,则将根据表的统计信息,确定 physical size 较小的表作为 build table。目前主要用于 broadcast nested loop joinsmallerSide}}
从上述源码可知,即使用户指定了广播 hint,实际执行时,不一定按照 hint 的表进行广播。
是否可构造本地 HashMap 应用于 Shuffle Hash Join 中,源码如下:
// 逻辑计划的单个分区足够小到构建一个 hash 表// 注意:要求分区数是固定的。如果分区数是动态的,还需满足其他条件 private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {// 逻辑计划的 physical size 小于 spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions(默认 200)时,即可构造本地 HashMapplan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions}
SparkSQL 目前主要实现了 3 种 join:Broadcast Hash Join、ShuffledHashJoin、Sort Merge Join。那么 Catalyst 在处理 SQL 语句时,是依据什么规则进行 join 策略选择的呢?
Broadcast Hash Join
主要根据 hint 和 size 进行判断是否满足条件。
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)if canBroadcastByHints(joinType, left, right) =>val buildSide = broadcastSideByHints(joinType, left, right)Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// broadcast hints were not specified, so need to infer it from size and configuration.case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)if canBroadcastBySizes(joinType, left, right) =>val buildSide = broadcastSideBySizes(joinType, left, right)Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
Shuffle Hash Join
选择 Shuffle Hash Join 需要同时满足以下条件:
spark.sql.join.preferSortMergeJoin 为 false,即 Shuffle Hash Join 优先于 Sort Merge Join 右表或左表是否能够作为 build table 是否能构建本地 HashMap 以右表为例,它的逻辑计划大小要远小于左表大小(默认 3 倍)上述条件优先检查右表。
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)&& muchSmaller(right, left) ||!RowOrdering.isOrderable(leftKeys) =>Seq(joins.ShuffledHashJoinExec(leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)if !conf.preferSortMergeJoin && canBuildLeft(joinType) && uildLocalHashMap(left)&& muchSmaller(left, right) ||!RowOrdering.isOrderable(leftKeys) =>Seq(joins.ShuffledHashJoinExec(leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes}
如果不满足上述条件,但是如果参与 join 的表的 key 无法被排序,即无法使用 Sort Merge Join,最终也会选择 Shuffle Hash Join。
!RowOrdering.isOrderable(leftKeys)
def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType))
Sort Merge Join
如果上面两种 join 策略(Broadcast Hash Join 和 Shuffle Hash Join)都不符合条件,并且参与 join 的 key 是可排序的,就会选择 Sort Merge Join。
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)if RowOrdering.isOrderable(leftKeys) =>joins.SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
Without joining keys
Broadcast Hash Join、Shuffle Hash Join 和 Sort Merge Join 都属于经典的 ExtractEquiJoinKeys(等值连接条件)。
对于非 ExtractEquiJoinKeys,则会优先检查表是否可以被广播(hint 或者 size)。如果可以,则会使用 BroadcastNestedLoopJoin(简称 BNLJ),熟悉 Nested Loop Join 则不难理解 BNLJ,主要却别在于 BNLJ 加上了广播表。
源码如下:
// Pick BroadcastNestedLoopJoin if one side could be broadcastcase j @ logical.Join(left, right, joinType, condition)if canBroadcastByHints(joinType, left, right) =>val buildSide = broadcastSideByHints(joinType, left, right)joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
case j @ logical.Join(left, right, joinType, condition)if canBroadcastBySizes(joinType, left, right) =>val buildSide = broadcastSideBySizes(joinType, left, right)joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
如果表不能被广播,又细分为两种情况:
若 join 类型 InnerLike(关于 InnerLike 上面已有介绍)对量表直接进行笛卡尔积处理若上述情况都不满足,最终方案是选择两个表中 physical size 较小的表进行广播,join 策略仍为 BNLJ 源码如下:
// Pick CartesianProduct for InnerJoincase logical.Join(left, right, _: InnerLike, condition) =>joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
case logical.Join(left, right, joinType, condition) =>val buildSide = broadcastSide(left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)// This join could be very slow or OOMjoins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
11.讲讲 Spark SQL 中 Not in Subquery 为何低效以及如何规避
首先看个 Not in Subquery 的 SQL:
// test_partition1 和 test_partition2 为 Hive 外部分区表 select * from test_partition1 t1 where t1.id not in (select id from test_partition2);
对应的完整的逻辑计划和物理计划为:
== Parsed Logical Plan =='Project [*]+- 'Filter NOT 't1.id IN (list#3 []): +- 'Project ['id]: +- 'UnresolvedRelation test_partition2
+- 'SubqueryAlias t1
+- 'UnresolvedRelation test_partition1
== Analyzed Logical Plan ==id: string, name: string, dt: stringProject [id#4, name#5, dt#6]+- Filter NOT id#4 IN (list#3 []): +- Project [id#7]: +- SubqueryAlias default
.test_partition2
: +- HiveTableRelation default
.test_partition2
, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]+- SubqueryAlias t1
+- SubqueryAlias default
.test_partition1
+- HiveTableRelation default
.test_partition1
, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
== Optimized Logical Plan ==Join LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7))):- HiveTableRelation default
.test_partition1
, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]+- Project [id#7]+- HiveTableRelation default
.test_partition2
, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
== Physical Plan ==BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7))):- Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation default
.test_partition1
, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]+- BroadcastExchange IdentityBroadcastMode+- Scan hive default.test_partition2 [id#7], HiveTableRelation `default
通过上述逻辑计划和物理计划可以看出,Spark SQL 在对 not in subquery 处理,从逻辑计划转换为物理计划时,会最终选择 BroadcastNestedLoopJoin(对应到 Spark 源码中 BroadcastNestedLoopJoinExec.scala)策略。
提起 BroadcastNestedLoopJoin,不得不提 Nested Loop Join,它在很多 RDBMS 中得到应用,比如 mysql。它的工作方式是循环从一张表(outer table)中读取数据,然后访问另一张表(inner table,通常有索引),将 outer 表中的每一条数据与 inner 表中的数据进行 join,类似一个嵌套的循环并且在循环的过程中进行数据的比对校验是否满足一定条件。
对于被连接的数据集较小的情况下,Nested Loop Join 是个较好的选择。但是当数据集非常大时,从它的执行原理可知,效率会很低甚至可能影响整个服务的稳定性。
而 Spark SQL 中的 BroadcastNestedLoopJoin 就类似于 Nested Loop Join,只不过加上了广播表(build table)而已。
BroadcastNestedLoopJoin 是一个低效的物理执行计划,内部实现将子查询(select id from test_partition2)进行广播,然后 test_partition1 每一条记录通过 loop 遍历广播的数据去匹配是否满足一定条件。
private def leftExistenceJoin(// 广播的数据 relation: Broadcast[Array[InternalRow]],exists: Boolean): RDD[InternalRow] = {assert(buildSide == BuildRight)
/* streamed 对应物理计划中:Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation default
.test_partition1
, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]*/streamed.execute().mapPartitionsInternal { streamedIter =>val buildRows = relation.valueval joinedRow = new JoinedRow
// 条件是否定义。此处为 Some(((id#4 = id#7) || isnull((id#4 = id#7))))if (condition.isDefined) {streamedIter.filter(l =>// exists 主要是为了根据 joinType 来进一步条件判断数据的返回与否,此处 joinType 为 LeftAntibuildRows.exists(r => boundCondition(joinedRow(l, r))) == exists)
}}
由于 BroadcastNestedLoopJoin 的低效率执行,可能导致长时间占用 executor 资源,影响集群性能。同时,因为子查询的结果集要进行广播,如果数据量特别大,对 driver 端也是一个严峻的考验,极有可能带来 OOM 的风险。因此,在实际生产中,要尽可能利用其他效率相对高的 SQL 来避免使用 Not in Subquery。
虽然通过改写 Not in Subquery 的 SQL,进行低效率的 SQL 到高效率的 SQL 过渡,能够避免上面所说的问题。但是这往往建立在我们发现任务执行慢甚至失败,然后排查任务中的 SQL,发现"问题"SQL 的前提下。那么如何在任务执行前,就"检查"出这样的 SQL,从而进行提前预警呢?
这里给出一个思路,就是解析 Spark SQL 计划,根据 Spark SQL 的 join 策略匹配条件等,来判断任务中是否使用了低效的 Not in Subquery 进行预警,然后通知业务方进行修改。同时,我们在实际完成数据的 ETL 处理等分析时,也要事前避免类似的低性能 SQL。
12.说说 SparkSQL 中产生笛卡尔积的几种典型场景以及处理策略
Spark SQL 几种产生笛卡尔积的典型场景
首先来看一下在 Spark SQL 中产生笛卡尔积的几种典型 SQL:
join 语句中不指定 on 条件 select * from test_partition1 join test_partition2;join 语句中指定不等值连接 select * from test_partition1 t1 inner join test_partition2 t2 on t1.name <> t2.name;join 语句 on 中用 or 指定连接条件 select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id or t1.name = t2.name;join 语句 on 中用||指定连接条件 select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id || t1.name = t2.name;除了上述举的几个典型例子,实际业务开发中产生笛卡尔积的原因多种多样。同时需要注意,在一些 SQL 中即使满足了上述 4 种规则之一也不一定产生笛卡尔积。比如,对于 join 语句中指定不等值连接条件的下述 SQL 不会产生笛卡尔积:
--在 Spark SQL 内部优化过程中针对 join 策略的选择,最终会通过 SortMergeJoin 进行处理。select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.i
此外,对于直接在 SQL 中使用 cross join 的方式,也不一定产生笛卡尔积。比如下述 SQL:
-- Spark SQL 内部优化过程中选择了 SortMergeJoin 方式进行处理 select * from test_partition1 t1 cross join test_partition2 t2 on t1.id = t2.id;
但是如果 cross join 没有指定 on 条件同样会产生笛卡尔积。那么如何判断一个 SQL 是否产生了笛卡尔积呢?
Spark SQL 是否产生了笛卡尔积
以 join 语句不指定 on 条件产生笛卡尔积的 SQL 为例:
-- test_partition1 和 test_partition2 是 Hive 分区表 select * from test_partition1 join test_partition2;
通过 Spark UI 上 SQL 一栏查看上述 SQL 执行图,如下:
可以看出,因为该 join 语句中没有指定 on 连接查询条件,导致了 CartesianProduct 即笛卡尔积。
再来看一下该 join 语句的逻辑计划和物理计划:
可以看出,因为该 join 语句中没有指定 on 连接查询条件,导致了 CartesianProduct 即笛卡尔积。
再来看一下该 join 语句的逻辑计划和物理计划:
== Parsed Logical Plan =='GlobalLimit 1000+- 'LocalLimit 1000+- 'Project [*]+- 'UnresolvedRelation t
== Analyzed Logical Plan ==id: string, name: string, dt: string, id: string, name: string, dt: stringGlobalLimit 1000+- LocalLimit 1000+- Project [id#84, name#85, dt#86, id#87, name#88, dt#89]+- SubqueryAlias t
+- Project [id#84, name#85, dt#86, id#87, name#88, dt#89]+- Join Inner:- SubqueryAlias default
.test_partition1
: +- HiveTableRelation default
.test_partition1
, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]+- SubqueryAlias default
.test_partition2
+- HiveTableRelation default
.test_partition2
, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]
== Optimized Logical Plan ==GlobalLimit 1000+- LocalLimit 1000+- Join Inner:- HiveTableRelation default
.test_partition1
, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]+- HiveTableRelation default
.test_partition2
, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]
== Physical Plan ==CollectLimit 1000+- CartesianProduct:- Scan hive default.test_partition1 [id#84, name#85, dt#86], HiveTableRelation default
.test_partition1
, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]+- Scan hive default.test_partition2 [id#87, name#88, dt#89], HiveTableRelation default
.test_partition2
, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]
通过逻辑计划到物理计划,以及最终的物理计划选择 CartesianProduct,可以分析得出该 SQL 最终确实产生了笛卡尔积。
Spark SQL 中产生笛卡尔积的处理策略
Spark SQL 中主要有 ExtractEquiJoinKeys(Broadcast Hash Join、Shuffle Hash Join、Sort Merge Join,这 3 种是我们比较熟知的 Spark SQL join)和 Without joining keys(CartesianProduct、BroadcastNestedLoopJoin)join 策略。
那么,如何判断 SQL 是否产生了笛卡尔积就迎刃而解。
在利用 Spark SQL 执行 SQL 任务时,通过查看 SQL 的执行图来分析是否产生了笛卡尔积。如果产生笛卡尔积,则将任务杀死,进行任务优化避免笛卡尔积。【不推荐。用户需要到 Spark UI 上查看执行图,并且需要对 Spark UI 界面功能等要了解,需要一定的专业性。(注意:这里之所以这样说,是因为 Spark SQL 是计算引擎,面向的用户角色不同,用户不一定对 Spark 本身了解透彻,但熟悉 SQL。对于做平台的小伙伴儿,想必深有感触)】分析 Spark SQL 的逻辑计划和物理计划,通过程序解析计划推断 SQL 最终是否选择了笛卡尔积执行策略。如果是,及时提示风险。具体可以参考 Spark SQL join 策略选择的源码:def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {// --- BroadcastHashJoin --------------------------------------------------------------------// broadcast hints were specifiedcase ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)if canBroadcastByHints(joinType, left, right) =>val buildSide = broadcastSideByHints(joinType, left, right)Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))// broadcast hints were not specified, so need to infer it from size and configuration.case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)if canBroadcastBySizes(joinType, left, right) =>val buildSide = broadcastSideBySizes(joinType, left, right)Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))// --- ShuffledHashJoin ---------------------------------------------------------------------case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)&& muchSmaller(right, left) ||!RowOrdering.isOrderable(leftKeys) =>Seq(joins.ShuffledHashJoinExec(leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)&& muchSmaller(left, right) ||!RowOrdering.isOrderable(leftKeys) =>Seq(joins.ShuffledHashJoinExec(leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))// --- SortMergeJoin ------------------------------------------------------------case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)if RowOrdering.isOrderable(leftKeys) =>joins.SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil// --- Without joining keys ------------------------------------------------------------// Pick BroadcastNestedLoopJoin if one side could be broadcastcase j @ logical.Join(left, right, joinType, condition)if canBroadcastByHints(joinType, left, right) =>val buildSide = broadcastSideByHints(joinType, left, right)joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, condition) :: Nilcase j @ logical.Join(left, right, joinType, condition)if canBroadcastBySizes(joinType, left, right) =>val buildSide = broadcastSideBySizes(joinType, left, right)joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, condition) :: Nil// Pick CartesianProduct for InnerJoincase logical.Join(left, right, _: InnerLike, condition) =>joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nilcase logical.Join(left, right, joinType, condition) =>val buildSide = broadcastSide(left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)// This join could be very slow or OOMjoins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, condition) :: Nil// --- Cases where this strategy does not apply ---------------------------------------------case _ => Nil}
13.具体讲讲 Spark SQL/Hive 中的一些实用函数字符串函数
concat 对字符串进行拼接:concat(str1, str2, ..., strN) ,参数:str1、str2...是要进行拼接的字符串。
-- return the concatenation of str1、str2、..., strN-- SparkSQLselect concat('Spark', 'SQL');
concat_ws 在拼接的字符串中间添加某种分隔符:concat_ws(sep, [str | array(str)]+)。参数 1:分隔符,如 - ;参数 2:要拼接的字符串(可多个)
-- return the concatenation of the strings separated by sep-- Spark-SQLselect concat_ws("-", "Spark", "SQL");
encode 设置编码格式:encode(str, charset)。参数 1:要进行编码的字符串 ;参数 2:使用的编码格式,如 UTF-8
-- encode the first argument using the second argument character setselect encode("HIVE", "UTF-8");
decode 转码:decode(bin, charset)。参数 1:进行转码的 binary ;参数 2:使用的转码格式,如 UTF-8
-- decode the first argument using the second argument character setselect decode(encode("HIVE", "UTF-8"), "UTF-8");
format_string / printf 格式化字符串:format_string(strfmt, obj, ...)
-- returns a formatted string from printf-style format stringsselect format_string("Spark SQL %d %s", 100, "days");
initcap / lower / upper initcap:将每个单词的首字母转为大写,其他字母小写。单词之间以空白分隔。upper:全部转为大写。lower:全部转为小写。
-- Spark Sqlselect initcap("spaRk sql");
-- SPARK SQLselect upper("sPark sql");
-- spark sqlselect lower("Spark Sql");
length
返回字符串的长度。
-- 返回 4select length("Hive");
lpad / rpad
返回固定长度的字符串,如果长度不够,用某种字符进行补全。lpad(str, len, pad):左补全 rpad(str, len, pad):右补全 注意:如果参数 str 的长度大于参数 len,则返回的结果长度会被截取为长度为 len 的字符串
-- vehiselect lpad("hi", 4, "ve");
-- hiveselect rpad("hi", 4, "ve");
-- sparselect lpad("spark", 4, "ve");
trim / ltrim / rtrim
去除空格或者某种字符。trim(str) / trim(trimStr, str):首尾去除。ltrim(str) / ltrim(trimStr, str):左去除。rtrim(str) / rtrim(trimStr, str):右去除。
-- hiveselect trim(" hive ");
-- arkSQLSSELECT ltrim("Sp", "SSparkSQLS") as tmp;
regexp_extract
正则提取某些字符串
-- 2000select regexp_extract("1000-2000", "(\d+)-(\d+)", 2);
regexp_replace
正则替换
-- r-rselect regexp_replace("100-200", "(\d+)", "r");
repeat
repeat(str, n):复制给定的字符串 n 次
-- aaselect repeat("a", 2);
instr / locate
返回截取字符串的位置。如果匹配的字符串不存在,则返回 0
-- returns the (1-based) index of the first occurrence of substr in str.
-- 6select instr("SparkSQL", "SQL");
-- 0select locate("A", "fruit");
space 在字符串前面加 n 个空格
select concat(space(2), "A");
split split(str, regex):以某字符拆分字符串 split(str, regex)
-- ["one","two"]select split("one two", " ");
substr / substring_index
-- k SQLselect substr("Spark SQL", 5);
-- 从后面开始截取,返回 SQLselect substr("Spark SQL", -3);
-- kselect substr("Spark SQL", 5, 1);
-- org.apache。注意:如果参数 3 为负值,则从右边取值 select substring_index("org.apache.spark", ".", 2);
translate
替换某些字符为指定字符
-- The translate will happen when any character in the string matches the character in the matchingString
-- A1B2C3select translate("AaBbCc", "abc", "123");
JSON 函数
get_json_object-- v2select get_json_object('{"k1": "v1", "k2": "v2"}', '$.k2');
from_jsonselect tmp.k from (select from_json('{"k": "fruit", "v": "apple"}','k STRING, v STRING', map("","")) as tmp);
to_json-- 可以把所有字段转化为 json 字符串,然后表示成 value 字段 select to_json(struct(*)) AS value;
时间函数
current_date / current_timestamp 获取当前时间 select current_date;
select current_timestamp;
从日期时间中提取字段/格式化时间 1)year、month、day、dayofmonth、hour、minute、second-- 20select day("2020-12-20");
2)dayofweek(1 = Sunday, 2 = Monday, ..., 7 = Saturday)、dayofye
-- 7select dayofweek("2020-12-12");
3)weekofyear(date)
/**
Extracts the week number as an integer from a given date/timestamp/string.
A week is considered to start on a Monday and week 1 is the first week with more than 3 days,
as defined by ISO 8601
@return An integer, or null if the input was a string that could not be cast to a date
@group datetime_funcs
@since 1.5.0*/def weekofyear(e: Column): Column = withExpr { WeekOfYear(e.expr) }
-- 50select weekofyear("2020-12-12");
4)trunc 截取某部分的日期,其他部分默认为 01。第二个参数: YEAR、YYYY、YY、MON、MONTH、MM
-- 2020-01-01select trunc("2020-12-12", "YEAR");
-- 2020-12-01select trunc("2020-12-12", "MM");
5)date_trunc 参数:YEAR、YYYY、YY、MON、MONTH、MM、DAY、DD、HOUR、MINUTE、SECOND、WEEK、QUARTER
-- 2012-12-12 09:00:00select date_trunc("HOUR" ,"2012-12-12T09:32:05.359");
6)date_format 按照某种格式格式化时间
-- 2020-12-12select date_format("2020-12-12 12:12:12", "yyyy-MM-dd");
日期时间转换
1)unix_timestamp 返回当前时间的 unix 时间戳。
select unix_timestamp();
-- 1609257600select unix_timestamp("2020-12-30", "yyyy-MM-dd");
2)from_unixtime 将 unix epoch(1970-01-01 00:00:00 UTC)中的秒数转换为以给定格式表示当前系统时区中该时刻的时间戳的字符串。
select from_unixtime(1609257600, "yyyy-MM-dd HH:mm:ss");
3)to_unix_timestamp 将时间转化为时间戳。
-- 1609257600select to_unix_timestamp("2020-12-30", "yyyy-MM-dd");
4)to_date / date 将时间字符串转化为 date。
-- 2020-12-30select to_date("2020-12-30 12:30:00");select date("2020-12-30");
5)to_timestamp 将时间字符串转化为 timestamp。
select to_timestamp("2020-12-30 12:30:00");
6)quarter 从给定的日期/时间戳/字符串中提取季度。
-- 4select quarter("2020-12-30");
日期、时间计算
1)months_between(end, start) 返回两个日期之间的月数。参数 1 为截止时间,参数 2 为开始时间
-- 3.94959677select months_between("1997-02-28 10:30:00", "1996-10-30");
2)add_months 返回某日期后 n 个月后的日期。
-- 2020-12-28select add_months("2020-11-28", 1);
3)last_day(date) 返回某个时间的当月最后一天
-- 2020-12-31select last_day("2020-12-01");
4)next_day(start_date, day_of_week) 返回某时间后 the first date 基于 specified day of the week。参数 1:开始时间。参数 2:Mon、Tue、Wed、Thu、Fri、Sat、Sun。
-- 2020-12-07select next_day("2020-12-01", "Mon");
5)date_add(start_date, num_days)
返回指定时间增加 num_days 天后的时间
-- 2020-12-02select date_add("2020-12-01", 1);
6)datediff(endDate, startDate) 两个日期相差的天数
-- 3select datediff("2020-12-01", "2020-11-28");
7)关于 UTC 时间
-- to_utc_timestamp(timestamp, timezone) - Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in the given time zone, and renders that time as a timestamp in UTC. For example, 'GMT+1' would yield '2017-07-14 01:40:00.0'.
select to_utc_timestamp("2020-12-01", "Asia/Seoul") ;
-- from_utc_timestamp(timestamp, timezone) - Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1' would yield '2017-07-14 03:40:00.0'.
select from_utc_timestamp("2020-12-01", "Asia/Seoul");
常用的开窗函数
开窗函数格式通常满足:
function_name([argument_list]) OVER ( [PARTITION BY partition_expression,…] [ORDER BY sort_expression, … [ASC|DESC]])
function_name: 函数名称,比如 SUM()、AVG()
partition_expression:分区列
sort_expression:排序列
注意:以下举例涉及的表 employee 中字段含义:name(员工姓名)、dept_no(部门编号)、salary(工资)
cume_dist 如果按升序排列,则统计:小于等于当前值的行数/总行数(number of rows ≤ current row)/(total number of rows)。如果是降序排列,则统计:大于等于当前值的行数/总行数。用于累计统计。
lead(value_expr[,offset[,default]])用于统计窗口内往下第 n 行值。第一个参数为列名,第二个参数为往下第 n 行(可选,默认为 1),第三个参数为默认值(当往下第 n 行为 NULL 时候,取默认值,如不指定,则为 NULL)。
lag(value_expr[,offset[,default]])与 lead 相反,用于统计窗口内往上第 n 行值。第一个参数为列名,第二个参数为往上第 n 行(可选,默认为 1),第三个参数为默认值(当往上第 n 行为 NULL 时候,取默认值,如不指定,则为 NULL)。
first_value 取分组内排序后,截止到当前行,第一个值。
last_value 取分组内排序后,截止到当前行,最后一个值。
rank 对组中的数据进行排名,如果名次相同,则排名也相同,但是下一个名次的排名序号会出现不连续。比如查找具体条件的 topN 行。RANK() 排序为 (1,2,2,4)。
dense_rankdense_rank 函数的功能与 rank 函数类似,dense_rank 函数在生成序号时是连续的,而 rank 函数生成的序号有可能不连续。当出现名次相同时,则排名序号也相同。而下一个排名的序号与上一个排名序号是连续的。DENSE_RANK() 排序为 (1,2,2,3)。
SUM/AVG/MIN/MAX 数据:
id time pv1 2015-04-10 11 2015-04-11 31 2015-04-12 61 2015-04-13 31 2015-04-14 22 2015-05-15 82 2015-05-16 6
结果:
SELECT id,time,pv,SUM(pv) OVER(PARTITION BY id ORDER BY time) AS pv1, -- 默认为从起点到当前行 SUM(pv) OVER(PARTITION BY id ORDER BY time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS pv2, --从起点到当前行,结果同 pv1SUM(pv) OVER(PARTITION BY id) AS pv3, --分组内所有行 SUM(pv) OVER(PARTITION BY id ORDER BY time ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS pv4, --当前行+往前 3 行 SUM(pv) OVER(PARTITION BY id ORDER BY time ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) AS pv5, --当前行+往前 3 行+往后 1 行 SUM(pv) OVER(PARTITION BY id ORDER BY time ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS pv6 ---当前行+往后所有行
FROM data;
NTILENTILE(n),用于将分组数据按照顺序切分成 n 片,返回当前切片值。
NTILE 不支持 ROWS BETWEEN,比如 NTILE(2) OVER(PARTITION BY cookieid ORDER BY createtime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)。
如果切片不均匀,默认增加第一个切片的分布。
ROW_NUMBER 从 1 开始,按照顺序,生成分组内记录的序列。
比如,按照 pv 降序排列,生成分组内每天的 pv 名次 ROW_NUMBER() 的应用场景非常多,比如获取分组内排序第一的记录。
文章来源:大数据真好玩
评论