一. 概述
在以前开发服务器时,都是通过用户态线程循环遍历连接请求,一个请求获取一个 socket 对象,接着通过 socket 对象获取请求数据,以及通过 socket 写入数据返回给客户端;而这种模式是阻塞型的,性能低效;现在操作系统层面提供了 Selector 模式,对网络层的连接提供高效的操作,其具体原理交由操作系统层面去监控这些 socket;具体原理介绍,可以去查阅聊聊Linux 五种IO模型
二. Buffer 家族
上面省略了相关的子类,如 LongBuffer、CharBuffer 等;然而其与 ByteBuffer 的结构差不多,只是 hb 的类型换了而已;那么我们仔细对属性进行简单介绍一下:
position 指的当前的数组的位置,可以理解为指针(游标),每次写入一个字节时,就会+1;
limit 限制大小,写入最大的位置,读取最大的位置,常规情况与 capacity 是一样的;
capacity 是当前数组容量,在分配 buffer 时,就填入,过程中是不能更改的,只有 truncate 方法被更改,但我不是很明白该方法的具体使用场景;
mark 像是给当前 buffer 打上一个标志,其值就是记录当前的 position 的位置,当调用 reset 方法可以回滚到之前打上标志的位置;
address 记录当前 buffer 的地址,通过 Unsafe 获取子类的数组的起始地址;如果是堆内内存的,则是通过 Unsafe 获取到该数组的地址;如果是对外内存的(MappedByteBuffer)的,则通过文件句柄获取到该地址;
byteBuffer 下的属性就就不过多介绍,字眼都能猜测出什么意思;堆外内存的,是不归垃圾回收器处理的,需要程序自行释放该内存,我们看到 DirectByteBuffer 中有 cleaner 属性,通过获取到该属性就调用该类的方法进行内存释放;
我们在常用的 buffer 类中的方法有如下:flip ,rewind,slice ,reset,clear 等;具体可以看源码就即可明白了;我们常规创建 Buffe 时,都是通过 allocateDirect, allocate 方法进行创建;稍微提醒的是:将 buffer 的内容写入对应的目标对象,都需要先调用 flip 方法;
具体的详细的可以参考网络的;不过多详细介绍;
三. Selector 模型
Selector 是 IO 多路复用模型。
其核心原理是交给操作系统层面去监控该通道,之所以有这样的结论,主要是用户态与内核态的切换要耗一定的时间,交由操作系统层面监控,可以减少不必要的切换;
同时也需要讲解的“零拷贝”这个俗语,其原理是在内核态使用的内存中划一块内存作为共享内存,用户态与内核态共同访问,这样子可以避免用户态的数据需要拷贝到内核态内存中,这样子内核态才能访问。
在 Selector 模型,主要是对三个类型操作,其类图如下:
1. Selector
创建 Selector 是通过 Selector.open 方法去创建,而底层最终是交给 SelectorProvider 的实现创建;
关闭 Selector 是通过 Selector.close 方法去关闭;
其属性我这边没有过多对其源码进行解读,所以就不在此班门弄斧了;
2. SelectableChannel 抽象类
创建:SelectableChannel 子类的创建是由各个子类中的 open 方法进行创建,其底层最终是交给 SelectorProvider 来创建;
相关的配置,具体可以对应的方法进行配置即可。
关闭:直接调用 close 方法即可
3. SelectionKey
其是将 Selector 与 SelectableChannel 关联起来;
事件:OP_READ(就绪读事件)、OP_WRITE(就绪写事件)、OP_CONNECT(就绪连接事件)、OP_ACCEPT(就绪接收事件);
我们感兴趣的事件标志都保存到 interestOps,已经就绪的事件标志都保存到 readyOps;
三. 配置项
1. SocketChannel 参数
通过 SocketChannel.setOption 方法进行添加参数;其内部维护了一个内部类,只允许部分的参数有效;SocketChannelImpl 类中的 DefaultOptionsHolder。我们简单了解一下参数的意义:
2. ServerSocketChannel 参数
与 SocketChannel 参数的介绍差不多,同样也维护了一个内部类:DefaultOptionsHolder。
3. DatagramChannel 参数
与 SocketChannel 参数的介绍差不多,同样也维护了一个内部类:DefaultOptionsHolder。
四. 实战
1. 客户端
public class Client { public static void main(String [] args) throws IOException, InterruptedException { Selector selector = Selector.open(); SocketChannel socketChannel = SocketChannel.open(); socketChannel.setOption(StandardSocketOptions.TCP_NODELAY,true); socketChannel.connect(new InetSocketAddress("127.0.0.1", 6666)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_WRITE); ByteBuffer buffer = ByteBuffer.allocate(10); for (int i = 0; i <1 ; i++) { buffer.clear(); buffer.put("hello5555".getBytes()); buffer.flip(); socketChannel.write(buffer); Thread.sleep(10000); } socketChannel.socket().close(); socketChannel.close(); }}
复制代码
2. 服务端
public class TimeServer { public static void main(String[] args) throws Exception { int port = 6666; MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start(); }}public class MultiplexerTimeServer implements Runnable{ private Selector selector; private ServerSocketChannel serverSocketChannel; private volatile boolean stop;
public MultiplexerTimeServer(int port) throws Exception { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port),1024); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println(" time server have configured "); } public void stop(){ this.stop = true; } public void run(){ while(!stop){ try{ int cnt = selector.select(1000); if(cnt>0){ Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); iterator.remove(); try { processKey(selectionKey); } catch (Exception e) { e.printStackTrace(); System.out.println(" 异常 "+e); selectionKey.cancel(); try { selectionKey.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } } } }else{ //说明没有就绪事件,可以先休眠 System.out.println(" 休眠 "+LocalDateTime.now()); Thread.sleep(5000); } }catch (Exception ex){ ex.printStackTrace(); } } if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } }
private void processKey(SelectionKey selectionKey) throws Exception{ if(selectionKey.isValid()){ if(selectionKey.isAcceptable()){ ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = channel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector,SelectionKey.OP_READ); } if(selectionKey.isReadable()){ SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int readBytes = channel.read(buffer); System.out.println("读取长度: "+readBytes ); if(readBytes>0){ buffer.flip(); byte [] revContent = new byte[buffer.remaining()]; buffer.get(revContent); System.out.println("receive data from client: "+ new String(revContent,"UTF-8"));
doWrite(channel); }else if(readBytes<0){ //关闭事件 System.out.println(" 拦截已经关闭。。。。 "); selectionKey.cancel(); channel.close(); } } } }
private void doWrite(SocketChannel channel) throws Exception { String date = LocalDateTime.now().toString(); ByteBuffer buffer = ByteBuffer.wrap(date.getBytes()); channel.write(buffer); }}
复制代码
评论