写点什么

谈一谈 Java 的网络编程

用户头像
CodeWithBuff
关注
发布于: 2 小时前

昨天坐车回家,在车上打卡了一道力扣,完事之后闲的无聊,看到自己之前收藏了 Akka,依稀记得这是一个比线程小的执行单元(当时是这么理解的),加上之前学 Golang,看到了那种无脑开 Goroutine 处理请求的方式,自己又刚好苦于 WebFlux+Reactor 写出来的那一堆狗屎代码。于是想:为什么 Java 没有这种东西,可以无脑开轻量级线程处理?


后来看到 Java 在推进一个项目,就是为了实现这个功能,名字我忘了。但既然是实现中,那就是目前不可用的,但是看到了 Golang 处理方式和 Java 的 NIO 的对比,发现二者本质都是让原本一个线程处理一件事,变成一个线程处理多个事。Golang 的协程我们撇开不说,实现在于 Golang 自定义的调度器;但是 Java 的 NIO 我们可以谈一谈。


我对于 NIO 似乎也就仅限于使用了,写一写 NIO 的单线程,多线程,主从多线程的 Echo Server。用用 Netty 写写 HelloWorld 之类的。一直没有深究一些实现,想起来挂在宿舍床上的“治学严谨”的牌子(这牌子也有来历的),未免心生惭愧,于是利用坐车时间好好 Google 了一番,加上自己的理解,决定写下这篇文章。

何为 I/O?

既然是 NIO/BIO/AIO,我们就必须先要搞懂 I/O 是什么。我之前以为是单纯的读写操作,后来发现没有这么简单。


I/O 单按照字面翻译来说的话,是输入/输出。比如从磁盘读/向磁盘写,从网卡接收数据/向网卡写入数据,数据库查询/增删改数据库。但凡涉及到磁盘和网络的操作,我们统称为 I/O 操作。我先给出一个这么笼统的定义。

何为阻塞?

一些约定:如果没有特别指出,这里的阻塞都是 I/O 阻塞,不包括同步时因为锁竞争而产生的阻塞。因为锁资源而导致某一线程无法执行被阻塞的情景和线程等待 I/O 操作而被阻塞的情景是一样的,都是为了获取某个资源但是失败了,而被阻塞无法继续执行,只有当资源获取到,线程才能继续执行


在了解阻塞之前,我们先了解各个设备的速度


CPU:1ns,寄存器:1ns,高速缓存 10ns,内存:10us,磁盘 10ms,网络:100ms


其中:1s=1000ms,1ms=1000us,1us=1000ns,1ns=1000ps。


I/O 是操作磁盘或网络的,而 I/O 操作与内存操作的速度差了 1000 倍不止,与 CPU 差了 10^6 不止,所以在 CPU 眼中,任何 I/O 操作都是很漫长的。


正因为此,I/O 操作产生的阻塞会导致当前线程被调度进等待队列(详见线程状态切换)中,然后在数据到达时,把这个线程切换到就绪列表,等待调度。


而所谓的阻塞指的是线程等待 I/O 操作完成的过程。当某个线程想要读取来自网络的数据,需要先等待网卡准备就绪,然后数据传输,然后由内核把数据拷贝到用户内存,这一系列。写入需要等到网卡准备就绪,且前面排队的写请求全部完成,然后数据传输完成,这是一个很漫长的过程。


网络编程时说的阻塞指的就是这个漫长的网络过程,线程被挂起,放到等待队列中,干等数据到来,干等数据被写出。程序就被停在这里了,也不会往下面执行。


我们来看一张图好了。



蓝色部分即为阻塞,网络操作时的阻塞过程。

阻塞/非阻塞 IO?同步/异步 IO?

好了,现在我们知道阻塞就是 I/O 操作太慢了造成的线程被挂起而无法继续执行这一回事。


一个标准的网络读取是这样的:


  • 1⃣️网卡接收数据完成并放到内核空间

  • 2⃣️内核把数据拷贝到用户空间。


一个标准的网络写出是这样的:


  • 1⃣️内核把数据拷贝到内核空间

  • 2⃣️网卡从内核空间读取数据并把数据发送出去。


判断一个操作是否是 I/O 阻塞的依据在于第 1⃣️步是否阻塞;判断一个 IO 是否是同步/异步的依据是第 2⃣️步是否阻塞


这样不知大家可以理解吗?


BIO/NIO/AIO

写过 Socket 通信,不论你是什么语言,基本都是这样的流程:


新建一个 ServerSocket=>设置监听地址=>accept 一个连接并返回一个 Socket=>继续监听,新的线程处理刚刚返回的 Socket。


这没什么不好的,一个很普通的 Socket/ServerSocket 服务器是吧!早期的 Tomcat 就是这样的。


现在我们来看看这整个过程中涉及到线程的部分,就是新建线程处理 Socket 的部分,为什么要这么做呢?


