写点什么

网络编程四 - 原生 JDK 的 NIO 及其应用

  • 2021 年 11 月 12 日
  • 本文字数:12592 字

    阅读完需:约 41 分钟

缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存( 其实就是数组)。这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。

2.3.1 buffer 重要属性

缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。


为了理解 Buffer 的工作原理,需要熟悉它的三个属性:


  • capacity

  • position

  • limit


position 和 limit 的含义取决于 Buffer 处在读模式还是写模式。不管 Buffer 处在什么模式,capacity 的含义总是一样的。


这里有一个关于 capacity,position 和 limit 在读写模式中的说明,详细的解释在插图后面。



capacity


作为一个内存块,Buffer 有一个固定的大小值,也叫“capacity”.你只能往里写 capacity 个 byte、long,char 等类型。一旦 Buffer 满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据。


position


当你写数据到 Buffer 中时,position 表示当前的位置。初始的 position 值为 0.当一个 byte、long 等数据写到 Buffer 后, position 会向前移动到下一个可插入数据的 Buffer 单元。position 最大可为 capacity – 1.


当读取数据时,也是从某个特定位置读。当将 Buffer 从写模式切换到读模式,position 会被重置为 0. 当从 Buffer 的 position 处读取数据时,position 向前移动到下一个可读的位置。


limit


在写模式下,Buffer 的 limit 表示你最多能往 Buffer 里写多少数据。 写模式下,limit 等于 Buffer 的 capacity。


当切换 Buffer 到读模式时, limit 表示你最多能读到多少数据。因此,当切换 Buffer 到读模式时,limit 会被设置成写模式下的 position 值。换句话说,你能读到之前写入的所有数据(limit 被设置成已写数据的数量,这个值在写模式下就是 position)

2.3.2 Buffer 的分配

堆内内存


要想获得一个 Buffer 对象首先要进行分配。 每一个 Buffer 类都有 allocate 方法(可以在堆上分配,也可以在直接内存上分配)。


分配 48 字节 capacity 的 ByteBuffer 的例子:ByteBuffer buf = ByteBuffer.allocate(48);


分配一个可存储 1024 个字符的 CharBuffer:CharBuffer buf = CharBuffer.allocate(1024);


wrap 方法:把一个 byte 数组或 byte 数组的一部分包装成 ByteBuffer:


ByteBuffer?wrap(byte []?array)


ByteBuffer?wrap(byte []?array,?int?offset,?int?length)


直接内存(堆外内存)


HeapByteBuffer 与 DirectByteBuffer,在原理上,前者可以看出分配的 buffer 是在 heap 区域的,其实真正 flush 到远程的时候会先拷贝到直接内存,再做下一步操作;在 NIO 的框架下,很多框架会采用 DirectByteBuffer 来操作,这样分配的内存不再是在 java heap 上,而是在操作系统的 C heap 上,经过性能测试,可以得到非常快速的网络交互,在大量的网络交互下,一般速度会比 HeapByteBuffer 要快速好几倍。


直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是 Java 虚拟机规范中定义的内存区域,但是这部分内存也被频繁地使用,而且也可能导致 OutOfMemoryError 异常出现。?


NIO 可以使用 Native 函数库直接分配堆外内存,然后通过一个存储在 Java 堆里面的 DirectByteBuffer 对象作为这块内存的引用进行操作。这样能在一些场景中显著提高性能,因为避免了在 Java 堆和 Native 堆中来回复制数据。


堆内内存和对外内存分配 code:


/**


  • @author DarkKing

  • 类说明:Buffer 的分配


*/


public class AllocateBuffer {


public static void main(String[] args) {


System.out.println("----------Test allocate--------");


System.out.println("before alocate:"


  • Runtime.getRuntime().freeMemory());


//堆上分配


ByteBuffer buffer = ByteBuffer.allocate(1024000);


System.out.println("buffer = " + buffer);


System.out.println("after alocate:"


  • Runtime.getRuntime().freeMemory());


// 直接内存分配


ByteBuffer directBuffer = ByteBuffer.allocateDirect(102400);


System.out.println("directBuffer = " + directBuffer);


System.out.println("after direct alocate:"


  • Runtime.getRuntime().freeMemory());


System.out.println("----------Test wrap--------");


byte[] bytes = new byte[32];


buffer = ByteBuffer.wrap(bytes);


System.out.println(buffer);


buffer = ByteBuffer.wrap(bytes, 10, 10);


System.out.println(buffer);


}


}


