写点什么

Flink 的 DataSource 三部曲之一:直接 API,nginx 文件服务器原理

作者:MySQL神话
  • 2021 年 11 月 27 日
  • 本文字数:3373 字

    阅读完需:约 11 分钟

DataSource 类型

对于常见的文本读入、kafka、RabbitMQ 等数据来源,可以直接使用 Flink 提供的 API 或者 connector,如果这些满足不了需求,还可以自己开发,下图是我按照自己的理解梳理的:


环境和版本

熟练掌握内置 DataSource 的最好办法就是实战,本次实战的环境和版本如下:


  1. JDK:1.8.0_211

  2. Flink:1.9.2

  3. Maven:3.6.0

  4. 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)

  5. IDEA:2018.3.5 (Ultimate Edition)

源码下载

如果您不想写代码,整个系列的源码可在 GitHub 下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos)


| 名称 | 链接 | 备注 |


| :-- | :-- | :-- |


| 项目主页 | https://github.com/zq2599/blog_demos | 该项目在 GitHub 上的主页 |


| git 仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https 协议 |


| git 仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh 协议 |


这个 git 项目中有多个文件夹,本章的应用在 flinkdatasourcedemo 文件夹下,如下图红框所示:


环境和版本

本次实战的环境和版本如下:


  1. JDK:1.8.0_211

  2. Flink:1.9.2

  3. Maven:3.6.0

  4. 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)

  5. IDEA:2018.3.5 (Ultimate Edition)

创建工程

  1. 在控制台执行以下命令就会进入创建 flink 应用的交互模式,按提示输入 gourpId 和 artifactId,就会创建一个 flink 应用(我输入的 groupId 是 com.bolingcavalry,artifactId 是 flinkdatasourcedemo):


mvn \


archetype:generate \


-DarchetypeGroupId=org.apache.flink \


-DarchetypeArtifactId=flink-quickstart-java \


-DarchetypeVersion=1.9.2


  1. 现在 maven 工程已生成,用 IDEA 导入这个工程,如下图:



  1. 以 maven 的类型导入:



  1. 导入成功的样子:



  1. 项目创建成功,可以开始写代码实战了;

辅助类 Splitter

实战中有个功能常用到:将字符串用空格分割,转成 Tuple2 类型的集合,这里将此算子做成一个公共类 Splitter.java,代码如下:


package com.bolingcavalry;


import org.apache.flink.api.common.functions.FlatMapFunction;


import org.apache.flink.api.java.tuple.Tuple2;


import org.apache.flink.util.Collector;


import org.apache.flink.util.StringUtils;


public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {


@Override


public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {


if(StringUtils.isNullOrWhitespaceOnly(s)) {


System.out.println("invalid line");


return;


}


for(String word : s.split(" ")) {


collector.collect(new Tuple2<String, Integer>(word, 1));


}


}


}


准备完毕,可以开始实战了,先从最简单的 Socket 开始。

Socket DataSource

Socket DataSource 的功能是监听指定 IP 的指定端口,读取网络数据;


  1. 在刚才新建的工程中创建一个类 Socket.java:


package com.bolingcavalry.api;


import com.bolingcavalry.Splitter;


import org.apache.flink.streaming.api.datastream.DataStream;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


import org.apache.flink.streaming.api.windowing.time.Time;


public class Socket {


public static void main(String[] args) throws Exception {


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


//监听本地 9999 端口,读取字符串


DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);


//每五秒钟一次,将当前五秒内所有字符串以空格分割,然后统计单词数量,打印出来


socketDataStream


.flatMap(new Splitter())


.keyBy(0)


.timeWindow(Time.seconds(5))


.sum(1)


.print();


env.execute("API DataSource demo : socket");


}


}


