写点什么

Hive UDF,就这

  • 2022 年 1 月 10 日
  • 本文字数:9268 字

    阅读完需:约 30 分钟

摘要:Hive UDF 是什么?有什么用?怎么用?什么原理?本文从 UDF 使用入手,简要介绍相关源码,UDF 从零开始。


本文分享自华为云社区《Hive UDF,就这》,作者:汤忒撒。


Hive 中内置了很多函数,同时支持用户自行扩展,按规则添加后即可在 sql 执行过程中使用,目前支持 UDF、UDTF、UDAF 三种类型,一般 UDF 应用场景较多,本文主要介绍 UDF 使用,简要介绍相关源码。


UDF,(User Defined Function)用户自定义函数


UDTF,(User-defined Table Generating Function)自定义表生成函数,一行数据生成多行


UDAF,(User-defined Aggregation Function)用户自定义聚合函数,多行数据生成一行


1.UDF 简介


UDF 包含两种类型:1、临时函数仅当前会话中有效,退出后重新连接即无法使用;2、永久函数注册 UDF 信息到 MetaStore 元数据中,可永久使用。


实现 UDF 需要继承特定类 UDF 或 GenericUDF 二选一。


  • apache.hadoop.hive.ql.exec.UDF,处理并返回基本数据类型,int、string、boolean、double 等;

  • apache.hadoop.hive.ql.udf.generic.GenericUDF,可处理并返回复杂数据类型,如 Map、List、Array 等,同时支持嵌套;

2.UDF 相关语法


UDF 使用需要将编写的 UDF 类编译为 jar 包添加到 Hive 中,根据需要创建临时函数或永久函数。

2.1. resources 操作​


Hive 支持向会话中添加资源,支持文件、jar、存档,添加后即可在 sql 中直接引用,仅当前会话有效,默认读取本地路径,支持 hdfs 等,路径不加引号。例:add jar /opt/ht/AddUDF.jar;


添加资源
ADD { FILE[S] | JAR[S] | ARCHIVE[S] } <filepath1> [<filepath2>]*
查看资源
LIST { FILE[S] | JAR[S] | ARCHIVE[S] } [<filepath1> <filepath2> ..]
删除资源
DELETE { FILE[S] | JAR[S] | ARCHIVE[S] } [<filepath1> <filepath2> ..]
复制代码


2.2. 临时函数​


仅当前会话有效,不支持指定数据库,USING 路径需加引号。


CREATE TEMPORARY FUNCTION function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];DROP TEMPORARY FUNCTION [IF EXISTS] function_name;
复制代码


2.3. 永久函数​


函数信息入库,永久有效,USING 路径需加引号。临时函数与永久函数均可使用 USING 语句,Hive 会自动添加指定文件到当前环境中,效果与 add 语句相同,执行后即可 list 查看已添加的文件或 jar 包。


CREATE FUNCTION [db_name.]function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];DROP FUNCTION [IF EXISTS] function_name;RELOAD (FUNCTIONS|FUNCTION);
复制代码


2.4. 查看函数​


查看所有函数,不区分临时函数与永久函数show functions;函数模糊查询,此处为查询x开头的函数show functions like 'x*';查看函数描述desc function function_name;查看函数详细描述desc function extended function_name;
复制代码


3.Description 注解​


Hive 已定义注解类型 org.apache.hadoop.hive.ql.exec.Description,用于执行 desc function [extended] function_name 时介绍函数功能,内置函数与自定义函数用法相同。


【备注】若 Description 注解名称与创建 UDF 时指定名称不同,以创建 UDF 时指定名称为准。