堆外内存的优点和缺点


堆外内存,其实就是不受 JVM 控制的内存。相比于堆内内存有几个优势:?


1 减少了垃圾回收的工作,因为垃圾回收会暂停其他的工作(可能使用多线程或者时间片的方式,根本感觉不到)?


2 加快了复制的速度。因为堆内在 flush 到远程时,会先复制到直接内存(非堆内存),然后在发送;而堆外内存相当于省略掉了这个工作。(零拷贝原理)?


而福之祸所依,自然也有不好的一面:?


1 堆外内存难以控制,如果内存泄漏,那么很难排查?


2 堆外内存相对来说,不适合存储很复杂的对象。一般简单的对象或者扁平化的比较适合。


直接内存(堆外内存)与堆内存比较


直接内存申请空间耗费更高的性能,当频繁申请到一定量时尤为明显


直接内存 IO 读写的性能要优于普通的堆内存,在多次读写操作的情况下差异明显


性能测试


/**


  • @author DarkKing

  • 类说明:


*/


public class ByteBufferCompare {


public static void main(String[] args) {


allocateCompare(); //分配比较


operateCompare(); //读写比较


}


/**


  • 直接内存 和 堆内存的 分配空间比较

  • 结论: 在数据量提升时,直接内存相比非直接内的申请,有很严重的性能问题


*/


public static void allocateCompare() {


int time = 10000000; //操作次数


long st = System.currentTimeMillis();


for (int i = 0; i < time; i++) {


//ByteBuffer.allocate(int capacity) 分配一个新的字节缓冲区。


ByteBuffer buffer = ByteBuffer.allocate(2); //非直接内存分配申请


}


long et = System.currentTimeMillis();


System.out.println("在进行" + time + "次分配操作时,堆内存 分配耗时:" + (et - st) + "ms");


long st_heap = System.currentTimeMillis();


for (int i = 0; i < time; i++) {


//ByteBuffer.allocateDirect(int capacity) 分配新的直接字节缓冲区。


ByteBuffer buffer = ByteBuffer.allocateDirect(2); //直接内存分配申请


}


long et_direct = System.currentTimeMillis();


System.out.println("在进行" + time + "次分配操作时,直接内存 分配耗时:" + (et_direct - st_heap) + "ms");


}


/**


  • 直接内存 和 堆内存的 读写性能比较

  • 结论:直接内存在直接的 IO 操作上,在频繁的读写时 会有显著的性能提升


*/


public static void operateCompare() {


int time = 100000000;


ByteBuffer buffer = ByteBuffer.allocate(2 * time);


long st = System.currentTimeMillis();


for (int i = 0; i < time; i++) {


// putChar(char value) 用来写入 char 值的相对 put 方法


buffer.putChar('a');


}


buffer.flip();


for (int i = 0; i < time; i++) {


buffer.getChar();


}


long et = System.currentTimeMillis();


System.out.println("在进行" + time + "次读写操作时,非直接内存读写耗时:" + (et - st) + "ms");


ByteBuffer buffer_d = ByteBuffer.allocateDirect(2 * time);


long st_direct = System.currentTimeMillis();


for (int i = 0; i < time; i++) {


// putChar(char value) 用来写入 char 值的相对 put 方法


buffer_d.putChar('a');


}


buffer_d.flip();


for (int i = 0; i < time; i++) {


buffer_d.getChar();


}


long et_direct = System.currentTimeMillis();


System.out.println("在进行" + time + "次读写操作时,直接内存读写耗时:" + (et_direct - st_direct) + "ms");


}


}


执行程序后



可以看到,


1、内存分配方面,在数据量提升时,直接内存相比非直接内的申请,有很严重的性能问题。


