大数据开发之 Flink sql 的基础用法
Flink sql 是什么
sql 的诞生就是为了简化我们对数据开发,可以使用少量的 sql 代码,帮助我完成对数据的查询,分析等功能
声明式 & 易于理解
对于用户只需要表达我想要什么,具体处理逻辑交给框架,系统处理,用户无需关心,对于一些非专业的开发人员有了解 sql,并且 sql 相对我们学习 java,c 等语言更简单,大数据培训学习成本更低,如果跨团队,或者非大数据开发人员,也可以通过 sql 来进行 flink 任务的开发
自动调优
查询优化器,会对我们编写的 sql 进行优化,生成效率更好的执行计划,所以用户不需要了解底层细节,即高效的获取结果
稳定
sql 语义发展几十年是一个很稳定的语言,少有变动,当我们引擎的升级,甚至替换成另一个引擎,都可以做到兼容地,平滑地升级,无需更改我们的已经编写好的 sql 代码
流批统一的基础
对于 flink 通过 sql 的表达式,来完成流批的统一,一套 sql 代码,既可以跑流任务,也可以跑批任务,减少我们开发的成本
Flink sql 使用
数据类型
系统函数 & 自定义函数
/*下面是 1.12 版本的系统内置的函数,具体我们可以到官网查看,根据需求使用即可https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html*/
// TODO 主要介绍自定义函数/*udf 和 udaf 需要定义 eval 方法,实现自己的逻辑,具体系统会调用对应的方法
udf : 传入一个值/多个/或者不传入,返回一个新的值,可以重载该方法,具体会根据传入的参数调用对应 eval 烦恼歌发 类似map
算子,作用于 sqludaf : 自定义聚合函数,根据自己的逻辑定义累加器 udtf : 用作与表中,可返回一个或多个值,*/
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.functions.AggregateFunction;import org.apache.flink.table.functions.ScalarFunction;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.types.Row;
import java.sql.SQLException;
public class UDFDemo {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,EnvironmentSettings.newInstance().build());
}
简单案例
代码
flink sql 中时间机制本质与 dataStream api 相同,只不过使用少于区别,稍加注意即可,注意指定 watermark 需要使用 sql 中 timestamp(3)类型(具体对应 java 类型可根据上面类型自行判断),设置 watermark 后可使用 ROWTIEM 字段(具体看 sql 代码),没有设置可直接使用 PROCTIME 字段
注意 : 不同的时间语义要严格对应环境配置的时间语义,否则可能出现异常
❝
时间字段为两种,属于非用户指定字段,设置完时间语义后,根据需求使用具体的时间字段
❞
ROWTIME : 事件时间
PROCTIME : 处理时间字段
场景 :
join : 场景与双流 join 或者 维表 join,目前 flink 支持的不是很好
topN & 去重 : 语法基本相同,row_num > 1 即 topN , 当=1 则是去重操作
topN 场景一些热搜,排名等内容
去重顾名思义,就是为了去重,去重会涉及到 retract 流(以后会详细讲)内容,会更新之前已经存在的结果
❞
// TODO 下面代码仅供参考,具体测试根据自己时间环境来
// 以下只是一些简单的案例,后面会逐步深入复杂 sql 和原理层面
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
@author 857hub
*/
public class ClickhouseSinkApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().
// useBlinkPlanner().
build()
);
tEnv.getConfig().getConfiguration().setString(PipelineOptions.NAME, "sql test");
// sources
String source = "CREATE TABLE source (\n" +
" id
int,\n" +
" name
varchar.\n" +
" ts
timestamp(3),\n" +
// 指定 watermark 允许延迟 5s
"WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"+
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test1',\n" +
" 'properties.bootstrap.servers' = '172.16.100.109:9092',\n" +
" 'properties.group.id' = 'xzw',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")";
String source2 = "CREATE TABLE source2 (\n" +
" id
int,\n" +
" name
varchar,\n" +
" ts
timestamp(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test2',\n" +
" 'properties.bootstrap.servers' = '172.16.100.109:9092',\n" +
" 'properties.group.id' = 'xzw',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")";
// clickhouse sink 由我自己定义,后面会对 sql 自定义 source 和 sink 进行讲解
String sink = "CREATE TABLE sink (\n" +
" id
INT,\n" +
" name
VARCHAR\n" +
") WITH (\n" +
// 需要自定义接信息参数 -- option
" 'connector' = 'xzw_ck',\n" +
" 'url' = 'jdbc:clickhouse://localhost:8123/default',\n" +
" 'table-name' = 'test',\n" +
" 'username' = 'default',\n" +
" 'password' = '123456'\n" +
" )";
// 执行 source sink sql
tEnv.executeSql(source);
tEnv.executeSql(source2);
tEnv.executeSql(sink);
/*
由于是简单使用,没有在场景应用,简单介绍一下区别,北京大数据培训可以根据们不同的区别在自己项目中使用
left json : 无论是否 join 上都返回左表的数据
inner join : 只有 join 上才会返回匹配后的结果
full outer join : 两边的数据都会返回,无论是否 join 上,没有的则为 null
interval join : 基于时间范围内的 join,在指定的时间范围内返回 join 上的数据
*/
String joinSql = "select * from source1 s1" +
"left join source2 s2" +
// 内连接
// "inner join source2" || "join source2"
// 全连接
// "full outer join source2"
// 时间范围 join
// "s1.ts >= s2.ts AND s1.ts < s2.ts + INTERVAL '10' MINUTE" +
" on s1.id =s2.id "
;
Table joinTable = tEnv.sqlQuery(joinSql);
// 分组排序,取 topN, 如果要是去重 rnum=1 即可实现去重操作
String insertSql = "insert into sink select id,name from(" +
"select *," +
"row_number() over(partition by id order by ts) as rnum " +
"from "+joinTable+" where rnum < 5 " +
")";
// add insert sql
TableResult tableResult = executeSql(tEnv, "insert into sink select * from source", "*",insertSql);
// 随意使用
// Optional<JobClient> jobClient = tableResult.getJobClient();
}
// 添加多个 sql 具体执行
private static TableResult executeSql(StreamTableEnvironment tEnv, final String... sqls) {
StatementSet statementSet = tEnv.createStatementSet();
for (String sql : sqls) {
if ("*".equals(sql) || sql.length()>=27) {
continue;
}
statementSet.addInsertSql(sql);
}
return statementSet.execute();
}
}
maven 依赖
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.12.2</flink.version><scala.version>2.11</scala.version></properties>
文章来源 857Hub
评论