写点什么

Flink 的批数据 SQL

发布于: 2021 年 05 月 27 日
Flink的批数据SQL

用法

1)   构建 Table 运行环境

2)   将 DataSet 注册为一张表

3)   使用 Table 运行环境的 sqlQuery 方法来执行 SQL 语句

 

示例

使用 Flink SQL 统计用户消费订单的总金额、最大金额、最小金额、订单总数。

测试数据(订单 ID、用户名、订单日期、订单金额)

      Order(1, "zhangsan", "2018-10-20 15:30", 358.5),      Order(2, "zhangsan", "2018-10-20 16:30", 131.5),      Order(3, "lisi", "2018-10-20 16:30", 127.5),      Order(4, "lisi", "2018-10-20 16:30", 328.5),      Order(5, "lisi", "2018-10-20 16:30", 432.5),      Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),      Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),      Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),      Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
复制代码

步骤

1)   获取一个批处理运行环境

2)   获取一个 Table 运行环境

3)   创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)

4)   基于本地 Order 集合创建一个 DataSet source

5)   使用 Table 运行环境将 DataSet 注册为一张表

6)   使用 SQL 语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)

7)   使用 TableEnv.toDataSet 将 Table 转换为 DataSet

8)   打印测试

 

参考代码

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.table.api.{Table, TableEnvironment}import org.apache.flink.table.api.scala.BatchTableEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.types.Row
/** * 使用Flink SQL统计用户消费订单的总金额、最大金额、最小金额、订单总数。 */object BatchFlinkSqlDemo { //3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额) case class Order(id:Int, userName:String, createTime:String, money:Double)
def main(args: Array[String]): Unit = { /** * 实现思路: * 1. 获取一个批处理运行环境 * 2. 获取一个Table运行环境 * 3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额) * 4. 基于本地 Order 集合创建一个DataSet source * 5. 使用Table运行环境将DataSet注册为一张表 * 6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数) * 7. 使用TableEnv.toDataSet将Table转换为DataSet * 8. 打印测试 */ //1. 获取一个批处理运行环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//2. 获取一个Table运行环境 val tabEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
//4. 基于本地 Order 集合创建一个DataSet source val orderDataSet: DataSet[Order] = env.fromElements( Order(1, "zhangsan", "2018-10-20 15:30", 358.5), Order(2, "zhangsan", "2018-10-20 16:30", 131.5), Order(3, "lisi", "2018-10-20 16:30", 127.5), Order(4, "lisi", "2018-10-20 16:30", 328.5), Order(5, "lisi", "2018-10-20 16:30", 432.5), Order(6, "zhaoliu", "2018-10-20 22:30", 451.0), Order(7, "zhaoliu", "2018-10-20 22:30", 362.0), Order(8, "zhaoliu", "2018-10-20 22:30", 364.0), Order(9, "zhaoliu", "2018-10-20 22:30", 341.0) )
//5. 使用Table运行环境将DataSet注册为一张表 tabEnv.registerDataSet("t_order", orderDataSet)
//6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数) //用户消费订单的总金额、最大金额、最小金额、订单总数。 val sql = """ | select | userName, | sum(money) totalMoney, | max(money) maxMoney, | min(money) minMoney, | count(1) totalCount | from t_order | group by userName |""".stripMargin //在scala中stripMargin默认是“|”作为多行连接符
//7. 使用TableEnv.toDataSet将Table转换为DataSet val table: Table = tabEnv.sqlQuery(sql)
table.printSchema()
tabEnv.toDataSet[Row](table).print() }}
复制代码


发布于: 2021 年 05 月 27 日阅读数: 20
用户头像

专注于大数据技术研究 2020.11.10 加入

运营公众号:五分钟学大数据。大数据领域原创技术号,深入大数据技术

评论

发布
暂无评论
Flink的批数据SQL