因为每一个 Socket 的网络的读/写都是一个耗时的过程,如果我们不开辟新的线程,就会阻塞后面的连接,这样整个系统的连接数,吞吐量就会大幅下降。


每个连接一个线程,似乎是一种不错的方案,也很好地解决了无法多连接的问题。但是每个连接一个线程,会不会在系统连接数很高的情况下崩溃呢?毕竟 Java 线程就是操作系统线程,而且 Linux 下一个线程接近于一个进程,创建销毁调度的开销一点都不小。即使我们有线程池这种东西,那也毕竟是有限的,况且线程池维护也是一种负担,有没有一种解决方案呢?


到目前为止 BIO 结束了,接下来就是引入 NIO/AIO 的时候了,这里需要说明一下,NIO 指的是非阻塞 IO,即每次读操作时不像 BIO 那样等待有数据可读,而是直接返回,返回值代表可读数据大小,为-1 表示不可读,所以 NIO 的非阻塞是在这里实现的,NIO 仅仅是 read 操作立刻返回,而不会等待读到了数据再返回。写操作同理


聪明的读者肯定立马意识到,那你这也没啥改变啊,毕竟你想读数据就得不停轮询返回值,看是否可读,这样还会造成 CPU 空转,还不如 BIO 呢!事实确实如此。所以 NIO 仅仅不阻塞程序,但是并不能加快总的时间。



到目前为止我们知道 Socket 处理之所以需要开线程,是因为网络读/写太慢了(这里我们暂时忽略业务逻辑中的耗时操作,比如查询数据库等),线程干不了活,所以产生了阻塞。此外,我们知道,在程序被阻塞时,CPU 是不干活的,这个期间我们为什么不能让另一个线程去执行呢?但这又引入一个新的问题,如果我调度了另一个线程,那当数据准备好时,或者可以写时,我该怎么得知呢?


嗯...嗅到了一丝异步+回调的味道。既然程序么得办法,那我们去看看操作系统能给我们提供什么解决方案吗?


Linux 提供了 Epoll(select 和 poll 现在基本不用,我就不说了),macOS 提供了 Kqueue,Windows 提供了 IOCP。它们是 I/O 多路复用技术


刚刚提到了 I/O 多路复用,这是什么意思?


再说这个之前,我再引入一些概念:


  • 1⃣️中断:中断指的是计算机除 CPU 之外的硬件,通过在总线放置一个信号,提醒 CPU 某件事情到达的手段。比如时钟中断,就是以固定频率发出中断信号,告诉 CPU 时间间隔;磁盘中断一般是数据读取完成并放到了指定的内存地址,网络中断一般是某个数据到达网卡并放置到指定的内存地址。

  • 2⃣️中断处理程序:一个小的程序,当中断被 CPU 捕获时,会根据中断类型调用相应的中断处理程序;比方式时钟中断可以用来触发线程调度程序,这适用于分时系统。

  • 3⃣️文件描述符 FD:Linux 中万物皆文件,对任何设备的处理不外乎四个操作:打开,关闭,读取,写入。所以文件描述符就是一个抽象文件的 ID,这个抽象文件可以是磁盘中的文件,也可以是远程进程(ip:port),还可以是键盘,鼠标。

  • 4⃣️DMA:直接内存访问,CPU 的助理,对于 I/O 设备的读写不再由 CPU 全权负责,而是由 CPU 交给 DMA,DMA 进行负责,这一般是主板上的某个芯片。


I/O 多路复用的本质就是网卡等设备的中断+对应的中断处理程序=>高级语言的封装使用。


说了这么多干嘛呢?引入 FD 就是为了说明每一个 Socket(Socket 本质就是一个远程进程,在互联网世界,IP:PORT 可以全球唯一定位一个进程)都有唯一的代表它的 FD。在这里我们使用 NIO 的 SocketChannel 替代 BIO 中的 Socket,二者都是代表了远程进程的套接字。Epoll 就是把这些 FD 注册到 Linux 的文件系统下,通过红黑树来管理,这个红黑树保存了(FD, SocketChannel)这样的结构,通过 FD 可以快速找到对应的 SocketChannel(只能说大概是这样的对应关系),当这个红黑树下的某个 FD 可读(网卡中断实现)或可写(网卡中断实现)时,添加到就绪列表中。


通过对中断程序注册一个处理函数,可以实现每次网卡中断时,把这个中断对应的 FD 添加到就绪列表中,唤醒 Epoll 的 select 方法。Epoll 的 select 方法就是遍历就绪列表,找到每个 FD 对应的 SocketChannel,返回。如果为空就阻塞。当有新的中断产生时被唤醒。


现在我们把远程进程可读/可写这件事丢给了操作系统,操作系统使用网卡中断实现异步通知。就可以解放我们的程序了,而不必让它干等了。


