写点什么

Flink DataStream API- 数据源、数据转换、数据输出

  • 2023-08-05
    江苏
  • 本文字数:4246 字

    阅读完需:约 14 分钟

Flink DataStream API-数据源、数据转换、数据输出

本文继续介绍 Flink DataStream API 先关内容,重点:数据源、数据转换、数据输出。

1、Source 数据源

1.1、Flink 基本数据源

  • 文件数据源


// 2. 读取数据源DataStream<String> fileDataStreamSource =        env.readTextFile("/Users/yclxiao/Project/bigdata/flink-blog/doc/words.txt");
复制代码


  • Socket 数据源


// 2. 读取数据源DataStream<String> textStream = env.socketTextStream("localhost", 9999, "\n");
复制代码


  • 集合数据源


DataStreamSource<String> textStream = env.fromCollection(Arrays.asList(                "java,c++,php,java,spring",                "hadoop,scala",                "c++,jvm,html,php"        ));
复制代码

1.2、高级数据源

Flink 可以从 Kafka、Mysql-CDC 等数据源读取数据,使用时需要引入第三方依赖库。


对接 Kafka 数据源


在 Maven 中引入 Flink 针对 Kafka 的 API 依赖库,pom 代码如下:



<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency>
复制代码


Java 代码如下:


// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据源KafkaSource<String> kafkaSource = KafkaSource.<String>builder()        .setBootstrapServers("10.20.1.26:9092")        .setGroupId("group-flinkdemo")        .setTopics("topic-flinkdemo")        // 从最末尾位点开始消费        .setStartingOffsets(OffsetsInitializer.latest())        // 从上次消费者提交的地方开始消费,应该采用这种方式,防止服务重启的期间丢失数据//                .setStartingOffsets(OffsetsInitializer.committedOffsets())        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))        .build();
DataStreamSource<String> dataStreamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "KafkaSource");

// 3. 数据转换DataStream<Tuple2<String, Integer>> dataStream = dataStreamSource .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word : value.split("\\,")) { out.collect(new Tuple2<>(word, 1)); } } }) .keyBy(value -> value.f0) .reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));
dataStream.print("BlogDemoStream=======") .setParallelism(1);
// 5. 启动任务env.execute(KafkaDataStreamSourceDemo.class.getSimpleName());
复制代码


对接 Myql-CDC 数据源


详细的过程可以查看我的这边文章:一次打通FlinkCDC同步Mysql数据


在 Maven 中引入 Flink 针对 Mysql-CDC 的 API 依赖库,pom 代码如下:


<dependency>    <groupId>com.ververica</groupId>    <artifactId>flink-connector-mysql-cdc</artifactId>    <version>2.3.0</version>    <exclusions>        <exclusion>            <artifactId>flink-shaded-guava</artifactId>            <groupId>org.apache.flink</groupId>        </exclusion>    </exclusions></dependency>
复制代码


java 代码如下:


MySqlSource<String> mySqlSource = MySqlSource.<String>builder()                .hostname(MYSQL_HOST)                .port(MYSQL_PORT)                .databaseList(SYNC_DB) // set captured database                .tableList(String.join(",", SYNC_TABLES)) // set captured table                .username(MYSQL_USER)                .password(MYSQL_PASSWD)                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String                .build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); env.enableCheckpointing(5000);
DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + LeagueOcSettleProfit2DwsHdjProfitRecordAPI.class.getName());
复制代码

1.3、自定义的数据源方式

Flink 中可以很方便的使用自定义数据源,只需要实现 SourceFunction 接口即可。


比如实现一个随机产生某10个学生的N此考试分数的自定义数据源,对每个学生的份数相加,代码如下:



private static class RandomStudentSource implements SourceFunction<Student> {
private Random rnd = new Random(); private volatile boolean isRunning = true;
@Override public void run(SourceContext<Student> ctx) throws Exception { while (isRunning) { Student student = new Student(); student.setName("name-" + rnd.nextInt(5)); student.setScore(rnd.nextInt(20)); ctx.collect(student); Thread.sleep(100L); } }
@Override public void cancel() { isRunning = false; } }
private static class Student { private String name; private Integer score;
public Student() { }
public Student(String name, Integer score) { this.name = name; this.score = score; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public Integer getScore() { return score; }
public void setScore(Integer score) { this.score = score; }
@Override public String toString() { return "Student{" + "name='" + name + '\'' + ", score=" + score + '}'; } }
复制代码

2、Transformation 数据转换

常用的数据转换函数如下:


map:map()算子接收一个函数作为参数,并把这个函数应用于 DataStream 的每个元素,最后将函数的返回结果作为结果 DataStream 中对应元素的值,即将 DataStream 的每个元素转换成新的元素。


flatMap:与 map()算子类似,但是每个传入该函数 func 的 DataStream 元素会返回 0 到多个元素,最终会将返回的所有元素合并到一个 DataStream。



filter:通过函数 filter 对源 DataStream 的每个元素进行过滤,并返回一个新的 DataStream。


keyBy:keyBy()算子主要作用于元素类型是元组或数组的 DataStream 上。使用该算子可以将 DataStream 中的元素按照指定的 key(指定的字段)进行分组,具有相同 key 的元素将进入同一个分区中(不进行聚合),并且不改变原来元素的数据结构。




reduce:reduce()算子主要作用于 KeyedStream 上,对 KeyedStream 数据流进行滚动聚合,即将当前元素与上一个聚合值进行合并,并且发射出新值。该算子的原理与 MapReduce 中的 Reduce 类似,聚合前后的元素类型保持一致。



aggregation:aggregation 是聚合算子,类似的还有:reduce、sum、max、min。Aggregation 算子作用于 KeyedStream 上,并且进行滚动聚合。与 keyBy()算子类似,可以使用数字或字段名称指定需要聚合的字段。keyBy()算子会将 DataStream 转换为 KeyedStream,而 Aggregation 算子会将 KeyedStream 转换为 DataStream,类似下图:



union:union()算子用于将两个或多个数据流进行合并,创建一个包含所有数据流所有元素的新流(不会去除重复元素)。



connect:connect()算子可以连接两个数据流,并保持各自元素的数据类型不变,允许在两个流之间共享状态数据。connect 与 union 有几点区别:


  1. union()要求多个数据流的数据类型必须相同,connect()允许多个数据流中的元素类型可以不同。

  2. union()可以合并多个数据流,但 connect()只能连接两个数据流

  3. union()的执行结果是 DataStream,而 connect()的执行结果是 ConnectedStreams;ConnectedStreams 表示两个(可能)不同数据类型的连接流,可以对两个流的数据应用不同的处理方法,当一个流上的操作直接影响另一个流上的操作时,连接流非常有用。可以通过流之间的共享状态对两个流进行操作。

  4. 与流的转换:


3、Sink 数据输出

3.1、Sink 简介

Sink 这个词很形象,中文意思“水槽”,寓意:数据流像水一样,源源不断的,经过水槽流向各种目的地。


Flink 可以使用 DataStream API 将数据流输出到文件、Socket、外部系统等。Flink 自带了各种内置的输出格式,比如 writeAsText()、writeAsCsv()等,但是已经过时,如下:



官方鼓励使用 addSink()方法,调用自定义接收函数,如下:


3.2、自定义 Sink

Flink 也可以与其他系统(如 Apache Kafka、doris 等)的 Sink 集成在一起,这些系统已经实现了自定义 Sink 函数。也可以完全自己自定义 Sink。并且,通过 addSink()方法可以参与到 Flink 的检查点(Checkpoint)中,以实现“精确的一次”语义。


自定义 Sink 只需要实现 SinkFunction,例如上面的例子中(随机生成学生分数),现在是直接 print 出来信息,改造成自定义 Sink 的方式之后,代码如下:


public class AlertSink implements SinkFunction<SourceSourceDemo.Student> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);
@Override public void invoke(SourceSourceDemo.Student value, Context context) { LOG.info("自定义sink" + value.toString()); }}
// transformedStream.print("result =======").setParallelism(1); transformedStream.addSink(new AlertSink());
复制代码


截图如下:


4、代码地址

https://github.com/yclxiao/flink-blog/blob/main/src/main/java/top/mangod/flinkblog/demo002/KafkaDataStreamSourceDemo.java



用户头像

这里可以找到我 mangod.top 2018-09-11 加入

13年IT行业经验,做过架构创过业,一起交流学习。专注于软件开发、云原生、大数据领域。关注我职业发展不焦虑。

评论

发布
暂无评论
Flink DataStream API-数据源、数据转换、数据输出_不焦躁的程序员_InfoQ写作社区