写点什么

Flink 数据源拆解分析 (WikipediaEditsSource)

作者:程序员欣宸
  • 2022 年 7 月 22 日
  • 本文字数:3672 字

    阅读完需:约 12 分钟

Flink数据源拆解分析(WikipediaEditsSource)

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos


  • Wikipedia Edit Stream 是 Flink 官网上的经典 demo,功能是实时处理来自维基百科的消息,消息的内容是当前每个用户对维基内容的操作,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html

  • 在 demo 中,WikipediaEditsSource 类作为数据源负责向 Flink 提供实时消息,今天咱们一起来分析其源码,了解 Flink 是怎么获取到来自 Wiki 的实时数据的,这对我们今后做自定义数据源也有很好的参考作用;

官方解释

  • 以下是官网对消息来源的说明,维基百科提供了一个 IRC 协议的通道,从这个通道可以获取对维基百科所做的编辑行为的日志:


Wikipedia provides an IRC channel where all edits to the wiki are logged.
复制代码


  • IRC 是应用层协议,更多细节请看:https://en.wikipedia.org/wiki/Internet_Relay_Chat

继承关系

  • 先看 WikipediaEditsSource 类的继承关系,做个初步了解,如下图:


  • 如上图所示,RichFunction 接口负责资源开启关闭以及环境上下文,而 SourceFunction 接口则是和数据生产行为的开始和停止有关,这些接口最终都在 WikipediaEditSource 实现;

构造方法

  • 通过构造方法来了解有哪些参数被确定了:


  //远程连接的域名  public static final String DEFAULT_HOST = "irc.wikimedia.org";  //远程连接的端口  public static final int DEFAULT_PORT = 6667;  //IRC协议的channel  public static final String DEFAULT_CHANNEL = "#en.wikipedia";
private final String host; private final int port; private final String channel;
public WikipediaEditsSource() { this(DEFAULT_HOST, DEFAULT_PORT, DEFAULT_CHANNEL); }
public WikipediaEditsSource(String host, int port, String channel) { this.host = host; this.port = port; this.channel = Objects.requireNonNull(channel); }
复制代码


  • 通过上述代码可以见到,数据的来源是 irc.wikimedia.org 这个网址;

