写点什么

大数据 -112 Flink DataStream API :数据源、转换与输出 文件、Socket 到 Kafka 的完整流程

作者:武子康
  • 2025-10-01
    山东
  • 本文字数:4298 字

    阅读完需:约 14 分钟

大数据-112 Flink DataStream API :数据源、转换与输出 文件、Socket 到 Kafka 的完整流程

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 09 月 29 日更新到:Java-136 深入浅出 MySQL Spring Boot @Transactional 使用指南:事务传播、隔离级别与异常回滚策略 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节完成了如下的内容:


  • Flink YARN 模式

  • YARN 模式下申请资源

  • YARN 模式下提交任务


DataStream API

Flink 程序的数据处理流程主要分为 3 个核心组成部分:


  1. DataSource(数据源)


  • 这是程序的输入来源,负责为 Flink 作业提供原始数据流

  • 可以通过 StreamExecutionEnvironment.addSource()方法添加数据源

  • 支持多种数据源类型:

  • 消息队列:如 Kafka、RabbitMQ 等

  • 文件系统:如 HDFS、本地文件等

  • 数据库:如 MySQL、PostgreSQL 等

  • 自定义数据源:通过实现 SourceFunction leveraged 来实现


  1. Transformation(数据转换)


  • 这是数据处理的核心环节,对数据源进行各种计算和转换操作

  • 常见转换操作包括:

  • Map:对每个元素进行转换(1:1 映射)

  • FlatMap:将一个元素转换为零个或多个元素(1:N 映射)

  • Filter:根据条件过滤数据

  • KeySetup:按 key 分组处理

  • Window:基于时间或数量的窗口操作

  • 支持多个数据流的合并、拆分等复杂操作

  • 转换操作可以链式调用,形成处理流水线


  1. Sink(数据输出)


  • 负责将处理后的数据输出到外部系统

  • 支持多种输出目标:

  • 消息系统:如 Kafka、RabbitMQ 等

  • 数据库系统:如 MySQL、Elasticsearch 等

  • 文件系统:如 HDFS、本地文件等

  • 自定义输出:通过实现 SinkFunction 来实现

  • 可以配置输出格式、写入策略等参数

  • 支持批处理和流式输出的不同模式


Flink 针对 DataStream 提供了大量已经实现的 DataSource(数据源接口)。下面来进行分析。

基于文件

readTextFile(path):读取本地文件,文件遵循 TextInputFormat 逐行读取规则并返回如果你是本地 IDEA 要读取 HDFS,那你需要额外的依赖:


<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-hadoop-compatibility_2.11</artifactId>  <version>1.11.1</version></dependency><dependency>  <groupId>org.apache.hadoop</groupId>  <artifactId>hadoop-common</artifactId>  <version>2.9.2</version></dependency><dependency>  <groupId>org.apache.hadoop</groupId>  <artifactId>hadoop-hdfs</artifactId>  <version>2.9.2</version></dependency><dependency>  <groupId>org.apache.hadoop</groupId>  <artifactId>hadoop-client</artifactId>  <version>2.9.2</version></dependency>
复制代码

基于 Socket

socketTextStream:从 Socket 中读取数据,元素可以通过一个分割符号分开。

基于集合

fromCollection 方法详解

fromCollection是 Apache Flink DataStream API 中的一个方法,用于从 Java 的 Collection 集合创建数据流。使用该方法时需要注意以下几点:

基本要求

  • 输入的 Collection 集合中的所有元素必须是相同类型的

  • 该方法通常用于本地测试和小规模数据集处理

POJO 类型识别条件

Flink 会将满足以下条件的类识别为 POJO 类型(允许"按名称"字段引用):


  1. 类定义要求

  2. 必须是 public 类且是独立的(不能是非静态内部类)

  3. 必须有 public 的无参构造方法

  4. 字段访问要求

  5. 类及其父类中所有不被statictransient修饰的属性需要满足以下条件之一:

  6. 是 public 的且不被 final 修饰

  7. 包含遵循 JavaBean 命名规范的 Getter 和 Setter 方法

JavaBean 命名规范示例