public @interface Description {//函数简单介绍String value() default "FUNC is undocumented";//函数详细使用说明String extended() default "";//函数名称String name() default "";}
复制代码


例:Hive 内置 ceil 函数 GenericUDFCeil 代码定义如下


desc function ceil;


desc function extended ceil;


4.UDF​


继承 UDF 类必须实现 evaluate 方法,支持定义多个 evaluate 方法不同参数列表用于处理不同类型数据,如下


public Text evaluate(Text s)public int evaluate(Integer s)
复制代码


4.1. UDF 示例​


实现 UDF 函数,若字符串执行拼接,int 类型执行加法运算。


@Description(    name="my_plus",    value="my_plus() - if string, do concat; if integer, do plus",    extended = "Example : \n    >select my_plus('a', 'b');\n    >ab\n    >select my_plus(3, 5);\n    >8")public class AddUDF extends UDF {    public String evaluate(String... parameters) {        if (parameters == null || parameters.length == 0) {            return null;        }        StringBuilder sb = new StringBuilder();        for (String param : parameters) {            sb.append(param);        }        return sb.toString();    }    public int evaluate(IntWritable... parameters) {        if (parameters == null || parameters.length == 0) {            return 0;        }        long sum = 0;        for (IntWritable currentNum : parameters) {            sum = Math.addExact(sum, currentNum.get());        }        return (int) sum;    }}
复制代码


hdfs dfs -put AddUDF.jar /tmp/ht/


create function my_plus as 'com.huawei.ht.test.AddUDF' using jar 'hdfs:///tmp/ht/AddUDF.jar';


desc function my_plus;


desc function extended my_plus;


UDF 添加后记录在元数据表 FUNCS、FUNC_RU 表中


4.2. 源码浅析​


UDF 类调用入口为方法解析器,默认方法解析器 DefaultUDFMethodResolver,执行时由解析器反射获取 UDF 类的 evaluate 方法执行,类代码如下:


UDF

public class UDF {  //udf方法解析器  private UDFMethodResolver rslv;  //默认构造器DefaultUDFMethodResolver  public UDF() {    rslv = new DefaultUDFMethodResolver(this.getClass());  }  protected UDF(UDFMethodResolver rslv) {    this.rslv = rslv;  }  public void setResolver(UDFMethodResolver rslv) {    this.rslv = rslv;  }  public UDFMethodResolver getResolver() {    return rslv;  }  public String[] getRequiredJars() {    return null;  }  public String[] getRequiredFiles() {    return null;  }}
复制代码


DefaultUDFMethodResolver

public class DefaultUDFMethodResolver implements UDFMethodResolver {  //The class of the UDF.  private final Class<? extends UDF> udfClass;  public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) {    this.udfClass = udfClass;  }  @Override  public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException {    return FunctionRegistry.getMethodInternal(udfClass, "evaluate", false, argClasses);  }}
复制代码


5.GenericUDF​


GenericUDF 相比与 UDF 功能更丰富,支持所有参数类型,参数类型由 ObjectInspector 封装;参数 Writable 类由 DeferredObject 封装,使用时简单类型可直接从 Writable 获取,复杂类型可由 ObjectInspector 解析。


继承 GenericUDF 必须实现如下 3 个接口:


//初始化,ObjectInspector为数据类型封装类,无实际参数值,返回结果类型public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {    return null;}//DeferredObject封装实际参数的对应Writable类public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {    return null;}//函数信息public String getDisplayString(String[] strings) {    return null;}
复制代码


5.1. GenericUDF 示例​


自定义函数实现 count 函数,支持 int 与 long 类型,Hive 中无 long 类型,对应类型为 bigint,create function 与数据库保存与 UDF 一致,此处不再赘述。


initialize,遍历 ObjectInspector[]检查每个参数类型,根据参数类型构造 ObjectInspectorConverters.Converter,用于将 Hive 传递的参数类型转换为对应的 Writable 封装对象 ObjectInspector,供后续统一处理。


evaluate,初始化时已记录每个参数具体类型,从 DeferredObject 中获取对象,根据类型使用对应 Converter 对象转换为 Writable 执行计算。


例:处理 int 类型,


UDF 查询常量时,DeferredObject 中封装类型为 IntWritable;


UDF 查询表字段时,DeferredObject 中封装类型为 LazyInteger。


@Description(    name="my_count",    value="my_count(...) - count int or long type numbers",    extended = "Example :\n    >select my_count(3, 5);\n    >8\n    >select my_count(3, 5, 25);\n    >33")public class MyCountUDF extends GenericUDF {    private PrimitiveObjectInspector.PrimitiveCategory[] inputType;    private transient ObjectInspectorConverters.Converter intConverter;    private transient ObjectInspectorConverters.Converter longConverter;    @Override    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {        int length = objectInspectors.length;        inputType = new PrimitiveObjectInspector.PrimitiveCategory[length];        for (int i = 0; i < length; i++) {            ObjectInspector currentOI = objectInspectors[i];            ObjectInspector.Category type = currentOI.getCategory();            if (type != ObjectInspector.Category.PRIMITIVE) {                throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);            }            PrimitiveObjectInspector.PrimitiveCategory primitiveType =                ((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();            inputType[i] = primitiveType;            switch (primitiveType) {                case INT:                    if (intConverter == null) {                        ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);                        intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);                    }                    break;                case LONG:                    if (longConverter == null) {                        ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);                        longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);                    }                    break;                default:                    throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);            }        }        return PrimitiveObjectInspectorFactory.writableLongObjectInspector;    }    @Override    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {        LongWritable out = new LongWritable();        for (int i = 0; i < deferredObjects.length; i++) {            PrimitiveObjectInspector.PrimitiveCategory type = this.inputType[i];            Object param = deferredObjects[i].get();            switch (type) {                case INT:                    Object intObject = intConverter.convert(param);                    out.set(Math.addExact(out.get(), ((IntWritable) intObject).get()));                    break;                case LONG:                    Object longObject = longConverter.convert(param);                    out.set(Math.addExact(out.get(), ((LongWritable) longObject).get()));                    break;                default:                    throw new IllegalStateException("Unexpected type in MyCountUDF evaluate : " + type);            }        }        return out;    }    @Override    public String getDisplayString(String[] strings) {        return "my_count(" + Joiner.on(", ").join(strings) + ")";    }}
复制代码