主业务代码

  • 主要的业务逻辑是 WikipediaEditsSource 的 run 方法,该方法在任务启动的时候会被 StreamSource.run 方法调用:


  @Override  public void run(SourceContext<WikipediaEditEvent> ctx) throws Exception {    try (WikipediaEditEventIrcStream ircStream = new WikipediaEditEventIrcStream(host, port)) {      // 创建一个IRC协议的连接      ircStream.connect();      //进入指定的channel      ircStream.join(channel);
try { while (isRunning) { //从阻塞队列中获取数据 WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS); //如果取到了数据,就调用ctx.collect方法,将数据生产到Flink环境,给其他operator使用 if (edit != null) { ctx.collect(edit); } } } finally { //结束时要向服务器发送数据表示离开 ircStream.leave(channel); } } }
复制代码


  • 上面的代码,我们挑几处重要的展开看一看;

和维基百科消息服务器建立连接后做的事情

  • 为了弄明白 Flink 是如何与维基百科的数据源建立连接的,先把**ircStream.connect()**这段代码展开,对应的是 IRCConnection 类的 connect 方法:


public void connect() throws IOException {    if (level != 0) // otherwise disconnected or connect      throw new SocketException("Socket closed or already open ("+ level +")");    IOException exception = null;    Socket s = null;    for (int i = 0; i < ports.length && s == null; i++) {      try {        //建立的是普通Socket连接        s = new Socket(host, ports[i]);        exception = null;      } catch (IOException exc) {        if (s != null)          s.close();        s = null;        exception = exc;      }    }    if (exception != null)      throw exception; // connection wasn't successful at any port
prepare(s); }
复制代码


  • 上述代码表明,Flink 与维基百科的数据源服务器之间建立的是普通的 Socket 连接,至于 IRC 协议,都是在这个 Socket 连接的通道里的一些读写操作;

  • 上面的 prepare 方法比较关键,展开看看:


protected void prepare(Socket s) throws IOException {    if (s == null)      throw new SocketException("Socket s is null, not connected");    socket = s;    level = 1;    s.setSoTimeout(timeout);    in  = new BufferedReader(new InputStreamReader(s.getInputStream(),        encoding));    out = new PrintWriter(new OutputStreamWriter(s.getOutputStream(),        encoding));
//IRCConnection是Thread的子类,执行start方法就表明会启动一个线程来执行IRCConnection的run方法 start(); //遵守IRC协议约定,发送一些注册相关的内容 register(); }
复制代码


  • 可以看出,prepare 方法做了两个重要的事情:启动一个子线程、发送 IRC 协议的注册信息,接下来看启动的子线程做了什么;

  • 打开 IRCConnection 的 run 方法:


public void run() {    try {      String line;      while (!isInterrupted()) {        line = in.readLine();        if (line != null)          get(line);        else          close();      }    } catch (IOException exc) {      close();    }  }
复制代码


  • run 方法中的内容很简单,就是让这个子线程负责读取远端发送的字符串,每读到一行就调用 get 方法去处理;

  • get 方法的内容很多,做的事情是根据 IRC 协议解析这个字符串再做不同的处理,这里我们只要关注下面这段,就是收到一条业务消息后如何处理:


//每当有人编辑了维基百科,这里就会收到一条command为PRIVMSG的记录if (command.equalsIgnoreCase("PRIVMSG")) { // MESSAGE      IRCUser user = p.getUser();      String middle = p.getMiddle();      String trailing = p.getTrailing();      for (int i = listeners.length - 1; i >= 0; i--)        //调用listener的onPrivmsg方法        listeners[i].onPrivmsg(middle, user, trailing);    }
复制代码


  • 如上所示,每收到一条远端发来的消息,都会调用 listener 的 onPrivmsg 方法,这里的注册的 linstener 是 WikipediaIrcChannelListener 对象;

  • 打开 WikipediaIrcChannelListener 的 onPrivmsg 方法,看看收到消息后做了什么:


@Overridepublic void onPrivmsg(String target, IRCUser user, String msg) {  LOG.debug("[{}] {}: {}.", target, user.getNick(), msg);  //根据消息构造一个WikipediaEditEvent对象,就是Flink的业务流程中用到的数据对象  WikipediaEditEvent event = WikipediaEditEvent.fromRawEvent(    System.currentTimeMillis(),    target,    msg);
if (event != null) { //eidts是个阻塞队列,WikipediaEditEvent被放入队列 if (!edits.offer(event)) { LOG.debug("Dropping message, because of full queue."); } }}
复制代码


  • 上面的代码已经分析把主要逻辑展现出来了,从 Socket 读到的数据被解析成 Flink 实时计算时用到的 WikipediaEditEvent 对象后,被放入阻塞队列中,这也就是负责读取的子线程的主要工作了;

如何消费队列中的数据

  • 前面的分析中我们得知:收到的数据被放入了阻塞队列中,现在回到 WikipediaEditsSource 的 run 方法再看看,这里面就有从阻塞队列取出数据的操作:


while (isRunning) {    //从阻塞队列中获取数据    WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS);    //如果取到了数据,就调用ctx.collect方法,将数据生产到Flink环境,给其他operator使用    if (edit != null) {      ctx.collect(edit);    }}
复制代码


  • 如上所示,一个 while 循环不停的从阻塞队列中获取数据,取到了就调用 SourceContext 的 collect,把一条数据生产到在 Flink 环境中,给后面的流程使用;

小结

  • 至此,WikipediaEditsSource 源码的分析就完成了,在此小结一下:


  1. 和 irc.wikimedia.org 这个网站建立 Socket 连接;

  2. 连接建立后,读写相关的内容都是基于 IRC 协议的,这是个应用层的协议,有自己的格式、关键字、命令字等约定,本次分析中我们没有花太多时间在这个协议上,有兴趣的读者在这里了解更多:https://en.wikipedia.org/wiki/Internet_Relay_Chat

  3. 启动一个子线程读取 Socket 信息,收到数据后,构造成 WikipediaEditEvent 对象,放入阻塞队列中;

  4. 原先的那个线程在一个 while 循环中从阻塞队列中取数据,如果取到了数据就调用 ctx.collect 方法,这样数据就生产到了 Flink 环境,其他 operator 就可以使用了;


  • 以上就是拆解 WikipediaEditsSource 的过程,现在我们对 Flink 数据源有了更进一步的了解,后续在开发自定义数据源的时候也有了参考实现;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 2 小时前阅读数: 7
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
Flink数据源拆解分析(WikipediaEditsSource)_Java_程序员欣宸_InfoQ写作社区