2、IO 读写方面,直接内存在直接的 IO 操作上,在频繁的读写时 会有显著的性能提升

2.3.3 Buffer 的读写

从 Buffer 中写数据


写数据到 Buffer 有两种方式:


  1. ?读取 Channel 写到 Buffer。

  2. ?通过 Buffer 的 put()方法写到 Buffer 里。


从 Channel 写到 Buffer 的例子 int?bytesRead = inChannel.read(buf);?//read into buffer.


通过 put 方法写 Buffer 的例子:buf.put(127);


put 方法有很多版本,允许你以不同的方式把数据写入到 Buffer 中。例如, 写到一个指定的位置,或者把一个字节数组写入到 Buffer。 更多 Buffer 实现的细节参考 JavaDoc。


**flip()**方法


flip 方法将 Buffer 从写模式切换到读模式。调用 flip()方法会将 position 设回 0,并将 limit 设置成之前 position 的值。


换句话说,position 现在用于标记读的位置,limit 表示之前写进了多少个 byte、char 等 —— 现在能读取多少个 byte、char 等。


从 Buffer 中读取数据


从 Buffer 中读取数据有两种方式:


  1. 从 Buffer 读取数据写入到 Channel。

  2. 使用 get()方法从 Buffer 中读取数据。


从 Buffer 读取数据到 Channel 的例子:int?bytesWritten = inChannel.write(buf);


使用 get()方法从 Buffer 中读取数据的例子:byte?aByte = buf.get();


get 方法有很多版本,允许你以不同的方式从 Buffer 中读取数据。例如,从指定 position 读取,或者从 Buffer 中读取数据到字节数组。更多 Buffer 实现的细节参考 JavaDoc。


使用 Buffer 读写数据常见步骤:


  1. 写入数据到 Buffer

  2. 调用 flip()方法

  3. 从 Buffer 中读取数据

  4. 调用 clear()方法或者 compact()方法


当向 buffer 写入数据时,buffer 会记录下写了多少数据。一旦要读取数据,需要通过 flip()方法将 Buffer 从写模式切换到读模式。在读模式下,可以读取之前写入到 buffer 的所有数据。


一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用 clear()或 compact()方法。clear()方法会清空整个缓冲区。compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。

2.3.4 buffer 其他常用方法

**rewind()**方法


Buffer.rewind()将 position 设回 0,所以你可以重读 Buffer 中的所有数据。limit 保持不变,仍然表示能从 Buffer 中读取多少个元素(byte、char 等)。


**clear()compact()**方法


一旦读完 Buffer 中的数据,需要让 Buffer 准备好再次被写入。可以通过 clear()或 compact()方法来完成。


如果调用的是 clear()方法,position 将被设回 0,limit 被设置成 capacity 的值。换句话说,Buffer 被清空了。Buffer 中的数据并未清除,只是这些标记告诉我们可以从哪里开始往 Buffer 里写数据。


如果 Buffer 中有一些未读的数据,调用 clear()方法,数据将“被遗忘”,意味着不再有任何标记会告诉你哪些数据被读过,哪些还没有。


如果 Buffer 中仍有未读的数据,且后续还需要这些数据,但是此时想要先先写些数据,那么使用 compact()方法。


compact()方法将所有未读的数据拷贝到 Buffer 起始处。然后将 position 设到最后一个未读元素正后面。limit 属性依然像 clear()方法一样,设置成 capacity。现在 Buffer 准备好写数据了,但是不会覆盖未读的数据。


**mark()reset()**方法


通过调用 Buffer.mark()方法,可以标记 Buffer 中的一个特定 position。之后可以通过调用 Buffer.reset()方法恢复到这个 position。例如:


buffer.mark();//call buffer.get() a couple of times, e.g. during parsing.


buffer.reset(); //set position back to mark.


**equals()compareTo()**方法


可以使用 equals()和 compareTo()方法两个 Buffer。


equals()


当满足下列条件时,表示两个 Buffer 相等:


  1. 有相同的类型(byte、char、int 等)。

  2. Buffer 中剩余的 byte、char 等的个数相等。

  3. Buffer 中所有剩余的 byte、char 等都相同。