create function my_count as 'com.huawei.ht.test.MyCountUDF' using jar 'hdfs:///tmp/countUDF.jar';create table test_numeric(i1 int, b1 bigint, b2 bigint, i2 int, i3 int);insert into table test_numeric values(0, -10, 25, 300, 15), (11, 22, 33, 44, 55);select *, my_count(*) from test_numeric;
复制代码



5.2. 源码浅析​


GenericUDF 内部定义了方法调用顺序,子类实现相应功能即可,调用时根据函数名称从 FunctionRegistry 中获取 UDF 对象,返回执行结果。


Hive 中数据类型均使用 ObjectInspector 封装,为区分普通类型与负责结构类型,定义枚举 Category,共包含 PRIMITIVE,LIST,MAP,STRUCT,UNION 这 5 种类型,其中 PRIMITIVE 表示普通类型(int、long、double 等)。


ObjectInspector


public interface ObjectInspector extends Cloneable {//用于类型名称    String getTypeName();//用于获取ObjectInspector封装的字段类型    ObjectInspector.Category getCategory();    public static enum Category {        PRIMITIVE,        LIST,        MAP,        STRUCT,        UNION;        private Category() {        }    }}
复制代码


PrimitiveObjectInspector.PrimitiveCategory,基本类型


public static enum PrimitiveCategory {    VOID,    BOOLEAN,    BYTE,    SHORT,    INT,    LONG,}
复制代码


GenericUDF. initializeAndFoldConstants

调用 initialize 获取输出 ObjectInspector,若为常量类型,直接 evaluate 计算结果值。

此方法编译阶段通过 AST 构造 Operator 遍历 sql 节点时,常量直接计算结果值,其他类型仅执行 initialize。

计算表字段时,在 MR 等任务中,Operator 执行时调用 initialize、evaluate 计算结果值(例:SelectOperator)。


public ObjectInspector initializeAndFoldConstants(ObjectInspector[] arguments) throws UDFArgumentException {    ObjectInspector oi = this.initialize(arguments);    if (this.getRequiredFiles() == null && this.getRequiredJars() == null) {        boolean allConstant = true;        for(int ii = 0; ii < arguments.length; ++ii) {            if (!ObjectInspectorUtils.isConstantObjectInspector(arguments[ii])) {                allConstant = false;                break;            }        }        if (allConstant && !ObjectInspectorUtils.isConstantObjectInspector((ObjectInspector)oi) && FunctionRegistry.isConsistentWithinQuery(this) && ObjectInspectorUtils.supportsConstantObjectInspector((ObjectInspector)oi)) {            GenericUDF.DeferredObject[] argumentValues = new GenericUDF.DeferredJavaObject[arguments.length];            for(int ii = 0; ii < arguments.length; ++ii) {                argumentValues[ii] = new GenericUDF.DeferredJavaObject(((ConstantObjectInspector)arguments[ii]).getWritableConstantValue());            }            try {                Object constantValue = this.evaluate(argumentValues);                oi = ObjectInspectorUtils.getConstantObjectInspector((ObjectInspector)oi, constantValue);            } catch (HiveException var6) {                throw new UDFArgumentException(var6);            }        }        return (ObjectInspector)oi;    } else {        return (ObjectInspector)oi;    }}
复制代码

6.UDF 相关源码​

6.1. 运算符​


Hive SQL 中,“+、-、*、/、=”等运算符都是是 UDF 函数,在 FunctionRegistry 中声明,所有 UDF 均在编译阶段由 AST 生成 Operator 树时解析,常量直接计算结果值,其他类型仅初始化,获取输出类型用于生成 Operator 树,后续在 Operator 真正执行时计算结果值。


static {  HIVE_OPERATORS.addAll(Arrays.asList(      "+", "-", "*", "/", "%", "div", "&", "|", "^", "~",      "and", "or", "not", "!",      "=", "==", "<=>", "!=", "<>", "<", "<=", ">", ">=",      "index"));}
复制代码


6.2. 函数类型​


Hive 中包含 BUILTIN, PERSISTENT, TEMPORARY 三种函数;


public static enum FunctionType {  BUILTIN, PERSISTENT, TEMPORARY;}
复制代码

6.3. FunctionRegistry​


Hive 的所有 UDF 均由 FunctionRegistry 管理,FunctionRegistry 仅管理内存中的 UDF,不操作数据库。


内置函数都在 FunctionRegistry 静态块中初始化,不在数据库中记录;用户自定义 UDF 添加、删除都在 HiveServer 本地执行,临时函数在 SessionState 中处理,永久函数由 FunctionTask 调用 FunctionRegistry 对应方法处理,加载后 FunctionTask 负责写库。


public final class FunctionRegistry {private static final Registry system = new Registry(true);static {  system.registerGenericUDF("concat", GenericUDFConcat.class);  system.registerUDF("substr", UDFSubstr.class, false);}public static void registerTemporaryMacro(    String macroName, ExprNodeDesc body, List<String> colNames, List<TypeInfo> colTypes) {  SessionState.getRegistryForWrite().registerMacro(macroName, body, colNames, colTypes);}public static FunctionInfo registerPermanentFunction(String functionName,    String className, boolean registerToSession, FunctionResource[] resources) {  return system.registerPermanentFunction(functionName, className, registerToSession, resources);}}
复制代码


6.4. GenericUDFBridge​


Hive 中 UDF 与 GenericUDF 实际均以 GenericUDF 方式处理,通过 GenericUDFBridge 适配,GenericUDFBridge 继承 GenericUDF。


添加 UDF 时,FunctionRegistry 调用 Registry 对象添加 UDF,Registry 将 UDF 封装为 GenericUDFBridge 保存到内置中。


Registry


private FunctionInfo registerUDF(String functionName, FunctionType functionType,    Class<? extends UDF> UDFClass, boolean isOperator, String displayName,    FunctionResource... resources) {  validateClass(UDFClass, UDF.class);  FunctionInfo fI = new FunctionInfo(functionType, displayName,      new GenericUDFBridge(displayName, isOperator, UDFClass.getName()), resources);  addFunction(functionName, fI);  return fI;}
复制代码


GenericUDFBridge


内部根据参数反射获取 UDF 类 evaluate 方法并适配参数,自动转化为相应类型,故 UDF 不需要感知函数本地执行与 yarn 运行时的具体类型是否一致。


部分代码如下:


public GenericUDFBridge(String udfName, boolean isOperator,    String udfClassName) {  this.udfName = udfName;  this.isOperator = isOperator;  this.udfClassName = udfClassName;}@Overridepublic ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {//初始化UDF对象  try {    udf = (UDF)getUdfClassInternal().newInstance();  } catch (Exception e) {    throw new UDFArgumentException(        "Unable to instantiate UDF implementation class " + udfClassName + ": " + e);  }  // Resolve for the method based on argument types  ArrayList<TypeInfo> argumentTypeInfos = new ArrayList<TypeInfo>(      arguments.length);  for (ObjectInspector argument : arguments) {    argumentTypeInfos.add(TypeInfoUtils        .getTypeInfoFromObjectInspector(argument));  }  udfMethod = udf.getResolver().getEvalMethod(argumentTypeInfos);  udfMethod.setAccessible(true);  // Create parameter converters  conversionHelper = new ConversionHelper(udfMethod, arguments);  // Create the non-deferred realArgument  realArguments = new Object[arguments.length];  // Get the return ObjectInspector.  ObjectInspector returnOI = ObjectInspectorFactory      .getReflectionObjectInspector(udfMethod.getGenericReturnType(),      ObjectInspectorOptions.JAVA);  return returnOI;}@Overridepublic Object evaluate(DeferredObject[] arguments) throws HiveException {  assert (arguments.length == realArguments.length);  // Calculate all the arguments  for (int i = 0; i < realArguments.length; i++) {    realArguments[i] = arguments[i].get();  }  // Call the function,反射执行UDF类evaluate方法  Object result = FunctionRegistry.invoke(udfMethod, udf, conversionHelper      .convertIfNecessary(realArguments));  // For non-generic UDF, type info isn't available. This poses a problem for Hive Decimal.  // If the returned value is HiveDecimal, we assume maximum precision/scale.  if (result != null && result instanceof HiveDecimalWritable) {    result = HiveDecimalWritable.enforcePrecisionScale        ((HiveDecimalWritable) result,            HiveDecimal.SYSTEM_DEFAULT_PRECISION,            HiveDecimal.SYSTEM_DEFAULT_SCALE);  }  return result;}
复制代码


6.5. 函数调用入口​


sql 中使用函数时,可能有 3 处调用,不同版本代码行数可能不一致,流程类似。


  1. 编译时遍历语法树转换 Operator。


TypeCheckProcFactory.getXpathOrFuncExprNodeDesc 中根据 sql 中运算符或 UDF 名称生成表达式对象 ExprNodeGenericFuncDesc,内部调用 GenericUDF 方法。



  1. 启用常量传播优化器优化时,ConstantPropagate 中遍历树过程调用;


此优化器默认开启,可参数控制"hive.optimize.constant.propagation"。


ConstantPropagate 优化时遍历节点,尝试提前计算常量表达式,由 ConstantPropagateProcFactory.evaluateFunction 计算 UDF。



  1. UDF 参数不是常量,SQL 按计划执行过程中 Operator 真正执行时;


Operator 真正执行时,由 ExprNodeGenericFuncEvaluator. _evaluate 处理每行数据,计算 UDF 结果值。


@Overrideprotected Object _evaluate(Object row, int version) throws HiveException {  if (isConstant) {    // The output of this UDF is constant, so don't even bother evaluating.    return ((ConstantObjectInspector) outputOI).getWritableConstantValue();  }  rowObject = row;  for (GenericUDF.DeferredObject deferredObject : childrenNeedingPrepare) {    deferredObject.prepare(version);  }  return genericUDF.evaluate(deferredChildren);}
复制代码


点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 2
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
Hive UDF,就这