写点什么

大数据培训:构建 Flink SQL 流式计算平台

  • 2022 年 2 月 21 日
  • 本文字数:1209 字

    阅读完需:约 4 分钟

​一、背景


Flink 由于阿里在国内的助推,火爆程度可以想象,大数据培训且目前 Flink 有非常明显的趋势是往 SQL 方向进行的。很多大厂已经实现了 Flink SQL 化,那我们怎么去实现一个流式计算平台呢?


二、Flink SQL 初探以及代码实现


连接 kafka 对数据进行处理写入 mysql


​package org.example;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class SqlDemo {public static void main(String[] args) throws Exception {//创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();


TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);


//把 kafka 中的 topic 映射成一个输入临时表 tableEnv.executeSql("create table sensor_source (id string,name string) with (" +" 'connector' = 'kafka'," +" 'topic' = 'test_info_test'," +" 'properties.bootstrap.servers' = 'localhost:9092'," +" 'properties.group.id' = 'testGroup'," +" 'scan.startup.mode' = 'earliest-offset'," +" 'format' = 'json')");//把 mysql 中的表映射成一个输出临时表


String sql = "CREATE TABLE print_table (\n" +" id STRING,\n" +" name STRING\n" +") WITH (\n" +" 'connector' = 'print'\n" +")";


String mysql_sql = "CREATE TABLE mysql_sink (\n" +" id string,\n" +" name string\n" +" ) WITH (\n" +" 'connector' = 'jdbc',\n" +" 'url' = 'jdbc:mysql://ip:8081/kafka?serverTimezone=UTC',\n" +" 'table-name' = 'test_info',\n" +" 'username' = 'kafka',\n" +" 'password' = 'Bonc@123'\n" +" )";


String kafka_sink_sql="create table kafka_sink (id string,name string) with (" +" 'connector' = 'kafka'," +" 'topic' = 'test_info_2'," +" 'properties.bootstrap.servers' = 'localhost:9092'," +" 'format' = 'json')";


    tableEnv.executeSql(mysql_sql);
复制代码


//tableEnv.executeSql(kafka_sink_sql);//tableEnv.executeSql(sql);//插入数据的 sql 语句//tableEnv.executeSql("insert into print_table select * from sensor_source");


    tableEnv.executeSql("insert into mysql_sink select * from sensor_source");
复制代码


//tableEnv.executeSql("insert into kafka_sink select * from sensor_source");


}
复制代码


}


运行之后 mysql 里面数据就有了


三、Flink 实时计算平台


依据上面的代码,我们可以抽象出一层 Flink 实时计算平台。



文章来源于诸葛子房


用户头像

关注尚硅谷,轻松学IT 2021.11.23 加入

还未添加个人简介

评论

发布
暂无评论
大数据培训:构建Flink SQL流式计算平台