写点什么

Java Selector 模型

用户头像
邱学喆
关注
发布于: 2021 年 06 月 17 日
Java Selector模型

一. 概述

在以前开发服务器时,都是通过用户态线程循环遍历连接请求,一个请求获取一个 socket 对象,接着通过 socket 对象获取请求数据,以及通过 socket 写入数据返回给客户端;而这种模式是阻塞型的,性能低效;现在操作系统层面提供了 Selector 模式,对网络层的连接提供高效的操作,其具体原理交由操作系统层面去监控这些 socket;具体原理介绍,可以去查阅聊聊Linux 五种IO模型

二. Buffer 家族

上面省略了相关的子类,如 LongBuffer、CharBuffer 等;然而其与 ByteBuffer 的结构差不多,只是 hb 的类型换了而已;那么我们仔细对属性进行简单介绍一下:

  • position 指的当前的数组的位置,可以理解为指针(游标),每次写入一个字节时,就会+1;

  • limit 限制大小,写入最大的位置,读取最大的位置,常规情况与 capacity 是一样的;

  • capacity 是当前数组容量,在分配 buffer 时,就填入,过程中是不能更改的,只有 truncate 方法被更改,但我不是很明白该方法的具体使用场景

  • mark 像是给当前 buffer 打上一个标志,其值就是记录当前的 position 的位置,当调用 reset 方法可以回滚到之前打上标志的位置;

  • address 记录当前 buffer 的地址,通过 Unsafe 获取子类的数组的起始地址;如果是堆内内存的,则是通过 Unsafe 获取到该数组的地址;如果是对外内存的(MappedByteBuffer)的,则通过文件句柄获取到该地址;

byteBuffer 下的属性就就不过多介绍,字眼都能猜测出什么意思;堆外内存的,是不归垃圾回收器处理的,需要程序自行释放该内存,我们看到 DirectByteBuffer 中有 cleaner 属性,通过获取到该属性就调用该类的方法进行内存释放;

我们在常用的 buffer 类中的方法有如下:flip ,rewind,slice ,reset,clear 等;具体可以看源码就即可明白了;我们常规创建 Buffe 时,都是通过 allocateDirect, allocate 方法进行创建;稍微提醒的是:将 buffer 的内容写入对应的目标对象,都需要先调用 flip 方法

具体的详细的可以参考网络的;不过多详细介绍;

三. Selector 模型

Selector 是 IO 多路复用模型。

其核心原理是交给操作系统层面去监控该通道,之所以有这样的结论,主要是用户态与内核态的切换要耗一定的时间,交由操作系统层面监控,可以减少不必要的切换;

同时也需要讲解的“零拷贝”这个俗语,其原理是在内核态使用的内存中划一块内存作为共享内存,用户态与内核态共同访问,这样子可以避免用户态的数据需要拷贝到内核态内存中,这样子内核态才能访问。

在 Selector 模型,主要是对三个类型操作,其类图如下:

1. Selector

创建 Selector 是通过 Selector.open 方法去创建,而底层最终是交给 SelectorProvider 的实现创建;

关闭 Selector 是通过 Selector.close 方法去关闭;

其属性我这边没有过多对其源码进行解读,所以就不在此班门弄斧了;

2. SelectableChannel 抽象类

创建:SelectableChannel 子类的创建是由各个子类中的 open 方法进行创建,其底层最终是交给 SelectorProvider 来创建;

相关的配置,具体可以对应的方法进行配置即可。

关闭:直接调用 close 方法即可

3. SelectionKey

其是将 Selector 与 SelectableChannel 关联起来;

事件:OP_READ(就绪读事件)、OP_WRITE(就绪写事件)、OP_CONNECT(就绪连接事件)、OP_ACCEPT(就绪接收事件);

我们感兴趣的事件标志都保存到 interestOps,已经就绪的事件标志都保存到 readyOps;

三. 配置项

1. SocketChannel 参数

通过 SocketChannel.setOption 方法进行添加参数;其内部维护了一个内部类,只允许部分的参数有效;SocketChannelImpl 类中的 DefaultOptionsHolder。我们简单了解一下参数的意义:

2. ServerSocketChannel 参数

与 SocketChannel 参数的介绍差不多,同样也维护了一个内部类:DefaultOptionsHolder。

3. DatagramChannel 参数

与 SocketChannel 参数的介绍差不多,同样也维护了一个内部类:DefaultOptionsHolder。

四. 实战

1. 客户端

public class Client {    public  static void main(String [] args) throws IOException, InterruptedException {        Selector selector = Selector.open();        SocketChannel socketChannel = SocketChannel.open();        socketChannel.setOption(StandardSocketOptions.TCP_NODELAY,true);        socketChannel.connect(new InetSocketAddress("127.0.0.1", 6666));        socketChannel.configureBlocking(false);        socketChannel.register(selector, SelectionKey.OP_WRITE);        ByteBuffer buffer = ByteBuffer.allocate(10);        for (int i = 0; i <1 ; i++) {            buffer.clear();            buffer.put("hello5555".getBytes());            buffer.flip();            socketChannel.write(buffer);            Thread.sleep(10000);        }        socketChannel.socket().close();        socketChannel.close();    }}
复制代码

2. 服务端

public class TimeServer {    public static void main(String[] args) throws Exception {        int port = 6666;        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);        new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();    }}public class MultiplexerTimeServer implements Runnable{    private Selector selector;    private ServerSocketChannel serverSocketChannel;    private volatile boolean stop;
public MultiplexerTimeServer(int port) throws Exception { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port),1024); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println(" time server have configured "); } public void stop(){ this.stop = true; } public void run(){ while(!stop){ try{ int cnt = selector.select(1000); if(cnt>0){ Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); iterator.remove(); try { processKey(selectionKey); } catch (Exception e) { e.printStackTrace(); System.out.println(" 异常 "+e); selectionKey.cancel(); try { selectionKey.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } } } }else{ //说明没有就绪事件,可以先休眠 System.out.println(" 休眠 "+LocalDateTime.now()); Thread.sleep(5000); } }catch (Exception ex){ ex.printStackTrace(); } } if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } }
private void processKey(SelectionKey selectionKey) throws Exception{ if(selectionKey.isValid()){ if(selectionKey.isAcceptable()){ ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = channel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector,SelectionKey.OP_READ); } if(selectionKey.isReadable()){ SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int readBytes = channel.read(buffer); System.out.println("读取长度: "+readBytes ); if(readBytes>0){ buffer.flip(); byte [] revContent = new byte[buffer.remaining()]; buffer.get(revContent); System.out.println("receive data from client: "+ new String(revContent,"UTF-8"));
doWrite(channel); }else if(readBytes<0){ //关闭事件 System.out.println(" 拦截已经关闭。。。。 "); selectionKey.cancel(); channel.close(); } } } }
private void doWrite(SocketChannel channel) throws Exception { String date = LocalDateTime.now().toString(); ByteBuffer buffer = ByteBuffer.wrap(date.getBytes()); channel.write(buffer); }}
复制代码


用户头像

邱学喆

关注

计算机原理的深度解读,源码分析。 2018.08.26 加入

在IT领域keep Learning。要知其然,也要知其所以然。原理的爱好,源码的阅读。输出我对原理以及源码解读的理解。个人的仓库:https://gitee.com/Michael_Chan

评论

发布
暂无评论
Java Selector模型