写点什么

Reactor 模式和 Java NIO

作者:Java·课代表
  • 2021 年 12 月 26 日
  • 本文字数:7865 字

    阅读完需:约 26 分钟

概述

本文先从基本的 Socket 编程模式说起,介绍了 Java 传统的同步阻塞 IO 网络编程的基本实现,以及存在的性能问题,从而引出 Reactor 设计模式,最后通过 Java NIO 给出单 Reactor 单线程的实现方案。

Socket 编程模式

Unix 有几个统一性的理念或象征,并塑造了它的 API 及由此形成的开发风格。其中最重要的一点应当是“一切皆文件”模型及在此基础上建立的管道概念。


在 Unix/Linux 环境下,网络中的进程通过 Socket 进行通信,Socket 本质上也是一种特殊的文件,可以按照“打开,读写,关闭”的模式来操作。Socket 编程的基本模式如图 1 所示:


图 1 Socket 编程模式


  1. 创建 socket:本质上就是创建一个文件,每个文件都有一个整型的文件描述符(fd)来指代这个文件;

  2. 绑定端口:一台服务器可以同时运行多个不同的应用,在 TCP/IP 协议下通过端口进行区分,因此接下来需要绑定端口,所有连接到该端口的请求都会被我们的服务处理;

  3. 监听端口:执行创建 socket 和bind之后,socket 还处于closed状态,不对外监听,需要调用listen方法,让 socket 进入被动监听状态;其 API 定义如下:


int listen(int sockfd, int backlog);// 在TCP协议下,建立连接需要完成三次握手,当连接建立完成后会先放到一个连接队列,backlog就是指定这个队列的大小。
复制代码


  1. 接收请求:通过调用accept()从已完成连接的队列中拿到连接进行处理,如果没有连接则调用会被阻塞。

基于同步阻塞 IO 的 Java Socket 编程

下面介绍在 Java 语言下如何完成 Socket 通信。传统的 Java Socket 编程模式使用同步阻塞 IO,如图 2 所示。



图 2 经典的 Java Socket 编程模式


服务端通过new ServerSocket(端口号)完成了绑定端口和监听端口的工作,接着循环调用 accept() 方法获取客户端请求(如果没有新的请求,程序就会阻塞),并为每一个客户端请求创建一个处理线程,避免因为主线程正在处理请求而无法响应其他连接。具体实现代码如下:


import java.io.*;import java.net.ServerSocket;import java.net.Socket;
public class Server { public static void main(String[] args) { try { // 绑定并监听端口 ServerSocket server = new ServerSocket(5566); Socket client; while (!Thread.interrupted()) { // 接受请求,没有请求会阻塞 client = server.accept(); new Thread(new Handler(client)).start(); } } catch (IOException e) { e.printStackTrace(); } }
static class Handler implements Runnable { final Socket client;
public Handler(Socket client) { this.client = client; }
@Override public void run() { try { BufferedReader reader = new BufferedReader( new InputStreamReader(client.getInputStream())); // 接收客户端发送的内容 String line; PrintWriter writer = new PrintWriter( new OutputStreamWriter(client.getOutputStream())); // 客户端连接未关闭前,readLine返回值不为null,没有数据时会阻塞 while ((line = reader.readLine()) != null) { writer.println("你输入的是:" + line); writer.flush();
// 通过约定特定输入,结束通讯 if ("end".equals(line)) { break; } } writer.close(); reader.close(); client.close(); } catch (IOException e) { e.printStackTrace(); } } }}
复制代码


在本地启动服务端程序,通过 telnet 请求连接。

这个模式在并发请求量不高的情况下是完全没有问题的,也建议使用这种模式。但在在高并发环境下,由于线程本身需要耗费系统资源,其次多线程下需要频繁进行上下文切换也会消耗性能,再者连接建立后还需要等待客户端数据到来,因此并不是最有效的解决方案。问题的关键在于两点:

  1. 频繁的上下文切换;

  2. 一个连接作为一个整体进行处理,粒度太粗。连接建立后,还需要等待数据可读才能进行真正的处理。

基于事件驱动的 Reactor 模式

解决上述问题的方式就是分治法和事件驱动。即:

  1. 把一个连接的处理过程拆成多个小任务;

  2. 只有在任务等待的事件触发时才执行任务。


Reactor 模式的核心理念即是如此。Reactor 模式也叫 Dispatcher 模式,即 I/O 多路复用统一监听事件,收到事件后分配(Dispatch)给某个进程。


论文《Reactor: An Object Behavioral Pattern forDemultiplexing and Dispatching Handles for Synchronous Events》详细介绍了 Reactor 模式的设计和实现,下面先理解几个关键的组成部分的作用:

主要概念

Handles

Handle(注意和 handler 区分开),中文常见翻译为句柄。句柄的叫法比较抽象,这里引用知乎的一个回答帮助大家理解。


句柄的英文是 handle。在英文中,有操作、处理、控制之类的意义。作为一个名词时,是指某个中间媒介,通过这个中间媒介可控制、操作某样东西。

这样说有点抽象,举个例子。door handle 是指门把手,通过门把手可以去控制门,但 door handle 并非 door 本身,只是一个中间媒介。又比如 knife handle 是刀柄,通过刀柄可以使用刀。

跟 door handle 类似,我们可以用 file handle 去操作 file, 但 file handle 并非 file 本身。这个 file handle 就被翻译成文件句柄,同理还有各种资源句柄。


一个句柄代表操作系统的一个资源,例如文件。event 会在句柄上触发,所以监控 event 需要在句柄上等待。

Synchronous Event Demultiplexer

直译过来就是同步事件分用器,它的作用是阻塞等待上面提到的一系列句柄集合产生事件。为什么叫做分用器?因为它监控了多个句柄,当某一个事件发生了,它会返回对应的那个句柄。一个常见的 I/O 事件分用器就是 select 系统调用。

Initiation Dispatcher

初始分发器,提供接口用于注册事件处理器 Event Handler。当同步事件分发器检测有一个句柄上产生事件时,会通知初始分发器,初始分发器通过查询已注册的事件处理器找到对应的处理对象。

Event Handler

事件处理器抽象类,定义了事件的处理方法。

Concrete Event Handler

具体的事件处理实现。

工作流程



Java NIO 下实现 Reactor 模式

Doug Lea 在《Scalable IO in Java》中介绍 Reactor 模式时引入了三种角色:

  1. Reactor:负责响应 IO 事件,分发给对应的事件处理,相当于上文提到的 Initiation Dispatcher

  2. Handler:事件处理器,与上文一致;

  3. Acceptor:特殊的事件处理器,专门处理connection事件。


单 Reactor 单线程的实现方案:

图片引用自李运华的《单服务器高性能模式:Reactor 与 Proactor》一文


在给出具体实现前,需要先介绍下 Java NIO 的基本用法,它支持面向缓冲的,基于通道的 I/O 操作方法,主要由以下几个核心部分组成:ChannelBufferSelector

Channel 和 Buffer

一般翻译为通道和缓存,数据可以从 Channel 读到 Buffer 中,也可以从 Buffer 写到 Channel 中。



Selector

Selector 允许单线程处理多个 Channel。要使用 Selector,得向 Selector 注册 Channel,然后调用它的 select() 方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。


代码示例

下面使用 Java NIO 实现单 Reactor 单线程。参考《Scalable IO in Java》的示例,需要实现三个角色,首先是 Reactor,负责事件监听和分发。事件监听使用 Selector 实现 IO 多路复用。从前面我们已经了解到,需要先创建 Channel,再向 Selector 注册。


Java NIO 中定义了四种事件类型,分别是 OP_CONNECT(客户端使用)、OP_ACCEPT(服务端使用)、OP_READOP_WRITE



显然,Reactor 需要负责监听 OP_ACCEPT 事件。


import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;
public class Reactor { final ServerSocketChannel serverSocket; final Selector selector;
public Reactor(int port) throws IOException { // 创建Channel serverSocket = ServerSocketChannel.open(); serverSocket.configureBlocking(false);
// 绑定端口 serverSocket.bind(new InetSocketAddress(port));
// 创建Selector selector = Selector.open(); // 向Selector注册Channel,通过Selector监听Channel上的 Accept事件 serverSocket.register(selector, SelectionKey.OP_ACCEPT); }}
复制代码


Selector 监听到 OP_ACCEPT 事件后,如前面所说,连接建立完成后会先放到一个连接队列,还需要调用 accept() 从已完成连接的队列中拿到连接进行处理。因此引入 Acceptor 角色来专门处理。AcceptorHandler 都是事件处理器,为了减少篇幅,这里用 Runnable 作为他们共同的抽象处理类。


Acceptor 为每个客户端连接绑定一个对应的处理类实例,后续其他事件都由对应的事件处理类处理。需要说明的是,虽然这里用 Runnable 作为处理类的抽象类,但并没有当成线程来使用,而是当成普通的抽象类,直接调用run()方法执行。


import java.io.IOException;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;
public class Acceptor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket;
public Acceptor(Selector selector, ServerSocketChannel serverSocket) { this.selector = selector; this.serverSocket = serverSocket; } @Override public void run() { try { // 调用accept()从已完成连接的队列中拿到连接进行处理 SocketChannel clientSocket = serverSocket.accept(); if (clientSocket != null) { // 绑定连接对应的事件处理器实例 new Handler(selector, clientSocket); } } catch (IOException e) { e.printStackTrace(); } }}
复制代码


接下来需要打通 ReactorAcceptor 两个角色的联系:当Selector监听到 OP_ACCEPT 事件后,把事件分发给 Acceptor 。在前面的基础上,我们在 Reactor 角色上加上事件分发逻辑:


  1. 轮询 select() 获取有事件就绪的通道;

  2. 获取所有就绪通道对应的 SelectedKey 集合;

  3. 依次分发处理 SelectedKey 实例;

  • 每个 SelectedKey 都附着了对应的处理器实例,获取实例;

  • 执行每个实例的处理方法。


ServerSocketChannel 注册到 Selector 后,生成对应的 SelectedKey 对象,在上面附着 Acceptor 实例。当 ServerSocketChannel 监听的事件 OP_ACCEPT 就绪时,Selector.select() 返回对应的 SelectedKey,从而能获取到 Acceptor 实例,进而由 Acceptor 接管连接处理事件。


import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.util.Set;
public class Reactor { final ServerSocketChannel serverSocket; final Selector selector;
public Reactor(int port) throws IOException { // 创建Channel serverSocket = ServerSocketChannel.open(); serverSocket.configureBlocking(false);
// 绑定端口 serverSocket.bind(new InetSocketAddress(port));
// 创建Selector selector = Selector.open();
// 把Channel注册到Selector进行监听,监听连接的 Accept事件 SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT); // 可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道 selectionKey.attach(new Acceptor(selector, serverSocket)); }
public void run() { while (!Thread.interrupted()) { try { // 一直阻塞到某个注册的通道有事件就绪 selector.select();
// 每一个注册的通道都对应一个SelectionKey Set<SelectionKey> selected = selector.selectedKeys(); for (SelectionKey selectionKey : selected) { // 分发处理 dispatch(selectionKey); } // 移除避免重复处理 selected.clear(); } catch (IOException e) { e.printStackTrace(); } } }
private void dispatch(SelectionKey selectionKey) { // 虽然这里用 Runnable 作为处理类的抽象类,但并没有当成线程来使用,而是当成普通的抽象类,直接调用run()方法执行 Runnable r = (Runnable) selectionKey.attachment(); if (r != null) { r.run(); } }}
复制代码


目前为止,我们通过在 Selector 上注册 OP_ACCEPT 事件打通了 ReactorAcceptorAcceptor 为每个连接绑定了对应的 Handler 实例。当读缓冲区有数据可读时,我们希望响应事件,并且由Handler 来处理。因此,仿照前面的做法:


  1. 需要在 Selector 上注册 OP_READ

  2. Selector 返回的 SelectedKey 上附着 Handler 实例。


import java.io.IOException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;
public class Handler implements Runnable { final Selector selector; final SocketChannel socket; final SelectionKey sk;
public Handler(Selector selector, SocketChannel socket) throws IOException { this.selector = selector; this.socket = socket; this.socket.configureBlocking(false);
// 把Channel注册到Selector进行监听,监听读缓冲区数据就绪事件 sk = socket.register(selector, SelectionKey.OP_READ);
// 把事件处理类本身附着到SelectionKey,方便识别通道和后续处理 sk.attach(this); }
@Override public void run() { }}
复制代码


接下来完善读事件和写事件的处理逻辑。


import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;
public class Handler implements Runnable { final Selector selector; final SocketChannel socket; final SelectionKey sk;
ByteBuffer inputBuffer = ByteBuffer.allocate(1024); ByteBuffer outputBuffer = ByteBuffer.allocate(1024);
public Handler(Selector selector, SocketChannel socket) throws IOException { this.selector = selector; this.socket = socket; this.socket.configureBlocking(false);
// 把Channel注册到Selector进行监听,监听读缓冲区数据就绪事件 sk = socket.register(selector, SelectionKey.OP_READ);
// 把事件处理类本身附着到SelectionKey,方便识别通道和后续处理 sk.attach(this); }
@Override public void run() { if (sk.isReadable()) { read(); } else if (sk.isWritable()) { write(); } }
public void read() { try { while (socket.read(inputBuffer) > 0) { inputBuffer.flip(); while(inputBuffer.hasRemaining()){ System.out.print((char) inputBuffer.get()); } inputBuffer.clear();
// 注册监听写缓冲区空闲事件,与前面通过channel注册到selector的写法等价 sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); } } catch (IOException e) { e.printStackTrace(); } }
public void write() { try { outputBuffer.put("YYDS-ABCD-EFG\n".getBytes()); outputBuffer.flip(); while (outputBuffer.hasRemaining()) { socket.write(outputBuffer); } outputBuffer.clear(); // 取消监听缓冲区空闲事件 sk.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE); } catch (IOException e) { e.printStackTrace(); } }}
复制代码


启动程序


import java.io.IOException;
public class Main { public static void main(String[] args) throws IOException { Reactor reactor = new Reactor(5566); reactor.run(); }}
复制代码


在本地启动服务端程序,通过 telnet 发起请求验证。

补充说明

水平触发和边缘触发

Java 的 NIO 属于水平触发,即条件触发,在使用 Java 的 NIO 编程的时候,在没有数据可以往外写的时候要取消写事件,在有数据往外写的时候再注册写事件。


水平触发(level-triggered,也被称为条件触发)LT: 只要满足条件,就触发一个事件(只要有数据没有被获取,内核就不断通知你)


边缘触发(edge-triggered)ET: 每当状态变化时,触发一个事件。


参考资料

[1]. UNIX 编程艺术,[美国] Eric S·Raymond

[2]. 为什么网络 I/O 会被阻塞?, yes 的练级攻略,https://mp.weixin.qq.com/s/BH1H-JUhKqgaAjaIx5k0Fw

[3]. Scalable IO in Java,Doug Lea,http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

[4]. Reactor: An Object Behavioral Pattern forDemultiplexing and Dispatching Handles for Synchronous Events,Douglas C. Schmidt,https://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf

[5]. 单服务器高性能模式:Reactor 与 Proactor,李运华,https://time.geekbang.org/column/article/8805?code=QlFu-hKw1A4UJB1cTk%2FNhziNpEeC2UV2F670BhdrVcc%3D&source=app_share

[6]. 句柄是什么? - 黄兢成的回答 - 知乎https://www.zhihu.com/question/27656256/answer/943130123

[7]. Java NIO 系列教程(一) Java NIO 概述,Jakob Jenkov,http://ifeve.com/overview/

[8]. JAVA NIO 操作类型,sarchitect,https://blog.51cto.com/stevex/1581934

[9]. java nio 使用的是水平触发还是边缘触发?, wuxinliuleihttps://www.zhihu.com/question/22524908

发布于: 3 小时前
用户头像

还未添加个人签名 2018.07.17 加入

还未添加个人简介

评论

发布
暂无评论
Reactor模式和Java NIO