写点什么

开发第一个 Flink 应用

作者:程序员欣宸
  • 2022 年 7 月 17 日
  • 本文字数:2549 字

    阅读完需:约 8 分钟

开发第一个Flink应用

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos


步骤列表

  • 本次实战经历以下步骤:


  1. 创建应用;

  2. 编码;

  3. 构建;

  4. 提交任务到 Flink,验证功能;

环境信息

  1. Flink:1.7;

  2. Flink 所在机器的操作系统:CentOS Linux release 7.5.1804;

  3. 开发环境 JDK:1.8.0_181;

  4. 开发环境 Maven:3.5.0;

应用功能简介

  • 《Flink1.7从安装到体验》一文中,我们在 Flink 运行 SocketWindowWordCount.jar,实现的功能是从 socket 读取字符串,将其中的每个单词的数量统计出来,今天我们就来编码开发这个应用,实现此功能;

创建应用

  • 应用基本代码是通过 mvn 命令创建的,在命令行输入以下命令:


mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
复制代码


  • 按控制台的提示输入 groupId、artifactId、version、package 等信息,一路回车确认后,会生成一个和你输入的 artifactId 同名的文件夹,里面是个 maven 工程:


Define value for property 'groupId': com.bolingcavalryDefine value for property 'artifactId': socketwordcountdemoDefine value for property 'version' 1.0-SNAPSHOT: :Define value for property 'package' com.bolingcavalry: :Confirm properties configuration:groupId: com.bolingcavalryartifactId: socketwordcountdemoversion: 1.0-SNAPSHOTpackage: com.bolingcavalry
复制代码


  • 用 IEDA 导入这个 maven 工程,如下图,已经有了两个类:BatchJob 和 StreamingJob,BatchJob 是用于批处理的,本次实战用不上,因此可以删除,只保留流处理的 StreamingJob:


  • 应用创建成功,接下来可以开始编码了;

编码

  • 您可以选择直接从 GitHub 下载这个工程的源码,地址和链接信息如下表所示:



  • 这个 git 项目中有多个文件夹,本章源码在 socketwordcountdemo 这个文件夹下,如下图红框所示:


  • 接下来开始编码:

  • 在 StreamingJob 类中添加静态内部类 WordWithCount,这是个 PoJo,用来保存一个具体的单词及其出现频率:


   /**   * 记录单词及其出现频率的Pojo   */  public static class WordWithCount {    /**     * 单词内容     */    public String word;
/** * 出现频率 */ public long count;
public WordWithCount() { super(); }
public WordWithCount(String word, long count) { this.word = word; this.count = count; }
/** * 将单词内容和频率展示出来 * @return */ @Override public String toString() { return word + " : " + count; } }
复制代码


  • 把所有业务逻辑写在 StreamJob 类的 main 方法中,如下所示,关键位置都加了中文注释:


public static void main(String[] args) throws Exception {
//环境信息 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//数据来源是本机9999端口,换行符分隔,您也可以考虑将hostname和port参数通过main方法的入参传入 DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");
//通过text对象转换得到新的DataStream对象, //转换逻辑是分隔每个字符串,取得的所有单词都创建一个WordWithCount对象 DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String s, Collector<WordWithCount> collector) throws Exception { for(String word : s.split("\\s")){ collector.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word")//key为word字段 .timeWindow(Time.seconds(5)) //五秒一次的翻滚时间窗口 .reduce(new ReduceFunction<WordWithCount>() { //reduce策略 @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word, a.count+b.count); } });

//单线程输出结果 windowCounts.print().setParallelism(1);
// 执行 env.execute("Flink Streaming Java API Skeleton"); }
复制代码

构建

  • 在 pom.xml 文件所在目录下执行命令:


mvn clean package -U
复制代码


  • 命令执行完毕后,在 target 目录下的 socketwordcountdemo-1.0-SNAPSHOT.jar 文件就是构建成功的 jar 包;

在 Flink 验证


nc -l 9999
复制代码


  • 我这边 Flink 所在机器的 IP 地址是 192.168.1.103,因此用浏览器访问的 Flink 的 web 地址为:http://192.168.1.103:8081;

  • 选择刚刚生成的 jar 文件作为一个新的任务,如下图:


  • 点击下图红框中的"upload",将文件提交:


  • 目前还只是将 jar 文件上传了而已,接下来就是手工设置执行类并启动任务,操作如下图,红框 2 中填写的前面编写的 StreamingJob 类的完整名称:


  • 提交后的页面效果如下图所示,可见一个 job 已经在运行中了:


  • 回到 Flink 所在机器的控制台,在之前输入了 nc -l 9999 的窗口输入一些英文句子,然后按下回车键,例如:


[root@vostro flink-1.7.0]# ./bin/start-cluster.shStarting cluster.Starting standalonesession daemon on host vostro.Starting taskexecutor daemon on host vostro.[root@vostro flink-1.7.0]# nc -l 9999Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
复制代码


  • 接下来看看我们的 job 的执行效果,如下图,点击左侧的"Task Managers",在右边的列表中只有一个 Task,点击它:


  • 出现的页面有三个 tab 页,点击"Stdout"这个 tab,就能见到我们的任务对之前句子中的单词的统计结果,如下图:


  • 至此,第一个最简单 Flink 就完成了。

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 刚刚阅读数: 3
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
开发第一个Flink应用_Java_程序员欣宸_InfoQ写作社区