写点什么

Day386&387

  • 2022 年 5 月 01 日
  • 本文字数:7675 字

    阅读完需:约 25 分钟

FileChannel fromChannel = aFile.getChannel();


RandomAccessFile bFile = new RandomAccessFile("d:\achang\02.txt", "rw");


FileChannel toChannel = bFile.getChannel();


//设置起始/结束位置


long position = 0;


long count = fromChannel.size();


//将 fromChannel 中全部的数据传到 toChannel 中


toChannel.transferFrom(fromChannel, position, count);


//关闭 channel 通道


aFile.close();


bFile.close();


System. out .println("over!");


}


}


方法的输入参数 position 表示从 position 处开始向目标文件写入数据,count 表示最多传输的字节数。


如果源通道的剩余空间小于 count 个字节,则所传输的字节数要小于请求的字节数。


此外要注意,在 SoketChannel 的实现中,SocketChannel 只会传输此刻准备好的数据(可能不足 count 字节)。


因此,SocketChannel 可能不会将请求的所有数据(count 个字节)全部传输到 FileChannel 中。


  • (2)transferTo()方法


transferTo()方法将数据从 FileChannel 传输到其他的 channel 中。


下面是一个 transferTo()方法的例子


public class FileChannelDemo {


public static void main(String args[]) throws Exception {


RandomAccessFile aFile = new RandomAccessFile("d:\achang\02.txt", "rw");


FileChannel fromChannel = aFile.getChannel();


RandomAccessFile bFile = new RandomAccessFile("d:\achang\03.txt", "rw");


FileChannel toChannel = bFile.getChannel();


long position = 0;


long count = fromChannel.size();


//将 fromChannel 传到 toChannel 中


fromChannel.transferTo(position, count, toChannel);


aFile.close();


bFile.close();


System. out .println("over!");


}


}


