写点什么

Flink 实战:消费 Wikipedia 实时消息

作者:程序员欣宸
  • 2022 年 7 月 20 日
  • 本文字数:3383 字

    阅读完需:约 11 分钟

Flink实战:消费Wikipedia实时消息

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

关于 Wikipedia Edit Stream

  • Wikipedia Edit Stream 是 Flink 官网提供的一个经典 demo,该应用消费的消息来自维基百科,消息中包含了用户名对 wiki 的编辑情况,demo 的官方资料地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html

消息来源

  • 消息的 DataSource 是个名为 WikipediaEditsSource 的类,这里面建立了到 irc.wikimedia.org 的 Socker 连接,再通过 Internet Relay Chat (IRC) 协议接收对方的数据,收到数据后保存在阻塞队列中,通过一个 while 循环不停的从队列取出数据,再调用 SourceContext 的 collect 方法,就在 Flink 中将这条数据生产出来了;

  • IRC 是应用层协议,更多细节请看:https://en.wikipedia.org/wiki/Internet_Relay_Chat

  • 关于 WikipediaEditsSource 类的深入分析,请参考《Flink 数据源拆解分析(WikipediaEditsSource)》

实战简介

  • 本次实战就是消费上述消息,然后统计每个用户十五秒内所有的消息,将每次操作的字节数累加起来,就得到用户十五秒内操作的字节数总和,并且每次累加了多少都会记录下来并最终和聚合结果一起展示;

和官网 demo 的不同之处

  • 和官网的 demo 略有不同,官网用的是 Tuple2 来处理数据,但我这里用了 Tuple3,多保存了一个 StringBuilder 对象,用来记录每次聚合时加了哪些值,这样在结果中通过这个字段就能看出来这个时间窗口内每个用户做了多少次聚合,每次是什么值:

环境信息

  • Flink:1.7;

  • 运行模式:单机(官网称之为 Local Flink Cluster);

  • Flink 所在机器的操作系统:CentOS Linux release 7.5.1804;

  • 开发环境 JDK:1.8.0_181;

  • 开发环境 Maven:3.5.0;

操作步骤简介

  • 今天的实战分为以下步骤:


  1. 创建应用;

  2. 编码;

  3. 构建;

  4. 部署运行;

创建应用

  • 应用基本代码是通过 mvn 命令创建的,在命令行输入以下命令:


mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
复制代码


  • 按控制台的提示输入 groupId、artifactId、version、package 等信息,一路回车确认后,会生成一个和你输入的 artifactId 同名的文件夹(我这里是 wikipediaeditstreamdemo),里面是个 maven 工程:


Define value for property 'groupId': com.bolingcavalryDefine value for property 'artifactId': wikipediaeditstreamdemoDefine value for property 'version' 1.0-SNAPSHOT: :Define value for property 'package' com.bolingcavalry: :Confirm properties configuration:groupId: com.bolingcavalryartifactId: wikipediaeditstreamdemoversion: 1.0-SNAPSHOTpackage: com.bolingcavalry Y: :
复制代码


  • 用 IEDA 导入这个 maven 工程,如下图,已经有了两个类:BatchJob 和 StreamingJob,BatchJob 是用于批处理的,本次实战用不上,因此可以删除,只保留流处理的 StreamingJob:



  • 应用创建成功,接下来可以开始编码了;

编码

  • 您可以选择直接从 GitHub 下载这个工程的源码,地址和链接信息如下表所示:



  • 这个 git 项目中有多个文件夹,本章源码在 wikipediaeditstreamdemo 这个文件夹下,如下图红框所示:



  • 接下来开始编码:

  • 在 pom.mxl 文件中增加 wikipedia 相关的库依赖:


<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-wikiedits_2.11</artifactId>  <version>${flink.version}</version></dependency>
复制代码


  • 在类中增加代码,如下所示,源码中已加详细注释:


package com.bolingcavalry;
import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.functions.AggregateFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public class StreamingJob {
public static void main(String[] args) throws Exception { // 环境信息 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new WikipediaEditsSource()) //以用户名为key分组 .keyBy((KeySelector<WikipediaEditEvent, String>) wikipediaEditEvent -> wikipediaEditEvent.getUser()) //时间窗口为5秒 .timeWindow(Time.seconds(15)) //在时间窗口内按照key将所有数据做聚合 .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple3<String, Integer, StringBuilder>, Tuple3<String, Integer, StringBuilder>>() { @Override public Tuple3<String, Integer, StringBuilder> createAccumulator() { //创建ACC return new Tuple3<>("", 0, new StringBuilder()); }
@Override public Tuple3<String, Integer, StringBuilder> add(WikipediaEditEvent wikipediaEditEvent, Tuple3<String, Integer, StringBuilder> tuple3) {
StringBuilder sbud = tuple3.f2;
//如果是第一条记录,就加个"Details :"作为前缀, //如果不是第一条记录,就用空格作为分隔符 if(StringUtils.isBlank(sbud.toString())){ sbud.append("Details : "); }else { sbud.append(" "); }
//聚合逻辑是将改动的字节数累加 return new Tuple3<>(wikipediaEditEvent.getUser(), wikipediaEditEvent.getByteDiff() + tuple3.f1, sbud.append(wikipediaEditEvent.getByteDiff())); }
@Override public Tuple3<String, Integer, StringBuilder> getResult(Tuple3<String, Integer, StringBuilder> tuple3) { return tuple3; }
@Override public Tuple3<String, Integer, StringBuilder> merge(Tuple3<String, Integer, StringBuilder> tuple3, Tuple3<String, Integer, StringBuilder> acc1) { //合并窗口的场景才会用到 return new Tuple3<>(tuple3.f0, tuple3.f1 + acc1.f1, tuple3.f2.append(acc1.f2)); } }) //聚合操作后,将每个key的聚合结果单独转为字符串 .map((MapFunction<Tuple3<String, Integer, StringBuilder>, String>) tuple3 -> tuple3.toString()) //输出方式是STDOUT .print();
// 执行 env.execute("Flink Streaming Java API Skeleton"); }}
复制代码


  • 至此编码结束;

构建

  • 在 pom.xml 文件所在目录下执行命令:


mvn clean package -U
复制代码


  • 命令执行完毕后,在 target 目录下的 wikipediaeditstreamdemo-1.0-SNAPSHOT.jar 文件就是构建成功的 jar 包;

在 Flink 验证

  • Flink 的安装和启动请参考《Flink1.7从安装到体验》

  • 我这边 Flink 所在机器的 IP 地址是 192.168.1.103,因此用浏览器访问的 Flink 的 web 地址为:http://192.168.1.103:8081 ;

  • 选择刚刚生成的 jar 文件作为一个新的任务,如下图:



  • 点击下图红框中的"upload",将文件提交:



  • 目前还只是将 jar 文件上传了而已,接下来就是手工设置执行类并启动任务,操作如下图,红框 2 中填写的前面编写的 StreamingJob 类的完整名称:



  • 提交后的页面效果如下图所示,可见一个 job 已经在运行中了:



  • 接下来看看我们的 job 的执行效果,如下图,以用户名聚合后的字数统计已经被打印出来了,并且 Details 后面的内容还展示了具体的聚合情况:



  • 至此,一个实施处理的 Flink 应用就开发完成了,希望能给您的开发过程提供一些参考,后面的实战中咱们一起继续深入学习和探讨 Flink;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 刚刚阅读数: 4
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
Flink实战:消费Wikipedia实时消息_Java_程序员欣宸_InfoQ写作社区