等一下,解放了我们的程序?读/写可用自动通知?这怎么看起来很 Nice 啊!我们把它和 NIO 拼接一下试试。


哦吼吼~就是 NIO+I/O 多路复用=>Reactor 模型。


所以 Reactor 模型是依赖于操作系统的,其实是依赖于硬件中断。整个计算机内部所有的硬件通知都是通过中断实现的(这里多嘴一句,其他硬件实现中断是通过轮询自己的状态实现的,其实 CPU 响应中断也是通过每个时钟周期查看引脚信号实现的)。


NIO+I/O 多路复用在 Java 中体现就是 ServerSocketChannel+SocketChannel+Selector+SelectionKey+Buffer 那一套。来看一个典型的 Echo 服务器,基于 NIO+I/O 多路复用实现的。



import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.channels.spi.AbstractSelectableChannel;import java.nio.charset.StandardCharsets;import java.util.*;
/** * @author CodeWithBuff */public class NioTcpSingleThread {
public static void main(String[] args) { NioTcpSingleThread.Server.builder().build().run(); }
private static final HashMap<SocketChannel, List<DataLoad>> dataLoads = new LinkedHashMap<>();
private static final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024);
@Data @Builder @NoArgsConstructor @AllArgsConstructor private static class DataLoad { private int intValue;
private long longValue;
private double doubleValue;
private String stringValue;
private int[] intArray;
private long[] longArray;
private double[] doubleArray;
private String[] stringArray; }
/** * Java NIO处理网络的核心组件只有四个:{@link Channel},{@link Selector},{@link SelectionKey}和{@link java.nio.Buffer} * <br/> * 说一下{@link ServerSocketChannel},{@link SocketChannel},{@link Selector}和{@link SelectionKey}之间的关系。 * <br/> * {@link ServerSocketChannel}和{@link SocketChannel}不说了,无非就是一个用来在服务端建立连接,一个处理连接(实际I/O交互)的区别,在这里统称为{@link AbstractSelectableChannel},也就是它俩都继承的类。 * <br/> * {@link Selector#select()}调用系统调用,轮询端口,记录已注册的{@link AbstractSelectableChannel}感兴趣的事件,如果发生了所有已注册的{@link AbstractSelectableChannel}感兴趣的事件之一的话,就返回。否则阻塞。 * <br/> * 对于{@link AbstractSelectableChannel}来说,怎么让{@link Selector}帮自己记录并轮询自己感兴趣的事件呢?答案是:注册到{@link Selector}上即可,同时设置感兴趣的事件类型。 * <br/> * 在注册成功后,会返回一个{@link SelectionKey}类型的变量,通过它,可以操作{@link AbstractSelectableChannel}和{@link Selector}。{@link SelectionKey}本身就是{@link AbstractSelectableChannel}和它注册到的{@link Selector}的凭证。 * 就像是订单一样,记录着它们俩的关系,所以在注册成功的后续操作里,一般都是用{@link SelectionKey}来实现的。同时,{@link SelectionKey}还有一个attachment()方法,可以获取附加到它上面的对象。 * 一般我们用这个附属对象来处理当前{@link SelectionKey}所包含的{@link AbstractSelectableChannel}和{@link Selector}的实际业务。 * <br/> * 刚才说到了{@link Selector#select()},它会一直阻塞直到发生了感兴趣的事件,但是有时候我们这边可以确定某一事件马上或已经发生,就可以调用{@link Selector#wakeup()}方法,让{@link Selector#select()}立即返回,然后获取 * {@link SelectionKey}集合也好,重新{@link Selector#select()}(这已经是下一次循环了)也罢。 * <br/> * <br/> * 注意!!!如果某一个{@link AbstractSelectableChannel}在同一个{@link Selector}上注册了两个不同的感兴趣的事件类型,那么返回的两个{@link SelectionKey}是没有任何关系的。虽然可以通过{@link SelectionKey}再次修改 * {@link AbstractSelectableChannel}感兴趣的事件类型。{@link SelectionKey}只在注册时生成返回,所以有(Channel + Selector) = SelectionKey。但是吧,啧,注册多个时会卡死,所以千万不要同一个Channel和同一个Selector注册多个!!! */ @Builder private static class Server implements Runnable {
@Override public void run() { System.out.println("Server开始运行..."); Selector globalSelector; ServerSocketChannel serverSocketChannel; SelectionKey serverSelectionKey; try { globalSelector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8190)); serverSocketChannel.configureBlocking(false); serverSelectionKey = serverSocketChannel.register(globalSelector, SelectionKey.OP_ACCEPT); serverSelectionKey.attach(Acceptor.builder() .globalSelector(globalSelector) .serverSocketChannel(serverSocketChannel) .build() ); while (true) { // select()是正儿八经的阻塞方法,它会一直阻塞直到发生了任何注册过的(Server)SocketChannel感兴趣的事件之一。比如有新的连接建立,Channel可以读了,或者Channel可以写了 // 它的返回值指出了有几个感兴趣事件,实际没啥用,所以在此直接忽略 globalSelector.select(); Set<SelectionKey> selectionKeySet = globalSelector.selectedKeys(); for (SelectionKey selectionKey : selectionKeySet) { dispatch(selectionKey); selectionKeySet.remove(selectionKey); } } } catch (IOException ignored) { } }
private void dispatch(SelectionKey selectionKey) { Runnable runnable = (Runnable) selectionKey.attachment(); runnable.run(); } }
@Data @Builder private static class Acceptor implements Runnable {
private final Selector globalSelector;
private final ServerSocketChannel serverSocketChannel;
@Override public void run() { try { SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("已建立连接..."); socketChannel.configureBlocking(false); SelectionKey socketSelectionKey = socketChannel.register(globalSelector, SelectionKey.OP_READ); socketSelectionKey.attach(Handler.builder() .socketSelectionKey(socketSelectionKey) .build() ); // 此时注册了读感兴趣Channel,所以为了快速开启读,直接唤醒selector。其实就是让它别等了,我这边准备好了,你那边应该已经有数据了,直接返回吧。 globalSelector.wakeup(); } catch (IOException ignored) { } } }
/** * "写"操作依赖于"读"操作读取到的数据,所以"写"之后不能再次"写",必须"读"或"关闭"。 * <br/> * "读"操作之后可以继续"读"而无需等待"写完成",所以"写完"可以把感兴趣类型设置为"读"|"写"而不是单单的"写"。 */ @Data @Builder private static class Handler implements Runnable {
private final SelectionKey socketSelectionKey;
@Override public void run() { SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel(); if (!socketChannel.isOpen()) { System.out.println("连接已关闭"); try { socketChannel.shutdownInput(); socketChannel.shutdownOutput(); socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } return ; } if (socketSelectionKey.isReadable()) { System.out.println("读事件发生,准备读..."); Reader.builder() .socketChannel(socketChannel) .build() .run(); // 说明即对读感兴趣,也对写感兴趣(因为客户端可能是长连接,还要再次发送消息),但是同一个SelectionKey只能是读或写之一 socketSelectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); // 读完了,就要准备写 socketSelectionKey.selector().wakeup(); } if (socketSelectionKey.isWritable()) { System.out.println("写事件发生,准备写..."); Writer.builder() .socketChannel(socketChannel) .build() .run(); socketSelectionKey.interestOps(SelectionKey.OP_READ); // 写完了,立即返回就免了 // socketSelectionKey.selector().wakeup(); } } }
@Data @Builder private static class Reader implements Runnable {
private final SocketChannel socketChannel;
@Override public void run() { try { byteBuffer.clear(); int readable = socketChannel.read(byteBuffer); byte[] bytes = byteBuffer.array(); String value = new String(bytes, 0, readable); System.out.println("读到了: " + value); DataLoad dataLoad = DataLoad.builder() .stringValue(value) .build(); List<DataLoad> tmp = dataLoads.computeIfAbsent(socketChannel, k -> new LinkedList<>()); tmp.add(dataLoad); } catch (IOException ignored) { } } }
@Data @Builder private static class Writer implements Runnable {
private final SocketChannel socketChannel;
@Override public void run() { try { String value = "Server get: " + dataLoads.get(socketChannel).get(0).getStringValue(); dataLoads.get(socketChannel).remove(0); socketChannel.write(ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8))); } catch (IOException ignored) { } } }}
复制代码


NIO 做到了数据可读/可写时的通知机制,然后去读,去写,而 AIO 则是直接做到了数据传输到指定区域,说白了就是不需要自己去读,自己去写,当它被调用时数据已经全部准备好了,更加地异步。但是 Linux 和实际生产使用不多,我们就不提了,只给出一个示例代码:


import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.nio.charset.StandardCharsets;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.locks.LockSupport;import java.util.concurrent.locks.ReentrantLock;
/** * @author CodeWithBuff */public class AioTcpSingleThread {
public static void main(String[] args) { Server.builder().build().run(); // 防止主线程退出 LockSupport.park(Long.MAX_VALUE); }
private static final ConcurrentHashMap<AsynchronousSocketChannel, LinkedBlockingQueue<DataLoad>> dataLoads = new ConcurrentHashMap<>();
private static final ReentrantLock READ_LOCK = new ReentrantLock();
private static final ReentrantLock WRITE_LOCK = new ReentrantLock();
private static final ByteBuffer READ_BUFFER = ByteBuffer.allocate(1024 * 4);
private static final ByteBuffer WRITE_BUFFER = ByteBuffer.allocate(1024 * 4);
@Data @Builder @NoArgsConstructor @AllArgsConstructor private static class DataLoad { private int intValue;
private long longValue;
private double doubleValue;
private String stringValue;
private int[] intArray;
private long[] longArray;
private double[] doubleArray;
private String[] stringArray; }
@Builder private static class Server implements Runnable {
@Override public void run() { try { System.out.println("服务器启动..."); asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); asynchronousServerSocketChannel.bind(new InetSocketAddress(8190)); asynchronousServerSocketChannel.accept(null, ACCEPTOR); } catch (IOException ignored) { } } }
private static AsynchronousServerSocketChannel asynchronousServerSocketChannel = null;
private static final Acceptor ACCEPTOR = new Acceptor();
private static class Acceptor implements CompletionHandler<AsynchronousSocketChannel, Object> { // 这个方法是异步调用的,所以不用担心阻塞会阻塞到主线程 @Override public void completed(AsynchronousSocketChannel result, Object attachment) { System.out.println("连接建立: " + Thread.currentThread().getName()); System.out.println("连接建立"); dataLoads.computeIfAbsent(result, k -> new LinkedBlockingQueue<>()); // 使用循环来进行多次读取,写入 while (result.isOpen()) { READ_LOCK.lock(); // 这个方法也是异步的 result.read(READ_BUFFER, attachment, new Reader(result, READ_BUFFER.array())); READ_BUFFER.clear(); READ_LOCK.unlock(); WRITE_LOCK.lock(); String ans = ""; try { ans = "Server get: " + dataLoads.get(result).take().getStringValue(); } catch (InterruptedException e) { e.printStackTrace(); } // 异步的 result.write(ByteBuffer.wrap(ans.getBytes(StandardCharsets.UTF_8)), attachment, new Writer(result)); WRITE_LOCK.unlock(); } System.out.println("结束通信一次"); // 尝试建立第二波通信 asynchronousServerSocketChannel.accept(attachment, ACCEPTOR); }
@Override public void failed(Throwable exc, Object attachment) { System.out.println("建立连接失败"); } }
private static class Reader implements CompletionHandler<Integer, Object> {
private final AsynchronousSocketChannel asynchronousSocketChannel;
private final byte[] bytes;
public Reader(AsynchronousSocketChannel asynchronousSocketChannel, byte[] bytes) { this.asynchronousSocketChannel = asynchronousSocketChannel; this.bytes = bytes; }
@Override public void completed(Integer result, Object attachment) { System.out.println("读取数据: " + Thread.currentThread().getName()); if (result == 0 || !asynchronousSocketChannel.isOpen()) { return ; } else if (result < 0) { shutdown(asynchronousSocketChannel); return ; } System.out.println("读取数据: " + result); String value = new String(bytes, 0, result); System.out.println("读到了: " + value); LinkedBlockingQueue<DataLoad> tmp = dataLoads.get(asynchronousSocketChannel); DataLoad dataLoad = DataLoad.builder() .stringValue(value) .build(); tmp.add(dataLoad); }
@Override public void failed(Throwable exc, Object attachment) { System.out.println("读取失败"); } }
private static class Writer implements CompletionHandler<Integer, Object> {
private final AsynchronousSocketChannel asynchronousSocketChannel;
public Writer(AsynchronousSocketChannel asynchronousSocketChannel) { this.asynchronousSocketChannel = asynchronousSocketChannel; }
@Override public void completed(Integer result, Object attachment) { System.out.println("写入数据: " + Thread.currentThread().getName()); if (!asynchronousSocketChannel.isOpen()) { return ; } System.out.println("写入数据: " + result); }
@Override public void failed(Throwable exc, Object attachment) { System.out.println("写入失败"); } }
private static void shutdown(AsynchronousSocketChannel asynchronousSocketChannel) { try { asynchronousSocketChannel.shutdownInput(); asynchronousSocketChannel.shutdownOutput(); asynchronousSocketChannel.close(); } catch (IOException ignore) { } }}
复制代码

NIO 解决了什么?未解决什么?

NIO 实现了一个线程管理多个连接的 I/O 操作,而不用像 BIO 每个连接一个线程,本质是通过中断机制+系统调用实现的。这样就可以在一个线程处理所有可用的 I/O 事件。


注意,如果一个 Selector(一般来说一个 Selector 对应一个线程,一个线程对应多个 Selector 会降低 Selector 效率)仅注册一个连接,那 NIO 和 BIO 就没什么区别了。


还记得我们为什么从 BIO 走到了 NIO 吗?是因为 BIO 无法抗住大量的连接,所以 NIO 解决了大连接的问题


但是 NIO 并不会带来每个请求的速度的提升,这点请记住,甚至在连接数不多时,处理速度还不如 BIO。

NIO 使用注意事项

前面我们假设 NIO 中不要出现耗时业务,但是如果必须有,比如数据库操作,那怎么办呢?在这里我们参考 NIO 框架 Netty 的实现。


Netty 建议对于耗时操作,应该通过传入的自定义线程池处理,即把这个任务提交到线程池,然后添加异步调用,当任务处理完毕,继续下一步处理。总之耗时任务线程池处理,普通任务直接在 NIO 线程处理就好。


其实这也对应了多线程 Reactor 模型。此外还有主从多线程 Reactor,就是把连接操作独立出来,做一个单独的 Selector 专门处理连接,连接之后的 I/O 操作都放在其他的 Selector 中,业务放在线程池中。


来看看这两种模型各自对应的代码:


import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.StandardCharsets;import java.util.*;import java.util.concurrent.*;import java.util.concurrent.locks.ReentrantLock;
/** * @author CodewithBuff */public class NioTcpMultiThread {
public static void main(String[] args) { Server.builder().build().run(); Runnable target = executorService::shutdown; Thread shutdown = new Thread(target); Runtime.getRuntime().addShutdownHook(shutdown); }
private static final ConcurrentHashMap<SocketChannel, LinkedBlockingQueue<DataLoad>> dataLoads = new ConcurrentHashMap<>();
private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
private static final ReentrantLock reentrantLock = new ReentrantLock();
@Data @Builder @NoArgsConstructor @AllArgsConstructor private static class DataLoad { private int intValue;
private long longValue;
private double doubleValue;
private String stringValue;
private int[] intArray;
private long[] longArray;
private double[] doubleArray;
private String[] stringArray; }
@Builder private static class Server implements Runnable {
@Override public void run() { System.out.println("Server开始运行..."); Selector globalSelector; ServerSocketChannel serverSocketChannel; SelectionKey serverSelectionKey; try { globalSelector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8190)); serverSocketChannel.configureBlocking(false); serverSelectionKey = serverSocketChannel.register(globalSelector, SelectionKey.OP_ACCEPT); serverSelectionKey.attach(Acceptor.builder() .serverSelectionKey(serverSelectionKey) .build() ); while (true) { int a = globalSelector.select(); Set<SelectionKey> selectionKeySet = globalSelector.selectedKeys(); for (SelectionKey selectionKey : selectionKeySet) { dispatch(selectionKey); selectionKeySet.remove(selectionKey); } } } catch (IOException ignored) { } }
private void dispatch(SelectionKey selectionKey) { Runnable runnable = (Runnable) selectionKey.attachment(); runnable.run(); } }
@Data @Builder private static class Acceptor implements Runnable {
private final SelectionKey serverSelectionKey;
@Override public void run() { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) serverSelectionKey.channel(); Selector globalSelector = serverSelectionKey.selector(); SocketChannel socketChannel; try { socketChannel = serverSocketChannel.accept(); System.out.println("已建立连接..."); socketChannel.configureBlocking(false); SelectionKey socketSelectionKey = socketChannel.register(globalSelector, SelectionKey.OP_READ); socketSelectionKey.attach(Handler.builder() .socketSelectionKey(socketSelectionKey) .build() ); globalSelector.wakeup(); } catch (IOException ignored) { } } }
@Data @Builder private static class Handler implements Runnable {
private final SelectionKey socketSelectionKey;
@Override public void run() { if (!socketSelectionKey.channel().isOpen()) { System.out.println("连接已关闭"); try { socketSelectionKey.channel().close(); } catch (IOException ignored) { } return ; } dataLoads.computeIfAbsent((SocketChannel) socketSelectionKey.channel(), k -> new LinkedBlockingQueue<>()); if (socketSelectionKey.isReadable()) { Reader reader = Reader.builder() .socketSelectionKey(socketSelectionKey) .build(); Thread thread = new Thread(reader); socketSelectionKey.interestOps(SelectionKey.OP_WRITE); thread.start(); } else if (socketSelectionKey.isWritable()) { Writer writer = Writer.builder() .socketSelectionKey(socketSelectionKey) .build(); Thread thread = new Thread(writer); socketSelectionKey.interestOps(SelectionKey.OP_READ); thread.start(); } } }
@Data @Builder private static class Reader implements Runnable {
private final SelectionKey socketSelectionKey;
@Override public void run() { try { SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel(); String value; reentrantLock.lock(); if (socketChannel.isOpen()) { int readable = socketChannel.read(byteBuffer); if (readable == 0) { value = null; // System.out.println("读到空请求"); } else if (readable < 0) { value = null; shutdownSocketChannel(socketChannel); } else { value = new String(byteBuffer.array(), 0, readable); } } else { value = null; } reentrantLock.unlock(); if (value == null) { return ; } System.out.println("读到了: " + value); DataLoad dataLoad = DataLoad.builder() .stringValue(value) .build(); LinkedBlockingQueue<DataLoad> tmp = dataLoads.computeIfAbsent(socketChannel, k -> new LinkedBlockingQueue<>()); tmp.add(dataLoad); socketSelectionKey.selector().wakeup(); } catch (IOException ignored) { } } }
@Data @Builder private static class Writer implements Runnable {
private final SelectionKey socketSelectionKey;
@Override public void run() { try { SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel(); LinkedBlockingQueue<DataLoad> queue = dataLoads.get(socketChannel); String value = "Server get: " + dataLoads.get(socketChannel).take().getStringValue(); if (socketChannel.isOpen()) socketChannel.write(ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8))); else { shutdownSocketChannel(socketChannel); } } catch (IOException | InterruptedException ignored) { } } }
private static void shutdownSocketChannel(SocketChannel socketChannel) { try { socketChannel.shutdownInput(); socketChannel.shutdownOutput(); socketChannel.close(); } catch (IOException ignored) { } }}
复制代码


主从多线程:


import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.StandardCharsets;import java.util.HashSet;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.locks.ReentrantLock;
/** * @author 十三月之夜 */public class NioTcpMainSubThread {
public static void main(String[] args) throws IOException { new Server(Runtime.getRuntime().availableProcessors()).run(); }
private static final ConcurrentHashMap<SocketChannel, LinkedBlockingQueue<DataLoad>> dataLoads = new ConcurrentHashMap<>();
private static final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
private static final ReentrantLock reentrantLock = new ReentrantLock();
@Data @Builder @NoArgsConstructor @AllArgsConstructor private static class DataLoad { private int intValue;
private long longValue;
private double doubleValue;
private String stringValue;
private int[] intArray;
private long[] longArray;
private double[] doubleArray;
private String[] stringArray; }
/** * BossSelector只和ServerSocketChannel进行建立连接操作,且是单线程的,运行在主线程中。 * <br/> * 然后把建立的连接扔给Workers处理,Workers是一组Worker,每个Worker都有一个独立的WorkSelector用来处理当前Worker被安排的SocketChannel。 * <br/> * 这里采取的策略是依次提交,尽可能让每个Worker所负责的SocketChannel数量相同。 * <br/> * 每个Worker运行在独立的线程上,仅做轮询Read/Write操作,耗时的业务操作(比如I/O,Compute)均交给线程工作池处理。 */ private static class Server implements Runnable {
private final Selector bossSelector;
private final int workerCount;
public Server(int workerCount) throws IOException { this.workerCount = workerCount; bossSelector = Selector.open(); }
@Override public void run() { try { System.out.println("服务器启动..."); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8190)); serverSocketChannel.configureBlocking(false); SelectionKey serverSelectionKey = serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT); serverSelectionKey.attach(new Boss(serverSocketChannel, workerCount)); while (true) { bossSelector.select(); Set<SelectionKey> selectionKeySet = bossSelector.selectedKeys(); // 特殊化处理,因为有且只有一个SelectionKey,所以不遍历了 SelectionKey key = selectionKeySet.iterator().next(); Runnable runnable = (Runnable) key.attachment(); runnable.run(); selectionKeySet.remove(key); } } catch (IOException ignored) { } } }
/** * 处理新的连接,生成SocketChannel并选择某一个Worker提交。 */ private static class Boss implements Runnable {
private final ServerSocketChannel serverSocketChannel;
private final int workerCount;
private final Set<SocketChannel>[] socketChannelSets;
private final Worker[] workers;
private int index = 0;
@SuppressWarnings("unchecked") public Boss(ServerSocketChannel serverSocketChannel, int workerCount) throws IOException { this.serverSocketChannel = serverSocketChannel; this.workerCount = workerCount; ExecutorService executorService = Executors.newFixedThreadPool(workerCount); socketChannelSets = new Set[workerCount]; workers = new Worker[workerCount]; for (int i = 0; i < workerCount; ++ i) { workers[i] = new Worker(); socketChannelSets[i] = workers[i].getSocketChannels(); executorService.submit(workers[i]); } }
@Override public void run() { Set<SocketChannel> socketChannelSet = socketChannelSets[index]; Selector workerSelector = workers[index].getWorkerSelector(); ++ index; if (index == this.workerCount) index = 0; try { SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("建立连接..."); socketChannelSet.add(socketChannel); workerSelector.wakeup(); } catch (IOException ignore) { } } }
/** * 处理新添加的SocketChannel和轮询Read/Write。 */ private static class Worker implements Runnable {
private final Selector workerSelector;
private final Set<SocketChannel> socketChannels = new HashSet<>();
public Worker() throws IOException { workerSelector = Selector.open(); }
@Override public void run() { while (true) { try { if (socketChannels.size() > 0) { for (SocketChannel socketChannel : socketChannels) { socketChannel.configureBlocking(false); SelectionKey selectionKey = socketChannel.register(workerSelector, SelectionKey.OP_READ); selectionKey.attach(new Handler(selectionKey)); socketChannels.remove(socketChannel); } System.out.println("已添加新的SocketChannel"); } workerSelector.select(); Set<SelectionKey> selectionKeySet = workerSelector.selectedKeys(); for (SelectionKey key : selectionKeySet) { Runnable runnable = (Runnable) key.attachment(); runnable.run(); selectionKeySet.remove(key); } } catch (IOException ignored) { } } }
public Set<SocketChannel> getSocketChannels() { return socketChannels; }
public Selector getWorkerSelector() { return workerSelector; } }
/** * 分发业务处理 */ @Data @Builder private static class Handler implements Runnable {
private final SelectionKey socketSelectionKey;
private static final ExecutorService workPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@Override public void run() { if (!socketSelectionKey.channel().isOpen()) { System.out.println("连接已关闭"); try { socketSelectionKey.channel().close(); } catch (IOException ignored) { } return ; } dataLoads.computeIfAbsent((SocketChannel) socketSelectionKey.channel(), k -> new LinkedBlockingQueue<>()); if (socketSelectionKey.isReadable()) { Reader reader = Reader.builder() .socketSelectionKey(socketSelectionKey) .build(); workPool.submit(reader); socketSelectionKey.interestOps(SelectionKey.OP_WRITE); } else if (socketSelectionKey.isWritable()) { Writer writer = Writer.builder() .socketSelectionKey(socketSelectionKey) .build(); workPool.submit(writer); socketSelectionKey.interestOps(SelectionKey.OP_READ); } } }
@Data @Builder private static class Reader implements Runnable {
private final SelectionKey socketSelectionKey;
@Override public void run() { try { SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel(); String value; reentrantLock.lock(); if (socketChannel.isOpen()) { int readable = socketChannel.read(byteBuffer); if (readable == 0) { value = null; // System.out.println("读到空请求"); } else if (readable < 0) { value = null; shutdownSocketChannel(socketChannel); } else { value = new String(byteBuffer.array(), 0, readable); } } else { value = null; } reentrantLock.unlock(); if (value == null) { return ; } System.out.println("读到了: " + value); DataLoad dataLoad = DataLoad.builder() .stringValue(value) .build(); LinkedBlockingQueue<DataLoad> tmp = dataLoads.computeIfAbsent(socketChannel, k -> new LinkedBlockingQueue<>()); tmp.add(dataLoad); socketSelectionKey.selector().wakeup(); } catch (IOException ignored) { } } }
@Data @Builder private static class Writer implements Runnable {
private final SelectionKey socketSelectionKey;
@Override public void run() { try { SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel(); LinkedBlockingQueue<DataLoad> queue = dataLoads.get(socketChannel); String value = "Server get: " + dataLoads.get(socketChannel).take().getStringValue(); if (socketChannel.isOpen()) socketChannel.write(ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8))); else { shutdownSocketChannel(socketChannel); } } catch (IOException | InterruptedException ignored) { } } }
private static void shutdownSocketChannel(SocketChannel socketChannel) { try { socketChannel.shutdownInput(); socketChannel.shutdownOutput(); socketChannel.close(); } catch (IOException ignored) { } }}
复制代码


在这里我们知道了。NIO 线程(Selector 绑定的线程)只做 I/O 处理。无论你这个业务逻辑什么时候完成,耗时多少,你只要数据准备好了,并且注册一个写事件,那么我 NIO 会在可以写时通知你,你来写就是了。这样只要耗时业务不在 NIO 线程中,就不会影响其他 ServerSocket 的 I/O 操作。



最后我希望明确一下,Reactor 模型的 Selector 仅仅实现了数据可读/数据可写的通知(Boss 版 Selector 可能仅做可连接通知)。这里我们强调的是“可”,表示可以的意思,即某个连接的 I/O 操作可用性通知。所以它需要搭配 NIO 使用,完成 NIO 的通知操作,而不必 NIO 不停地轮询。


你想让它帮你通知某个操作也非常简单,注册给它就行了,并添加一个回调程序(attachment 实现),这样 Selector 就能在可用时调用你的回调程序完成进一步操作,但是记得别在这个程序里阻塞你的 NIO Selector,不然会影响后续的其他连接的 I/O 可用性通知。

参考

Selector实现原理


彻底理解同步 异步 阻塞 非阻塞


I/O会一直占用CPU吗? - 赵鑫磊的回答 - 知乎


epoll 或者 kqueue 的原理是什么? - 关于夏天的一切的回答 - 知乎


《Java 高并发程序设计》


《现代操作系统》


《深入理解计算机系统》

发布于: 2 小时前阅读数: 6
用户头像

CodeWithBuff

关注

还未添加个人签名 2021.03.13 加入

还未添加个人简介

评论

发布
暂无评论
谈一谈Java的网络编程