案例解析丨 Spark Hive 自定义函数应用
摘要:Spark 目前支持 UDF,UDTF,UDAF 三种类型的自定义函数。
1. 简介
Spark 目前支持 UDF,UDTF,UDAF 三种类型的自定义函数。UDF 使用场景:输入一行,返回一个结果,一对一,比如定义一个函数,功能是输入一个 IP 地址,返回一个对应的省份。UDTF 使用场景: 输入一行,返回多行(hive),一对多, 而 sparkSQL 中没有 UDTF, spark 中用 flatMap 即可实现该功能。UDAF: 输入多行,返回一行, aggregate(主要用于聚合功能,比如 groupBy,count,sum), 这些是 spark 自带的聚合函数,但是复杂相对复杂。
Spark 底层其实以 CatalogFunction 结构封装了一个函数,其中 FunctionIdentifier 描述了函数名字等基本信息,FunctionResource 描述了文件类型(jar 或者 file)和文件路径;Spark 的 SessionCatalog 提供了函数注册,删除,获取等一些列接口,Spark 的 Executor 在接收到函数执行 sql 请求时,通过缓存的 CatalogFunction 信息,找到 CatalogFunction 中对应的 jar 地址以及 ClassName, JVM 动态加载 jar,并通过 ClassName 反射执行对应的函数。

图 1. CatalogFunction 结构体

图 2. 注册加载函数逻辑
Hive 的 HiveSessionCatalog 是继承 Spark 的 SessionCatalog,对 Spark 的基本功能做了一层装饰以适配 Hive 的基本功能,其中包括函数功能。HiveSimpleUDF 对应 UDF,HiveGenericUDF 对应 GenericUDF,HiveUDAFFunction 对应 AbstractGenericUDAFResolve 以及 UDAF,HiveGenericUDTF 对应 GenericUDTF

图 3. Hive 装饰 spark 函数逻辑
2. UDF
UDF 是最常用的函数,使用起来相对比较简单,主要分为两类 UDF:简单数据类型,继承 UDF 接口;复杂数据类型,如 Map,List,Struct 等数据类型,继承 GenericUDF 接口。
简单类型实现 UDF 时,可自定义若干个名字 evaluate 为的方法,参数和返回类型根据需要自己设置。因为 UDF 接口默认使用 DefaultUDFMethodResolver 去方法解析器获取方法,解析器是根据用户输入参数和写死的名字 evaluate 去反射寻找方法元数据。当然用户也可以自定义解析器解析方法。

图 4. 自定义 UDF 简单示例

图 5.默认 UDF 方法解析器
3. UDAF
UDAF 是聚合函数,目前实现方式主要有三种:实现 UDAF 接口,比较老的简答实现方式,目前已经被废弃;实现 UserDefinedAggregateFunction,目前使用比较普遍方式,按阶段实现接口聚集数据;实现 AbstractGenericUDAFResolver,实现相对 UserDefinedAggregateFunction 方式稍微复杂点,还需要实现一个计算器 Evaluator(如通用计算器 GenericUDAFEvaluator),UDAF 的逻辑处理主要发生在 Evaluator。
UserDefinedAggregateFunction 定义输入输出数据结构,实现初始化缓冲区(initialize),聚合单条数据(update),聚合缓存区(merge)以及计算最终结果(evaluate)。


图 6.自定义 UDAF 简单示例
4. UDTF
UDTF 简单粗暴的理解是一行生成多行的自动函数,可以生成多行多列,又被称为表生成函数。目前实现方式是实现 GenericUDTF 接口,实现 2 个接口,initialize 接口参数校验,列的定义,process 接口接受一行数据,切割数据。


图 7.自定义 UDTF 简单示例
版权声明: 本文为 InfoQ 作者【华为云开发者社区】的原创文章。
原文链接:【http://xie.infoq.cn/article/5da554f4caa0f21e0f3d8c06b】。文章转载请联系作者。
评论