写点什么

[1.2.0 新功能系列:三]Apache doris 1.2.0 Java UDF 函数开发及使用

作者:张家锋
  • 2022-12-14
    陕西
  • 本文字数:5583 字

    阅读完需:约 18 分钟

概述我们在使用各个 SQL 引擎时,会有纷繁复杂的查询需求。一部分可以通过引擎自带的内置函数去解决,但内置函数不可能解决所有人的问题,所以一般 SQL 引擎会提供 UDF 功能,方便用户通过自己写逻辑来满足特定的需求,Doris 也不例外。


在 java UDF 之前,Doris 提供了两种用户可以自己实现 UDF 的方式:


远程 UDF,其优缺点如下:


支持通过 RPC 的方式访问用户提供的 UDF Service,以实现用户自定义函数的执行只要支持 Protobuf 的各类语言都能使用,有足够的安全和灵活性额外的网络开销和基于 protobuf 的开发模式让该使用方式的用户望而却步原生 UDF,其优缺点如下:


支持使用 C++编写 UDF,执行效率高、速度快跟 Doris 代码耦合度高,需要自己打包编译 Doris 源码只支持 C++语言并且容易造成 BE 挂掉熟悉大数据组件(Hive Spark 等)的用户有一定的门槛看起来上述 UDF 的两种方式实现起来有点复杂。有没有相对简单,门槛较低,跟 Doris 代码耦合度低,对 Java 友好的 UDF 方式呢?


在 Doris 1.2.0 版本我们正式支持 Java UDF 函数,你可以像之前写 Hive udf 函数一样去写自己的 Doris udf 函数来处理自己复杂的业务逻辑。


SinceVersion 1.2.0


Java UDF 为用户提供 UDF 编写的 Java 接口,以方便用户使用 Java 语言进行自定义函数的执行。相比于 Native 的 UDF 实现,Java UDF 有如下优势和限制:


优势兼容性:使用 Java UDF 可以兼容不同的 Doris 版本,所以在进行 Doris 版本升级时,Java UDF 不需要进行额外的迁移操作。与此同时,Java UDF 同样遵循了和 Hive/Spark 等引擎同样的编程规范,使得用户可以直接将 Hive/Spark 的 UDF jar 包迁移至 Doris 使用。安全:Java UDF 执行失败或崩溃仅会导致 JVM 报错,而不会导致 Doris 进程崩溃。灵活:Java UDF 中用户通过把第三方依赖打进用户 jar 包,而不需要额外处理引入的三方库。使用限制性能:相比于 Native UDF,Java UDF 会带来额外的 JNI 开销,不过通过批式执行的方式,我们已经尽可能的将 JNI 开销降到最低。向量化引擎:Java UDF 当前只支持向量化引擎。doris 提供


UDF:用户自定义函数,user defined function。一对一的输入输出,(最常用的)。UDAF:用户自定义聚合函数。user defined aggregate function,多对一的输入输出,类似 count sum max 等统计函数怎么实现 Doris Java UDF 函数下面我们来开始讲解怎么编写和使用 doris java udf 函数。


Doris java udf 函数是基于 Hive udf 框架来实现的


继承 org.apache.hadoop.hive.ql.exec.UDF 重写 evaluate(),特殊说明:evaluate()方法不是由接口定义的,因为它可接受的参数个数,数据类型都是不确定的。Doris 会检查 UDF, 看能否找到和函数调用相匹配的 evaluate()方法这里演示的是我们怎么实现一个 AES 加解密的函数


函数开发我们创建一个普通的 java maven 工程


pom.xml 依赖如下:


<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>org.apache.doris</groupId> <artifactId>doris.java.udf.demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging>
<name>doris.java.udf.demo</name> <url>http://maven.apache.org</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
<dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.3.5</version> </dependency> </dependencies>
<build> <finalName>java-udf-demo</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.2.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build></project>
复制代码

加解密工具类:


package org.apache.doris.udf.demo;
import javax.crypto.*;import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.lang3.StringUtils;
import java.security.SecureRandom;