从上述代码可见,StreamExecutionEnvironment.socketTextStream 就可以创建 Socket 类型的 DataSource,在控制台执行命令 nc -lk 9999,即可进入交互模式,此时输出任何字符串再回车,都会将字符串传输到本机 9999 端口;


  1. 在 IDEA 上运行 Socket 类,启动成功后再回到刚才执行 nc -lk 9999 的控制台,输入一些字符串再回车,可见 Socket 的功能已经生效:


集合 DataSource(generateSequence)

  1. 基于集合的 DataSource,API 如下图所示:



  1. 先试试最简单的 generateSequence,创建指定范围内的数字型的 DataSource:


package com.bolingcavalry.api;


import org.apache.flink.api.common.functions.FilterFunction;


import org.apache.flink.streaming.api.datastream.DataStream;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class GenerateSequence {


public static void main(String[] args) throws Exception {


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


//并行度为 1


env.setParallelism(1);


//通过 generateSequence 得到 Long 类型的 DataSource


DataStream<Long> dataStream = env.generateSequence(1, 10);


//做一次过滤,只保留偶数,然后打印


dataStream.filter(new FilterFunction<Long>() {


@Override


public boolean filter(Long aLong) throws Exception {


return 0L==aLong.longValue()%2L;


}


}).print();


env.execute("API DataSource demo : collection");


}


}


  1. 运行时会打印偶数:


集合 DataSource(fromElements+fromCollection)

  1. fromElements 和 fromCollection 就在一个类中试了吧,创建 FromCollection 类,里面是这两个 API 的用法:


package com.bolingcavalry.api;


import org.apache.flink.api.java.tuple.Tuple2;


import org.apache.flink.streaming.api.datastream.DataStream;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


import java.util.ArrayList;


import java.util.List;


public class FromCollection {


public static void main(String[] args) throws Exception {


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


//并行度为 1


env.setParallelism(1);


//创建一个 List,里面有两个 Tuple2 元素


List<Tuple2<String, Integer>> list = new ArrayList<>();


list.add(new Tuple2("aaa", 1));


list.add(new Tuple2("bbb", 1));


//通过 List 创建 DataStream


DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);


//通过多个 Tuple2 元素创建 DataStream


DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements(


new Tuple2("ccc", 1),


new Tuple2("ddd", 1),


new Tuple2("aaa", 1)


);


//通过 union 将两个 DataStream 合成一个


DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);


//统计每个单词的数量


unionDataStream


.keyBy(0)


.sum(1)


.print();


env.execute("API DataSource demo : collection");

复习的面试资料

这些面试全部出自大厂面试真题和面试合集当中,小编已经为大家整理完毕(PDF 版)


  • 第一部分:Java 基础-中级-高级



  • 第二部分:开源框架(SSM:Spring+SpringMVC+MyBatis)



  • 第三部分:性能调优(JVM+MySQL+Tomcat)



  • 第四部分:分布式(限流:ZK+Nginx;缓存:Redis+MongoDB+Memcached;通讯:MQ+kafka)



  • 第五部分:微服务(SpringBoot+SpringCloud+Dubbo)



  • 第六部分:其他:并发编程+设计模式+数据结构与算法+网络


进阶学习笔记 pdf

  • Java 架构进阶之架构筑基篇(Java 基础+并发编程+JVM+MySQL+Tomcat+网络+数据结构与算法



  • Java 架构进阶之开源框架篇(设计模式+Spring+SpringMVC+MyBatis





  • Java 架构进阶之分布式架构篇 (限流(ZK/Nginx)+缓存(Redis/MongoDB/Memcached)+通讯(MQ/kafka)



![image](https://upl


《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享


oad-images.jianshu.io/upload_images/24613101-c825ae7d85579e2c.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)



  • Java 架构进阶之微服务架构篇(RPC+SpringBoot+SpringCloud+Dubbo+K8s)




本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

用户头像

MySQL神话

关注

还未添加个人签名 2021.11.12 加入

还未添加个人简介

评论

发布
暂无评论
Flink的DataSource三部曲之一:直接API,nginx文件服务器原理