写点什么

Flink SQL 如何实现列转行?

用户头像
JasonLee
关注
发布于: 2021 年 03 月 14 日

Flink SQL 如何实现列转行 ?

在 SQL 任务里面经常会遇到一列转多行的需求,下面就来总结一下在 Flink SQL 里面如何实现列转行的,先来看下面的一个具体案例.

需求

原始数据格式如下:

namedataJasonLee[{"content_type":"flink","url":"111"},{"content_type":"spark","url":"222"},{"content_type":"hadoop","url":"333"}]

data 格式化

{    "name": "JasonLee",    "data": [{            "content_type": "flink",            "url": "111"        }, {            "content_type": "spark",            "url": "222"        },        {            "content_type": "hadoop",            "url": "333"        }    ]}

现在希望得到的数据格式是这样的:

namecontent_typeurlJasonLeeflink111JasonLeespark222JasonLeehadoop333

这是一个典型的列转行或者一行转多行的场景,需要将 data 列进行拆分成为多行多列,下面介绍两种实现方式.

  1. 使用 Flink 自带的 unnest 函数解析

  2. ##### 使用自定义 UDTF 函数解析

建表 DDL

CREATE TABLE kafka_table (name string,`data` ARRAY<ROW<content_type STRING,url STRING>>)WITH (    'connector' = 'kafka', -- 使用 kafka connector    'topic' = 'test',    'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',  -- broker连接信息    'properties.group.id' = 'jason_flink_test', -- 消费kafka的group_id    'scan.startup.mode' = 'latest-offset',  -- 读取数据的位置    'format' = 'json',  -- 数据源格式为 json    'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败    'json.ignore-parse-errors' = 'true'  -- 解析失败跳过)

这里在定义 data 字段类型的时候直接定义为 ARRAY 类型,因为 unnest 函数需要一个数组类型的参数.

unnest 解析

select name,content_type,urlfrom kafka_table CROSS JOIN UNNEST(`data`) AS t (content_type,url)select name,content_type,urlfrom kafka_table, UNNEST(`data`) AS t (content_type,url)select name,content_type,urlfrom kafka_table left join UNNEST(`data`) AS t (content_type,url) on true

自定义 UDTF 解析

自定义表值函数(UDTF),自定义表值函数,将 0 个、1 个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是 1 个值。返回的行可以由 1 个或多个列组成。调用一次函数输出多行或多列数据。必须继承 TableFunction 基类,并实现一个或者多个名为 eval 的方法, 在使用 UDTF 时,需要带上 LATERAL TABLE 两个关键字.

@FunctionHint(output = @DataTypeHint("ROW<content_type STRING,url STRING>"))public class ParserJsonArrayTest extends TableFunction<Row> {    private static final Logger log = Logger.getLogger(ParserJsonArrayTest.class);    public void eval(String value) {        try {            JSONArray snapshots = JSONArray.parseArray(value);            Iterator<Object> iterator = snapshots.iterator();            while (iterator.hasNext()) {                JSONObject jsonObject = (JSONObject) iterator.next();                String content_type = jsonObject.getString("content_type");                String url = jsonObject.getString("url");                collect(Row.of(content_type,url));            }        } catch (Exception e) {            log.error("parser json failed :" + e.getMessage());        }    }}

自定义 UDTF 解析的时候,就不需要把 data 字段定义成 ARRAY 类型了,直接定义成 STRING 类型就可以了,并且这种方式会更加的灵活,比如还需要过滤数据或者更复杂的一些操作时都可以在 UDTF 里面完成.

Flink SQL 使用 UDTF

select name,content_type,urlfrom kafka_table CROSS JOIN lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url)select name,content_type,urlfrom kafka_table, lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url)select name,content_type,urlfrom kafka_table left join lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url) on true

注意:

unnest 和 自定义 UDTF 函数在使用的时候都有 3 种写法,前面两种写法的效果其实是一样的,第三种写法相当于 left join 的用法.区别在于 CROSS JOIN/INNER JOIN: 对于左侧表的每一行,右侧 UDTF 不输出,则这一行不输出.LEFT JOIN: 对于左侧表的每一行,右侧 UDTF 不输出,则这一行会输出,右侧 UDTF 字段为 null

打印的结果

2> JasonLee,flink,1112> JasonLee,spark,2222> JasonLee,hadoop,333

总结

在实际使用的时候如果 unnest 可以满足需求就直接用 unnest 不需要带来额外的开发,如果 unnest 函数满足不了需求,那么就自定义 UDTF 去完成.


发布于: 2021 年 03 月 14 日阅读数: 13
用户头像

JasonLee

关注

还未添加个人签名 2019.09.20 加入

还未添加个人简介

评论

发布
暂无评论
Flink SQL 如何实现列转行?