如你所见,equals 只是比较 Buffer 的一部分,不是每一个在它里面的元素都比较。实际上,它只比较 Buffer 中的剩余元素。


**compareTo()**方法


compareTo()方法比较两个 Buffer 的剩余元素(byte、char 等), 如果满足下列条件,则认为一个 Buffer“小于”另一个 Buffer:


  1. 第一个不相等的元素小于另一个 Buffer 中对应的元素 。

  2. 所有元素都相等,但第一个 Buffer 比另一个先耗尽(第一个 Buffer 的元素个数比另一个少)。


Buffer 方法总结


<table border="1" cellspacing="0"><tbody><tr><td><p>limit(), limit(10)等</p></td><td><p>其中读取和设置这 4 个属性的方法的命名和 jQuery 中的 val(),val(10)类似,一个负责 get,一个负责 set</p></td></tr><tr><td><p>reset()</p></td><td><p>把 position 设置成 mark 的值,相当于之前做过一个标记,现在要退回到之前标记的地方</p></td></tr><tr><td><p>clear()</p></td><td><p>position = 0;limit = capacity;mark = -1;  有点初始化的味道,但是并不影响底层 byte 数组的内容</p></td></tr><tr><td><p>flip()</p></td><td><p>limit = position;position = 0;mark = -1;  翻转,也就是让 flip 之后的 position 到 limit 这块区域变成之前的 0 到 position 这块,翻转就是将一个处于存数据状态的缓冲区变为一个处于准备取数据的状态</p></td></tr><tr><td><p>rewind()</p></td><td><p>把 position 设为 0,mark 设为-1,不改变 limit 的值</p></td></tr><tr><td><p>remaining()</p></td><td><p>return limit - position;返回 limit 和 position 之间相对位置差</p></td></tr><tr><td><p>hasRemaining()</p></td><td><p>return position < limit 返回是否还有未读内容</p></td></tr><tr><td><p>compact()</p></td><td><p>把从 position 到 limit 中的内容移到 0 到 limit-position 的区域内,position 和 limit 的取值也分别变成 limit-position、capacity。如果先将 positon 设置到 limit,再 compact,那么相当于 clear()</p></td></tr><tr><td><p>get()</p></td><td><p>相对读,从 position 位置读取一个 byte,并将 position+1,为下次读写作准备</p></td></tr><tr><td><p>get(int index)</p></td><td><p>绝对读,读取 byteBuffer 底层的 bytes 中下标为 index 的 byte,不改变 position</p></td></tr><tr><td><p>get(byte[] dst, int offset, int length)</p></td><td><p>从 position 位置开始相对读,读 length 个 byte,并写入 dst 下标从 offset 到 offset+length 的区域</p></td></tr><tr><td><p>put(byte b)</p></td><td><p>相对写,向 position 的位置写入一个 byte,并将 postion+1,为下次读写作准备</p></td></tr><tr><td><p>put(int index, byte b)</p></td><td><p>绝对写,向 byteBuffer 底层的 bytes 中下标为 index 的位置插入 byte b,不改变 position</p></td></tr><tr><td><p>put(ByteBuffer src)</p></td><td><p>用相对写,把 src 中可读的部分(也就是 position 到 limit)写入此 byteBuffer</p></td></tr><tr><td><p>put(byte[] src, int offset, int length)</p></td><td><p>从 src 数组中的 offset 到 offset+length 区域读取数据并使用相对写写入此 byteBuffer</p></td></tr></tbody></table>


buffer 方法演示


/**


  • @author DarkKing

  • 类说明:Buffer 方法演示


*/


