Hive UDF/UDAF 总结
概述
在Hive中,用户可以自定义一些函数,用于扩展HiveQL的功能,这类函数分为三大类:
UDF(User-Defined-Function)
- 特点:一进一出;
- 继承UDF类(org.apache.hadoop.hive.ql.exec.UDF)
UDAF(User-Defined Aggregation Function)
- 特点:多进一出
- 继承UDAF类(org.apache.hadoop.hive.ql.exec.UDAF)
UDTF(User-Defined Table-Generating Functions)
- 特点:一进多出
- 继承UDTF类( org.apache.hadoop.hive.ql.udf.generic.GenericUDTF)
UDF(User-Defined-Function)
内置的UDF,一般分为两类,UDF、 GenericUDF.
相比于UDF,GenericUDF有两个优势
可以接受复杂的参数类型,返回复杂类型
可以接受变长参数个数(参数数组)
extends UDF
UDF类型的编写相对比较简单,父类源码github位置,简易示例如下
可以看出UDF子类只需要实现 evaluate
方法
从官方注释可以看出, 支持但不限于如下类型,
public int evaluate();
public int evaluate(int a);
public double evaluate(int a, double b);
public String evaluate(String a, int b, Text c);
public Text evaluate(String a);
public String evaluate(List<Integer> a);
从官方注释可以看出主要是要满足 evaluate
方法的要求
输入为JAVA 原语(Hive Array 会被转为 List, 如
ARRAY<int>
转为List<Integer>
})输出为JAVA 原语或
org.apache.hadoop.io.Writable Writable
虽然简单,但是仔细分析一下源码,如何使用 evaluate
方法,从UDF父类中可以看到主要操作了 UDFMethodResolver
.
其 UDFMethodResolver
如果没有指明则由 DefaultUDFMethodResolver
来生成,其源码如下,可以看出通过 getMethodInternal
获取到 evaluate
方法.
extends GenericUDF
GenericUDF
相对于 UDF
写法上更加复杂,需要自己定义三个函数,虽然有上述的两个优点,但是 Hive
官方并不推荐使用该方法,如果能够使用 UDF
实现尽量不使用 GenericUDF
.父类源码github位置
从上述代码可以看出 evaluate
主要操作 DeferredObject
类型,该类型其实就是一个接口,该类内部实现一个类继承该接口 DeferredJavaObject
. 该类仅仅只是封装了一个 JAVA 的 Object
对象.
而 initialize
方法则是用以检测输入的数据是否合法.
上述代码很长,整体流程为
检查传入的参数个数与每个参数的数据类型是正确的;
保存 converters (ObjectInspector) 用以供 evaluate() 使用;
返回 ListObjectInspector,让 Hive 能够读取该函数的返回结果;
关注 ObjectInspector
是个什么类型, 源码见ObjectInspector.java,简述如下,主要用以解耦数据类型.
如上的 ObjectInspector.Category.PRIMITIVE
支持如下类型,源码见rimitiveObjectInspector.java
getDisplayString
用于当实现的GenericUDF出错的时候,打印出提示信息.
UDAF(User-Defined Aggregation Function)
UDAF 是 Hive 中用户自定义的聚合函数,内置的 UDAF 有 max()
等.
UDAF 是需要 hive sql 语句和 group by 联合使用的. 聚合函数常常需要对大量数组进行操作,所以在编写程序时,一定要注意内存溢出问题.
Simple: 即继承
org.apache.hadoop.hive.ql.exec.UDAF
类,并在派生类中以静态内部类的方式实现org.apache.hadoop.hive.ql.exec.UDAFEvaluator
接口.
- 这种方式简单直接,但是在使用过程中需要依赖JAVA反射机制,因此性能相对较低.
- 在Hive源码包org.apache.hadoop.hive.contrib.udaf.example中包含几个示例, 但是这些接口已经被注解为Deprecated
,建议不要使用这种方式开发新的UDAF函数.
Generic: 这是Hive社区推荐的新的写法,以抽象类代替原有的接口.新的抽象类
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
替代老的UDAF接口,新的抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
替代老的UDAFEvaluator接口.
简单 UDAF
总结:
UDAF要继承于UDAF父类
org.apache.hadoop.hive.ql.exec.UDAF
.内部类要实现
org.apache.hadoop.hive.ql.exec.UDAFEvaluator
接口.UDAFExampleAvgEvaluator
类里需要实现init、iterate、terminatePartial、merge、terminate
这几个函数,是必不可少的init()
方法用来进行全局初始化的.iterate()
中实现累加逻辑.terminatePartial
是Hive部分聚集时调用的,类似于MapReduce里的Combiner,这里 能保证能得到各个部分的状态累加.merge
是多个部分合并时调用的,得到了参与合并的最大值.terminate
是最终Reduce合并时调用的,得到最大值.
通用UDAF
通用UDAF
的编写主要如下两步:
编写
resolver
类,resolver
负责类型检查,操作符重载.类继承org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
,AbstractGenericUDAFResolver
实现了org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2
,方便统一接口.编写
evaluator
类.evaluator
真正实现UDAF的逻辑.通常来说,实现org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
,包括几个必须实现的抽象方法,这几个方法负责完成UDAF所需要处理的逻辑.
UDAF的运行流程简介
抽象类GenericUDAFEvaluator中,包含一个静态内部枚举类,和一系列抽象方法.这个枚举类的注释中,解释了各个枚举值的运行阶段和运行内容.按照时间先后顺序,分别有:
PARTIAL1:原始数据到部分聚合,调用iterate和terminatePartial –> map阶段
PARTIAL2: 部分聚合到部分聚合,调用merge和terminatePartial –> combine阶段
FINAL: 部分聚合到完全聚合,调用merge和terminate –> reduce阶段
COMPLETE: 从原始数据直接到完全聚合 –> map阶段,并且没有reduce
UDAF方法
init(Mode m, ObjectInspector[] parameters)
: 这个是非必须的,但是一般是需要的.实例化Evaluator类的时候调用的,在不同的阶段需要返回不同的OI.需要注意的是,在不同的模式下parameters的含义是不同的,比如m为 PARTIAL1 和 COMPLETE 时,parameters为原始数据;m为 PARTIAL2 和 FINAL 时,parameters仅为部分聚合数据(只有一个元素).在PARTIAL1
和PARTIAL2
模式下,ObjectInspector 用于terminatePartial方法的返回值,在FINAL和COMPLETE模式下ObjectInspector 用于terminate方法的返回值. 其入参和返回值,以及Mode阶段的关系如下表:
| mode | 入参 | 返回值的使用者 |
| :-: | :-: | :-: |
| PARTIAL1 | 原始数据 | terminatePartial |
| PARTIAL2 | 部分聚合数据 | terminatePartial |
| FINAL | 部分聚合数据 | terminate |
| COMPLETE | 原始数据 | terminate |
getNewAggregationBuffer()
: 返回存储临时聚合结果的AggregationBuffer对象reset(AggregationBuffer agg)
: 重置聚合结果对象,以支持mapper和reducer的重用.iterate(AggregationBuffer agg, Object[] parameters)
:迭代处理原始数据parameters并保存到agg中terminatePartial(AggregationBuffer agg)
:返回部分聚合数据的持久化对象.因为调用这个方法时,说明已经是map或者combine的结束了,必须将数据持久化以后交给reduce进行处理.只支持JAVA原始数据类型及其封装类型、HADOOP Writable类型、List、Map,不能返回自定义的类,即使实现了Serializable也不行,否则会出现问题或者错误的结果.merge(AggregationBuffer agg, Object partial)
:将terminatePartial返回的部分聚合数据进行合并,需要使用到对应的OI.terminate(AggregationBuffer agg)
:返回最终结果.
参考资料
版权声明: 本文为 InfoQ 作者【windism】的原创文章。
原文链接:【http://xie.infoq.cn/article/5941298a8126ec43f73fc8758】。文章转载请联系作者。
评论