一. 概述
在以前开发服务器时,都是通过用户态线程循环遍历连接请求,一个请求获取一个 socket 对象,接着通过 socket 对象获取请求数据,以及通过 socket 写入数据返回给客户端;而这种模式是阻塞型的,性能低效;现在操作系统层面提供了 Selector 模式,对网络层的连接提供高效的操作,其具体原理交由操作系统层面去监控这些 socket;具体原理介绍,可以去查阅聊聊Linux 五种IO模型
二. Buffer 家族
上面省略了相关的子类,如 LongBuffer、CharBuffer 等;然而其与 ByteBuffer 的结构差不多,只是 hb 的类型换了而已;那么我们仔细对属性进行简单介绍一下:
position 指的当前的数组的位置,可以理解为指针(游标),每次写入一个字节时,就会+1;
limit 限制大小,写入最大的位置,读取最大的位置,常规情况与 capacity 是一样的;
capacity 是当前数组容量,在分配 buffer 时,就填入,过程中是不能更改的,只有 truncate 方法被更改,但我不是很明白该方法的具体使用场景;
mark 像是给当前 buffer 打上一个标志,其值就是记录当前的 position 的位置,当调用 reset 方法可以回滚到之前打上标志的位置;
address 记录当前 buffer 的地址,通过 Unsafe 获取子类的数组的起始地址;如果是堆内内存的,则是通过 Unsafe 获取到该数组的地址;如果是对外内存的(MappedByteBuffer)的,则通过文件句柄获取到该地址;
byteBuffer 下的属性就就不过多介绍,字眼都能猜测出什么意思;堆外内存的,是不归垃圾回收器处理的,需要程序自行释放该内存,我们看到 DirectByteBuffer 中有 cleaner 属性,通过获取到该属性就调用该类的方法进行内存释放;
我们在常用的 buffer 类中的方法有如下:flip ,rewind,slice ,reset,clear 等;具体可以看源码就即可明白了;我们常规创建 Buffe 时,都是通过 allocateDirect, allocate 方法进行创建;稍微提醒的是:将 buffer 的内容写入对应的目标对象,都需要先调用 flip 方法;
具体的详细的可以参考网络的;不过多详细介绍;
三. Selector 模型
Selector 是 IO 多路复用模型。
其核心原理是交给操作系统层面去监控该通道,之所以有这样的结论,主要是用户态与内核态的切换要耗一定的时间,交由操作系统层面监控,可以减少不必要的切换;
同时也需要讲解的“零拷贝”这个俗语,其原理是在内核态使用的内存中划一块内存作为共享内存,用户态与内核态共同访问,这样子可以避免用户态的数据需要拷贝到内核态内存中,这样子内核态才能访问。
在 Selector 模型,主要是对三个类型操作,其类图如下:
1. Selector
创建 Selector 是通过 Selector.open 方法去创建,而底层最终是交给 SelectorProvider 的实现创建;
关闭 Selector 是通过 Selector.close 方法去关闭;
其属性我这边没有过多对其源码进行解读,所以就不在此班门弄斧了;
2. SelectableChannel 抽象类
创建:SelectableChannel 子类的创建是由各个子类中的 open 方法进行创建,其底层最终是交给 SelectorProvider 来创建;
相关的配置,具体可以对应的方法进行配置即可。
关闭:直接调用 close 方法即可
3. SelectionKey
其是将 Selector 与 SelectableChannel 关联起来;
事件:OP_READ(就绪读事件)、OP_WRITE(就绪写事件)、OP_CONNECT(就绪连接事件)、OP_ACCEPT(就绪接收事件);
我们感兴趣的事件标志都保存到 interestOps,已经就绪的事件标志都保存到 readyOps;
三. 配置项
1. SocketChannel 参数
通过 SocketChannel.setOption 方法进行添加参数;其内部维护了一个内部类,只允许部分的参数有效;SocketChannelImpl 类中的 DefaultOptionsHolder。我们简单了解一下参数的意义:
2. ServerSocketChannel 参数
与 SocketChannel 参数的介绍差不多,同样也维护了一个内部类:DefaultOptionsHolder。
3. DatagramChannel 参数
与 SocketChannel 参数的介绍差不多,同样也维护了一个内部类:DefaultOptionsHolder。
四. 实战
1. 客户端
public class Client {
public static void main(String [] args) throws IOException, InterruptedException {
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY,true);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 6666));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_WRITE);
ByteBuffer buffer = ByteBuffer.allocate(10);
for (int i = 0; i <1 ; i++) {
buffer.clear();
buffer.put("hello5555".getBytes());
buffer.flip();
socketChannel.write(buffer);
Thread.sleep(10000);
}
socketChannel.socket().close();
socketChannel.close();
}
}
复制代码
2. 服务端
public class TimeServer {
public static void main(String[] args) throws Exception {
int port = 6666;
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
}
}
public class MultiplexerTimeServer implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean stop;
public MultiplexerTimeServer(int port) throws Exception {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println(" time server have configured ");
}
public void stop(){
this.stop = true;
}
public void run(){
while(!stop){
try{
int cnt = selector.select(1000);
if(cnt>0){
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
iterator.remove();
try {
processKey(selectionKey);
} catch (Exception e) {
e.printStackTrace();
System.out.println(" 异常 "+e);
selectionKey.cancel();
try {
selectionKey.channel().close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}else{
//说明没有就绪事件,可以先休眠
System.out.println(" 休眠 "+LocalDateTime.now());
Thread.sleep(5000);
}
}catch (Exception ex){
ex.printStackTrace();
}
}
if(selector!=null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void processKey(SelectionKey selectionKey) throws Exception{
if(selectionKey.isValid()){
if(selectionKey.isAcceptable()){
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ);
}
if(selectionKey.isReadable()){
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int readBytes = channel.read(buffer);
System.out.println("读取长度: "+readBytes );
if(readBytes>0){
buffer.flip();
byte [] revContent = new byte[buffer.remaining()];
buffer.get(revContent);
System.out.println("receive data from client: "+ new String(revContent,"UTF-8"));
doWrite(channel);
}else if(readBytes<0){
//关闭事件
System.out.println(" 拦截已经关闭。。。。 ");
selectionKey.cancel();
channel.close();
}
}
}
}
private void doWrite(SocketChannel channel) throws Exception {
String date = LocalDateTime.now().toString();
ByteBuffer buffer = ByteBuffer.wrap(date.getBytes());
channel.write(buffer);
}
}
复制代码
评论