写点什么

【Netty】「NIO」(三)剖析 Selector

作者:sidiot
  • 2023-06-07
    浙江
  • 本文字数:5795 字

    阅读完需:约 19 分钟

前言


本篇博文是《从 0 到 1 学习 Netty》中 NIO 系列的第三篇博文,主要内容是介绍通过使用 Selector,一个单独的线程可以有效地监视多个通道,从而提高应用程序的处理效率,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;

介绍


在 Java 中,Selector 是 NIO(New Input/Output)库中的一种对象,用于监控多个通道的状态,例如文件 I/O 或者网络 I/O。


Selector 的工作原理是使用 select() 方法轮询已注册的通道,获取它们的就绪状态,并返回一个已准备好进行 I/O 操作的通道集合。通过使用此机制,可以监视几个通道的状态,并且只有当至少一个通道处于就绪状态时才会执行 I/O 操作,从根本上避免了 CPU 的浪费。


总之,Selector 是一种强大的工具,可实现高效的 I/O 操作和网络编程,因为它能够轻松地监视多个通道的状态并在需要时对它们进行操作。


使用


1、创建 selector,管理多个 channel


Selector selector = Selector.open();
复制代码


2、注册 selectorchannel 的联系;


SelectionKey sscKey = ssc.register(selector, 0, null);  sscKey.interestOps(SelectionKey.OP_ACCEPT);
复制代码


SelectionKey 表示一个通道(Channel)与一个选择器(Selector)之间的注册关系。每个通道在与选择器进行注册时都会创建一个对应的 SelectionKey 对象,这个对象包含了关于通道和选择器的一些元数据信息。


这里 SelectionKey 调用 register() 方法指定感兴趣的事件类型,绑定的事件类型有以下几种:


  • connect - 客户端连接成功时触发;

  • accept - 服务器端成功接受连接时触发;

  • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况;

  • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况;


3、通过 selector 监听事件,并获得就绪的通道个数,若没有通道就绪,线程会被阻塞:


  • 阻塞直到绑定事件发生;


    int count = selector.select();
复制代码


  • 阻塞直到绑定事件发生,或是超时(时间单位为 ms);


    int count = selector.select(long timeout);
复制代码


  • 不会阻塞,即不管有没有事件,立刻返回,根据返回值检查是否有事件;


    int count = selector.selectNow();
复制代码


4、处理事件,SelectionKey 内部包含了所有发生的事件:


selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {    SelectionKey key = iter.next();    ServerSocketChannel channel = (ServerSocketChannel) key.channel();    SocketChannel sc = channel.accept();}
复制代码


5、整体代码如下所示:


@Slf4jpublic class SelectorTest {    public static void main(String[] args) {        try {            // 1. 创建选择器来管理多个 channel            Selector selector = Selector.open();
ByteBuffer buffer = ByteBuffer.allocate(16); ServerSocketChannel ssc = ServerSocketChannel.open(); // 通道必须设置为非阻塞模式 ssc.configureBlocking(false);
// 2. 注册 selector 和 channel 的联系 SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); log.debug("Register Key: {}", sscKey);
ssc.bind(new InetSocketAddress(7999));
while (true) { // 3. 在没有事件发生时,线程阻塞;反之,则线程恢复运行 selector.select();
// 4. 处理事件,SelectionKey 内部包含了所有发生的事件 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) { SelectionKey key = iter.next(); log.debug("Key: {}", key); ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); log.debug("{}", sc); } } } catch (IOException e) { e.printStackTrace(); } }}
复制代码


运行结果:


20:22:20 [DEBUG] [main] c.s.n.c.SelectorTest - Register Key: channel=sun.nio.ch.ServerSocketChannelImpl[unbound], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=020:22:35 [DEBUG] [main] c.s.n.c.SelectorTest - Key: channel=sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:7999], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=1620:22:35 [DEBUG] [main] c.s.n.c.SelectorTest - java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:61605]
复制代码


进阶


在上个阶段,我们只是简单地使用了 selector,但对于其绑定的事件类型,我们并没有进行特别的关注,然而,在实际应用中,我们不可能只使用一种事件类型,因此,我们需要改进我们的代码;


这里我们需要对事件类型进行判断,SelectionKey 正好提供了相关的方法:



改进代码如下所示:


while (true) {    // 3. 在没有事件发生时,线程阻塞;反之,则线程恢复运行    selector.select();
// 4. 处理事件,SelectionKey 内部包含了所有发生的事件 Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> iter = keySet.iterator(); log.debug("count: {}", keySet.size());
while (iter.hasNext()) { SelectionKey key = iter.next(); log.debug("Selection Key: {}", key);
// 5. 区分事件类型 if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); log.debug("sc Key: {}", sc); } else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(16); channel.read(buffer); buffer.flip(); debugRead(buffer); buffer.clear(); }
}}
复制代码


但是在运行时会发现报错空指针异常 NullPointerException


16:42:24 [DEBUG] [main] c.s.n.c.SelectorTest - Register Key: channel=sun.nio.ch.ServerSocketChannelImpl[unbound], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=016:42:45 [DEBUG] [main] c.s.n.c.SelectorTest - count: 116:42:45 [DEBUG] [main] c.s.n.c.SelectorTest - Selection Key: channel=sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:7999], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=1616:42:45 [DEBUG] [main] c.s.n.c.SelectorTest - sc Key: java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:60700]16:43:02 [DEBUG] [main] c.s.n.c.SelectorTest - count: 216:43:02 [DEBUG] [main] c.s.n.c.SelectorTest - Selection Key: channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:60700], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=1, readyOps=1
+--------+-------------------- read -----------------------+----------------+position: [0], limit: [6] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 73 69 64 69 6f 74 |sidiot |+--------+-------------------------------------------------+----------------+
16:43:02 [DEBUG] [main] c.s.n.c.SelectorTest - Selection Key: channel=sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:7999], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=16Exception in thread "main" java.lang.NullPointerException at com.sidiot.netty.c2.SelectorTest.main(SelectorTest.java:52)
复制代码