public class BufferMethod {


public static void main(String[] args) {


System.out.println("------Test get-------------");


ByteBuffer buffer = ByteBuffer.allocate(32);


buffer.put((byte) 'a')//0


.put((byte) 'b')//1


.put((byte) 'c')//2


.put((byte) 'd')//3


.put((byte) 'e')//4


.put((byte) 'f');//5


System.out.println("before flip()" + buffer);


/* 转换为读取模式*/


buffer.flip();


System.out.println("before get():" + buffer);


System.out.println((char) buffer.get());


System.out.println("after get():" + buffer);


/* get(index)不影响 position 的值*/


System.out.println((char) buffer.get(2));


System.out.println("after get(index):" + buffer);


byte[] dst = new byte[10];


/* position 移动两位*/


buffer.get(dst, 0, 2);


/这里的 buffer 是 abcdef[pos=3 lim=6 cap=32]/


System.out.println("after get(dst, 0, 2):" + buffer);


System.out.println("dst:" + new String(dst));


System.out.println("--------Test put-------");


ByteBuffer bb = ByteBuffer.allocate(32);


System.out.println("before put(byte):" + bb);


System.out.println("after put(byte):" + bb.put((byte) 'z'));


// put(2,(byte) 'c')不改变 position 的位置


bb.put(2, (byte) 'c');


System.out.println("after put(2,(byte) 'c'):" + bb);


System.out.println(new String(bb.array()));


// 这里的 buffer 是 abcdef[pos=3 lim=6 cap=32]


bb.put(buffer);


System.out.println("after put(buffer):" + bb);


System.out.println(new String(bb.array()));


System.out.println("--------Test reset----------");


buffer = ByteBuffer.allocate(20);


System.out.println("buffer = " + buffer);


buffer.clear();


buffer.position(5);//移动 position 到 5


buffer.mark();//记录当前 position 的位置


buffer.position(10);//移动 position 到 10


System.out.println("before reset:" + buffer);


buffer.reset();//复位 position 到记录的地址


System.out.println("after reset:" + buffer);


System.out.println("--------Test rewind--------");


buffer.clear();


buffer.position(10);//移动 position 到 10


buffer.limit(15);//限定最大可写入的位置为 15


System.out.println("before rewind:" + buffer);


buffer.rewind();//将 position 设回 0


System.out.println("before rewind:" + buffer);


System.out.println("--------Test compact--------");


buffer.clear();


//放入 4 个字节,position 移动到下个可写入的位置,也就是 4


buffer.put("abcd".getBytes());


System.out.println("before compact:" + buffer);


System.out.println(new String(buffer.array()));


buffer.flip();//将 position 设回 0,并将 limit 设置成之前 position 的值


System.out.println("after flip:" + buffer);


//从 Buffer 中读取数据的例子,每读一次,position 移动一次


System.out.println((char) buffer.get());


System.out.println((char) buffer.get());


System.out.println((char) buffer.get());


System.out.println("after three gets:" + buffer);


System.out.println(new String(buffer.array()));


//compact()方法将所有未读的数据拷贝到 Buffer 起始处。


// 然后将 position 设到最后一个未读元素正后面。


buffer.compact();


System.out.println("after compact:" + buffer);


System.out.println(new String(buffer.array()));


}


}


三、NIO 之 Reactor 模式


===============


“反应”器名字中”反应“的由来:


“反应”即“倒置”,“控制逆转”,具体事件处理程序不调用反应器,而向反应器注册一个事件处理器,表示自己对某些事件感兴趣,有时间来了,具体事件处理程序通过事件处理器对某个指定的事件发生做出反应;这种控制逆转又称为“好莱坞法则”(不要调用我,让我来调用你)


NIO 为实现 Reactor 模式提供了基础,上面的 NIO 图示其实就是 Reactor 模式的雏形,只是 Reactor 以 OO 的方式抽象出了几个概念,使得职责划分更加明确。


  • Reactor:Reactor 是 IO 事件的派发者,对应 NIO 的 Selector;

  • Acceptor:Acceptor 接受 client 连接,建立对应 client 的 Handler,并向 Reactor 注册此 Handler,对应 NIO 中注册 Channel 和事件触发时的判断分支(上述 NIO 服务端示例代码的 38-46 行);

  • Handler:IO 处理类,对应 NIO 中 Channel[使用 socket]操作 Buffer 的过程。

