写点什么

Flink 的流数据 SQL

发布于: 2021 年 05 月 26 日
Flink的流数据SQL

流处理中也可以支持 SQL。但是需要注意以下几点:

 

1)   要使用流处理的 SQL,必须要添加水印时间

2)   使用 registerDataStream 注册表的时候,使用 ' 来指定字段

3)   注册表的时候,必须要指定一个 rowtime,否则无法在 SQL 中使用窗口

4)   必须要导入 import org.apache.flink.table.api.scala._ 隐式参数

5)   SQL 中使用 trumble(时间列名, interval '时间' sencond) 来进行定义窗口

 

示例

使用 Flink SQL 来统计 5 秒内 用户的 订单总数、订单的最大金额、订单的最小金额。

 

步骤

 

1)   获取流处理运行环境

2)   获取 Table 运行环境

3)   设置处理时间为 EventTime

4)   创建一个订单样例类 Order ,包含四个字段(订单 ID、用户 ID、订单金额、时间戳)

5)   创建一个自定义数据源

a.   使用 for 循环生成 1000 个订单

b.   随机生成订单 ID(UUID)

c.   随机生成用户 ID(0-2)

d.   随机生成订单金额(0-100)

e.   时间戳为当前系统时间

f.   每隔 1 秒生成一个订单

6)   添加水印,允许延迟 2 秒

7)   导入 import org.apache.flink.table.api.scala._ 隐式参数

8)   使用 registerDataStream 注册表,并分别指定字段,还要指定 rowtime 字段

9)   编写 SQL 语句统计用户订单总数、最大金额、最小金额

分组时要使用 tumble(时间列, interval '窗口时间' second) 来创建窗口

10)  使用 tableEnv.sqlQuery 执行 sql 语句

11)  将 SQL 的执行结果转换成 DataStream 再打印出来

12)  启动流处理程序


参考代码

import java.util.UUIDimport java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.table.api.{Table, TableEnvironment}import org.apache.flink.api.scala._import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.watermark.Watermarkimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.types.Row
import scala.util.Random/** * 需求: * 使用Flink SQL来统计5秒内 用户的 订单总数、订单的最大金额、订单的最小金额 * * timestamp是关键字不能作为字段的名字(关键字不能作为字段名字) */object StreamFlinkSqlDemo { /** * 1. 获取流处理运行环境 * 2. 获取Table运行环境 * 3. 设置处理时间为 EventTime * 4. 创建一个订单样例类 Order ,包含四个字段(订单ID、用户ID、订单金额、时间戳) * 5. 创建一个自定义数据源 * 使用for循环生成1000个订单 * 随机生成订单ID(UUID) * 随机生成用户ID(0-2) * 随机生成订单金额(0-100) * 时间戳为当前系统时间 * 每隔1秒生成一个订单 * 6. 添加水印,允许延迟2秒 * 7. 导入 import org.apache.flink.table.api.scala._ 隐式参数 * 8. 使用 registerDataStream 注册表,并分别指定字段,还要指定rowtime字段 * 9. 编写SQL语句统计用户订单总数、最大金额、最小金额 * 分组时要使用 tumble(时间列, interval '窗口时间' second) 来创建窗口 * 10. 使用 tableEnv.sqlQuery 执行sql语句 * 11. 将SQL的执行结果转换成DataStream再打印出来 * 12. 启动流处理程序 */ // 3. 创建一个订单样例类`Order`,包含四个字段(订单ID、用户ID、订单金额、时间戳) case class Order(orderId:String, userId:Int, money:Long, createTime:Long)
def main(args: Array[String]): Unit = {
// 1. 创建流处理运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 设置处理时间为`EventTime` env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//获取table的运行环境 val tableEnv = TableEnvironment.getTableEnvironment(env)
// 4. 创建一个自定义数据源 val orderDataStream = env.addSource(new RichSourceFunction[Order] { var isRunning = true override def run(ctx: SourceFunction.SourceContext[Order]): Unit = { // - 随机生成订单ID(UUID) // - 随机生成用户ID(0-2) // - 随机生成订单金额(0-100) // - 时间戳为当前系统时间 // - 每隔1秒生成一个订单 for (i <- 0 until 1000 if isRunning) { val order = Order(UUID.randomUUID().toString, Random.nextInt(3), Random.nextInt(101), System.currentTimeMillis()) TimeUnit.SECONDS.sleep(1) ctx.collect(order) } } override def cancel(): Unit = { isRunning = false } })
// 5. 添加水印,允许延迟2秒 val watermarkDataStream = orderDataStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(2)) { override def extractTimestamp(element: Order): Long = { val eventTime = element.createTime eventTime } } )
// 6. 导入`import org.apache.flink.table.api.scala._`隐式参数 // 7. 使用`registerDataStream`注册表,并分别指定字段,还要指定rowtime字段 import org.apache.flink.table.api.scala._ tableEnv.registerDataStream("t_order", watermarkDataStream, 'orderId, 'userId, 'money,'createTime.rowtime)
// 8. 编写SQL语句统计用户订单总数、最大金额、最小金额 // - 分组时要使用`tumble(时间列, interval '窗口时间' second)`来创建窗口 val sql = """ |select | userId, | count(1) as totalCount, | max(money) as maxMoney, | min(money) as minMoney | from | t_order | group by | tumble(createTime, interval '5' second), | userId """.stripMargin // 9. 使用`tableEnv.sqlQuery`执行sql语句 val table: Table = tableEnv.sqlQuery(sql)
// 10. 将SQL的执行结果转换成DataStream再打印出来 table.toRetractStream[Row].print() env.execute("StreamSQLApp") }}
复制代码


发布于: 2021 年 05 月 26 日阅读数: 12
用户头像

专注于大数据技术研究 2020.11.10 加入

运营公众号:五分钟学大数据。大数据领域原创技术号,深入大数据技术

评论

发布
暂无评论
Flink的流数据SQL