写点什么

Flink 数据源拆解分析 (WikipediaEditsSource)

作者:Java高工P7
  • 2021 年 11 月 11 日
  • 本文字数:2017 字

    阅读完需:约 7 分钟

public static final int DEFAULT_PORT = 6667;


//IRC 协议的 channel


public static final Strin


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


g DEFAULT_CHANNEL = "#en.wikipedia";


private final String host;


private final int port;


private final String channel;


public WikipediaEditsSource() {


this(DEFAULT_HOST, DEFAULT_PORT, DEFAULT_CHANNEL);


}


public WikipediaEditsSource(String host, int port, String channel) {


this.host = host;


this.port = port;


this.channel = Objects.requireNonNull(channel);


}


通过上述代码可以见到,数据的来源是 irc.wikimedia.org 这个网址;

主业务代码

主要的业务逻辑是 WikipediaEditsSource 的 run 方法,该方法在任务启动的时候会被 StreamSource.run 方法调用:


@Override


public void run(SourceContext<WikipediaEditEvent> ctx) throws Exception {


try (WikipediaEditEventIrcStream ircStream = new WikipediaEditEventIrcStream(host, port)) {


// 创建一个 IRC 协议的连接


ircStream.connect();


//进入指定的 channel


ircStream.join(channel);


try {


while (isRunning) {


//从阻塞队列中获取数据


WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS);


//如果取到了数据,就调用 ctx.collect 方法,将数据生产到 Flink 环境,给其他 operator 使用


if (edit != null) {


ctx.collect(edit);


}


}


} finally {


//结束时要向服务器发送数据表示离开


ircStream.leave(channel);


}


}


}


上面的代码,我们挑几处重要的展开看一看;

和维基百科消息服务器建立连接后做的事情

  1. 为了弄明白 Flink 是如何与维基百科的数据源建立连接的,先把 ircStream.connect()这段代码展开,对应的是 IRCConnection 类的 connect 方法:


public void connect() throws IOException {


if (level != 0) // otherwise disconnected or connect


throw new SocketException("Socket closed or already open ("+ level +")");


IOException exception = null;


Socket s = null;


for (int i = 0; i < ports.length && s == null; i++) {


try {


//建立的是普通 Socket 连接


s = new Socket(host, ports[i]);


exception = null;


} catch (IOException exc) {


if (s != null)


s.close();


s = null;


exception = exc;


}


}


if (exception != null)


throw exception; // connection wasn't successful at any port


prepare(s);


}


上述代码表明,Flink 与维基百科的数据源服务器之间建立的是普通的 Socket 连接,至于 IRC 协议,都是在这个 Socket 连接的通道里的一些读写操作;


  1. 上面的 prepare 方法比较关键,展开看看:


protected void prepare(Socket s) throws IOException {


if (s == null)


throw new SocketException("Socket s is null, not connected");


socket = s;


level = 1;


s.setSoTimeout(timeout);


in = new BufferedReader(new InputStreamReader(s.getInputStream(),


encoding));


out = new PrintWriter(new OutputStreamWriter(s.getOutputStream(),


encoding));


//IRCConnection 是 Thread 的子类,执行 start 方法就表明会启动一个线程来执行 IRCConnection 的 run 方法


start();


//遵守 IRC 协议约定,发送一些注册相关的内容


register();


}


可以看出,prepare 方法做了两个重要的事情:启动一个子线程、发送 IRC 协议的注册信息,接下来看启动的子线程做了什么;


  1. 打开 IRCConnection 的 run 方法:


public void run() {


try {


String line;


while (!isInterrupted()) {


line = in.readLine();


if (line != null)


get(line);


else


close();


}


} catch (IOException exc) {


close();


}


}


run 方法中的内容很简单,就是让这个子线程负责读取远端发送的字符串,每读到一行就调用 get 方法去处理;


  1. get 方法的内容很多,做的事情是根据 IRC 协议解析这个字符串再做不同的处理,这里我们只要关注下面这段,就是收到一条业务消息后如何处理:


//每当有人编辑了维基百科,这里就会收到一条 command 为 PRIVMSG 的记录


if (command.equalsIgnoreCase("PRIVMSG")) { // MESSAGE


IRCUser user = p.getUser();


String middle = p.getMiddle();


String trailing = p.getTrailing();


for (int i = listeners.length - 1; i >= 0; i--)


//调用 listener 的 onPrivmsg 方法


listeners[i].onPrivmsg(middle, user, trailing);


}


如上所示,每收到一条远端发来的消息,都会调用 listener 的 onPrivmsg 方法,这里的注册的 linstener 是 WikipediaIrcChannelListener 对象;


  1. 打开 WikipediaIrcChannelListener 的 onPrivmsg 方法,看看收到消息后做了什么:


@Override


public void onPrivmsg(String target, IRCUser user, String msg) {


LOG.debug("[{}] {}: {}.", target, user.getNick(), msg);


//根据消息构造一个 WikipediaEditEvent 对象,就是 Flink 的业务流程中用到的数据对象


WikipediaEditEvent event = WikipediaEditEvent.fromRawEvent(


System.currentTimeMillis(),


target,


msg);


if (event != null) {

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
Flink数据源拆解分析(WikipediaEditsSource)