3.1 单线程 Reactor 模式流程


  1. 服务器端的 Reactor 是一个线程对象,该线程会启动事件循环,并使用 Selector(选择器)来实现 IO 的多路复用。注册一个 Acceptor 事件处理器到 Reactor 中,Acceptor 事件处理器所关注的事件是 ACCEPT 事件,这样 Reactor 会监听客户端向服务器端发起的连接请求事件(ACCEPT 事件)。

  2. ?客户端向服务器端发起一个连接请求,Reactor 监听到了该 ACCEPT 事件的发生并将该 ACCEPT 事件派发给相应的 Acceptor 处理器来进行处理。Acceptor 处理器通过 accept()方法得到与这个客户端对应的连接(SocketChannel),然后将该连接所关注的 READ 事件以及对应的 READ 事件处理器注册到 Reactor 中,这样一来 Reactor 就会监听该连接的 READ 事件了。

  3. ?当 Reactor 监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理。比如,读处理器会通过 SocketChannel 的 read()方法读取数据,此时 read()操作可以直接读取到数据,而不会堵塞与等待可读的数据到来。

  4. 每当处理完所有就绪的感兴趣的 I/O 事件后,Reactor 线程会再次执行 select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理。


注意,Reactor 的单线程模式的单线程主要是针对于 I/O 操作而言,也就是所有的 I/O 的 accept()、read()、write()以及 connect()操作都在一个线程上完成的。



但在目前的单线程 Reactor 模式中,不仅 I/O 操作在该 Reactor 线程上,连非 I/O 的业务操作也在该线程上进行处理了,这可能会大大延迟 I/O 请求的响应。所以我们应该将非 I/O 的业务逻辑操作从 Reactor 线程上卸载,以此来加速 Reactor 线程对 I/O 请求的响应。

3.2 单线程 Reactor,工作者线程池

与单线程 Reactor 模式不同的是,添加了一个工作者线程池,并将非 I/O 操作从 Reactor 线程中移出转交给工作者线程池来执行。这样能够提高 Reactor 线程的 I/O 响应,不至于因为一些耗时的业务逻辑而延迟对后面 I/O 请求的处理。



使用线程池的优势:


  1. ?通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程产生的巨大开销。

  2. ?另一个额外的好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。

  3. ?通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态。同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。


改进的版本中,所以的 I/O 操作依旧由一个 Reactor 来完成,包括 I/O 的 accept()、read()、write()以及 connect()操作。


对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发或大数据量的应用场景却不合适,主要原因如下:


  1. ?一个 NIO 线程同时处理成百上千的链路,性能上无法支撑,即便 NIO 线程的 CPU 负荷达到 100%,也无法满足海量消息的读取和发送;

  2. 当 NIO 线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了 NIO 线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;

3.3 多 Reactor 线程模式

Reactor 线程池中的每一 Reactor 线程都会有自己的 Selector、线程和分发的事件循环逻辑。


mainReactor 可以只有一个,但 subReactor 一般会有多个。mainReactor 线程主要负责接收客户端的连接请求,然后将接收到的 SocketChannel 传递给 subReactor,由 subReactor 来完成和客户端的通信。



流程:


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


  1. 注册一个 Acceptor 事件处理器到 mainReactor 中,Acceptor 事件处理器所关注的事件是 ACCEPT 事件,这样 mainReactor 会监听客户端向服务器端发起的连接请求事件(ACCEPT 事件)。启动 mainReactor 的事件循环。

  2. ?客户端向服务器端发起一个连接请求,mainReactor 监听到了该 ACCEPT 事件并将该 ACCEPT 事件派发给 Acceptor 处理器来进行处理。Acceptor 处理器通过 accept()方法得到与这个客户端对应的连接(SocketChannel),然后将这个 SocketChannel 传递给 subReactor 线程池。

  3. subReactor 线程池分配一个 subReactor 线程给这个 SocketChannel,即,将 SocketChannel 关注的 READ 事件以及对应的 READ 事件处理器注册到 subReactor 线程中。当然你也注册 WRITE 事件以及 WRITE 事件处理器到 subReactor 线程中以完成 I/O 写操作。Reactor 线程池中的每一 Reactor 线程都会有自己的 Selector、线程和分发的循环逻辑。

  4. 当有 I/O 事件就绪时,相关的 subReactor 就将事件派发给响应的处理器处理。注意,这里 subReactor 线程只负责完成 I/O 的 read()操作,在读取到数据后将业务逻辑的处理放入到线程池中完成,若完成业务逻辑后需要返回数据给客户端,则相关的 I/O 的 write 操作还是会被提交回 subReactor 线程来完成。


