写点什么

Flink 和 StreamPark 自定义 UDF 函数的使用

  • 2025-09-08
    北京
  • 本文字数:3477 字

    阅读完需:约 11 分钟

本文分享自天翼云开发者社区《Flink和StreamPark自定义UDF函数的使用》,作者:王****帅

1、什么是函数

在 SQL 中,我们可以把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这就是“函数”(functions)。Flink 的 Table API 和 SQL 同样提供了函数的功能。两者在调用时略有不同:Table API 中的函数是通过数据对象的方法调用来实现的;而 SQL 则是直接引用函数名称,传入数据作为参数。例如,要把一个字符串 str 转换成全大写的形式,Table API 的写法是调用 str 这个 String 对象的 upperCase()方法:

str.upperCase();
复制代码


而 SQL 中的写法就是直接引用 UPPER()函数,将 str 作为参数传入:

UPPER(str)
复制代码


由于 Table API 是内嵌在 Java 语言中的,很多方法需要在类中额外添加,因此扩展功能比较麻烦,目前支持的函数比较少;而且 Table API 也不如 SQL 的通用性强,所以一般情况下较少使用。下面我们主要介绍 Flink SQL 中函数的使用。Flink SQL 中的函数可以分为两类:一类是 SQL 中内置的系统函数,直接通过函数名调用就可以,能够实现一些常用的转换操作,比如之前我们用到的 COUNT()、CHAR_LENGTH()、UPPER()等等;而另一类函数则是用户自定义的函数(UDF),需要在表环境中注册才能使用。


2、什么是自定义 UDF 函数

系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需要用自定义函数(User Defined Functions,UDF)来实现了。事实上,系统内置函数仍然在不断扩充,如果我们认为自己实现的自定义函数足够通用、应用非常广泛,也可以在项目跟踪工具 JIRA 上向 Flink 开发团队提出“议题”(issue),请求将新的函数添加到系统函数中。

2.1 编写自定义 UDF 函数

自定义一个 ScalarFunction,传入一个 String 类型的参数,输出这个参数的 hashCode

public class HashScalarFunction extends ScalarFunction {    public String eval(String str){        return String.valueOf(str.hashCode());    }}
复制代码


2.2 在代码中以 SQL 方式使用 UDF 函数

2.2.1 读取 mysql 数据使用 UDF 函数转换并输出到控制台


package cn.ctyun.demo.flinksql;
import cn.ctyun.demo.flinksql.udf.HashScalarFunction;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/** * @Date 2023/4/14 14:38 * @Description 读取mysql数据使用UDF函数转换并输出到控制台 */public class FlinkSqlUdfMysql2Print {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 1. 创建读取表,使用mysql进行 String source_ddl = "CREATE TABLE UserSource (" + " id INT, " + " name VARCHAR, " + " phone VARCHAR, " + " sex INT " + ") WITH (" + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_source?useSSL=false', " + " 'connector.table' = 'test_user_table', " + " 'connector.username' = 'root', " + " 'connector.password' = '******'" + ")"; tableEnv.executeSql(source_ddl); // 3. 注册自定义标量函数 tableEnv.createTemporarySystemFunction("MyHash", HashScalarFunction.class); // 4. 调用UDF查询转换 Table resultTable = tableEnv.sqlQuery("select id, name, phone, sex, MyHash(name) as name_hash from UserSource");
// 5. 输出到控制台 tableEnv.executeSql("create table output (" + "id INT, " + "name STRING, " + "phone STRING, " + "sex INT, " + "name_hash STRING ) " + "WITH (" + "'connector' = 'print')"); resultTable.executeInsert("output"); }}
复制代码
  • 2.2.2 读取 mysql 数据使用 UDF 函数转换并输出到 mysql


package cn.ctyun.demo.flinksql;
import cn.ctyun.demo.flinksql.udf.HashScalarFunction;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/** * @Date 2023/4/14 14:50 * @Description 读取mysql数据使用UDF函数转换并输出到mysql */public class FlinkSqlUdfMysql2Mysql {
public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 1. 创建读取表,使用mysql进行 String source_ddl = "CREATE TABLE UserSource (" + " id INT, " + " name VARCHAR, " + " phone VARCHAR, " + " sex INT " + ") WITH (" + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_source?useSSL=false', " + " 'connector.table' = 'test_user_table', " + " 'connector.username' = 'root', " + " 'connector.password' = '*******'" + ")"; tableEnv.executeSql(source_ddl);
// 2. 创建写出表,使用mysql进行 String sink_ddl = "CREATE TABLE UserSink (" + "id INT, " + "name STRING, " + "phone STRING, " + "sex INT, " + "name_hash STRING " + ") WITH (" + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_sink?useSSL=false', " + " 'connector.table' = 'test_user_table_udf', " + " 'connector.username' = 'root', " + " 'connector.password' = '********'" + ")"; tableEnv.executeSql(sink_ddl);
// 3. 注册自定义标量函数 tableEnv.createTemporarySystemFunction("MyHash", HashScalarFunction.class);
// 4. 使用insert语句进行数据输出,在这里进行UDF查询转换 String insertSql = "INSERT INTO UserSink select id, name, phone, sex, MyHash(name) as name_hash from UserSource";
tableEnv.executeSql(insertSql); }}
复制代码

2.3 在 StreamPark 中以 SQL 方式使用 UDF 函数在 StreamPark 创建作业,导入作业依赖:

flink-connector-jdbc_2.12-1.14.3.jar

flink-demo-jar-job-1.0-SNAPSHOT.jar

mysql-connector-java-8.0.21.jar

FlinkSQL 为:

CREATE FUNCTION MyHash AS 'cn.ctyun.demo.flinksql.udf.HashScalarFunction';CREATE TABLE UserSource ( id INT, name VARCHAR,phone VARCHAR, sex INT ) WITH ('connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://********:3306/flink_test_source?useSSL=false', 'connector.table' = 'test_user_table', 'connector.username' = 'root', 'connector.password' = '*********');CREATE TABLE UserSink (id INT, name STRING, phone STRING, sex INT, name_hash STRING ) WITH ('connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_sink?useSSL=false', 'connector.table' = 'test_user_table_udf', 'connector.username' = 'root', 'connector.password' = '**********');INSERT INTO UserSink select id, name, phone, sex, MyHash(name) as name_hash from UserSource;
复制代码

运行作业后 mysql 可正常插入数据

用户头像

还未添加个人签名 2022-02-22 加入

天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。

评论

发布
暂无评论
Flink和StreamPark自定义UDF函数的使用_MySQL_天翼云开发者社区_InfoQ写作社区