前言
本篇博文是《从 0 到 1 学习 Netty》中 NIO 系列的第三篇博文,主要内容是介绍通过使用 Selector,一个单独的线程可以有效地监视多个通道,从而提高应用程序的处理效率,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
介绍
在 Java 中,Selector 是 NIO(New Input/Output)库中的一种对象,用于监控多个通道的状态,例如文件 I/O 或者网络 I/O。
Selector 的工作原理是使用 select()
方法轮询已注册的通道,获取它们的就绪状态,并返回一个已准备好进行 I/O 操作的通道集合。通过使用此机制,可以监视几个通道的状态,并且只有当至少一个通道处于就绪状态时才会执行 I/O 操作,从根本上避免了 CPU 的浪费。
总之,Selector 是一种强大的工具,可实现高效的 I/O 操作和网络编程,因为它能够轻松地监视多个通道的状态并在需要时对它们进行操作。
使用
1、创建 selector
,管理多个 channel
;
Selector selector = Selector.open();
复制代码
2、注册 selector
和 channel
的联系;
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
复制代码
SelectionKey
表示一个通道(Channel)与一个选择器(Selector)之间的注册关系。每个通道在与选择器进行注册时都会创建一个对应的 SelectionKey
对象,这个对象包含了关于通道和选择器的一些元数据信息。
这里 SelectionKey
调用 register()
方法指定感兴趣的事件类型,绑定的事件类型有以下几种:
3、通过 selector
监听事件,并获得就绪的通道个数,若没有通道就绪,线程会被阻塞:
int count = selector.select();
复制代码
int count = selector.select(long timeout);
复制代码
int count = selector.selectNow();
复制代码
4、处理事件,SelectionKey
内部包含了所有发生的事件:
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
}
复制代码
5、整体代码如下所示:
@Slf4j
public class SelectorTest {
public static void main(String[] args) {
try {
// 1. 创建选择器来管理多个 channel
Selector selector = Selector.open();
ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
// 通道必须设置为非阻塞模式
ssc.configureBlocking(false);
// 2. 注册 selector 和 channel 的联系
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("Register Key: {}", sscKey);
ssc.bind(new InetSocketAddress(7999));
while (true) {
// 3. 在没有事件发生时,线程阻塞;反之,则线程恢复运行
selector.select();
// 4. 处理事件,SelectionKey 内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
log.debug("Key: {}", key);
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
log.debug("{}", sc);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
复制代码
运行结果:
20:22:20 [DEBUG] [main] c.s.n.c.SelectorTest - Register Key: channel=sun.nio.ch.ServerSocketChannelImpl[unbound], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=0
20:22:35 [DEBUG] [main] c.s.n.c.SelectorTest - Key: channel=sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:7999], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=16
20:22:35 [DEBUG] [main] c.s.n.c.SelectorTest - java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:61605]
复制代码
进阶
在上个阶段,我们只是简单地使用了 selector
,但对于其绑定的事件类型,我们并没有进行特别的关注,然而,在实际应用中,我们不可能只使用一种事件类型,因此,我们需要改进我们的代码;
这里我们需要对事件类型进行判断,SelectionKey
正好提供了相关的方法:
改进代码如下所示:
while (true) {
// 3. 在没有事件发生时,线程阻塞;反之,则线程恢复运行
selector.select();
// 4. 处理事件,SelectionKey 内部包含了所有发生的事件
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> iter = keySet.iterator();
log.debug("count: {}", keySet.size());
while (iter.hasNext()) {
SelectionKey key = iter.next();
log.debug("Selection Key: {}", key);
// 5. 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
log.debug("sc Key: {}", sc);
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
channel.read(buffer);
buffer.flip();
debugRead(buffer);
buffer.clear();
}
}
}
复制代码
但是在运行时会发现报错空指针异常 NullPointerException
:
16:42:24 [DEBUG] [main] c.s.n.c.SelectorTest - Register Key: channel=sun.nio.ch.ServerSocketChannelImpl[unbound], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=0
16:42:45 [DEBUG] [main] c.s.n.c.SelectorTest - count: 1
16:42:45 [DEBUG] [main] c.s.n.c.SelectorTest - Selection Key: channel=sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:7999], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=16
16:42:45 [DEBUG] [main] c.s.n.c.SelectorTest - sc Key: java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:60700]
16:43:02 [DEBUG] [main] c.s.n.c.SelectorTest - count: 2
16:43:02 [DEBUG] [main] c.s.n.c.SelectorTest - Selection Key: channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:60700], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=1, readyOps=1
+--------+-------------------- read -----------------------+----------------+
position: [0], limit: [6]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 |sidiot |
+--------+-------------------------------------------------+----------------+
16:43:02 [DEBUG] [main] c.s.n.c.SelectorTest - Selection Key: channel=sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:7999], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=16
Exception in thread "main" java.lang.NullPointerException
at com.sidiot.netty.c2.SelectorTest.main(SelectorTest.java:52)
复制代码
这是由于我们没有及时 remove()
造成的,当调用了 server.register()
方法后,Selector 中维护了一个集合,用于存放 SelectionKey
以及其对应的通道;
当 Selector 的通道对应的事件发生后,SelecionKey
会被放到另一个集合中,在这个集合中,即使 SelecionKey
被使用了,它也不会自动移除,所以在处理完一个事件后,需要手动移除迭代器中的 SelecionKey
,否则会导致已被处理过的事件再次被处理,引发一些错误,例如上述的空指针异常。
当客户端主动断开连接时,也会出现异常,控制台输出如下:
java.net.SocketException: Connection reset
at java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:345)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:376)
at com.sidiot.netty.c2.SelectorTest.main(SelectorTest.java:60)
复制代码
这是因为当客户端与服务器之间的连接断开时,会给服务器端发送一个读事件,因此,我们需要进行判断,当 channel.read()
的返回值为-1 时,表示连接断开,需要调用 key.cancel()
方法取消此事件;
改进代码如下所示:
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
try {
int read = channel.read(buffer);
if (read == -1) {
key.cancel();
channel.close();
} else {
buffer.flip();
debugRead(buffer);
buffer.clear();
}
iter.remove();
} catch (IOException e) {
e.printStackTrace();
key.cancel();
channel.close();
iter.remove();
}
}
复制代码
整体代码如下所示:
@Slf4j
public class SelectorTest {
public static void main(String[] args) {
try {
// 1. 创建选择器来管理多个 channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
// 通道必须设置为非阻塞模式
ssc.configureBlocking(false);
// 2. 注册 selector 和 channel 的联系
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("Register Key: {}", sscKey);
ssc.bind(new InetSocketAddress(7999));
while (true) {
// 3. 在没有事件发生时,线程阻塞;反之,则线程恢复运行
selector.select();
// 4. 处理事件,SelectionKey 内部包含了所有发生的事件
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> iter = keySet.iterator();
log.debug("count: {}", keySet.size());
while (iter.hasNext()) {
SelectionKey key = iter.next();
log.debug("Selection Key: {}", key);
// 5. 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
log.debug("sc Key: {}", sc);
iter.remove();
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(4);
try {
int read = channel.read(buffer);
if (read == -1) {
key.cancel();
channel.close();
} else {
buffer.flip();
debugRead(buffer);
buffer.clear();
}
iter.remove();
} catch (IOException e) {
e.printStackTrace();
key.cancel();
channel.close();
iter.remove();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
复制代码
后记
以上就是 剖析 Selector 的所有内容了,希望本篇博文对大家有所帮助!
参考:
📝 上篇精讲:「NIO」(二)阻塞模式与非阻塞模式
💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;
👍 创作不易,请多多支持;
🔥 系列专栏:探索 Netty:源码解析与应用案例分享
评论