写点什么

大数据 -115 - Flink DataStream Transformation Map、FlatMap、Filter 到 Window 的全面讲解

作者:武子康
  • 2025-10-04
    山东
  • 本文字数:4182 字

    阅读完需:约 14 分钟

大数据-115 - Flink DataStream Transformation Map、FlatMap、Filter 到 Window 的全面讲解

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 29 日更新到:Java-136 深入浅出 MySQL Spring Boot @Transactional 使用指南:事务传播、隔离级别与异常回滚策略 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • Flink DataStream API

  • Rich 并行源 并行源

Flink 针对 DataStream 提供了大量的已经实现的算子

Map

DataStream -> DataStream,获取一个元素并产生一个元素。以下是将输入流中的值*2 的映射函数:


DataStream<Integer> dataStream = //...dataStream.map(new MapFunction<Integer, Integer>() {    @Override    public Integer map(Integer value) throws Exception {        return 2 * value;    }});
复制代码

FlatMap

DataStream -> DataStream 获取一个元素,产生 0 个、1 个、多个元素。以下是一个 FlapMap 函数切分句子:


dataStream.flatMap(new FlatMapFunction<String, String>() {    @Override    public void flatMap(String value, Collector<String> out) throws Exception {        for(String word: value.split(" ")){            out.collect(word);        }    }});
复制代码

Filter

DataStream -> DataStream 返回一个布尔值,如果满足条件就筛选出来。以下是去掉 0 的值:


dataStream.filter(new FilterFunction<Integer>() {    @Override    public boolean filter(Integer value) throws Exception {        return value != 0;    }});
复制代码

KeyBy

DataStream -> KeyedStream 将流逻辑上划分为不相交的分区。所有具有相同键的记录被分配到相同的分区。在函数的内部,keyBy() 使用哈希分区实现。有不同的方法来指定键。 此转换返回一个 KeyedStream。


dataStream.keyBy(value -> value.getSomeKey())dataStream.keyBy(value -> value.f0)
复制代码

Reduce

KeyedStream -> DataStream"滚动"归约是一种在数据流处理中常用的操作。简单来说,它是一个累积的过程,逐个处理流中的元素并将结果逐步更新。


  • 键控数据流:这是指数据流中的元素根据某个键(如 ID 或类别)进行分组。

  • 当前元素:这是指数据流中正在处理的当前记录。

  • 上一个归约值:这是指之前已经处理过并累积的结果。


keyedStream.reduce(new ReduceFunction<Integer>() {    @Override    public Integer reduce(Integer value1, Integer value2) throws Exception {        return value1 + value2;    }});
复制代码

举一个例子:

部分和是一种典型的归约操作,它逐步计算出流中各元素的累积和。例如,对于一个输入数据流 [1, 2, 3, 4, 5],部分和的输出将是 [1, 3, 6, 10, 15]。也就是说:


  • 第一个元素 1 的部分和是 1。

  • 第二个元素 2 的部分和是 1+2=3。

  • 第三个元素 3 的部分和是 1+2+3=6。

  • 以此类推。通过这样的处理方式,你可以在数据流中实时地看到每一步累积的结果。

Fold

KeyedStream -> DataStream 对于带有初始值的键控数据流进行滚动折叠,将当前元素与上一个折叠值相互结合,并发出新值。一个折叠函数,应用序列是:(1,2,3,4,5),会发出:start-1、start-1-2、start-1-2-3...


DataStream<String> result =    keyedStream.fold("start", new FoldFunction<Integer, String>() {        @Override        public String fold(String current, Integer value) {            return current + "-" + value;        }});
复制代码

Aggregations

KeyedStream -> DataStream 对 KeyedStream 进行滚动聚合,min 和 minBy 的区别在于,min 返回最小值,而 minBy 返回该字段中具有最小元素值(max 和 maxBy 同理)。


keyedStream.sum(0);keyedStream.sum("key");keyedStream.min(0);keyedStream.min("key");keyedStream.max(0);keyedStream.max("key");keyedStream.minBy(0);keyedStream.minBy("key");keyedStream.maxBy(0);keyedStream.maxBy("key");
复制代码

