概述
本文先从基本的 Socket 编程模式说起,介绍了 Java 传统的同步阻塞 IO 网络编程的基本实现,以及存在的性能问题,从而引出 Reactor 设计模式,最后通过 Java NIO 给出单 Reactor 单线程的实现方案。
Socket 编程模式
Unix 有几个统一性的理念或象征,并塑造了它的 API 及由此形成的开发风格。其中最重要的一点应当是“一切皆文件”模型及在此基础上建立的管道概念。
在 Unix/Linux 环境下,网络中的进程通过 Socket 进行通信,Socket 本质上也是一种特殊的文件,可以按照“打开,读写,关闭”的模式来操作。Socket 编程的基本模式如图 1 所示:
图 1 Socket 编程模式
创建 socket:本质上就是创建一个文件,每个文件都有一个整型的文件描述符(fd)来指代这个文件;
绑定端口:一台服务器可以同时运行多个不同的应用,在 TCP/IP 协议下通过端口进行区分,因此接下来需要绑定端口,所有连接到该端口的请求都会被我们的服务处理;
监听端口:执行创建 socket 和bind
之后,socket 还处于closed
状态,不对外监听,需要调用listen
方法,让 socket 进入被动监听状态;其 API 定义如下:
int listen(int sockfd, int backlog);
// 在TCP协议下,建立连接需要完成三次握手,当连接建立完成后会先放到一个连接队列,backlog就是指定这个队列的大小。
复制代码
接收请求:通过调用accept()
从已完成连接的队列中拿到连接进行处理,如果没有连接则调用
会被阻塞。
基于同步阻塞 IO 的 Java Socket 编程
下面介绍在 Java 语言下如何完成 Socket 通信。传统的 Java Socket 编程模式使用同步阻塞 IO,如图 2 所示。
图 2 经典的 Java Socket 编程模式
服务端通过new ServerSocket(端口号)
完成了绑定端口和监听端口的工作,接着循环调用 accept()
方法获取客户端请求(如果没有新的请求,程序就会阻塞),并为每一个客户端请求创建一个处理线程,避免因为主线程正在处理请求而无法响应其他连接。具体实现代码如下:
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
public class Server {
public static void main(String[] args) {
try {
// 绑定并监听端口
ServerSocket server = new ServerSocket(5566);
Socket client;
while (!Thread.interrupted()) {
// 接受请求,没有请求会阻塞
client = server.accept();
new Thread(new Handler(client)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class Handler implements Runnable {
final Socket client;
public Handler(Socket client) {
this.client = client;
}
@Override
public void run() {
try {
BufferedReader reader = new BufferedReader(
new InputStreamReader(client.getInputStream()));
// 接收客户端发送的内容
String line;
PrintWriter writer = new PrintWriter(
new OutputStreamWriter(client.getOutputStream()));
// 客户端连接未关闭前,readLine返回值不为null,没有数据时会阻塞
while ((line = reader.readLine()) != null) {
writer.println("你输入的是:" + line);
writer.flush();
// 通过约定特定输入,结束通讯
if ("end".equals(line)) {
break;
}
}
writer.close();
reader.close();
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
复制代码
在本地启动服务端程序,通过 telnet
请求连接。
这个模式在并发请求量不高的情况下是完全没有问题的,也建议使用这种模式。但在在高并发环境下,由于线程本身需要耗费系统资源,其次多线程下需要频繁进行上下文切换也会消耗性能,再者连接建立后还需要等待客户端数据到来,因此并不是最有效的解决方案。问题的关键在于两点:
频繁的上下文切换;
一个连接作为一个整体进行处理,粒度太粗。连接建立后,还需要等待数据可读才能进行真正的处理。
基于事件驱动的 Reactor 模式
解决上述问题的方式就是分治法和事件驱动。即:
把一个连接的处理过程拆成多个小任务;
只有在任务等待的事件触发时才执行任务。
Reactor 模式的核心理念即是如此。Reactor 模式也叫 Dispatcher 模式,即 I/O 多路复用统一监听事件,收到事件后分配(Dispatch)给某个进程。
论文《Reactor: An Object Behavioral Pattern forDemultiplexing and Dispatching Handles for Synchronous Events》详细介绍了 Reactor 模式的设计和实现,下面先理解几个关键的组成部分的作用:
主要概念
Handles
Handle(注意和 handler 区分开),中文常见翻译为句柄。句柄的叫法比较抽象,这里引用知乎的一个回答帮助大家理解。
句柄的英文是 handle。在英文中,有操作、处理、控制之类的意义。作为一个名词时,是指某个中间媒介,通过这个中间媒介可控制、操作某样东西。
这样说有点抽象,举个例子。door handle 是指门把手,通过门把手可以去控制门,但 door handle 并非 door 本身,只是一个中间媒介。又比如 knife handle 是刀柄,通过刀柄可以使用刀。
跟 door handle 类似,我们可以用 file handle 去操作 file, 但 file handle 并非 file 本身。这个 file handle 就被翻译成文件句柄,同理还有各种资源句柄。
一个句柄代表操作系统的一个资源,例如文件。event
会在句柄上触发,所以监控 event
需要在句柄上等待。
Synchronous Event Demultiplexer
直译过来就是同步事件分用器,它的作用是阻塞等待上面提到的一系列句柄集合产生事件。为什么叫做分用器?因为它监控了多个句柄,当某一个事件发生了,它会返回对应的那个句柄。一个常见的 I/O 事件分用器就是 select
系统调用。
Initiation Dispatcher
初始分发器,提供接口用于注册事件处理器 Event Handler
。当同步事件分发器检测有一个句柄上产生事件时,会通知初始分发器,初始分发器通过查询已注册的事件处理器找到对应的处理对象。
Event Handler
事件处理器抽象类,定义了事件的处理方法。
Concrete Event Handler
具体的事件处理实现。
工作流程
Java NIO 下实现 Reactor 模式
Doug Lea 在《Scalable IO in Java》中介绍 Reactor 模式时引入了三种角色:
Reactor:负责响应 IO 事件,分发给对应的事件处理,相当于上文提到的 Initiation Dispatcher
;
Handler:事件处理器,与上文一致;
Acceptor:特殊的事件处理器,专门处理connection
事件。
单 Reactor 单线程的实现方案:
图片引用自李运华的《单服务器高性能模式:Reactor 与 Proactor》一文
在给出具体实现前,需要先介绍下 Java NIO 的基本用法,它支持面向缓冲的,基于通道的 I/O 操作方法,主要由以下几个核心部分组成:Channel
、Buffer
和 Selector
。
Channel 和 Buffer
一般翻译为通道和缓存,数据可以从 Channel
读到 Buffer
中,也可以从 Buffer
写到 Channel
中。
Selector
Selector
允许单线程处理多个 Channel
。要使用 Selector
,得向 Selector
注册 Channel
,然后调用它的 select()
方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。
代码示例
下面使用 Java NIO 实现单 Reactor 单线程。参考《Scalable IO in Java》的示例,需要实现三个角色,首先是 Reactor
,负责事件监听和分发。事件监听使用 Selector
实现 IO 多路复用。从前面我们已经了解到,需要先创建 Channel
,再向 Selector
注册。
Java NIO 中定义了四种事件类型,分别是 OP_CONNECT
(客户端使用)、OP_ACCEPT
(服务端使用)、OP_READ
、OP_WRITE
。
显然,Reactor
需要负责监听 OP_ACCEPT
事件。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
public class Reactor {
final ServerSocketChannel serverSocket;
final Selector selector;
public Reactor(int port) throws IOException {
// 创建Channel
serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
// 绑定端口
serverSocket.bind(new InetSocketAddress(port));
// 创建Selector
selector = Selector.open();
// 向Selector注册Channel,通过Selector监听Channel上的 Accept事件
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
}
}
复制代码
当 Selector
监听到 OP_ACCEPT
事件后,如前面所说,连接建立完成后会先放到一个连接队列,还需要调用 accept()
从已完成连接的队列中拿到连接进行处理。因此引入 Acceptor
角色来专门处理。Acceptor
和 Handler
都是事件处理器,为了减少篇幅,这里用 Runnable
作为他们共同的抽象处理类。
Acceptor
为每个客户端连接绑定一个对应的处理类实例,后续其他事件都由对应的事件处理类处理。需要说明的是,虽然这里用 Runnable
作为处理类的抽象类,但并没有当成线程来使用,而是当成普通的抽象类,直接调用run()
方法执行。
import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class Acceptor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
public Acceptor(Selector selector, ServerSocketChannel serverSocket) {
this.selector = selector;
this.serverSocket = serverSocket;
}
@Override
public void run() {
try {
// 调用accept()从已完成连接的队列中拿到连接进行处理
SocketChannel clientSocket = serverSocket.accept();
if (clientSocket != null) {
// 绑定连接对应的事件处理器实例
new Handler(selector, clientSocket);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
复制代码
接下来需要打通 Reactor
和 Acceptor
两个角色的联系:当Selector
监听到 OP_ACCEPT
事件后,把事件分发给 Acceptor
。在前面的基础上,我们在 Reactor
角色上加上事件分发逻辑:
轮询 select()
获取有事件就绪的通道;
获取所有就绪通道对应的 SelectedKey
集合;
依次分发处理 SelectedKey
实例;
ServerSocketChannel
注册到 Selector
后,生成对应的 SelectedKey
对象,在上面附着 Acceptor
实例。当 ServerSocketChannel
监听的事件 OP_ACCEPT
就绪时,Selector.select()
返回对应的 SelectedKey
,从而能获取到 Acceptor
实例,进而由 Acceptor
接管连接处理事件。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Set;
public class Reactor {
final ServerSocketChannel serverSocket;
final Selector selector;
public Reactor(int port) throws IOException {
// 创建Channel
serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
// 绑定端口
serverSocket.bind(new InetSocketAddress(port));
// 创建Selector
selector = Selector.open();
// 把Channel注册到Selector进行监听,监听连接的 Accept事件
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
// 可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道
selectionKey.attach(new Acceptor(selector, serverSocket));
}
public void run() {
while (!Thread.interrupted()) {
try {
// 一直阻塞到某个注册的通道有事件就绪
selector.select();
// 每一个注册的通道都对应一个SelectionKey
Set<SelectionKey> selected = selector.selectedKeys();
for (SelectionKey selectionKey : selected) {
// 分发处理
dispatch(selectionKey);
}
// 移除避免重复处理
selected.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey selectionKey) {
// 虽然这里用 Runnable 作为处理类的抽象类,但并没有当成线程来使用,而是当成普通的抽象类,直接调用run()方法执行
Runnable r = (Runnable) selectionKey.attachment();
if (r != null) {
r.run();
}
}
}
复制代码
目前为止,我们通过在 Selector
上注册 OP_ACCEPT
事件打通了 Reactor
和 Acceptor
,Acceptor
为每个连接绑定了对应的 Handler
实例。当读缓冲区有数据可读时,我们希望响应事件,并且由Handler
来处理。因此,仿照前面的做法:
需要在 Selector
上注册 OP_READ
;
在 Selector
返回的 SelectedKey
上附着 Handler
实例。
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
public class Handler implements Runnable {
final Selector selector;
final SocketChannel socket;
final SelectionKey sk;
public Handler(Selector selector, SocketChannel socket) throws IOException {
this.selector = selector;
this.socket = socket;
this.socket.configureBlocking(false);
// 把Channel注册到Selector进行监听,监听读缓冲区数据就绪事件
sk = socket.register(selector, SelectionKey.OP_READ);
// 把事件处理类本身附着到SelectionKey,方便识别通道和后续处理
sk.attach(this);
}
@Override
public void run() {
}
}
复制代码
接下来完善读事件和写事件的处理逻辑。
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
public class Handler implements Runnable {
final Selector selector;
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
ByteBuffer outputBuffer = ByteBuffer.allocate(1024);
public Handler(Selector selector, SocketChannel socket) throws IOException {
this.selector = selector;
this.socket = socket;
this.socket.configureBlocking(false);
// 把Channel注册到Selector进行监听,监听读缓冲区数据就绪事件
sk = socket.register(selector, SelectionKey.OP_READ);
// 把事件处理类本身附着到SelectionKey,方便识别通道和后续处理
sk.attach(this);
}
@Override
public void run() {
if (sk.isReadable()) {
read();
} else if (sk.isWritable()) {
write();
}
}
public void read() {
try {
while (socket.read(inputBuffer) > 0) {
inputBuffer.flip();
while(inputBuffer.hasRemaining()){
System.out.print((char) inputBuffer.get());
}
inputBuffer.clear();
// 注册监听写缓冲区空闲事件,与前面通过channel注册到selector的写法等价
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void write() {
try {
outputBuffer.put("YYDS-ABCD-EFG\n".getBytes());
outputBuffer.flip();
while (outputBuffer.hasRemaining()) {
socket.write(outputBuffer);
}
outputBuffer.clear();
// 取消监听缓冲区空闲事件
sk.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
}
}
复制代码
启动程序
import java.io.IOException;
public class Main {
public static void main(String[] args) throws IOException {
Reactor reactor = new Reactor(5566);
reactor.run();
}
}
复制代码
在本地启动服务端程序,通过 telnet
发起请求验证。
补充说明
水平触发和边缘触发
Java 的 NIO 属于水平触发,即条件触发,在使用 Java 的 NIO 编程的时候,在没有数据可以往外写的时候要取消写事件,在有数据往外写的时候再注册写事件。
水平触发(level-triggered,也被称为条件触发)LT: 只要满足条件,就触发一个事件(只要有数据没有被获取,内核就不断通知你)
边缘触发(edge-triggered)ET: 每当状态变化时,触发一个事件。
参考资料
[1]. UNIX 编程艺术,[美国] Eric S·Raymond
[2]. 为什么网络 I/O 会被阻塞?, yes 的练级攻略,https://mp.weixin.qq.com/s/BH1H-JUhKqgaAjaIx5k0Fw
[3]. Scalable IO in Java,Doug Lea,http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
[4]. Reactor: An Object Behavioral Pattern forDemultiplexing and Dispatching Handles for Synchronous Events,Douglas C. Schmidt,https://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf
[5]. 单服务器高性能模式:Reactor 与 Proactor,李运华,https://time.geekbang.org/column/article/8805?code=QlFu-hKw1A4UJB1cTk%2FNhziNpEeC2UV2F670BhdrVcc%3D&source=app_share
[6]. 句柄是什么? - 黄兢成的回答 - 知乎https://www.zhihu.com/question/27656256/answer/943130123
[7]. Java NIO 系列教程(一) Java NIO 概述,Jakob Jenkov,http://ifeve.com/overview/
[8]. JAVA NIO 操作类型,sarchitect,https://blog.51cto.com/stevex/1581934
[9]. java nio 使用的是水平触发还是边缘触发?, wuxinliulei,https://www.zhihu.com/question/22524908
评论