注意,所以的 I/O 操作(包括,I/O 的 accept()、read()、write()以及 connect()操作)依旧还是在 Reactor 线程(mainReactor 线程 或 subReactor 线程)中完成的。Thread Pool(线程池)仅用来处理非 I/O 操作的逻辑。


多 Reactor 线程模式将“接受客户端的连接请求”和“与该客户端的通信”分在了两个 Reactor 线程来完成。mainReactor 完成接收客户端连接请求的操作,它不负责与客户端的通信,而是将建立好的连接转交给 subReactor 线程来完成与客户端的通信,这样一来就不会因为 read()数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多 Reactor 线程模式在海量的客户端并发请求的情况下,还可以通过实现 subReactor 线程池来将海量的连接分发给多个 subReactor 线程,在多核的操作系统中这能大大提升应用的负载和吞吐量。

3.4 和观察者模式的区别

观察者模式:


也可以称为为 发布-订阅 模式,主要适用于多个对象依赖某一个对象的状态并,当某对象状态发生改变时,要通知其他依赖对象做出更新。是一种一对多的关系。当然,如果依赖的对象只有一个时,也是一种特殊的一对一关系。通常,观察者模式适用于消息事件处理,监听者监听到事件时通知事件处理者对事件进行处理(这一点上面有点像是回调,容易与反应器模式和前摄器模式的回调搞混淆)。


Reactor****模式:


reactor 模式,即反应器模式,是一种高效的异步 IO 模式,特征是回调,当 IO 完成时,回调对应的函数进行处理。这种模式并非是真正的异步,而是运用了异步的思想,当 IO 事件触发时,通知应用程序作出 IO 处理。模式本身并不调用系统的异步 IO 函数。


reactor 模式与观察者模式有点像。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。当一个主体发生改变时,所有依属体都得到通知。


四、NIO 使用举例


=========


4.1、NioServerHandle




Nio 通信服务端处理器


/**


  • @author DarkKing

  • 类说明:nio 通信服务端处理器


*/


