写点什么

Flink 从 0 到 1 学习 —— 如何使用 Side Output 来分流?

用户头像
zhisheng
关注
发布于: 2020 年 05 月 22 日
Flink 从0到1学习 —— 如何使用 Side Output 来分流?

前言


之前在 Flink 从0到1学习—— Flink 不可以连续 Split(分流)? 讲过 Flink 使用连续的 Split 会有问题,当时提供了几种解决方法,有一种方法就是使用 Side Output 来进行,当时留了个余念,那么就在这篇文章详细的讲一波,教大家如何使用 Side Output 来分流。


Side Output


通常我们在处理数据的时候,有时候想对不同情况的数据进行不同的处理,那么就需要把数据流进行分流。比如我们在那篇文章里面的例子:需要将从 Kafka 过来的告警和恢复数据进行分类拆分,然后在对每种数据再分为告警数据和恢复数据。



如果是使用 filter 来进行拆分,也能满足我们的需求,但每次筛选过滤都要保留整个流,然后通过遍历整个流来获取相应的数据,显然很浪费性能。假如能够在一个流里面就进行多次输出就好了,恰好 Flink 的 Side Output 则提供了这样的功能。


如何使用?


要使用 Side Output 的话,你首先需要做的是定义一个 OutputTag 来标识 Side Output,代表这个 Tag 是要收集哪种类型的数据,如果是要收集多种不一样类型的数据,那么你就需要定义多种 OutputTag。例如:如果我要将告警/恢复的数据分为机器、容器、中间件等的数据,那么我们起码就得定义三个 OutputTag,如下:


private static final OutputTag<AlertEvent> middleware = new OutputTag<AlertEvent>("MIDDLEWARE") {};private static final OutputTag<AlertEvent> machine = new OutputTag<AlertEvent>("MACHINE") {};private static final OutputTag<AlertEvent> docker = new OutputTag<AlertEvent>("DOCKER") {};
复制代码


然后呢,你可以使用下面几种函数来处理数据,在处理数据的过程中,进行判断将不同种类型的数据存到不同的 OutputTag 中去。


  • ProcessFunction

  • KeyedProcessFunction

  • CoProcessFunction

  • ProcessWindowFunction

  • ProcessAllWindowFunction


比如:


//dataStream 是总的数据流SingleOutputStreamOperator<AlertEvent, AlertEvent> outputStream = dataStream.process(new ProcessFunction<AlertEvent, AlertEvent>() {    @Override    public void processElement(AlertEvent value, Context ctx, Collector<AlertEvent> out) throws Exception {        if ("MACHINE".equals(value.type)) {            ctx.output(machine, value);        } else if ("DOCKER".equals(value.type)) {            ctx.output(docker, value);        } else if ("MIDDLEWARE".equals(value.type)) {            ctx.output(middleware, value);        } else {            //其他的业务逻辑            out.collect(value);        }    }})
复制代码


好了,既然上面我们已经将不同类型的数据进行放到不同的 OutputTag 里面了,那么我们该如何去获取呢?你可以使用 getSideOutput 方法来获取不同 OutputTag 的数据,比如:


//机器相关的告警&恢复数据outputStream.getSideOutput(machine).print();
//容器相关的告警&恢复数据outputStream.getSideOutput(docker).print();
//中间件相关的告警&恢复数据outputStream.getSideOutput(middleware).print();
复制代码


这样你就可以获取到 Side Output 数据了。


另外你还可以看下我在 Github 放的一个完整 demo 代码: https://github.com/zhisheng17/flink-learning/blob/master/flink-learning-examples/src/main/java/com/zhisheng/examples/streaming/sideoutput/Main.java


总结


本文讲了如何使用 Side Output 来进行分流,比较简单,大家可以稍微阅读一下 demo 代码就可以很清楚了解。


本文地址是:http://www.54tianzhisheng.cn/2019/08/18/flink-side-output/


发布于: 2020 年 05 月 22 日阅读数: 67
用户头像

zhisheng

关注

坑要一个个填,路要一步步走! 2018.05.15 加入

GitChat《Flink 实战与性能优化》专栏作者,公众号(zhisheng) 负责人,http://www.54tianzhisheng.cn/ 博客博主,擅长大数据、Java。

评论

发布
暂无评论
Flink 从0到1学习 —— 如何使用 Side Output 来分流?