写点什么

实践解析可视化开发平台 FlinkSever 优势

  • 2021 年 12 月 30 日
  • 本文字数:3865 字

    阅读完需:约 13 分钟

摘要:华为 Flink 可视化开发平台 FlinkServer 作为自研服务,能够提供比原生 flinksql 接口更强的企业级特性,比如任务的集中管理,可视化开发,多数据源配置等。

 

本文分享自华为云社区《华为FusionInsight MRS实战 -Flink增强特性之可视化开发平台FlinkSever开发学习》,作者:晋红轻。

背景说明


随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。


SQL 是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink 作为流批一体的计算引擎自 1.7.2 版本开始引入 Flink SQL 的特性,并不断发展。之前,用户可能需要编写上百行业务代码,使用 SQL 后,可能只需要几行 SQL 就可以轻松搞定。


但是真正的要将 Flink SQL 开发工作投入到实际的生产场景中,如果使用原生的 API 接口进行作业的开发还是存在门槛较高,易用性低,SQL 代码可维护性差的问题。新需求由业务人员提交给 IT 人员,IT 人员排期开发。从需求到上线,周期长,导致错失新业务最佳市场时间窗口。同时,IT 人员工作繁重,大量相似 Flink 作业,成就感低。

华为 Flink 可视化开发平台 FlinkServer 优势:


  • 提供基于 Web 的可视化开发平台,只需要写 SQL 即可开发作业,极大降低作业开发门槛。

  • 通过作业平台能力开放,支持业务人员自行编写 SQL 开发作业,快速应对需求,并将 IT 人员从繁琐的 Flink 作业开发工作中解放出来;

  • 同时支持流作业和批作业;

  • 支持常见的 Connector,包括 Kafka、Redis、HDFS 等


下面将以 kafka 为例分别使用原生 API 接口以及 FlinkServer 进行作业开发,对比突出 FlinkServer 的优势

场景说明


参考已发论坛帖 《华为FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践》

需要使用 FlinkSQL 从一个源 kafka topic 接收 cdl 复杂嵌套 json 数据并进行解析,将解析后的数据发送到另一个 kafka topic 里

使用原生 API 接口方案开发 flink sql 操作步骤

前提条件


  • 完成 MRS Flink 客户端的安装以及配置

  • 完成 Flink SQL 原生接口相关配置

操作步骤


  • 使用如下命令首先启动 Flink 集群


source /opt/hadoopclient/bigdata_envkinit developusercd /opt/hadoopclient/Flink/flink./bin/yarn-session.sh -t ssl/
复制代码



  • 使用如下命令启动 Flink SQL Client


cd /opt/hadoopclient/Flink/flink/bin./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml
复制代码



  • 使用如下 flink sql 创建源端 kafka 表,并提取需要的信息:


CREATE TABLE huditableout_source(  `schema` ROW < `fields` ARRAY< ROW<type STRING, optional BOOLEAN, field STRING>> >,  payload ROW < `TIMESTAMP` BIGINT, `data` ROW <  uid INT,  uname VARCHAR(32),  age INT,  sex VARCHAR(30),  mostlike VARCHAR(30),  lastview VARCHAR(30),  totalcost INT> >,  type1 as `schema`.`fields`[1].type,  optional1 as `schema`.`fields`[1].optional,  field1 as `schema`.`fields`[1].field,  type2 as `schema`.`fields`[2].type,  optional2 as `schema`.`fields`[2].optional,  field2 as `schema`.`fields`[2].field,  ts as payload.`TIMESTAMP`,  uid as payload.`data`.uid,  uname as payload.`data`.uname,  age as payload.`data`.age,  sex as payload.`data`.sex,  mostlike as payload.`data`.mostlike,  lastview as payload.`data`.lastview,  totalcost as payload.`data`.totalcost,  localts as LOCALTIMESTAMP) WITH(  'connector' = 'kafka',  'topic' = 'huditableout',  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',  'properties.group.id' = 'example',  'scan.startup.mode' = 'latest-offset',  'format' = 'json',
'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true',
'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com');
复制代码



  • 使用如下 flink sql 创建目标端 kafka 表:


