写点什么

大数据培训 Flink 之 Table API 与 SQL

作者:@零度
  • 2022 年 6 月 20 日
  • 本文字数:2834 字

    阅读完需:约 9 分钟

​Table API 是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 ApacheFlink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义的,具有 IDE 支持如:自动完成和语法检测。


10.1 需要引入的 pom 依赖


<dependency>


<groupId>org.apache.flink</groupId>


<artifactId>flink-table-planner_2.12</artifactId>


<version>1.10.1</version>


</dependency>


<dependency>


<groupId>org.apache.flink</groupId>


<artifactId>flink-table-api-scala-bridge_2.12</artifactId>


<version>1.10.1</version>


</dependency>


10.2 简单了解 TableAPI


def main(args: Array[String]): Unit = {


val env = StreamExecutionEnvironment.getExecutionEnvironment


env.setParallelism(1)


val inputStream = env.readTextFile("..\sensor.txt")


val dataStream = inputStream


.map( data => {


val dataArray = data.split(",")


SensorReading(dataArray(0).trim, dataArray(1).trim.toLong,


dataArray(2).trim.toDouble)


}


)


// 基于 env 创建 tableEn


val settings: EnvironmentSettings =


EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()


val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,


settings)


// 从一条流创建一张表


val dataTable: Table = tableEnv.fromDataStream(dataStream)


// 从表里选取特定的数据


val selectedTable: Table = dataTable.select('id, 'temperature)


.filter("id = 'sensor_1'")


val selectedStream: DataStream[(String, Double)] = selectedTable


.toAppendStream[(String, Double)]


selectedStream.print()


env.execute("table test")


}


10.2.1 动态表


如果流中的数据类型是 case class 可以直接根据 case class 的结构生成 table_大数据培训


tableEnv.fromDataStream(dataStream)


或者根据字段顺序单独命名


tableEnv.fromDataStream(dataStream,’id,’timestamp .......)


最后的动态表可以转换为流进行输出


table.toAppendStream[(String,String)]


10.2.2 字段


用一个单引放到字段前面来标识字段名, 如 ‘name, ‘id ,’amount 等


10.3 TableAPI 的窗口聚合操作


10.3.1 通过一个例子了解 TableAPI


// 统计每 10 秒中每个传感器温度值的个数


def main(args: Array[String]): Unit = {


val env = StreamExecutionEnvironment.getExecutionEnvironment


env.setParallelism(1)


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


val inputStream = env.readTextFile("..\sensor.txt")


val dataStream = inputStream


.map( data => {


val dataArray = data.split(",")


SensorReading(dataArray(0).trim, dataArray(1).trim.toLong,


dataArray(2).trim.toDouble)


}


)


.assignTimestampsAndWatermarks(new


BoundedOutOfOrdernessTimestampExtractorSensorReading {


override def extractTimestamp(element: SensorReading): Long =


element.timestamp * 1000L


})


// 基于 env 创建 tableEnv


val settings: EnvironmentSettings =


EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()


val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,


settings)


// 从一条流创建一张表,按照字段去定义,并指定事件时间的时间字段


val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id,


'temperature, 'ts.rowtime)


// 按照时间开窗聚合统计


val resultTable: Table = dataTable


.window( Tumble over 10.seconds on 'ts as 'tw )


.groupBy('id, 'tw)


.select('id, 'id.count)


val selectedStream: DataStream[(Boolean, (String, Long))] = resultTable


.toRetractStream[(String, Long)]


selectedStream.print()


env.execute("table window test")


}


10.3.2 关于 group by


  1. 如果了使用 groupby,table 转换为流的时候只能用 toRetractDstream


val dataStream: DataStream[(Boolean, (String, Long))] = table


.toRetractStream[(String,Long)]


  1. toRetractDstream 得到的第一个 boolean 型字段标识 true 就是最新的数据


(Insert),false 表示过期老数据(Delete)


val dataStream: DataStream[(Boolean, (String, Long))] = table


.toRetractStream[(String,Long)]


dataStream.filter(_._1).print()


  1. 如果使用的 api 包括时间窗口,那么窗口的字段必须出现在 groupBy 中。


val resultTable: Table = dataTable


.window( Tumble over 10.seconds on 'ts as 'tw )


.groupBy('id, 'tw)


.select('id, 'id.count)


10.3.3 关于时间窗口


  1. 用到时间窗口,必须提前声明时间字段,如果是 processTime 直接在创建动态表时进行追加就可以_大数据视频


val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id,


'temperature, 'ps.proctime)


  1. 如果是 EventTime 要在创建动态表时声明


val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id,


'temperature, 'ts.rowtime)


  1. 滚动窗口可以使用 Tumble over 10000.millis on 来表示


val resultTable: Table = dataTable


.window( Tumble over 10.seconds on 'ts as 'tw )


.groupBy('id, 'tw)


.select('id, 'id.count)


10.4 SQL 如何编写


// 统计每 10 秒中每个传感器温度值的个数


def main(args: Array[String]): Unit = {


val env = StreamExecutionEnvironment.getExecutionEnvironment


env.setParallelism(1)


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


val inputStream = env.readTextFile("..\sensor.txt")


val dataStream = inputStream


.map( data => {


val dataArray = data.split(",")


SensorReading(dataArray(0).trim, dataArray(1).trim.toLong,


dataArray(2).trim.toDouble)


}


)


.assignTimestampsAndWatermarks(new


BoundedOutOfOrdernessTimestampExtractorSensorReading {


override def extractTimestamp(element: SensorReading): Long =


element.timestamp * 1000L


})


// 基于 env 创建 tableEnv


val settings: EnvironmentSettings =


EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(


)


val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,


settings)


// 从一条流创建一张表,按照字段去定义,并指定事件时间的时间字段


val dataTable: Table = tableEnv.fromDataStream(dataStream, 'id,


'temperature, 'ts.rowtime)


// 直接写 sql 完成开窗统计


val resultSqlTable: Table = tableEnv.sqlQuery("select id, count(id) from "


  • dataTable + " group by id, tumble(ts, interval '15' second)")


val selectedStream: DataStream[(Boolean, (String, Long))] =


resultSqlTable.toRetractStream[(String, Long)]


selectedStream.print()


env.execute("table window test")


}


用户头像

@零度

关注

关注尚硅谷,轻松学IT 2021.11.23 加入

IT培训 www.atguigu.com

评论

发布
暂无评论
大数据培训Flink之Table API 与 SQL_flink_@零度_InfoQ写作社区