/*** AES encryption and decryption tool class** @author zhangfeng*/public class AESUtil { private static final String defaultCharset = "UTF-8"; private static final String KEY_AES = "AES";
/** * AES encryption function method * * @param content * @param secret * @return */ public static String encrypt(String content, String secret) { return doAES(content, secret, Cipher.ENCRYPT_MODE); }
/** * AES decryption function method * * @param content * @param secret * @return */ public static String decrypt(String content, String secret) { return doAES(content, secret, Cipher.DECRYPT_MODE); }
/** * encryption and decryption * * @param content * @param secret * @param mode * @return */ private static String doAES(String content, String secret, int mode) { try { if (StringUtils.isBlank(content) || StringUtils.isBlank(secret)) { return null; } //Determine whether to encrypt or decrypt boolean encrypt = mode == Cipher.ENCRYPT_MODE; byte[] data;
//1.Construct a key generator, specified as the AES algorithm, case-insensitive KeyGenerator kgen = KeyGenerator.getInstance(KEY_AES); SecureRandom secureRandom = SecureRandom.getInstance("SHA1PRNG"); //2. Initialize the key generator according to the ecnodeRules rules //Generate a 128-bit random source, based on the incoming byte array secureRandom.setSeed(secret.getBytes()); //Generate a 128-bit random source, based on the incoming byte array kgen.init(128, secureRandom); //3.generate the original symmetric key SecretKey secretKey = kgen.generateKey(); //4.Get the byte array of the original symmetric key byte[] enCodeFormat = secretKey.getEncoded(); //5.Generate AES key from byte array SecretKeySpec keySpec = new SecretKeySpec(enCodeFormat, KEY_AES); //6.According to the specified algorithm AES self-generated cipher Cipher cipher = Cipher.getInstance(KEY_AES); //7.Initialize the cipher, the first parameter is encryption (Encrypt_mode) or decryption (Decrypt_mode) operation, // the second parameter is the KEY used cipher.init(mode, keySpec);
if (encrypt) { data = content.getBytes(defaultCharset); } else { data = parseHexStr2Byte(content); } byte[] result = cipher.doFinal(data); if (encrypt) { //convert binary to hexadecimal return parseByte2HexStr(result); } else { return new String(result, defaultCharset); } } catch (Exception e) { System.out.println(e.getMessage()); } return null; }
/** * convert binary to hexadecimal * * @param buf * @return */ public static String parseByte2HexStr(byte buf[]) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < buf.length; i++) { String hex = Integer.toHexString(buf[i] & 0xFF); if (hex.length() == 1) { hex = '0' + hex; } sb.append(hex.toUpperCase()); } return sb.toString(); }
/** * Convert hexadecimal to binary * * @param hexStr * @return */ public static byte[] parseHexStr2Byte(String hexStr) { if (hexStr.length() < 1) { return null; } byte[] result = new byte[hexStr.length() / 2]; for (int i = 0; i < hexStr.length() / 2; i++) { int high = Integer.parseInt(hexStr.substring(i * 2, i * 2 + 1), 16); int low = Integer.parseInt(hexStr.substring(i * 2 + 1, i * 2 + 2), 16); result[i] = (byte) (high * 16 + low); } return result; }
}
复制代码


加密函数


package org.apache.doris.udf.demo;
import org.apache.hadoop.hive.ql.exec.UDF;import org.apache.commons.lang3.StringUtils;
public class AESEncrypt extends UDF {
public String evaluate(String content, String secret) throws Exception { if (StringUtils.isBlank(content)) { throw new Exception("content not is null"); } if (StringUtils.isBlank(secret)) { throw new Exception("Secret not is null"); } return AESUtil.encrypt(content, secret); }}
复制代码

解密函数


package org.apache.doris.udf.demo;
import org.apache.hadoop.hive.ql.exec.UDF;import org.apache.commons.lang3.StringUtils;
public class AESDecrypt extends UDF {
public String evaluate(String content, String secret) throws Exception { if (StringUtils.isBlank(content)) { throw new Exception("content not is null"); } if (StringUtils.isBlank(secret)) { throw new Exception("Secret not is null"); } return AESUtil.decrypt(content, secret); }
}
复制代码

函数打包

mvn clean package
复制代码

这个时候我们可以得到一个 java-udf-demo.jar


注册函数注册加密函数


这里有两个参数,一个是加密内容,一个是秘钥,返回值是一个字符串


CREATE FUNCTION ase_encryp(string,string) RETURNS string PROPERTIES (   "file"="file:///Users/zhangfeng/work/doris.java.udf.demo/target/java-udf-demo.jar",   "symbol"="org.apache.doris.udf.demo.AESEncrypt",   "always_nullable"="true",   "type"="JAVA_UDF");
复制代码

注意:这里我是单机测试,使用的是本地文件方式,如果你也是要本地文件方式需要再所有的 FE 及 BE 上相同目录下都要有这个文件我们也可以使用 http 方式,让每个节点自己下载这个文件,我们更推荐这种方式,下面也给出这种方式的示例


Http 方式示例:


CREATE FUNCTION ase_encryp(string,string) RETURNS string PROPERTIES (   "file"="http://192.168.31.54/work/doris.java.udf.demo/target/java-udf-demo.jar",   "symbol"="org.apache.doris.udf.demo.AESEncrypt",   "always_nullable"="true",   "type"="JAVA_UDF");
复制代码

然后我们执行我们刚才创建的函数


要加密的内容是:zhangfeng,秘钥是: java_udf_function


select ase_encryp('zhangfeng','java_udf_function');
复制代码


从下图可以看到我们得到了加密后的结果


注册解密函数


CREATE FUNCTION ase_decryp(string,string) RETURNS string PROPERTIES (  "file"="file:///Users/zhangfeng/work/doris.java.udf.demo/target/java-udf-demo.jar",  "symbol"="org.apache.doris.udf.demo.AESDecrypt",  "always_nullable"="true",  "type"="JAVA_UDF");
复制代码


http 方式:

CREATE FUNCTION ase_decryp(string,string) RETURNS string PROPERTIES (  "file"="http://192.168.63.32/work/doris.java.udf.demo/target/java-udf-demo.jar",  "symbol"="org.apache.doris.udf.demo.AESDecrypt",  "always_nullable"="true",  "type"="JAVA_UDF");
复制代码


验证函数


我们对上面解密的结果进行解密操作


select ase_decryp('4442106BB8C98E74D19CEC0413467810','java_udf_function');
复制代码

可以看到我们得到了正确的解密结果


总结这样看来 Doris Java UDF 函数是不是非常简单呢,可以大大加速我们业务的开发,降低业务系统开发复杂度,而且使用大家都非常熟悉的 Java 语言来开发 UDF,基本每个会 Java 语言的人都可以非常轻松的完成,避免的学习和开发 C++ UDF 函数的难度,还不赶快行动起来。

发布于: 刚刚阅读数: 4
用户头像

张家锋

关注

Apache Doris PMC 2007-07-01 加入

还未添加个人简介

评论

发布
暂无评论
[1.2.0新功能系列:三]Apache doris 1.2.0 Java UDF 函数开发及使用_张家锋_InfoQ写作社区