本文继续介绍 Flink DataStream API 先关内容,重点:数据源、数据转换、数据输出。
1、Source 数据源
1.1、Flink 基本数据源
// 2. 读取数据源
DataStream<String> fileDataStreamSource =
env.readTextFile("/Users/yclxiao/Project/bigdata/flink-blog/doc/words.txt");
复制代码
// 2. 读取数据源
DataStream<String> textStream = env.socketTextStream("localhost", 9999, "\n");
复制代码
DataStreamSource<String> textStream = env.fromCollection(Arrays.asList(
"java,c++,php,java,spring",
"hadoop,scala",
"c++,jvm,html,php"
));
复制代码
1.2、高级数据源
Flink 可以从 Kafka、Mysql-CDC 等数据源读取数据,使用时需要引入第三方依赖库。
对接 Kafka 数据源:
在 Maven 中引入 Flink 针对 Kafka 的 API 依赖库,pom 代码如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
复制代码
Java 代码如下:
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取数据源
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("10.20.1.26:9092")
.setGroupId("group-flinkdemo")
.setTopics("topic-flinkdemo")
// 从最末尾位点开始消费
.setStartingOffsets(OffsetsInitializer.latest())
// 从上次消费者提交的地方开始消费,应该采用这种方式,防止服务重启的期间丢失数据
// .setStartingOffsets(OffsetsInitializer.committedOffsets())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStreamSource<String> dataStreamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "KafkaSource");
// 3. 数据转换
DataStream<Tuple2<String, Integer>> dataStream = dataStreamSource
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : value.split("\\,")) {
out.collect(new Tuple2<>(word, 1));
}
}
})
.keyBy(value -> value.f0)
.reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));
dataStream.print("BlogDemoStream=======")
.setParallelism(1);
// 5. 启动任务
env.execute(KafkaDataStreamSourceDemo.class.getSimpleName());
复制代码
对接 Myql-CDC 数据源
详细的过程可以查看我的这边文章:一次打通FlinkCDC同步Mysql数据,
在 Maven 中引入 Flink 针对 Mysql-CDC 的 API 依赖库,pom 代码如下:
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
复制代码
java 代码如下:
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(MYSQL_HOST)
.port(MYSQL_PORT)
.databaseList(SYNC_DB) // set captured database
.tableList(String.join(",", SYNC_TABLES)) // set captured table
.username(MYSQL_USER)
.password(MYSQL_PASSWD)
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(5000);
DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + LeagueOcSettleProfit2DwsHdjProfitRecordAPI.class.getName());
复制代码
1.3、自定义的数据源方式
Flink 中可以很方便的使用自定义数据源,只需要实现 SourceFunction 接口即可。
比如实现一个随机产生某10个学生的N此考试分数
的自定义数据源,对每个学生的份数相加,代码如下:
private static class RandomStudentSource implements SourceFunction<Student> {
private Random rnd = new Random();
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Student> ctx) throws Exception {
while (isRunning) {
Student student = new Student();
student.setName("name-" + rnd.nextInt(5));
student.setScore(rnd.nextInt(20));
ctx.collect(student);
Thread.sleep(100L);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
private static class Student {
private String name;
private Integer score;
public Student() {
}
public Student(String name, Integer score) {
this.name = name;
this.score = score;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getScore() {
return score;
}
public void setScore(Integer score) {
this.score = score;
}
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", score=" + score +
'}';
}
}
复制代码
2、Transformation 数据转换
常用的数据转换函数如下:
map:map()算子接收一个函数作为参数,并把这个函数应用于 DataStream 的每个元素,最后将函数的返回结果作为结果 DataStream 中对应元素的值,即将 DataStream 的每个元素转换成新的元素。
flatMap:与 map()算子类似,但是每个传入该函数 func 的 DataStream 元素会返回 0 到多个元素,最终会将返回的所有元素合并到一个 DataStream。
filter:通过函数 filter 对源 DataStream 的每个元素进行过滤,并返回一个新的 DataStream。
keyBy:keyBy()算子主要作用于元素类型是元组或数组的 DataStream 上。使用该算子可以将 DataStream 中的元素按照指定的 key(指定的字段)进行分组,具有相同 key 的元素将进入同一个分区中(不进行聚合),并且不改变原来元素的数据结构。
reduce:reduce()算子主要作用于 KeyedStream 上,对 KeyedStream 数据流进行滚动聚合,即将当前元素与上一个聚合值进行合并,并且发射出新值。该算子的原理与 MapReduce 中的 Reduce 类似,聚合前后的元素类型保持一致。
aggregation:aggregation 是聚合算子,类似的还有:reduce、sum、max、min。Aggregation 算子作用于 KeyedStream 上,并且进行滚动聚合。与 keyBy()算子类似,可以使用数字或字段名称指定需要聚合的字段。keyBy()算子会将 DataStream 转换为 KeyedStream,而 Aggregation 算子会将 KeyedStream 转换为 DataStream,类似下图:
union:union()算子用于将两个或多个数据流进行合并,创建一个包含所有数据流所有元素的新流(不会去除重复元素)。
connect:connect()算子可以连接两个数据流,并保持各自元素的数据类型不变,允许在两个流之间共享状态数据。connect 与 union 有几点区别:
union()要求多个数据流的数据类型必须相同,connect()允许多个数据流中的元素类型可以不同。
union()可以合并多个数据流,但 connect()只能连接两个数据流
union()的执行结果是 DataStream,而 connect()的执行结果是 ConnectedStreams;ConnectedStreams 表示两个(可能)不同数据类型的连接流,可以对两个流的数据应用不同的处理方法,当一个流上的操作直接影响另一个流上的操作时,连接流非常有用。可以通过流之间的共享状态对两个流进行操作。
与流的转换:
3、Sink 数据输出
3.1、Sink 简介
Sink 这个词很形象,中文意思“水槽”,寓意:数据流像水一样,源源不断的,经过水槽流向各种目的地。
Flink 可以使用 DataStream API 将数据流输出到文件、Socket、外部系统等。Flink 自带了各种内置的输出格式,比如 writeAsText()、writeAsCsv()等,但是已经过时,如下:
官方鼓励使用 addSink()方法,调用自定义接收函数,如下:
3.2、自定义 Sink
Flink 也可以与其他系统(如 Apache Kafka、doris 等)的 Sink 集成在一起,这些系统已经实现了自定义 Sink 函数。也可以完全自己自定义 Sink。并且,通过 addSink()方法可以参与到 Flink 的检查点(Checkpoint)中,以实现“精确的一次”语义。
自定义 Sink 只需要实现 SinkFunction,例如上面的例子中(随机生成学生分数),现在是直接 print 出来信息,改造成自定义 Sink 的方式之后,代码如下:
public class AlertSink implements SinkFunction<SourceSourceDemo.Student> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);
@Override
public void invoke(SourceSourceDemo.Student value, Context context) {
LOG.info("自定义sink" + value.toString());
}
}
// transformedStream.print("result =======").setParallelism(1);
transformedStream.addSink(new AlertSink());
复制代码
截图如下:
4、代码地址
https://github.com/yclxiao/flink-blog/blob/main/src/main/java/top/mangod/flinkblog/demo002/KafkaDataStreamSourceDemo.java
评论