昨天坐车回家,在车上打卡了一道力扣,完事之后闲的无聊,看到自己之前收藏了 Akka,依稀记得这是一个比线程小的执行单元(当时是这么理解的),加上之前学 Golang,看到了那种无脑开 Goroutine 处理请求的方式,自己又刚好苦于 WebFlux+Reactor 写出来的那一堆狗屎代码。于是想:为什么 Java 没有这种东西,可以无脑开轻量级线程处理?
后来看到 Java 在推进一个项目,就是为了实现这个功能,名字我忘了。但既然是实现中,那就是目前不可用的,但是看到了 Golang 处理方式和 Java 的 NIO 的对比,发现二者本质都是让原本一个线程处理一件事,变成一个线程处理多个事。Golang 的协程我们撇开不说,实现在于 Golang 自定义的调度器;但是 Java 的 NIO 我们可以谈一谈。
我对于 NIO 似乎也就仅限于使用了,写一写 NIO 的单线程,多线程,主从多线程的 Echo Server。用用 Netty 写写 HelloWorld 之类的。一直没有深究一些实现,想起来挂在宿舍床上的“治学严谨”的牌子(这牌子也有来历的),未免心生惭愧,于是利用坐车时间好好 Google 了一番,加上自己的理解,决定写下这篇文章。
何为 I/O?
既然是 NIO/BIO/AIO,我们就必须先要搞懂 I/O 是什么。我之前以为是单纯的读写操作,后来发现没有这么简单。
I/O 单按照字面翻译来说的话,是输入/输出。比如从磁盘读/向磁盘写,从网卡接收数据/向网卡写入数据,数据库查询/增删改数据库。但凡涉及到磁盘和网络的操作,我们统称为 I/O 操作。我先给出一个这么笼统的定义。
何为阻塞?
一些约定:如果没有特别指出,这里的阻塞都是 I/O 阻塞,不包括同步时因为锁竞争而产生的阻塞。因为锁资源而导致某一线程无法执行被阻塞的情景和线程等待 I/O 操作而被阻塞的情景是一样的,都是为了获取某个资源但是失败了,而被阻塞无法继续执行,只有当资源获取到,线程才能继续执行。
在了解阻塞之前,我们先了解各个设备的速度:
CPU:1ns,寄存器:1ns,高速缓存 10ns,内存:10us,磁盘 10ms,网络:100ms
其中:1s=1000ms,1ms=1000us,1us=1000ns,1ns=1000ps。
I/O 是操作磁盘或网络的,而 I/O 操作与内存操作的速度差了 1000 倍不止,与 CPU 差了 10^6 不止,所以在 CPU 眼中,任何 I/O 操作都是很漫长的。
正因为此,I/O 操作产生的阻塞会导致当前线程被调度进等待队列(详见线程状态切换)中,然后在数据到达时,把这个线程切换到就绪列表,等待调度。
而所谓的阻塞指的是线程等待 I/O 操作完成的过程。当某个线程想要读取来自网络的数据,需要先等待网卡准备就绪,然后数据传输,然后由内核把数据拷贝到用户内存,这一系列。写入需要等到网卡准备就绪,且前面排队的写请求全部完成,然后数据传输完成,这是一个很漫长的过程。
而网络编程时说的阻塞指的就是这个漫长的网络过程,线程被挂起,放到等待队列中,干等数据到来,干等数据被写出。程序就被停在这里了,也不会往下面执行。
我们来看一张图好了。
蓝色部分即为阻塞,网络操作时的阻塞过程。
阻塞/非阻塞 IO?同步/异步 IO?
好了,现在我们知道阻塞就是 I/O 操作太慢了造成的线程被挂起而无法继续执行这一回事。
一个标准的网络读取是这样的:
1⃣️网卡接收数据完成并放到内核空间
2⃣️内核把数据拷贝到用户空间。
一个标准的网络写出是这样的:
1⃣️内核把数据拷贝到内核空间
2⃣️网卡从内核空间读取数据并把数据发送出去。
判断一个操作是否是 I/O 阻塞的依据在于第 1⃣️步是否阻塞;判断一个 IO 是否是同步/异步的依据是第 2⃣️步是否阻塞。
这样不知大家可以理解吗?
BIO/NIO/AIO
写过 Socket 通信,不论你是什么语言,基本都是这样的流程:
新建一个 ServerSocket=>设置监听地址=>accept 一个连接并返回一个 Socket=>继续监听,新的线程处理刚刚返回的 Socket。
这没什么不好的,一个很普通的 Socket/ServerSocket 服务器是吧!早期的 Tomcat 就是这样的。
现在我们来看看这整个过程中涉及到线程的部分,就是新建线程处理 Socket 的部分,为什么要这么做呢?
因为每一个 Socket 的网络的读/写都是一个耗时的过程,如果我们不开辟新的线程,就会阻塞后面的连接,这样整个系统的连接数,吞吐量就会大幅下降。
每个连接一个线程,似乎是一种不错的方案,也很好地解决了无法多连接的问题。但是每个连接一个线程,会不会在系统连接数很高的情况下崩溃呢?毕竟 Java 线程就是操作系统线程,而且 Linux 下一个线程接近于一个进程,创建销毁调度的开销一点都不小。即使我们有线程池这种东西,那也毕竟是有限的,况且线程池维护也是一种负担,有没有一种解决方案呢?
到目前为止 BIO 结束了,接下来就是引入 NIO/AIO 的时候了,这里需要说明一下,NIO 指的是非阻塞 IO,即每次读操作时不像 BIO 那样等待有数据可读,而是直接返回,返回值代表可读数据大小,为-1 表示不可读,所以 NIO 的非阻塞是在这里实现的,NIO 仅仅是 read 操作立刻返回,而不会等待读到了数据再返回。写操作同理。
聪明的读者肯定立马意识到,那你这也没啥改变啊,毕竟你想读数据就得不停轮询返回值,看是否可读,这样还会造成 CPU 空转,还不如 BIO 呢!事实确实如此。所以 NIO 仅仅不阻塞程序,但是并不能加快总的时间。
到目前为止我们知道 Socket 处理之所以需要开线程,是因为网络读/写太慢了(这里我们暂时忽略业务逻辑中的耗时操作,比如查询数据库等),线程干不了活,所以产生了阻塞。此外,我们知道,在程序被阻塞时,CPU 是不干活的,这个期间我们为什么不能让另一个线程去执行呢?但这又引入一个新的问题,如果我调度了另一个线程,那当数据准备好时,或者可以写时,我该怎么得知呢?
嗯...嗅到了一丝异步+回调的味道。既然程序么得办法,那我们去看看操作系统能给我们提供什么解决方案吗?
Linux 提供了 Epoll(select 和 poll 现在基本不用,我就不说了),macOS 提供了 Kqueue,Windows 提供了 IOCP。它们是 I/O 多路复用技术。
刚刚提到了 I/O 多路复用,这是什么意思?
再说这个之前,我再引入一些概念:
1⃣️中断:中断指的是计算机除 CPU 之外的硬件,通过在总线放置一个信号,提醒 CPU 某件事情到达的手段。比如时钟中断,就是以固定频率发出中断信号,告诉 CPU 时间间隔;磁盘中断一般是数据读取完成并放到了指定的内存地址,网络中断一般是某个数据到达网卡并放置到指定的内存地址。
2⃣️中断处理程序:一个小的程序,当中断被 CPU 捕获时,会根据中断类型调用相应的中断处理程序;比方式时钟中断可以用来触发线程调度程序,这适用于分时系统。
3⃣️文件描述符 FD:Linux 中万物皆文件,对任何设备的处理不外乎四个操作:打开,关闭,读取,写入。所以文件描述符就是一个抽象文件的 ID,这个抽象文件可以是磁盘中的文件,也可以是远程进程(ip:port),还可以是键盘,鼠标。
4⃣️DMA:直接内存访问,CPU 的助理,对于 I/O 设备的读写不再由 CPU 全权负责,而是由 CPU 交给 DMA,DMA 进行负责,这一般是主板上的某个芯片。
I/O 多路复用的本质就是网卡等设备的中断+对应的中断处理程序=>高级语言的封装使用。
说了这么多干嘛呢?引入 FD 就是为了说明每一个 Socket(Socket 本质就是一个远程进程,在互联网世界,IP:PORT 可以全球唯一定位一个进程)都有唯一的代表它的 FD。在这里我们使用 NIO 的 SocketChannel 替代 BIO 中的 Socket,二者都是代表了远程进程的套接字。Epoll 就是把这些 FD 注册到 Linux 的文件系统下,通过红黑树来管理,这个红黑树保存了(FD, SocketChannel)这样的结构,通过 FD 可以快速找到对应的 SocketChannel(只能说大概是这样的对应关系),当这个红黑树下的某个 FD 可读(网卡中断实现)或可写(网卡中断实现)时,添加到就绪列表中。
通过对中断程序注册一个处理函数,可以实现每次网卡中断时,把这个中断对应的 FD 添加到就绪列表中,唤醒 Epoll 的 select 方法。Epoll 的 select 方法就是遍历就绪列表,找到每个 FD 对应的 SocketChannel,返回。如果为空就阻塞。当有新的中断产生时被唤醒。
现在我们把远程进程可读/可写这件事丢给了操作系统,操作系统使用网卡中断实现异步通知。就可以解放我们的程序了,而不必让它干等了。
等一下,解放了我们的程序?读/写可用自动通知?这怎么看起来很 Nice 啊!我们把它和 NIO 拼接一下试试。
哦吼吼~就是 NIO+I/O 多路复用=>Reactor 模型。
所以 Reactor 模型是依赖于操作系统的,其实是依赖于硬件中断。整个计算机内部所有的硬件通知都是通过中断实现的(这里多嘴一句,其他硬件实现中断是通过轮询自己的状态实现的,其实 CPU 响应中断也是通过每个时钟周期查看引脚信号实现的)。
NIO+I/O 多路复用在 Java 中体现就是 ServerSocketChannel+SocketChannel+Selector+SelectionKey+Buffer 那一套。来看一个典型的 Echo 服务器,基于 NIO+I/O 多路复用实现的。
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.nio.charset.StandardCharsets;
import java.util.*;
/**
* @author CodeWithBuff
*/
public class NioTcpSingleThread {
public static void main(String[] args) {
NioTcpSingleThread.Server.builder().build().run();
}
private static final HashMap<SocketChannel, List<DataLoad>> dataLoads = new LinkedHashMap<>();
private static final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024);
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
private static class DataLoad {
private int intValue;
private long longValue;
private double doubleValue;
private String stringValue;
private int[] intArray;
private long[] longArray;
private double[] doubleArray;
private String[] stringArray;
}
/**
* Java NIO处理网络的核心组件只有四个:{@link Channel},{@link Selector},{@link SelectionKey}和{@link java.nio.Buffer}
* <br/>
* 说一下{@link ServerSocketChannel},{@link SocketChannel},{@link Selector}和{@link SelectionKey}之间的关系。
* <br/>
* {@link ServerSocketChannel}和{@link SocketChannel}不说了,无非就是一个用来在服务端建立连接,一个处理连接(实际I/O交互)的区别,在这里统称为{@link AbstractSelectableChannel},也就是它俩都继承的类。
* <br/>
* {@link Selector#select()}调用系统调用,轮询端口,记录已注册的{@link AbstractSelectableChannel}感兴趣的事件,如果发生了所有已注册的{@link AbstractSelectableChannel}感兴趣的事件之一的话,就返回。否则阻塞。
* <br/>
* 对于{@link AbstractSelectableChannel}来说,怎么让{@link Selector}帮自己记录并轮询自己感兴趣的事件呢?答案是:注册到{@link Selector}上即可,同时设置感兴趣的事件类型。
* <br/>
* 在注册成功后,会返回一个{@link SelectionKey}类型的变量,通过它,可以操作{@link AbstractSelectableChannel}和{@link Selector}。{@link SelectionKey}本身就是{@link AbstractSelectableChannel}和它注册到的{@link Selector}的凭证。
* 就像是订单一样,记录着它们俩的关系,所以在注册成功的后续操作里,一般都是用{@link SelectionKey}来实现的。同时,{@link SelectionKey}还有一个attachment()方法,可以获取附加到它上面的对象。
* 一般我们用这个附属对象来处理当前{@link SelectionKey}所包含的{@link AbstractSelectableChannel}和{@link Selector}的实际业务。
* <br/>
* 刚才说到了{@link Selector#select()},它会一直阻塞直到发生了感兴趣的事件,但是有时候我们这边可以确定某一事件马上或已经发生,就可以调用{@link Selector#wakeup()}方法,让{@link Selector#select()}立即返回,然后获取
* {@link SelectionKey}集合也好,重新{@link Selector#select()}(这已经是下一次循环了)也罢。
* <br/>
* <br/>
* 注意!!!如果某一个{@link AbstractSelectableChannel}在同一个{@link Selector}上注册了两个不同的感兴趣的事件类型,那么返回的两个{@link SelectionKey}是没有任何关系的。虽然可以通过{@link SelectionKey}再次修改
* {@link AbstractSelectableChannel}感兴趣的事件类型。{@link SelectionKey}只在注册时生成返回,所以有(Channel + Selector) = SelectionKey。但是吧,啧,注册多个时会卡死,所以千万不要同一个Channel和同一个Selector注册多个!!!
*/
@Builder
private static class Server implements Runnable {
@Override
public void run() {
System.out.println("Server开始运行...");
Selector globalSelector;
ServerSocketChannel serverSocketChannel;
SelectionKey serverSelectionKey;
try {
globalSelector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8190));
serverSocketChannel.configureBlocking(false);
serverSelectionKey = serverSocketChannel.register(globalSelector, SelectionKey.OP_ACCEPT);
serverSelectionKey.attach(Acceptor.builder()
.globalSelector(globalSelector)
.serverSocketChannel(serverSocketChannel)
.build()
);
while (true) {
// select()是正儿八经的阻塞方法,它会一直阻塞直到发生了任何注册过的(Server)SocketChannel感兴趣的事件之一。比如有新的连接建立,Channel可以读了,或者Channel可以写了
// 它的返回值指出了有几个感兴趣事件,实际没啥用,所以在此直接忽略
globalSelector.select();
Set<SelectionKey> selectionKeySet = globalSelector.selectedKeys();
for (SelectionKey selectionKey : selectionKeySet) {
dispatch(selectionKey);
selectionKeySet.remove(selectionKey);
}
}
} catch (IOException ignored) {
}
}
private void dispatch(SelectionKey selectionKey) {
Runnable runnable = (Runnable) selectionKey.attachment();
runnable.run();
}
}
@Data
@Builder
private static class Acceptor implements Runnable {
private final Selector globalSelector;
private final ServerSocketChannel serverSocketChannel;
@Override
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("已建立连接...");
socketChannel.configureBlocking(false);
SelectionKey socketSelectionKey = socketChannel.register(globalSelector, SelectionKey.OP_READ);
socketSelectionKey.attach(Handler.builder()
.socketSelectionKey(socketSelectionKey)
.build()
);
// 此时注册了读感兴趣Channel,所以为了快速开启读,直接唤醒selector。其实就是让它别等了,我这边准备好了,你那边应该已经有数据了,直接返回吧。
globalSelector.wakeup();
} catch (IOException ignored) {
}
}
}
/**
* "写"操作依赖于"读"操作读取到的数据,所以"写"之后不能再次"写",必须"读"或"关闭"。
* <br/>
* "读"操作之后可以继续"读"而无需等待"写完成",所以"写完"可以把感兴趣类型设置为"读"|"写"而不是单单的"写"。
*/
@Data
@Builder
private static class Handler implements Runnable {
private final SelectionKey socketSelectionKey;
@Override
public void run() {
SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel();
if (!socketChannel.isOpen()) {
System.out.println("连接已关闭");
try {
socketChannel.shutdownInput();
socketChannel.shutdownOutput();
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
return ;
}
if (socketSelectionKey.isReadable()) {
System.out.println("读事件发生,准备读...");
Reader.builder()
.socketChannel(socketChannel)
.build()
.run();
// 说明即对读感兴趣,也对写感兴趣(因为客户端可能是长连接,还要再次发送消息),但是同一个SelectionKey只能是读或写之一
socketSelectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
// 读完了,就要准备写
socketSelectionKey.selector().wakeup();
}
if (socketSelectionKey.isWritable()) {
System.out.println("写事件发生,准备写...");
Writer.builder()
.socketChannel(socketChannel)
.build()
.run();
socketSelectionKey.interestOps(SelectionKey.OP_READ);
// 写完了,立即返回就免了
// socketSelectionKey.selector().wakeup();
}
}
}
@Data
@Builder
private static class Reader implements Runnable {
private final SocketChannel socketChannel;
@Override
public void run() {
try {
byteBuffer.clear();
int readable = socketChannel.read(byteBuffer);
byte[] bytes = byteBuffer.array();
String value = new String(bytes, 0, readable);
System.out.println("读到了: " + value);
DataLoad dataLoad = DataLoad.builder()
.stringValue(value)
.build();
List<DataLoad> tmp = dataLoads.computeIfAbsent(socketChannel, k -> new LinkedList<>());
tmp.add(dataLoad);
} catch (IOException ignored) {
}
}
}
@Data
@Builder
private static class Writer implements Runnable {
private final SocketChannel socketChannel;
@Override
public void run() {
try {
String value = "Server get: " + dataLoads.get(socketChannel).get(0).getStringValue();
dataLoads.get(socketChannel).remove(0);
socketChannel.write(ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8)));
} catch (IOException ignored) {
}
}
}
}
复制代码
NIO 做到了数据可读/可写时的通知机制,然后去读,去写,而 AIO 则是直接做到了数据传输到指定区域,说白了就是不需要自己去读,自己去写,当它被调用时数据已经全部准备好了,更加地异步。但是 Linux 和实际生产使用不多,我们就不提了,只给出一个示例代码:
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author CodeWithBuff
*/
public class AioTcpSingleThread {
public static void main(String[] args) {
Server.builder().build().run();
// 防止主线程退出
LockSupport.park(Long.MAX_VALUE);
}
private static final ConcurrentHashMap<AsynchronousSocketChannel, LinkedBlockingQueue<DataLoad>> dataLoads = new ConcurrentHashMap<>();
private static final ReentrantLock READ_LOCK = new ReentrantLock();
private static final ReentrantLock WRITE_LOCK = new ReentrantLock();
private static final ByteBuffer READ_BUFFER = ByteBuffer.allocate(1024 * 4);
private static final ByteBuffer WRITE_BUFFER = ByteBuffer.allocate(1024 * 4);
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
private static class DataLoad {
private int intValue;
private long longValue;
private double doubleValue;
private String stringValue;
private int[] intArray;
private long[] longArray;
private double[] doubleArray;
private String[] stringArray;
}
@Builder
private static class Server implements Runnable {
@Override
public void run() {
try {
System.out.println("服务器启动...");
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
asynchronousServerSocketChannel.bind(new InetSocketAddress(8190));
asynchronousServerSocketChannel.accept(null, ACCEPTOR);
} catch (IOException ignored) {
}
}
}
private static AsynchronousServerSocketChannel asynchronousServerSocketChannel = null;
private static final Acceptor ACCEPTOR = new Acceptor();
private static class Acceptor implements CompletionHandler<AsynchronousSocketChannel, Object> {
// 这个方法是异步调用的,所以不用担心阻塞会阻塞到主线程
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println("连接建立: " + Thread.currentThread().getName());
System.out.println("连接建立");
dataLoads.computeIfAbsent(result, k -> new LinkedBlockingQueue<>());
// 使用循环来进行多次读取,写入
while (result.isOpen()) {
READ_LOCK.lock();
// 这个方法也是异步的
result.read(READ_BUFFER, attachment, new Reader(result, READ_BUFFER.array()));
READ_BUFFER.clear();
READ_LOCK.unlock();
WRITE_LOCK.lock();
String ans = "";
try {
ans = "Server get: " + dataLoads.get(result).take().getStringValue();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 异步的
result.write(ByteBuffer.wrap(ans.getBytes(StandardCharsets.UTF_8)), attachment, new Writer(result));
WRITE_LOCK.unlock();
}
System.out.println("结束通信一次");
// 尝试建立第二波通信
asynchronousServerSocketChannel.accept(attachment, ACCEPTOR);
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("建立连接失败");
}
}
private static class Reader implements CompletionHandler<Integer, Object> {
private final AsynchronousSocketChannel asynchronousSocketChannel;
private final byte[] bytes;
public Reader(AsynchronousSocketChannel asynchronousSocketChannel, byte[] bytes) {
this.asynchronousSocketChannel = asynchronousSocketChannel;
this.bytes = bytes;
}
@Override
public void completed(Integer result, Object attachment) {
System.out.println("读取数据: " + Thread.currentThread().getName());
if (result == 0 || !asynchronousSocketChannel.isOpen()) {
return ;
} else if (result < 0) {
shutdown(asynchronousSocketChannel);
return ;
}
System.out.println("读取数据: " + result);
String value = new String(bytes, 0, result);
System.out.println("读到了: " + value);
LinkedBlockingQueue<DataLoad> tmp = dataLoads.get(asynchronousSocketChannel);
DataLoad dataLoad = DataLoad.builder()
.stringValue(value)
.build();
tmp.add(dataLoad);
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("读取失败");
}
}
private static class Writer implements CompletionHandler<Integer, Object> {
private final AsynchronousSocketChannel asynchronousSocketChannel;
public Writer(AsynchronousSocketChannel asynchronousSocketChannel) {
this.asynchronousSocketChannel = asynchronousSocketChannel;
}
@Override
public void completed(Integer result, Object attachment) {
System.out.println("写入数据: " + Thread.currentThread().getName());
if (!asynchronousSocketChannel.isOpen()) {
return ;
}
System.out.println("写入数据: " + result);
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("写入失败");
}
}
private static void shutdown(AsynchronousSocketChannel asynchronousSocketChannel) {
try {
asynchronousSocketChannel.shutdownInput();
asynchronousSocketChannel.shutdownOutput();
asynchronousSocketChannel.close();
} catch (IOException ignore) {
}
}
}
复制代码
NIO 解决了什么?未解决什么?
NIO 实现了一个线程管理多个连接的 I/O 操作,而不用像 BIO 每个连接一个线程,本质是通过中断机制+系统调用实现的。这样就可以在一个线程处理所有可用的 I/O 事件。
注意,如果一个 Selector(一般来说一个 Selector 对应一个线程,一个线程对应多个 Selector 会降低 Selector 效率)仅注册一个连接,那 NIO 和 BIO 就没什么区别了。
还记得我们为什么从 BIO 走到了 NIO 吗?是因为 BIO 无法抗住大量的连接,所以 NIO 解决了大连接的问题。
但是 NIO 并不会带来每个请求的速度的提升,这点请记住,甚至在连接数不多时,处理速度还不如 BIO。
NIO 使用注意事项
前面我们假设 NIO 中不要出现耗时业务,但是如果必须有,比如数据库操作,那怎么办呢?在这里我们参考 NIO 框架 Netty 的实现。
Netty 建议对于耗时操作,应该通过传入的自定义线程池处理,即把这个任务提交到线程池,然后添加异步调用,当任务处理完毕,继续下一步处理。总之耗时任务线程池处理,普通任务直接在 NIO 线程处理就好。
其实这也对应了多线程 Reactor 模型。此外还有主从多线程 Reactor,就是把连接操作独立出来,做一个单独的 Selector 专门处理连接,连接之后的 I/O 操作都放在其他的 Selector 中,业务放在线程池中。
来看看这两种模型各自对应的代码:
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author CodewithBuff
*/
public class NioTcpMultiThread {
public static void main(String[] args) {
Server.builder().build().run();
Runnable target = executorService::shutdown;
Thread shutdown = new Thread(target);
Runtime.getRuntime().addShutdownHook(shutdown);
}
private static final ConcurrentHashMap<SocketChannel, LinkedBlockingQueue<DataLoad>> dataLoads = new ConcurrentHashMap<>();
private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
private static final ReentrantLock reentrantLock = new ReentrantLock();
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
private static class DataLoad {
private int intValue;
private long longValue;
private double doubleValue;
private String stringValue;
private int[] intArray;
private long[] longArray;
private double[] doubleArray;
private String[] stringArray;
}
@Builder
private static class Server implements Runnable {
@Override
public void run() {
System.out.println("Server开始运行...");
Selector globalSelector;
ServerSocketChannel serverSocketChannel;
SelectionKey serverSelectionKey;
try {
globalSelector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8190));
serverSocketChannel.configureBlocking(false);
serverSelectionKey = serverSocketChannel.register(globalSelector, SelectionKey.OP_ACCEPT);
serverSelectionKey.attach(Acceptor.builder()
.serverSelectionKey(serverSelectionKey)
.build()
);
while (true) {
int a = globalSelector.select();
Set<SelectionKey> selectionKeySet = globalSelector.selectedKeys();
for (SelectionKey selectionKey : selectionKeySet) {
dispatch(selectionKey);
selectionKeySet.remove(selectionKey);
}
}
} catch (IOException ignored) {
}
}
private void dispatch(SelectionKey selectionKey) {
Runnable runnable = (Runnable) selectionKey.attachment();
runnable.run();
}
}
@Data
@Builder
private static class Acceptor implements Runnable {
private final SelectionKey serverSelectionKey;
@Override
public void run() {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) serverSelectionKey.channel();
Selector globalSelector = serverSelectionKey.selector();
SocketChannel socketChannel;
try {
socketChannel = serverSocketChannel.accept();
System.out.println("已建立连接...");
socketChannel.configureBlocking(false);
SelectionKey socketSelectionKey = socketChannel.register(globalSelector, SelectionKey.OP_READ);
socketSelectionKey.attach(Handler.builder()
.socketSelectionKey(socketSelectionKey)
.build()
);
globalSelector.wakeup();
} catch (IOException ignored) {
}
}
}
@Data
@Builder
private static class Handler implements Runnable {
private final SelectionKey socketSelectionKey;
@Override
public void run() {
if (!socketSelectionKey.channel().isOpen()) {
System.out.println("连接已关闭");
try {
socketSelectionKey.channel().close();
} catch (IOException ignored) {
}
return ;
}
dataLoads.computeIfAbsent((SocketChannel) socketSelectionKey.channel(), k -> new LinkedBlockingQueue<>());
if (socketSelectionKey.isReadable()) {
Reader reader = Reader.builder()
.socketSelectionKey(socketSelectionKey)
.build();
Thread thread = new Thread(reader);
socketSelectionKey.interestOps(SelectionKey.OP_WRITE);
thread.start();
} else if (socketSelectionKey.isWritable()) {
Writer writer = Writer.builder()
.socketSelectionKey(socketSelectionKey)
.build();
Thread thread = new Thread(writer);
socketSelectionKey.interestOps(SelectionKey.OP_READ);
thread.start();
}
}
}
@Data
@Builder
private static class Reader implements Runnable {
private final SelectionKey socketSelectionKey;
@Override
public void run() {
try {
SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel();
String value;
reentrantLock.lock();
if (socketChannel.isOpen()) {
int readable = socketChannel.read(byteBuffer);
if (readable == 0) {
value = null;
// System.out.println("读到空请求");
} else if (readable < 0) {
value = null;
shutdownSocketChannel(socketChannel);
} else {
value = new String(byteBuffer.array(), 0, readable);
}
} else {
value = null;
}
reentrantLock.unlock();
if (value == null) {
return ;
}
System.out.println("读到了: " + value);
DataLoad dataLoad = DataLoad.builder()
.stringValue(value)
.build();
LinkedBlockingQueue<DataLoad> tmp = dataLoads.computeIfAbsent(socketChannel, k -> new LinkedBlockingQueue<>());
tmp.add(dataLoad);
socketSelectionKey.selector().wakeup();
} catch (IOException ignored) {
}
}
}
@Data
@Builder
private static class Writer implements Runnable {
private final SelectionKey socketSelectionKey;
@Override
public void run() {
try {
SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel();
LinkedBlockingQueue<DataLoad> queue = dataLoads.get(socketChannel);
String value = "Server get: " + dataLoads.get(socketChannel).take().getStringValue();
if (socketChannel.isOpen())
socketChannel.write(ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8)));
else {
shutdownSocketChannel(socketChannel);
}
} catch (IOException | InterruptedException ignored) {
}
}
}
private static void shutdownSocketChannel(SocketChannel socketChannel) {
try {
socketChannel.shutdownInput();
socketChannel.shutdownOutput();
socketChannel.close();
} catch (IOException ignored) {
}
}
}
复制代码
主从多线程:
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author 十三月之夜
*/
public class NioTcpMainSubThread {
public static void main(String[] args) throws IOException {
new Server(Runtime.getRuntime().availableProcessors()).run();
}
private static final ConcurrentHashMap<SocketChannel, LinkedBlockingQueue<DataLoad>> dataLoads = new ConcurrentHashMap<>();
private static final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
private static final ReentrantLock reentrantLock = new ReentrantLock();
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
private static class DataLoad {
private int intValue;
private long longValue;
private double doubleValue;
private String stringValue;
private int[] intArray;
private long[] longArray;
private double[] doubleArray;
private String[] stringArray;
}
/**
* BossSelector只和ServerSocketChannel进行建立连接操作,且是单线程的,运行在主线程中。
* <br/>
* 然后把建立的连接扔给Workers处理,Workers是一组Worker,每个Worker都有一个独立的WorkSelector用来处理当前Worker被安排的SocketChannel。
* <br/>
* 这里采取的策略是依次提交,尽可能让每个Worker所负责的SocketChannel数量相同。
* <br/>
* 每个Worker运行在独立的线程上,仅做轮询Read/Write操作,耗时的业务操作(比如I/O,Compute)均交给线程工作池处理。
*/
private static class Server implements Runnable {
private final Selector bossSelector;
private final int workerCount;
public Server(int workerCount) throws IOException {
this.workerCount = workerCount;
bossSelector = Selector.open();
}
@Override
public void run() {
try {
System.out.println("服务器启动...");
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8190));
serverSocketChannel.configureBlocking(false);
SelectionKey serverSelectionKey = serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);
serverSelectionKey.attach(new Boss(serverSocketChannel, workerCount));
while (true) {
bossSelector.select();
Set<SelectionKey> selectionKeySet = bossSelector.selectedKeys();
// 特殊化处理,因为有且只有一个SelectionKey,所以不遍历了
SelectionKey key = selectionKeySet.iterator().next();
Runnable runnable = (Runnable) key.attachment();
runnable.run();
selectionKeySet.remove(key);
}
} catch (IOException ignored) {
}
}
}
/**
* 处理新的连接,生成SocketChannel并选择某一个Worker提交。
*/
private static class Boss implements Runnable {
private final ServerSocketChannel serverSocketChannel;
private final int workerCount;
private final Set<SocketChannel>[] socketChannelSets;
private final Worker[] workers;
private int index = 0;
@SuppressWarnings("unchecked")
public Boss(ServerSocketChannel serverSocketChannel, int workerCount) throws IOException {
this.serverSocketChannel = serverSocketChannel;
this.workerCount = workerCount;
ExecutorService executorService = Executors.newFixedThreadPool(workerCount);
socketChannelSets = new Set[workerCount];
workers = new Worker[workerCount];
for (int i = 0; i < workerCount; ++ i) {
workers[i] = new Worker();
socketChannelSets[i] = workers[i].getSocketChannels();
executorService.submit(workers[i]);
}
}
@Override
public void run() {
Set<SocketChannel> socketChannelSet = socketChannelSets[index];
Selector workerSelector = workers[index].getWorkerSelector();
++ index;
if (index == this.workerCount)
index = 0;
try {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("建立连接...");
socketChannelSet.add(socketChannel);
workerSelector.wakeup();
} catch (IOException ignore) {
}
}
}
/**
* 处理新添加的SocketChannel和轮询Read/Write。
*/
private static class Worker implements Runnable {
private final Selector workerSelector;
private final Set<SocketChannel> socketChannels = new HashSet<>();
public Worker() throws IOException {
workerSelector = Selector.open();
}
@Override
public void run() {
while (true) {
try {
if (socketChannels.size() > 0) {
for (SocketChannel socketChannel : socketChannels) {
socketChannel.configureBlocking(false);
SelectionKey selectionKey = socketChannel.register(workerSelector, SelectionKey.OP_READ);
selectionKey.attach(new Handler(selectionKey));
socketChannels.remove(socketChannel);
}
System.out.println("已添加新的SocketChannel");
}
workerSelector.select();
Set<SelectionKey> selectionKeySet = workerSelector.selectedKeys();
for (SelectionKey key : selectionKeySet) {
Runnable runnable = (Runnable) key.attachment();
runnable.run();
selectionKeySet.remove(key);
}
} catch (IOException ignored) {
}
}
}
public Set<SocketChannel> getSocketChannels() {
return socketChannels;
}
public Selector getWorkerSelector() {
return workerSelector;
}
}
/**
* 分发业务处理
*/
@Data
@Builder
private static class Handler implements Runnable {
private final SelectionKey socketSelectionKey;
private static final ExecutorService workPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@Override
public void run() {
if (!socketSelectionKey.channel().isOpen()) {
System.out.println("连接已关闭");
try {
socketSelectionKey.channel().close();
} catch (IOException ignored) {
}
return ;
}
dataLoads.computeIfAbsent((SocketChannel) socketSelectionKey.channel(), k -> new LinkedBlockingQueue<>());
if (socketSelectionKey.isReadable()) {
Reader reader = Reader.builder()
.socketSelectionKey(socketSelectionKey)
.build();
workPool.submit(reader);
socketSelectionKey.interestOps(SelectionKey.OP_WRITE);
} else if (socketSelectionKey.isWritable()) {
Writer writer = Writer.builder()
.socketSelectionKey(socketSelectionKey)
.build();
workPool.submit(writer);
socketSelectionKey.interestOps(SelectionKey.OP_READ);
}
}
}
@Data
@Builder
private static class Reader implements Runnable {
private final SelectionKey socketSelectionKey;
@Override
public void run() {
try {
SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel();
String value;
reentrantLock.lock();
if (socketChannel.isOpen()) {
int readable = socketChannel.read(byteBuffer);
if (readable == 0) {
value = null;
// System.out.println("读到空请求");
} else if (readable < 0) {
value = null;
shutdownSocketChannel(socketChannel);
} else {
value = new String(byteBuffer.array(), 0, readable);
}
} else {
value = null;
}
reentrantLock.unlock();
if (value == null) {
return ;
}
System.out.println("读到了: " + value);
DataLoad dataLoad = DataLoad.builder()
.stringValue(value)
.build();
LinkedBlockingQueue<DataLoad> tmp = dataLoads.computeIfAbsent(socketChannel, k -> new LinkedBlockingQueue<>());
tmp.add(dataLoad);
socketSelectionKey.selector().wakeup();
} catch (IOException ignored) {
}
}
}
@Data
@Builder
private static class Writer implements Runnable {
private final SelectionKey socketSelectionKey;
@Override
public void run() {
try {
SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel();
LinkedBlockingQueue<DataLoad> queue = dataLoads.get(socketChannel);
String value = "Server get: " + dataLoads.get(socketChannel).take().getStringValue();
if (socketChannel.isOpen())
socketChannel.write(ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8)));
else {
shutdownSocketChannel(socketChannel);
}
} catch (IOException | InterruptedException ignored) {
}
}
}
private static void shutdownSocketChannel(SocketChannel socketChannel) {
try {
socketChannel.shutdownInput();
socketChannel.shutdownOutput();
socketChannel.close();
} catch (IOException ignored) {
}
}
}
复制代码
在这里我们知道了。NIO 线程(Selector 绑定的线程)只做 I/O 处理。无论你这个业务逻辑什么时候完成,耗时多少,你只要数据准备好了,并且注册一个写事件,那么我 NIO 会在可以写时通知你,你来写就是了。这样只要耗时业务不在 NIO 线程中,就不会影响其他 ServerSocket 的 I/O 操作。
最后我希望明确一下,Reactor 模型的 Selector 仅仅实现了数据可读/数据可写的通知(Boss 版 Selector 可能仅做可连接通知)。这里我们强调的是“可”,表示可以的意思,即某个连接的 I/O 操作可用性通知。所以它需要搭配 NIO 使用,完成 NIO 的通知操作,而不必 NIO 不停地轮询。
你想让它帮你通知某个操作也非常简单,注册给它就行了,并添加一个回调程序(attachment 实现),这样 Selector 就能在可用时调用你的回调程序完成进一步操作,但是记得别在这个程序里阻塞你的 NIO Selector,不然会影响后续的其他连接的 I/O 可用性通知。
参考
Selector实现原理
彻底理解同步 异步 阻塞 非阻塞
I/O会一直占用CPU吗? - 赵鑫磊的回答 - 知乎
epoll 或者 kqueue 的原理是什么? - 关于夏天的一切的回答 - 知乎
《Java 高并发程序设计》
《现代操作系统》
《深入理解计算机系统》
评论