CREATE TABLE huditableout(  type1 VARCHAR(32),  optional1 BOOLEAN,  field1 VARCHAR(32),  type2 VARCHAR(32),  optional2 BOOLEAN,  field2 VARCHAR(32),  ts BIGINT,  uid INT,  uname VARCHAR(32),  age INT,  sex VARCHAR(30),  mostlike VARCHAR(30),  lastview VARCHAR(30),  totalcost INT,  localts TIMESTAMP) WITH(  'connector' = 'kafka',  'topic' = 'huditableout2',  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',  'properties.group.id' = 'example',  'scan.startup.mode' = 'latest-offset',  'format' = 'json',
'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true',
'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com');
复制代码



  • 使用如下 flink sql 将源端 kafka 流表写入到目标端 kafka 流表中


insert into  huditableout  select  type1,  optional1,  field1,  type2,  optional2,  field2,  ts,  uid,  uname,  age,  sex,  mostlike,  lastview,  totalcost,  localtsfrom  huditableout_source;
复制代码


  • 检查测试结果

消费生产源 kafka topic 的数据(由 cdl 生成)



​消费目标端 kafka topic 解析后的数据(flink sql 任务生成的结果)



​可以登录 flink 原生界面查看任务



  • 使用 flink sql client 方式查看结果

首先使用命令 set execution.result-mode=tableau; 可以让查询结果直接输出到终端



​使用 flink sql 查询上面已创建好的流表

select * from huditableout



​注意:因为是 kafka 流表,所以查询结果只会显示 select 任务启动之后写进该 topic 的数据

使用 FlinkServer 可视化开发平台方案开发 flink sql 操作步骤

前提条件


  • 参考产品文档 《基于用户和角色的鉴权》章节创建一个具有“FlinkServer 管理操作权限”的用户,使用该用户访问 Flink Server

操作步骤


  • 登录 FlinkServer 选择作业管理



  • 创建任务 cdl_kafka_json_test3 并输入 flink sql

说明: 可以看到开发 flink sql 任务时在 FlinkServer 界面可以自行设置 flink 集群规模



CREATE TABLE huditableout_source(  `schema` ROW < `fields` ARRAY< ROW<type STRING, optional BOOLEAN, field STRING>> >,  payload ROW < `TIMESTAMP` BIGINT, `data` ROW <  uid INT,  uname VARCHAR(32),  age INT,  sex VARCHAR(30),  mostlike VARCHAR(30),  lastview VARCHAR(30),  totalcost INT> >,  type1 as `schema`.`fields`[1].type,  optional1 as `schema`.`fields`[1].optional,  field1 as `schema`.`fields`[1].field,  type2 as `schema`.`fields`[2].type,  optional2 as `schema`.`fields`[2].optional,  field2 as `schema`.`fields`[2].field,  ts as payload.`TIMESTAMP`,  uid as payload.`data`.uid,  uname as payload.`data`.uname,  age as payload.`data`.age,  sex as payload.`data`.sex,  mostlike as payload.`data`.mostlike,  lastview as payload.`data`.lastview,  totalcost as payload.`data`.totalcost,  localts as LOCALTIMESTAMP) WITH(  'connector' = 'kafka',  'topic' = 'huditableout',  'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007',  'properties.group.id' = 'example',  'scan.startup.mode' = 'latest-offset',  'format' = 'json',
'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true',
'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com');CREATE TABLE huditableout( type1 VARCHAR(32), optional1 BOOLEAN, field1 VARCHAR(32), type2 VARCHAR(32), optional2 BOOLEAN, field2 VARCHAR(32), ts BIGINT, uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT, localts TIMESTAMP) WITH( 'connector' = 'kafka', 'topic' = 'huditableout2', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json',
'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true',
'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com');insert into huditableout select type1, optional1, field1, type2, optional2, field2, ts, uid, uname, age, sex, mostlike, lastview, totalcost, localtsfrom huditableout_source;
复制代码


  • 点击语义校验,确保语义校验通过



  • 点击提交并启动任务




  • 检查测试结果

消费生产源 kafka topic 的数据(由 cdl 生成)



​消费目标端 kafka topic 解析后的数据(flink sql 任务生成的结果)



点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
实践解析可视化开发平台FlinkSever优势