Flink 的 DataSource 三部曲之一:直接 API,nginx 文件服务器原理

DataSource 类型
对于常见的文本读入、kafka、RabbitMQ 等数据来源,可以直接使用 Flink 提供的 API 或者 connector,如果这些满足不了需求,还可以自己开发,下图是我按照自己的理解梳理的:

环境和版本
熟练掌握内置 DataSource 的最好办法就是实战,本次实战的环境和版本如下:
JDK:1.8.0_211
Flink:1.9.2
Maven:3.6.0
操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
IDEA:2018.3.5 (Ultimate Edition)
源码下载
如果您不想写代码,整个系列的源码可在 GitHub 下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
| 名称 | 链接 | 备注 |
| :-- | :-- | :-- |
| 项目主页 | https://github.com/zq2599/blog_demos | 该项目在 GitHub 上的主页 |
| git 仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https 协议 |
| git 仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh 协议 |
这个 git 项目中有多个文件夹,本章的应用在 flinkdatasourcedemo 文件夹下,如下图红框所示:

环境和版本
本次实战的环境和版本如下:
JDK:1.8.0_211
Flink:1.9.2
Maven:3.6.0
操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
IDEA:2018.3.5 (Ultimate Edition)
创建工程
在控制台执行以下命令就会进入创建 flink 应用的交互模式,按提示输入 gourpId 和 artifactId,就会创建一个 flink 应用(我输入的 groupId 是 com.bolingcavalry,artifactId 是 flinkdatasourcedemo):
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
现在 maven 工程已生成,用 IDEA 导入这个工程,如下图:

以 maven 的类型导入:

导入成功的样子:

项目创建成功,可以开始写代码实战了;
辅助类 Splitter
实战中有个功能常用到:将字符串用空格分割,转成 Tuple2 类型的集合,这里将此算子做成一个公共类 Splitter.java,代码如下:
package com.bolingcavalry;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
if(StringUtils.isNullOrWhitespaceOnly(s)) {
System.out.println("invalid line");
return;
}
for(String word : s.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
准备完毕,可以开始实战了,先从最简单的 Socket 开始。
Socket DataSource
Socket DataSource 的功能是监听指定 IP 的指定端口,读取网络数据;
在刚才新建的工程中创建一个类 Socket.java:
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Socket {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//监听本地 9999 端口,读取字符串
DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);
//每五秒钟一次,将当前五秒内所有字符串以空格分割,然后统计单词数量,打印出来
socketDataStream
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
env.execute("API DataSource demo : socket");
}
}
从上述代码可见,StreamExecutionEnvironment.socketTextStream 就可以创建 Socket 类型的 DataSource,在控制台执行命令 nc -lk 9999,即可进入交互模式,此时输出任何字符串再回车,都会将字符串传输到本机 9999 端口;
在 IDEA 上运行 Socket 类,启动成功后再回到刚才执行 nc -lk 9999 的控制台,输入一些字符串再回车,可见 Socket 的功能已经生效:

集合 DataSource(generateSequence)
基于集合的 DataSource,API 如下图所示:

先试试最简单的 generateSequence,创建指定范围内的数字型的 DataSource:
package com.bolingcavalry.api;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class GenerateSequence {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为 1
env.setParallelism(1);
//通过 generateSequence 得到 Long 类型的 DataSource
DataStream<Long> dataStream = env.generateSequence(1, 10);
//做一次过滤,只保留偶数,然后打印
dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long aLong) throws Exception {
return 0L==aLong.longValue()%2L;
}
}).print();
env.execute("API DataSource demo : collection");
}
}
运行时会打印偶数:

集合 DataSource(fromElements+fromCollection)
fromElements 和 fromCollection 就在一个类中试了吧,创建 FromCollection 类,里面是这两个 API 的用法:
package com.bolingcavalry.api;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class FromCollection {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为 1
env.setParallelism(1);
//创建一个 List,里面有两个 Tuple2 元素
List<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(new Tuple2("aaa", 1));
list.add(new Tuple2("bbb", 1));
//通过 List 创建 DataStream
DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);
//通过多个 Tuple2 元素创建 DataStream
DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements(
new Tuple2("ccc", 1),
new Tuple2("ddd", 1),
new Tuple2("aaa", 1)
);
//通过 union 将两个 DataStream 合成一个
DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);
//统计每个单词的数量
unionDataStream
.keyBy(0)
.sum(1)
.print();
env.execute("API DataSource demo : collection");
复习的面试资料
这些面试全部出自大厂面试真题和面试合集当中,小编已经为大家整理完毕(PDF 版)
第一部分:Java 基础-中级-高级

第二部分:开源框架(SSM:Spring+SpringMVC+MyBatis)

第三部分:性能调优(JVM+MySQL+Tomcat)

第四部分:分布式(限流:ZK+Nginx;缓存:Redis+MongoDB+Memcached;通讯:MQ+kafka)

第五部分:微服务(SpringBoot+SpringCloud+Dubbo)

第六部分:其他:并发编程+设计模式+数据结构与算法+网络

进阶学习笔记 pdf
Java 架构进阶之架构筑基篇(Java 基础+并发编程+JVM+MySQL+Tomcat+网络+数据结构与算法)

Java 架构进阶之开源框架篇(设计模式+Spring+SpringMVC+MyBatis)



Java 架构进阶之分布式架构篇 (限流(ZK/Nginx)+缓存(Redis/MongoDB/Memcached)+通讯(MQ/kafka))



Java 架构进阶之微服务架构篇(RPC+SpringBoot+SpringCloud+Dubbo+K8s)


评论