这是由于我们没有及时 remove() 造成的,当调用了 server.register() 方法后,Selector 中维护了一个集合,用于存放 SelectionKey 以及其对应的通道



Selector 的通道对应的事件发生后SelecionKey 会被放到另一个集合中,在这个集合中,即使 SelecionKey 被使用了,它也不会自动移除,所以在处理完一个事件后,需要手动移除迭代器中的 SelecionKey,否则会导致已被处理过的事件再次被处理,引发一些错误,例如上述的空指针异常。





当客户端主动断开连接时,也会出现异常,控制台输出如下:


java.net.SocketException: Connection reset        at java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:345)        at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:376)        at com.sidiot.netty.c2.SelectorTest.main(SelectorTest.java:60)
复制代码


这是因为当客户端与服务器之间的连接断开时,会给服务器端发送一个读事件,因此,我们需要进行判断,当 channel.read() 的返回值为-1 时,表示连接断开,需要调用 key.cancel() 方法取消此事件;


改进代码如下所示:


if (key.isReadable()) {    SocketChannel channel = (SocketChannel) key.channel();    ByteBuffer buffer = ByteBuffer.allocate(16);    try {        int read = channel.read(buffer);        if (read == -1) {            key.cancel();            channel.close();        } else {            buffer.flip();            debugRead(buffer);            buffer.clear();        }        iter.remove();    } catch (IOException e) {        e.printStackTrace();        key.cancel();        channel.close();        iter.remove();    }}
复制代码




整体代码如下所示:


@Slf4jpublic class SelectorTest {    public static void main(String[] args) {        try {            // 1. 创建选择器来管理多个 channel            Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open(); // 通道必须设置为非阻塞模式 ssc.configureBlocking(false);
// 2. 注册 selector 和 channel 的联系 SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); log.debug("Register Key: {}", sscKey);
ssc.bind(new InetSocketAddress(7999));
while (true) { // 3. 在没有事件发生时,线程阻塞;反之,则线程恢复运行 selector.select();
// 4. 处理事件,SelectionKey 内部包含了所有发生的事件 Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> iter = keySet.iterator(); log.debug("count: {}", keySet.size());
while (iter.hasNext()) { SelectionKey key = iter.next(); log.debug("Selection Key: {}", key);
// 5. 区分事件类型 if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); log.debug("sc Key: {}", sc); iter.remove(); } else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(4); try { int read = channel.read(buffer); if (read == -1) { key.cancel(); channel.close(); } else { buffer.flip(); debugRead(buffer); buffer.clear(); } iter.remove(); } catch (IOException e) { e.printStackTrace(); key.cancel(); channel.close(); iter.remove(); } } } } } catch (IOException e) { e.printStackTrace(); } }}
复制代码


后记


以上就是 剖析 Selector 的所有内容了,希望本篇博文对大家有所帮助!


参考:



📝 上篇精讲:「NIO」(二)阻塞模式与非阻塞模式

💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;

👍 创作不易,请多多支持;

🔥 系列专栏:探索 Netty:源码解析与应用案例分享

发布于: 20 小时前阅读数: 24
用户头像

sidiot

关注

还未添加个人签名 2023-06-04 加入

还未添加个人简介

评论

发布
暂无评论
【Netty】「NIO」(三)剖析 Selector_Java_sidiot_InfoQ写作社区