写点什么

NIO 看破也说破(四)—— Java 的 NIO

发布于: 2020 年 05 月 19 日
NIO看破也说破(四)—— Java的NIO

Java的NIO有selector,系统内核也提供了多种非阻塞IO模型,Java社区也出现了像netty这种优秀的 NIO 框架。Java的NIO 与内核的阻塞模型到底什么关系,为什么Java有NIO的API还出现了netty这种框架,网上说的 reactor 到底是什么?本文通过分析代码,带你一步步搞清楚Java的NIO和系统函数之间的关系,以及Java NIO 是如何一步步衍生出来netty框架。

NIO概念



前几节我们提到了 Nonblocking IO 的概念,在Java中有Java NIO 的系列包,网上的大多数资料把Java的NIO等同于 Nonblocking IO ,这是错误的。Java 中的 NIO 指的是从1.4版本后,提供的一套可以替代标准的Java IO 的 new API。有三部分组成:

  • Buffer 缓冲区

  • Channel 通道

  • Selector 选择器



API的具体使用不在本文赘述。

代码模板



NIO大致分为这几步骤:

  1. 获取channel

  2. 设置非阻塞

  3. 创建多路复用器selector

  4. channel和selector做关联

  5. 根据selector返回的channel状态处理逻辑

// 开启一个channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(PORT));
// 打开一个多路复用器
Selector selector = Selector.open();
// 绑定多路复用器和channel
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 获取到达的事件
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.keys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
// 处理逻辑
}
if (selectionKey.isReadable()) {
// 处理逻辑
}
}
}



单线程示例



参考代码模板,我们用 NIO 实现一个Echo Server。server代码如下:

public static void main(String[] args) throws IOException {
Selector selector = initServer();
while (selector.select() > 0) {
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
try {
if (selectionKey.isAcceptable()) {
ServerSocketChannel serverSocketChannel =
(ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverSocketChannel.accept();
System.out.println("建立链接:" + channel.getRemoteAddress());
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.register(selector, SelectionKey.OP_READ, buffer);
} else if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
buffer.clear();
StringBuilder sb = new StringBuilder();
int read = 0;
while ((read = channel.read(buffer)) > 0) {
buffer.flip();
sb.append(Charset.forName("UTF-8").
newDecoder().decode(buffer));
buffer.clear();
}
System.out.printf("收到 %s 发来的:%s\n",
channel.getRemoteAddress(), sb);
buffer.clear();
// 模拟server端处理耗时
Thread.sleep((int) (Math.random() * 1000));
buffer.put(("收到,你发来的是:" + sb + "\r\n").getBytes("utf-8"));
buffer.flip();
channel.write(buffer);
System.out.printf("回复 %s 内容是: %s\n",
channel.getRemoteAddress(), sb);
channel.register(selector, SelectionKey.OP_READ, buffer.clear());
}
} catch (IOException | InterruptedException e) {
selectionKey.cancel();
selectionKey.channel().close();
System.err.println(e.getMessage());
}
iterator.remove();
}
}
}



写一个client ,模拟 50 个线程同时请求 server 端,在 readHandler 中模拟了随机sleep。client代码:

public static void main(String[] args) throws IOException {
for (int i = 0; i < 50; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
clientHandler();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
return;
}
private static void clientHandler() throws IOException {
long start = System.currentTimeMillis();
Socket socket = new Socket();
socket.setSoTimeout(10000);
socket.connect(new InetSocketAddress(9999));
OutputStream outputStream = socket.getOutputStream();
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outputStream));
bw.write("你好,我是client " + socket.getLocalSocketAddress() + "\r\n");
bw.flush();
InputStream inputStream = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
System.out.printf("接到服务端响应:%s,处理了%d\r\n", br.readLine(), (System.currentTimeMillis() - start));
br.close();
inputStream.close();
bw.close();
outputStream.close();
}



Java NIO 单线程



Selector 实现原理



用strace启动,[root@f00e68119764 tmp]# strace -ff -o out /usr/lib/jvm/java-1.8.0/bin/java NIOServerSingle 分析执行的过程,在日志中可以看到如下片段:

20083 socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4
20084 setsockopt(4, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
20085 clock_gettime(CLOCK_MONOTONIC, {tv_sec=242065, tv_nsec=887240727}) = 0
20086 fcntl(4, F_GETFL) = 0x2 (flags O_RDWR)
20087 fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0
20088 bind(4, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
20089 listen(4, 50) = 0
20090 getsockname(4, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0
20091 getsockname(4, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0
20092 epoll_create(256) = 7
21100 epoll_ctl(7, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=158913789956}}) = 0
21101 epoll_wait(7, [{EPOLLIN, {u32=4, u64=158913789956}}], 8192, -1) = 1

可以看出,在Java的 NIO 中(java1.8)底层是调用的系统 epoll ,关于 epoll 请出门右转 这里不再啰嗦。

从源码中也可以看出:

public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

openSelector是抽象方法具体实现类,在Linux上代码如下:

public class EPollSelectorProvider
extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}



跟踪代码可以看到最后调用 native 方法,说明:Java NIO 是利用系统内核提供的能力。

多线程处理



我们把单线程示例中,readHandler 随机 sleep,稍稍做些修改。模拟 server 端执行某一次请求时,处理过慢,如图示:



第十五个请求过来时,随机sleep:

// 模拟server端处理耗时
if (t.get() == 15) {
Thread.sleep((int) (Math.random() * 10000));
}

结果第十五个线程之后,所有 client 的执行都有一个短暂的等待



很容易解释,因为在单线程处理中,channel创建、IO读写均为一个 Thread ,面对50个 client,IO时间需要排队处理。因此我们Redis系列中也提到了在Redis中,尽量避免某一个key的操作会很耗时的情况。可以参考 出门右转



我们对代码做一些改造,client 端代码不动,server 端代码稍作调整。增加一个线程来处理读写时间,代码片段如下:

if (selectionKey.isAcceptable()) {
ServerSocketChannel serverSocketChannel =
(ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverSocketChannel.accept();
System.out.println("建立链接:" + channel.getRemoteAddress());
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.register(selector, SelectionKey.OP_READ, buffer);
} else if (selectionKey.isReadable()) {
service.execute(new Runnable() {
@Override
public void run() {
try {
// 处理逻辑……………………
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}

这样相当于server端有两个线程,一个是主线程启动的 selector 来监听 channel 的 OP_ACCEPT 状态,另一个线程是处理 channel 的读写。程序也可以继续执行,稍稍快了一些。

Java NIO 多线程



reactor模式



接触 NIO 就一定听过reactor 这个名词,reactor 经常被混入 NIO 中,让很多人混淆概念。Reactor 到底是什么,维基百科的解释:



The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.



核心点四个:

  1. reactor design pattern (reactor是一种设计模式,不是专属于某个语言或框架的)

  2. event handling pattern (事件处理模式)

  3. delivered concurrently to a service handler by one or more inputs(一次处理一个或多个输入)

  4. demultiplexes the incoming requests and dispatches them(多路分解,分发)



我们对单线程示例做些修改:

public static void main(String[] args) throws IOException {
Selector selector = initServer();
while (selector.select() > 0) {
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispatcher(selectionKey);
iterator.remove();
}
}
}

initServer的实现:

private static Selector initServer() throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(9999));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server 启动");
return selector;
}

dispatcher 的实现:

private static void dispatcher(SelectionKey selectionKey) {
try {
if (selectionKey.isAcceptable()) {
// 我只负责处理链接
acceptHandler(selector, selectionKey);
} else if (selectionKey.isReadable()) {
// 我只处理读写数据
readHandler(selector, selectionKey);
}
} catch (IOException | InterruptedException e) {
selectionKey.cancel();
selectionKey.channel().close();
System.err.println(e.getMessage());
}
}

acceptHandler的实现:

ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverSocketChannel.accept();
System.out.println("建立链接:" + channel.getRemoteAddress());
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.register(selector, SelectionKey.OP_READ, buffer);

readHandler的实现:

SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
buffer.clear();
StringBuilder sb = new StringBuilder();
int read = 0;
while ((read = channel.read(buffer)) > 0) {
buffer.flip();
sb.append(Charset.forName("UTF-8").newDecoder().decode(buffer));
buffer.clear();
}
System.out.printf("收到 %s 发来的:%s\n", channel.getRemoteAddress(), sb);
buffer.clear();
// 模拟server端处理耗时
Thread.sleep((int) (Math.random() * 1000));
buffer.put(("收到,你发来的是:" + sb + "\r\n").getBytes("utf-8"));
buffer.flip();
channel.write(buffer);
System.out.printf("回复 %s 内容是: %s\n", channel.getRemoteAddress(), sb);
channel.register(selector, SelectionKey.OP_READ, buffer.clear());

单线程Reactor



改造后的代码,具备了以下特点:



  1. 基于事件驱动( NIO 的 selector,底层对事件驱动的epoll实现 jdk1.8)

  2. 统一分派中心(dispatcher方法)

  3. 不同的事件处理(accept 和 read write 拆分)



已经基本上实现了 Reactor 的单线程模式,我们把示例代码再做一些改造:

public class ReactorDemo {
private Selector selector;
public ReactorDemo() throws IOException {
initServer();
}
private void initServer() throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(9999));
selector = Selector.open();
SelectionKey selectionKey = serverChannel.
register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor());
System.out.println("server 启动");
}
public void start() throws IOException {
while (selector.select() > 0) {
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispater(selectionKey);
iterator.remove();
}
}
}
public void dispater(SelectionKey selectionKey) {
Hander hander = (Hander) selectionKey.attachment();
if (hander != null) {
hander.process(selectionKey);
}
}
private interface Hander {
void process(SelectionKey selectionKey);
}
private class Acceptor implements Hander {
@Override
public void process(SelectionKey selectionKey) {
try {
ServerSocketChannel serverSocketChannel =
(ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverSocketChannel.accept();
System.out.println("建立链接:" + channel.getRemoteAddress());
channel.configureBlocking(false);
selectionKey.attach(new ProcessHander());
channel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
}
private class ProcessHandler implements Hander {
@Override
public void process(SelectionKey selectionKey) {
}
}
public static void main(String[] args) throws IOException {
new ReactorDemo().start();
}
}

我们实现了最基本的单Reactor的单线程模型,程序启动后 selector 负责获取、分离可用的 socket 交给dispatcher处理,dispatcher 交给不同的 handler 处理。其中 Acceptor 只负责 socket 链接,IO 的处理交给 ProcessHandler。



我把网上流传的Reactor的图,按自己的理解重新画了一份



多线程Reactor



上面的示例代码中,从 socket 建立到 IO 完成,只有一个线程在处理。NIO 单线程示例中我们尝试加入线程池来加速 IO 任务的处理,reactor 模式中该如何实现呢?



简单理解,参考 NIO 多线程加入线程池处理所有的processHandler ,可以利用 CPU 多核心加快业务处理,代码不再赘述。



多Reactor模式



参考ReactorDemo ,我们的 acceptor 处理 socket 链接时和 handler 处理 IO 都是用的同一个 selector 。如果我们在多线程基础上有两个 selector ,一个只负责处理 socket 链接一个处理网路 IO 各司其职将会更高大的提升系统吞吐量,该怎么实现呢?

public class ReactorDemo {
private Selector selector;
private Selector ioSelector;
public ReactorDemo() throws IOException {
initServer();
}
private void initServer() throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(9999));
selector = Selector.open();
ioSelector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server 启动");
}
public void startServer() {
Executors.newFixedThreadPool(1).execute(new Runnable() {
@Override
public void run() {
try {
majorListen();
} catch (IOException e) {
e.printStackTrace();
}
}
});
Executors.newFixedThreadPool(1).execute(new Runnable() {
@Override
public void run() {
try {
subListen();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public void majorListen() throws IOException {
System.out.println("主selector启动");
while (selector.select() > 0) {
System.out.println("主selector有事件");
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
new Acceptor().process(selectionKey);
}
iterator.remove();
}
}
}
public void subListen() throws IOException {
System.out.println("子selector启动");
while (true) {
if (ioSelector.select(100) <= 0) {
continue;
}
System.out.println("子selector有事件");
Set<SelectionKey> set = ioSelector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
selectionKey.attach(new ProcessHander());
dispater(selectionKey, true);
iterator.remove();
}
}
}
public void dispater(SelectionKey selectionKey, boolean isSub) {
Hander hander = (Hander) selectionKey.attachment();
if (hander != null) {
hander.process(selectionKey);
}
}
private interface Hander {
void process(SelectionKey selectionKey);
}
private class Acceptor implements Hander {
@Override
public void process(SelectionKey selectionKey) {
try {
ServerSocketChannel serverSocketChannel =
(ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverSocketChannel.accept();
System.out.println("获取一个链接:" + channel.getRemoteAddress());
channel.configureBlocking(false);
channel.register(ioSelector,
SelectionKey.OP_READ, ByteBuffer.allocate(1024));
ioSelector.wakeup();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private class ProcessHander implements Hander {
@Override
public void process(SelectionKey selectionKey) {
try {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
StringBuilder sb = new StringBuilder();
int read = 0;
if ((read = channel.read(buffer)) > 0) {
buffer.flip();
sb.append(Charset.forName("UTF-8").newDecoder().decode(buffer));
buffer.clear();
} else if (read == 0) {
return;
} else if (read == -1) {
if (selectionKey.isValid()) {
selectionKey.cancel();
channel.close();
}
}
System.out.printf("收到 %s 发来的:%s\n",
channel.getRemoteAddress(), sb);
buffer.clear();
buffer.put(("收到,你发来的是:" + sb + "\r\n").getBytes("utf-8"));
buffer.flip();
channel.write(buffer);
System.out.printf("回复 %s 内容是: %s\n",
channel.getRemoteAddress(), sb);
channel.register(ioSelector, SelectionKey.OP_READ, buffer.clear());
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new ReactorDemo().startServer();
}
}

示例中创建了 selector 和 ioSelector ,其中 selector 只处理 socket 的建立,在 Acceptor.process 方法中把 socket 注册给 ioSelector。在 ProcessHander.process 方法中 ioSelector 只负责处理 IO 事件。这样,我们把 selector 进行了拆分。参考多线程实现,同理我们可以创建 N 个线程,处理 ioSelector 对应的 IO 事件。



总结



至此,我们了解了 Reactor 的三种模型结果,分别是单 Reactor 单线程、单 Reactor 多线程、多 Reactor 多线程。所有代码不够严谨,只为了表示可以使用多个线程或者多个 selector 之间的关系。总结重点:



  1. reactor 是一种设计模式

  2. 基于事件驱动来处理

  3. 利用多路分发的策略,让不同业务处理各司其职

  4. 有单线程,单Reactor 多线程 和 多 Reactor 多线程,三种实现方式



参考:

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf



系列

NIO 看破也说破(一)—— Linux/IO 基础

NIO 看破也说破(二)—— Java 中的两种 BIO

NIO 看破也说破(三)—— 不同的 IO 模型

NIO 看破也说破(四)—— Java 的 NIO

NIO 看破也说破(五): 搞,今天就搞,搞懂Buffer



关注我



如果您在微信阅读,请您点击链接 关注我 ,如果您在 PC 上阅读请扫码关注我,欢迎与我交流随时指出错误



发布于: 2020 年 05 月 19 日阅读数: 3045
用户头像

欢迎关注公众号“小眼睛聊技术” 2018.11.12 加入

互联网老兵,关注产品、技术、管理

评论 (7 条评论)

发布
用户头像
还是别忘了把之前几篇链接加到文章末尾哈。
2020 年 05 月 19 日 17:00
回复
感谢提醒
2020 年 05 月 19 日 20:53
回复
该评论已删除
2020 年 05 月 22 日 13:10
回复
正在研究,稍等。
2020 年 05 月 22 日 14:16
回复
查看更多回复
用户头像
第四篇如期而至,辛苦码字,InfoQ首页推荐。
2020 年 05 月 19 日 17:00
回复
系列更新,求推荐
2020 年 06 月 04 日 16:04
回复
没有更多了
NIO看破也说破(四)—— Java的NIO