[](()2.5 Socket 通道—实现二




  • (1)新的 socket 通道类可以运行非阻塞模式并且是可选择的,可以激活大程序(如网络服务器和中间件组件)巨大的可伸缩性和灵活性。本节中我们会看到,再也没有为每个 socket 连接使用一个线程的必要了,也避免了管理大量线程所需的上下文交换开销。借助新的 NIO 类,一个或几个线程就可以管理成百上千的活动 socket 连接了并且只有很少甚至可能没有性能损失。


所有的 socket 通道类(DatagramChannel、SocketChannel 和 ServerSocketChannel)都继承了位于 java.nio.channels.spi 包中的AbstractSelectableChannel


这意味着我们可以用一个 Selector 对象来执行 socket 通道的就绪选择(readiness selection)。


  • (2)请注意 DatagramChannel 和 SocketChannel 实现定义读和写功能的接口而 ServerSocketChannel 不实现。ServerSocketChannel 负责监听传入的连接和创建新的 SocketChannel 对象它本身从不传输数据

  • (3)在我们具体讨论每一种 socket 通道前,您应该了解 socket 和 socket 通道之间的关系。


通道是一个连接 I/O 服-务导管并提供与该服务交互的方法。就某个 socket 而言,它不会再次实现与之对应的 socket 通道类中的 socket 协议 API,而 java.net 中已经存在的 socket 通道都可以被大多数协议操作重复使用。全部 socket 通道类(DatagramChannel、SocketChannel 和 ServerSocketChannel)在被实例化时都会创建一个对等 socket 对象。socket 不能被重复使用,channel 可以被重复使用


这些是我们所熟悉的来自 java.net 的类(Socket、ServerSocket 和 DatagramSocket),它们已经被更新以识别通道。对等 socket 可以通过调用 socket( )方法从一个通道上获取。此外,这三个 java.net 类现在都有 getChannel( )方法。


  • (4)要把一个 socket 通道置于非阻塞模式,我们要依靠所有 socket 通道类的公有超级类:SelectableChannel。就绪选择(readiness selection)是一种可以用来查询通道的机制,该查询可以判断通道是否准备好执行一个目标操作,如读或写。非阻塞 I/O 和可选择性是紧密相连的,那也正是管理阻塞模式的 API 代码要在 SelectableChannel 超级类中定义的原因。设置或重新设置一个通道的阻塞模式是很简单的,只要调用 configureBlocking( )方法即可,传递参数值为 true 则设为阻塞模式,参数值为 false 值设为非阻塞模式。可以通过调用 isBlocking( )方法来判断某个 socket 通道当前处于哪种模式。


AbstractSelectableChannel.java 中实现的 configureBlocking()方法如下:


可以通过 configureBlocking()来设置 socket 是阻塞/非阻塞



非阻塞 socket 通常被认为是服务端使用的,因为它们使同时管理很多 socket 通道变得更容易。但是,在客户端使用一个或几个非阻塞模式的 socket 通道也是有益处的,例如,借助非阻塞 socket 通道,GUI 程序可以专注于用户请求并且同时维护与一个或多个服务器的会话。在很多程序上,非阻塞模式都是有用的。


偶尔地,我们也会需要防止 socket 通道的阻塞模式被更改。API 中有一个 blockingLock( )方法,该方法会返回一个非透明的对象引用。返回的对象是通道实现修改阻塞模式时内部使用的。只有拥有此对象的锁的线程才能更改通道的阻塞模式。


下面分别介绍这 3 个通道



[](()2.5.1 ServerSocketChannel

ServerSocketChannel 是一个基于通道的 socket 监听器(本身不传数据,而是一个监听器)


它同我们所熟悉的 java.net.ServerSocket 执行相同的任务,不过它增加了通道语义,因此能够在非阻塞模式下运行


由于 ServerSocketChannel 没有 bind()方法,因此有必要取出对等的 socket 并使用它来绑定到一个端口以开始监听连接。我们也是使用对等 ServerSocket 的 API 来根据需要设置其他的 socket 选项。


同 java.net.ServerSocket 一样,ServerSocketChannel 也有 accept( )方法。一旦创建了一个 ServerSocketChannel 并用对等 socket 绑定了它,然后您就可以在其中一个上调用 accept()。如果您选择在 ServerSocket 上调用 accept( )方法,那么它会同任何其他的 ServerSocket 表现一样的行为:总是阻塞并返回一个 java.net.Socket 对


象。


如果您选择在 ServerSocketChannel 上调用 accept( )方法则会返回 SocketChannel 类型的对象,返回的对象能够在非阻塞模式下运行


换句话说:


ServerSocketChannel 的 accept()方法会返回 SocketChannel 类型对象,SocketChannel 可以在非阻塞模式下运行。其它 Socket 的 accept()方法会阻塞返回一个 Socket 对象。如果 ServerSocketChannel 以非阻塞模式被调用,当没有传入连接在等待时,ServerSocketChannel.accept( )会立即返回 null。正是这种检查连接而不阻塞的能力实现了可伸缩性并降低了复杂性。可选择性也因此得到实现。我们可以使用一个选择器实例来注册 ServerSocketChannel 对象以实现新连接到达时自动通知的功能。


以下代码演示了如何使用一个非阻塞的 accept( )方法:


public class FileChannelAccept {


public static final String GREETING = "Hello java nio.\r\n";


public static void main(String[] argv) throws Exception {


//端口号


int port = 1234; // default


if (argv.length > 0) {


port = Integer.parseInt(argv[0]);


}


//Buffer


ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes());


//ServerSocketChannel


ServerSocketChannel ssc = ServerSocketChannel.open();


//绑定,监听端口


ssc.socket().bind(new InetSocketAddress(port));


//设置非阻塞模式


ssc.configureBlocking(false);


//监听循环,是否有新连接传入


while (true) {


System.out.println("Waiting for connections");


SocketChannel sc = ssc.accept();


//没有连接传入的情况


if (sc == null) {


System.out.println("null");


Thread.sleep (2000);


} else {


//有连接传入的情况


System.out.println("Incoming connection from: "+sc.socket().getRemoteSocketAddress());//获取连接 ip


buffer.rewind();//指针指向 0,就是位置制到 0


//向 Buffer 中写入数据


sc.write(buffer);


sc.close();


}


}


}


}



  • (1)打开 ServerSocketChannel


通过调用 ServerSocketChannel.open() 方法来打开 ServerSocketChannel.


ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();


  • (2)关闭 ServerSocketChannel


通过调用 ServerSocketChannel.close() 方法来关闭 ServerSocketChannel.


serverSocketChannel.close();


  • (3)监听新的连接


通过 ServerSocketChannel.accept() 方法监听新进的连接。当 accept()方法返回时候,它返回一个包含新进来的连接的 SocketChannel。因此, accept()方法会一直阻塞到有新连接到达。


通常不会仅仅只监听一个连接,在 while 循环中调用 accept()方法. 如下面的例子:



  • (4)阻塞模式



会在 SocketChannel sc = ssc.accept();这里阻塞住进程。


  • (5)非阻塞模式


ServerSocketChannel 可以设置成非阻塞模式。在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是 null。 因此,需要检查返回的 SocketChannel 是否是 null.如:




[](()2.5.2 SocketChannel

[](()1、SocketChannel 介绍

Java NIO 中的 SocketChannel 是一个连接到 TCP 网络套接字的通道操作面向Buffer缓冲区


A selectable channel for stream-oriented connecting sockets.


以上是 Java docs 中对于 SocketChannel 的描述:SocketChannel 是一种面向流连接 sockets 套接字的可选择通道。从这里可以看出:


  • SocketChannel 是用来连接 Socket 套接字

  • SocketChannel 主要用途用来处理网络 I/O 的通道

  • SocketChannel 是基于 TCP 连接传输

  • SocketChannel 实现了可选择通道,可以被多路复用

  • 用于 TCP 的网络 I/O 套接字的通道



[](()2、SocketChannel 特征:

(1)对于已经存在的 socket 不能创建 SocketChannel


(2)SocketChannel 中提供的 open 接口创建的 Channel 并没有进行网络级联,需要使用 connect 接口连接到指定地址


(3)未进行连接的 SocketChannle 执行 I/O 操作时,会抛出 NotYetConnectedException


(4)SocketChannel 支持两种 I/O 模式:阻塞式和非阻塞式


(5)SocketChannel 支持异步关闭。如果 SocketChannel 在一个线程上 read 阻塞,另一个线程对该 SocketChannel 调用 shutdownInput,则读阻塞的线程将返回-1 表示没有读取任何数据;如果 SocketChannel 在一个线程上 write 阻塞,另一个线程对该 SocketChannel 调用 shutdownWrite,则写阻塞的线程将抛出 AsynchronousCloseException


(6)SocketChannel 支持设定参数


SO_SNDBUF 套接字发送缓冲区大小



SO_RCVBUF 套接字接收缓冲区大小



SO_KEEPALIVE 保活连接



O_REUSEADDR 复用地址



SO_LINGER 有数据传输时延缓关闭 Channel (只有在非阻塞模式下有用)



TCP_NODELAY 禁用 Nagle 算法



[](()3、SocketChannel 的使用

(1)创建 SocketChannel


  • 方式一:


SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("www.baidu.com", 80));


  • 方式二:


SocketChannel socketChanne2 = SocketChannel.open();


socketChanne2.connect(new InetSocketAddress("www.baidu.com",80));


直接使用有参 open api 或者使用无参 open api,但是在无参 open 只是创建了一个 SocketChannel 对象,并没有进行实质的 tcp 连接。


(2)连接校验


socketChannel.isOpen(); // 测试 SocketChannel 是否为 open 状态


socketChannel.isConnected(); //测试 SocketChannel 是否已经被连接


socketChannel.isConnectionPending(); //测试 SocketChannel 是否正在进行连接


socketChannel.finishConnect(); //校验正在进行套接字连接的 SocketChannel 是否已经完成连接


(3)读写模式


前面提到 SocketChannel支持阻塞非阻塞两种模式:


socketChannel.configureBlocking(false);//非阻塞


通过以上方法设置 SocketChannel 的读写模式。false 表示非阻塞,true 表示阻塞。


(4)读写


SocketChannel socketChannel = SocketChannel.open (new InetSocketAddress("www.baidu.com", 80));


ByteBuffer byteBuffer = ByteBuffer.allocate (16);


socketChannel.read(byteBuffer);


socketChannel.close();


System.out.println("read over");


以上为阻塞式读,当执行到 read 出,线程将阻塞,控制台将无法打印 read over


SocketChannel socketChannel = SocketChannel. open (new InetSocketAddress("www.baidu.com", 80));


socketChannel.configureBlocking(false);


ByteBuffer byteBuffer = ByteBuffer.allocate (16);


socketChannel.read(byteBuffer);


socketChannel.close();


System.out.println("read over");


以上为非阻塞读,控制台将打印 read over


读写都是面向缓冲区,这个读写方式与前文中的 FileChannel 相同。


(5)设置和获取参数


socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,Boolean.TRUE).setOption(StandardSocketOptions.TCP_NODELAY,Boolean.TRUE);


通过 setOptions 方法可以设置 socket 套接字的相关参数


socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);


socketChannel.getOption(StandardSocketOptions.SO_RCVBUF);


可以通过 getOption 获取相关参数的值。如默认的接收缓冲区大小是 8192byte


SocketChannel 还支持多路复用,但是多路复用在后续内容中会介绍到。



[](()2.5.3 DatagramChannel

向地址 ip 端口号发送内容;接收 ip 端口号发送来的内容


正如 SocketChannel 对应 Socket,ServerSocketChannel 对应 ServerSocket,每一个 DatagramChannel 对象也有一个关联的 DatagramSocket 对象。


正如 SocketChannel 模拟连接导向的流协议(如 TCP/IP),DatagramChannel 则模拟包导向的无连接协议(如 UDP/IP)。DatagramChannel 是无连接的,每个数据报(datagram)都是一个自包含的实体,拥有它自己的目的地址及不依赖其他数据报的数据负载。与面向流的的 socket 不同,DatagramChannel 可以发送单独的数据报给不同的目的地址


同样,DatagramChannel 对象也可以接收来自任意地址的数据包。每个到达的数据报都含有关于它来自何处的信息(源地址)

[](()1、打开 DatagramChannel

DatagramChannel server = DatagramChannel.open();


server.socket().bind(new InetSocketAddress(10086));


此例子是打开 10086 端口接收 UDP 数据包



[](()2、接收数据

通过 receive()接收 UDP 包


ByteBuffer receiveBuffer = ByteBuffer.allocate(64);


receiveBuffer.clear();


SocketAddress receiveAddr = server.receive(receiveBuffer);


SocketAddress 可以获得发包的 ip、端口等信息,用 toString 查看,格式如下


/127.0.0.1:57126



[](()3、发送数据

通过 send()发送 UDP 包


DatagramChannel server = DatagramChannel.open();


ByteBuffer sendBuffer = ByteBuffer. wrap ("client send".getBytes());//发送的内容


server.send(sendBuffer, new InetSocketAddress("127.0.0.1",10086));



[](()4、连接

UDP 不存在真正意义上的连接,这里的连接是向特定服务地址用 read 和 write 接收发送数据包


client.connect(new InetSocketAddress("127.0.0.1",10086));


int readSize= client.read(sendBuffer);


server.write(sendBuffer);


read()和 write()只有在 connect()后才能使用,不然会抛 NotYetConnectedException 异常。


用 read()接收时,如果没有接收到包,会抛 PortUnreachableException 异常。



[](()5、DatagramChannel 示例

客户端发送,服务端接收的例子


/**


  • 发包的 datagram

  • @throws IOException

  • @throws InterruptedException


*/


@Test


public void sendDatagram() throws IOException, InterruptedException {


DatagramChannel sendChannel= DatagramChannel.open();


//发送地址端口号


InetSocketAddress sendAddress= new InetSocketAddress("127.0.0.1",9999);


while (true) {


sendChannel.send(ByteBuffer.wrap ("发包".getBytes("UTF-8")),sendAddress);


System.out.println("发包端发包");


Thread.sleep (1000);


}


}


/**


  • 收包端

  • @throws IOException


*/


@Test


public void receive() throws IOException {


DatagramChannel receiveChannel= DatagramChannel.open();


//接受端口


InetSocketAddress receiveAddress= new InetSocketAddress(9999);


//绑定


receiveChannel.bind(receiveAddress);


ByteBuffer receiveBuffer= ByteBuffer.allocate(512);


while (true) {


receiveBuffer.clear();


SocketAddress sendAddress= receiveChannel.receive(receiveBuffer);


receiveBuffer.flip();//读写反转,转换为写操作


System.out.print(sendAddress.toString() + " ");//输出发来信息的 ip 地址


System.out.println(Charset.forName ("UTF-8").decode(receiveBuffer));//输出内容


}


}


/**


  • 只接收和发送 9999 的数据包

  • @throws IOException


*/


@Test


public void testConect1() throws IOException {


DatagramChannel connChannel= DatagramChannel.open();


connChannel.bind(new InetSocketAddress(9998));//绑定 DatagramChannel 自身端口


connChannel.connect(new InetSocketAddress("127.0.0.1",9999));//指定连接 ip 端口


//发送


connChannel.write(ByteBuffer.wrap("发包".getBytes("UTF-8")));


ByteBuffer readBuffer= ByteBuffer.allocate (512);


while (true) {


try {


readBuffer.clear();


//接收


connChannel.read(readBuffer);


readBuffer.flip();


System.out.println(Charset.forName("UTF-8").decode(readBuffer));


}catch(Exception e) {


}


}


}




[](()2.6 Scatter/Gather




Java NIO 开始支持 scatter/gather,scatter/gather 用于描述从 Channel 中读取或者写入到 Channel 的操作。


分散(scatter)从 Channel 中读取是指在读操作时将读取的数据写入多个 buffer中。因此,Channel 将从 Channel 中读取的数据“分散(scatter)”到多个 Buffer 中。


聚集(gather)写入 Channel 是指在写操作时将多个 buffer 的数据写入同一个Channel,因此,Channel 将多个 Buffer 中的数据“ 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 聚集(gather)”后发送到 Channel。


scatter / gather 经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的 buffer 中,这样你可以方便的处理消息头和消息体。

[](()2.6.1 Scattering Reads

Scattering Reads 是指数据从一个 channel 读取到多个 buffer 中。如下图描述:



ByteBuffer header = ByteBuffer.allocate (128);


ByteBuffer body = ByteBuffer.allocate (1024);


ByteBuffer[] bufferArray = {header,body};


channel.read(bufferArray);


注意 buffer 首先被插入到数组,然后再将数组作为 channel.read() 的输入参数。


read()方法按照 buffer 在数组中的顺序将从 channel 中读取的数据写入到 buffer当一个 buffer 被写满后,channel 紧接着向另一个 buffer 中写


Scattering Reads 在移动下一个 buffer 前,必须填满当前的 buffer,这也意味着它不适用于动态消息(译者注:消息大小不固定)。换句话说,如果存在消息头和消息体,消息头必须完成填充(例如 128byte),Scattering Reads 才能正常工作。



[](()2.6.2 Gathering Writes

Gathering Writes 是指数据从多个 buffer 写入到同一个 channel。如下图描述:



ByteBuffer header = ByteBuffer.allocate (128);


ByteBuffer body = ByteBuffer.allocate (1024);

用户头像

还未添加个人签名 2022.04.13 加入

还未添加个人简介

评论

发布
暂无评论
Day386&387_Java_爱好编程进阶_InfoQ写作社区