对于名为value的字段,其访问方法应为:


  • Getter 方法:public DataType getValue()

  • Setter 方法:public void setValue(DataType value)

应用场景示例

// 定义符合要求的POJO类public class SensorReading {    public String sensorId;  // public字段    private double temperature;  // 私有字段但有getter/setter        public SensorReading() {}  // 无参构造        // Getter和Setter方法    public double getTemperature() {        return temperature;    }        public void setTemperature(double temperature) {        this.temperature = temperature;    }}
// 使用fromCollection创建数据流List<SensorReading> readings = Arrays.asList( new SensorReading("sensor1", 25.0), new SensorReading("sensor2", 28.5));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<SensorReading> stream = env.fromCollection(readings);
复制代码

注意事项

  • 不符合 POJO 要求的类将被 Flink 视为通用类型,只能通过字段索引访问

  • 对于嵌套集合或复杂类型,可能需要额外配置类型信息

  • 在生产环境中,通常使用addSource或连接器而非fromCollection

编写代码

编写的代码如下:


package icu.wzk;public class StreamFromCollection {
public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<People> peopleList = new ArrayList<>(); peopleList.add(new People("wzk", 18)); peopleList.add(new People("icu", 15)); peopleList.add(new People("wzkicu", 10));
DataStreamSource<People> data = env.getJavaEnv().fromCollection(peopleList); SingleOutputStreamOperator<People> filtered = data.filter(new FilterFunction<People>() { @Override public boolean filter(People value) throws Exception { return value.getAge() > 15; } }); filtered.print(); env.execute("StreamFromCollection"); }
public static class People {
private String name; private Integer age;
public People() {
}
public People(String name, Integer age) { this.name = name; this.age = age; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public Integer getAge() { return age; }
public void setAge(Integer age) { this.age = age; } }
}
复制代码

运行结果

运行结果如下图所示:


toString

我们可以通过重写 People 的 toString() 方法,来打印内容:


@Overridepublic String toString() {    return "name: " + this.name + ", age: " + this.age;}
复制代码

重新运行

重新运行可以看到:


自定义输入

可以使用 StreamExecutionEnvironment.addSource()将一个数据源添加到程序中。Flink 提供了许多预先实现的源函数,但是也可以编写自己的自定义源,方法是非并行源:implements SourceFunction,或者为并行源 implements ParallelSourceFuction 接口,或者 extends RichParallelSourceFunctionFlink 也提供了一些内置的 Connector(连接器),如下表列了几个主要的:


Kafka 连接器

添加依赖

我们需要继续添加依赖:


<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-kafka_2.11</artifactId>  <version>1.11.1</version></dependency>
复制代码

编写代码

package icu.wzk;public class StreamFromKafka {
public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "h121.wzk.icu:9092");
// Kafka FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>( "flink_test", new SimpleStringSchema(), properties ); DataStreamSource<String> data = env.getJavaEnv().addSource(consumer);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" "); for (String word: words) { out.collect(new Tuple2<>(word, 1)); } } }); SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() { @Override public Object getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }) .sum(1); result.print(); env.execute("StreamFromKafka"); }
}
复制代码

启动 Kafka

我们需要启动 Kafka 的服务来进行测试,之前章节我们已经配置和启动过 Kafka 了,这里就是直接启动了。


cd /opt/servers/kafka_2.12-2.7.2/bin./kafka-server-start.sh ../config/server.properties
复制代码


启动结果如下图所示:


创建主题

cd /opt/servers/kafka_2.12-2.7.2/bin/./kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partition 1 --topic flink_test
复制代码

生产消息

cd /opt/servers/kafka_2.12-2.7.2/bin/./kafka-console-producer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test# 我们等Java程序启动后,产生几条消息
复制代码

运行代码

观察控制台可以看到:


3> (hello,1)5> (world,1)3> (hello,2)5> (world,2)3> (hello,3)3> (hello,4)2> (hello!,1)2> (hello!,2)...
复制代码


运行的截图如下所示:



发布于: 刚刚阅读数: 5
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-112 Flink DataStream API :数据源、转换与输出 文件、Socket 到 Kafka 的完整流程_Java_武子康_InfoQ写作社区