写点什么

FlinkSQL 平台

用户头像
shengjk1
关注
发布于: 2021 年 03 月 23 日

1.背景


由于公司内部需求较多,并不想每次都写一个 streaming 程序,故而开始搭建 flinksql 平台,基于 jdk1.8,flink1.12.x


2.效果


传一个 sql 文件给 jar 包,然后 sql 文件内的 sql 将自动执行


3. jar 包 vs web 界面


调研了基于 web 的 zeppline

  1. zeppline 设计的初衷其实是为了交互式分析

  2. 基于 zeppline rest api 与现有的监控不兼容,需要修改现有监控的代码

  3. 虽然带有 web 界面的对用户很是友好,对于分析人员来说,是一个不错的选择,但对于开发人员来说,真正的线上长时间的运行程序,开发成 HA 的 server 还是有必要的


基于以上 3 点最终选择 jar 作为最终的方式


4. 使用


  1. 将 sql 写入 xxx.sql 文件中,如

```sql

CREATE TEMPORARY FUNCTION MillisecondsToDateStr AS 'io.github.shengjk.udf.MillisecondsToDateStr' LANGUAGE JAVA;
-- ExecutionCheckpointingOptions
set execution.checkpointing.mode=EXACTLY_ONCE;
set execution.checkpointing.timeout=30 min;-- 30min
set execution.checkpointing.interval=1 min ; -- 1min
set execution.checkpointing.externalized-checkpoint-retention=RETAINONCANCELLATION;
-- ExecutionConfigOptions
set table.exec.state.ttl=1 day; -- 1 day
set table.exec.mini-batch.enabled=true; -- enable mini-batch optimization
set table.exec.mini-batch.allow-latency=1 s; -- 1s
set table.exec.mini-batch.size=1000;
set table.exec.sink.not-null-enforcer=drop;
-- -- dadadadadada
CREATE TABLE orders
(
status int,
courier_id bigint,
id bigint,
finish_time BIGINT
)
WITH (
'connector' = 'kafka','topic' = 'canalmonitororder',
'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup',
'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'earliest-offset');
-- flink.partition-discovery.interval-millis;
CREATE TABLE infos
(
info_index int,
order_id bigint
)
WITH (
'connector' = 'kafka','topic' = 'canalmonitororder',
'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup',
'format' = 'ss-canal-json','ss-canal-json.table.include' = 'infos','scan.startup.mode' = 'earliest-offset');
CREATE TABLE redisCache
(
finishOrders BIGINT,
courier_id BIGINT,
dayStr String
)
WITH (
'connector' = 'redis',
'hostPort'='localhost:6400',
'keyType'='hash',
'keyTemplate'='test2${courierid}',
'fieldTemplate'='${dayStr}',
'valueNames'='finishOrders',
'expireTime'='259200');
create view temp as
select o.courier_id,
(CASE
WHEN sum(infosMaxIndex.info_index) is null then 0
else sum(infosMaxIndex.info_index) end) finishOrders,
o.status,
dayStr
from ((select courier_id,
id,
last_value(status) status,
MillisecondsToDateStr(finish_time, 'yyyyMMdd') dayStr
from orders
where status = 60
group by courierid, id, MillisecondsToDateStr(finishtime, 'yyyyMMdd'))) o
left join (select max(infoindex) infoindex, order_id
from infos
group by orderid) infosMaxIndex on o.id = infosMaxIndex.orderid
group by o.courier_id, o.status, dayStr;
INSERT INTO redisCache SELECT finishOrders,courier_id,dayStr FROM temp;
复制代码


  1. 将 flinksql-platform 打包并上传至服务器

  2. 将必要的 connector jar 放入到相应的目录下

  3. 执行,如

flink-1.12.0/bin/flink  run -p 3 -yt ./flinkjar/  -C file:///home/shengjk/flinkjar/test-udf.jar -C file:///home/shengjk/flinkjar/jedis-2.10.2.jar  -m yarn-cluster -ynm sqlDemo  -c io.github.shengjk.Main ./flinksql-platform-1.0-SNAPSHOT.jar --sqlPath ./xxx.sql
复制代码

其中

-C 添加 udfJar 等第三方 jar 包 -C 参数 apply 到了 client 端生成的 JobGraph 里,然后提交 JobGraph 来运行的

-yt 目录 将 udfJar 等第三方 jar 包提交到 TaskManager 上


5. 总括


更详细的内容,请移步 flinksql-platform


发布于: 2021 年 03 月 23 日阅读数: 16
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
FlinkSQL 平台