Window

KeyedStream -> WindowedStream 可以在已经分区的 KeyedStream 上定义窗口,窗口根据某些特性(例如:最近 5 秒内到达的数据),对每个键中的数据进行分组。


 // Last 5 seconds of datadataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5)));
复制代码

WindowAll

DataStream -> AllWindowStream 可以在常规的 DataStream 上定义窗口,窗口根据某些特性(例如:最近 5 秒内到达的数据)对所有流事件进行分组。


// Last 5 seconds of datadataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); 
复制代码

Window Apply

WindowedStream -> DataStreamAllWindowedStream -> DataStream 对整个窗口应用一个通用函数:



windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple,Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); }});
// applying an AllWindowFunction on non-keyed window streamallWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer,Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); }});
复制代码

Window Reduce

WindowedStream -> DataStream 对于一个功能性 Reduce 函数应用于窗口,并返回简化的值。


windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,    Tuple2<String, Integer> value2) throws Exception {        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);    }});
复制代码

Window Fold

WindowedStream -> DataStream 应用一个函数式的折叠函数到窗口,并返回折叠后的值。示例函数在序列(1,2,3,4,5)上应用时,将折叠成字符:start-1-2-3-4-5


windowedStream.fold("start", new FoldFunction<Integer, String>() {    public String fold(String current, Integer value) {        return current + "-" + value;    }});
复制代码

Aggregations on windows

WindowedStream -> DataStream 聚合窗口中的内容,min 和 minBy 的区别于,min 返回最小值,而 minBy 返回在此字段中具有最小值的元素(max 和 maxBy 同理)


windowedStream.sum(0);windowedStream.sum("key");windowedStream.min(0);windowedStream.min("key");windowedStream.max(0);windowedStream.max("key");windowedStream.minBy(0);windowedStream.minBy("key");windowedStream.maxBy(0);windowedStream.maxBy("key");
复制代码

Union

DataStream -> DataStream 将两个或者多个数据流合并,创建一个包含所有数据流中所有元素的新数据流。注意:如果一个数据流于自身合并,结果数据流中每个元素将出现两次。


dataStream.union(otherStream1, otherStream2, ...);
复制代码

Window Join

DataStream, DataStream -> DataStream 将两个数据流按给定的键和一个公共窗口进行连接。


dataStream.join(otherStream).where(<key selector>).equalTo(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new JoinFunction () {...});
复制代码

Interval Join

KeyedStream, KeyedStream -> DataStream 将两个具有相同键的键控流中的元素 e1 和 e2 在给定时间间隔内连接起来,使得 e2 的时间戳满足 e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound


// this will join the two streams so that// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2keyedStream.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upperbound.upperBoundExclusive(true) // optional.lowerBoundExclusive(true) // optional.process(new IntervalJoinFunction() {...});
复制代码

Window CoGroup

DataStream, DataStream -> DataStream 将两个数据流在给定键和公共窗口上进行合并。


dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new CoGroupFunction () {...});
复制代码

Connect

DataStrean, DataStream -> ConnectedStreams 将两个数据流连接起来,保留他们的类型,连接允许在两个数据流之间共享。


DataStream<Integer> someStream = //...DataStream<String> otherStream = //...ConnectedStreams<Integer, String> connectedStreams =someStream.connect(otherStream);
复制代码

CoMap, CoFlatMap

ConnetedStreams -> DataStream 类似 Map 和 FlatMap,只不过是连接流。


connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {    @Override    public Boolean map1(Integer value) {        return true;    }    @Override    public Boolean map2(String value) {        return false;    }});connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {    @Override    public void flatMap1(Integer value, Collector<String> out) {        out.collect(value.toString());    }    @Override    public void flatMap2(String value, Collector<String> out) {        for (String word: value.split(" ")) {            out.collect(word);        }    }});
复制代码


发布于: 刚刚阅读数: 5
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-115 - Flink DataStream Transformation Map、FlatMap、Filter 到 Window 的全面讲解_Java_武子康_InfoQ写作社区