写点什么

大数据 -108 Flink 流批一体化入门:概念解析与 WordCount 代码实践 批数据 + 流数据

作者:武子康
  • 2025-09-27
    山东
  • 本文字数:4690 字

    阅读完需:约 15 分钟

大数据-108 Flink 流批一体化入门:概念解析与WordCount代码实践 批数据+流数据

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 22 日更新到:Java-130 深入浅出 MySQL MyCat 深入解析 核心配置文件 server.xml 使用与优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节完成了如下的内容:


  • Flink 基本介绍

  • 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构



再次回到最初的起点,Hello Word Count!

Flink 流处理 (Stream Processing)

定义

流处理是指对持续不断的数据流进行实时处理。Flink 的流处理模式非常适合处理持续产生的数据,例如来自传感器、日志记录系统或金融交易的数据流。

核心概念

  • 无界数据流:流处理通常处理无界数据流,即数据流没有明确的结束点,持续不断地产生。

  • 事件时间:Flink 支持基于事件时间的处理,能够处理乱序和延迟数据,使得处理更加精确。事件时间指的是数据在其产生源头的时间。

  • 窗口操作:在流处理过程中,通常需要将数据按时间窗口(如滑动窗口、滚动窗口、会话窗口)进行分组,以便执行聚合或其他操作。

  • 状态管理:Flink 支持有状态的流处理,这意味着处理每条数据时,可以记住之前的数据状态。例如,在流中计算一个累积的总和或频率。

Flink 批处理 (Batch Processing)

定义

批处理是指对静态的、有界的数据集进行处理。这种处理通常用于一次性的大规模数据分析,如定期的业务报告生成、数据转换和加载任务。

核心概念

  • 有界数据集:批处理通常处理有界数据集,即数据集是固定大小的,有明确的开始和结束点。

  • 任务并行化:在批处理模式下,Flink 会将数据集划分为多个子任务,并行执行这些任务,以加快处理速度。

  • DataSet API:Flink 的 DataSet API 提供了一组高层次的操作符,用于对批数据集执行各种操作,如映射(map)、过滤(filter)、联接(join)和聚合(aggregate)。

单词统计(批数据)

需求说明

统计一个文件中各个单词出现的次数,把统计结果输出到文件


  • 读取数据源

  • 处理数据源

  • 将读取到的数据源文件中的每一行根据空格切分

  • 将切分好的每个单词拼接 1

  • 根据单词聚合(将相同的单词放到一起)

  • 累加相同的单词(单词后面的 1 进行累加)

  • 保存处理结果

导入依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId> <artifactId>flink-test</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.11.1</version> <scope>provided</scope> </dependency> </dependencies>
</project>
复制代码

编写代码

package icu.wzk;public class WordCount {
public static void main(String[] args) throws Exception { String inPath = "word-count/word-count.txt"; String outPath = "word-count/word-count-result.csv"; // 获取Flink批处理执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取文件中的内容 DataSet<String> text = env.readTextFile(inPath); // 对数据进行处理 DataSet<Tuple2<String, Integer>> dataSet = text .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : line.split(" ")) { collector.collect(new Tuple2<>(word, 1)); } } }) .groupBy(0) .sum(1); dataSet .writeAsCsv(outPath, "\n", " ", FileSystem.WriteMode.OVERWRITE) .setParallelism(1); // 触发执行程序 env.execute("Word Count"); }
}
复制代码

测试数据

Stateful Computations over Data StreamsApache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.Correctness guaranteesExactly-once state consistencyEvent-time processingSophisticated late data handlingSQL on Stream & Batch DataDataStream API & DataSet APIProcessFunction (Time & State)Flexible deploymentHigh-availability setupSavepoints
复制代码

运行测试

结果数据

查看 word-count/word-count-result.csv 打开即可看到以下内容:


Stateful 1any 1common 1computations 2on 1setup 1state 1streams. 1unbounded 1& 3Data 2DataStream 1High-availability 1for 1perform 1run 1to 1Event-time 1Flexible 1Sophisticated 1framework 1is 1scale. 1Exactly-once 1ProcessFunction 1Stream 1a 1been 1handling 1in 1late 1processing 2Batch 1DataSet 1at 2bounded 1consistency 1deployment 1distributed 1engine 1has 1API 2Apache 1Flink 2SQL 1Streams 1all 1designed 1over 2Computations 1Savepoints 1and 3data 2environments, 1in-memory 1speed 1stateful 1(Time 1Correctness 1State) 1cluster 1guarantees 1
复制代码

单词统计(流数据)

需求说明

Socket 模拟实时发送单词,使用 Flink 实时接收数据,对指定时间窗口内(如 5 秒)的数据进行聚合统计,每隔 1 秒汇总计算一次,并且把时间窗口内计算结果打印出来。

编写代码

Server 部分

编写一个 Socket 服务,提供一定的数据流。


package icu.wzk;
public class WordCountServer {
public static void main(String[] args) throws IOException, InterruptedException { String ip = "localhost"; int port = 9999; Random random = new Random(); ServerSocket serverSocket = new ServerSocket(); InetSocketAddress address = new InetSocketAddress(ip, port); serverSocket.bind(address); Socket socket = serverSocket.accept(); OutputStream outputStream = socket.getOutputStream(); PrintWriter writer = new PrintWriter(outputStream, true); for (int i = 0; i < 1000; i ++) { int number = random.nextInt(100); System.out.println(number); writer.println(number); Thread.sleep((random.nextInt(900) + 100)); } socket.close(); serverSocket.close(); }
}
复制代码

Flink 部分

连接到上述的 Server 部分


package icu.wzk;
public class WordCount2 {
public static void main(String[] args) throws Exception { String ip = "localhost"; int port = 9999;
// 获取 Flink 执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取 Socket 输入数据 DataStreamSource<String> textStream = env.socketTextStream(ip, port, "\n"); SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = textStream .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(new Tuple2<>(word, 1)); } } }); SingleOutputStreamOperator<Tuple2<String, Integer>> word = wordCount .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() { @Override public Object getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }) .timeWindow(Time.seconds(5), Time.seconds(1)) .sum(1);
// 输出并运行 word.print(); env.execute("Word Count"); }
}
复制代码

观察结果

Server 部分

351884722451151365985568228417
复制代码

Flink 部分

3> (35,1)4> (18,1)3> (35,1)5> (84,1)4> (18,1)6> (72,1)3> (35,1)5> (84,1)5> (24,1)3> (35,1)6> (72,1)4> (18,1)7> (51,1)5> (24,1)5> (84,1)4> (15,1)6> (72,1)7> (51,1)3> (35,1)4> (15,1)4> (18,1)
复制代码


运行结果过程截图如下所示:


过程总结

  • 获得一个执行环境

  • 加载、创建 初始化环境

  • 指定数据操作的算子

  • 指定结果数据存放位置

  • 调用 Execute 触发执行程序


注意:Flink 程序是延迟计算的,只有最后调用 execute()方法的时候才会真正的触发执行程序。

发布于: 刚刚阅读数: 6
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-108 Flink 流批一体化入门:概念解析与WordCount代码实践 批数据+流数据_Java_武子康_InfoQ写作社区