public class NioServerHandle implements Runnable {


private Selector selector;


private ServerSocketChannel serverChannel;


private volatile boolean started;


/**


  • 构造方法

  • @param port 指定要监听的端口号


*/


public NioServerHandle(int port) {


try {


//创建选择器


selector = Selector.open();


//打开监听通道


serverChannel = ServerSocketChannel.open();


//如果为 true,则此通道将被置于阻塞模式;


// 如果为 false,则此通道将被置于非阻塞模式


serverChannel.configureBlocking(false);//开启非阻塞模式


serverChannel.socket().bind(new InetSocketAddress(port));


serverChannel.register(selector, SelectionKey.OP_ACCEPT);


//标记服务器已开启


started = true;


System.out.println("服务器已启动,端口号:" + port);


} catch (IOException e) {


e.printStackTrace();


System.exit(1);


}


}


public void stop() {


started = false;


}


@Override


public void run() {


//循环遍历 selector


while (started) {


try {


//阻塞,只有当至少一个注册的事件发生的时候才会继续.


selector.select();


Set<SelectionKey> keys = selector.selectedKeys();


Iterator<SelectionKey> it = keys.iterator();


SelectionKey key = null;


while (it.hasNext()) {


key = it.next();


it.remove();


try {


handleInput(key);


} catch (Exception e) {


if (key != null) {


key.cancel();


if (key.channel() != null) {


key.channel().close();


}


}


}


}


} catch (Throwable t) {


t.printStackTrace();


}


}


//selector 关闭后会自动释放里面管理的资源


if (selector != null)


try {


selector.close();


} catch (Exception e) {


e.printStackTrace();


}


}


private void handleInput(SelectionKey key) throws IOException {


if (key.isValid()) {


//处理新接入的请求消息


if (key.isAcceptable()) {


ServerSocketChannel ssc = (ServerSocketChannel) key.channel();


SocketChannel sc = ssc.accept();


System.out.println("=======建立连接===");


sc.configureBlocking(false);


sc.register(selector, SelectionKey.OP_READ);


}


//读消息


if (key.isReadable()) {


System.out.println("======socket channel 数据准备完成," +


"可以去读==读取=======");


SocketChannel sc = (SocketChannel) key.channel();


//创建 ByteBuffer,并开辟一个 1M 的缓冲区


ByteBuffer buffer = ByteBuffer.allocate(1024);


//读取请求码流,返回读取到的字节数


int readBytes = sc.read(buffer);


//读取到字节,对字节进行编解码


if (readBytes > 0) {


//将缓冲区当前的 limit 设置为 position,position=0,


// 用于后续对缓冲区的读取操作


buffer.flip();


//根据缓冲区可读字节数创建字节数组


byte[] bytes = new byte[buffer.remaining()];


//将缓冲区可读字节数组复制到新建的数组中


buffer.get(bytes);


String message = new String(bytes, "UTF-8");


System.out.println("服务器收到消息:" + message);


//处理数据


String result = Const.response(message);


//发送应答消息


doWrite(sc, result);


}


//链路已经关闭,释放资源


else if (readBytes < 0) {


key.cancel();


sc.close();


}


}


}


}


//发送应答消息


private void doWrite(SocketChannel channel, String response)


throws IOException {


//将消息编码为字节数组


byte[] bytes = response.getBytes();


//根据数组容量创建 ByteBuffer


ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);


//将字节数组复制到缓冲区


writeBuffer.put(bytes);


//flip 操作


writeBuffer.flip();


//发送缓冲区的字节数组


channel.write(writeBuffer);


}


}


4.2、NioServer




nio 通信服务端


/**


  • @author DarkKing

  • 类说明:nio 通信服务端


*/


public class NioServer {


private static NioServerHandle nioServerHandle;


public static void start() {


if (nioServerHandle != null)


nioServerHandle.stop();


nioServerHandle = new NioServerHandle(Const.DEFAULT_PORT);


new Thread(nioServerHandle, "Server").start();


}


public static void main(String[] args) {


start();


}


}


4.3NioClientHandlenio




通信客户端处理器


/**


  • @author DarkKing

  • 类说明:nio 通信客户端处理器


*/


public class NioClientHandle implements Runnable {


private String host;


private int port;


private volatile boolean started;


private Selector selector;


private SocketChannel socketChannel;


public NioClientHandle(String ip, int port) {


this.host = ip;


this.port = port;


try {


/创建选择器/


this.selector = Selector.open();


/打开监听通道/


socketChannel = SocketChannel.open();


/*如果为 true,则此通道将被置于阻塞模式;


  • 如果为 false,则此通道将被置于非阻塞模式

  • 缺省为 true*/


socketChannel.configureBlocking(false);


started = true;


} catch (IOException e) {


e.printStackTrace();


System.exit(-1);


}


}


public void stop() {


started = false;


}


@Override


public void run() {


//连接服务器


try {


doConnect();


} catch (IOException e) {


e.printStackTrace();


System.exit(-1);


}


/循环遍历 selector/


while (started) {


try {


/阻塞方法,当至少一个注册的事件发生的时候就会继续/


selector.select();


/获取当前有哪些事件可以使用/


Set<SelectionKey> keys = selector.selectedKeys();


/转换为迭代器/


Iterator<SelectionKey> it = keys.iterator();


SelectionKey key = null;


while (it.hasNext()) {


key = it.next();


/*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。


如果我们没有删除处理过的键,那么它仍然会在事件集合中以一个激活


的键出现,这会导致我们尝试再次处理它。*/

评论

发布
暂无评论
网络编程四-原生JDK的NIO及其应用