写点什么

IO 流中「线程」模型总结

作者:知了一笑
  • 2023-04-07
    浙江
  • 本文字数:7382 字

    阅读完需:约 24 分钟

IO流中「线程」模型总结

IO 流模块:经常看、经常用、经常忘;

一、基础简介

在 IO 流的网络模型中,以常见的「客户端-服务端」交互场景为例;



客户端与服务端进行通信「交互」,可能是同步或者异步,服务端进行「流」处理时,可能是阻塞或者非阻塞模式,当然也有自定义的业务流程需要执行,从处理逻辑看就是「读取数据-业务执行-应答写数据」的形式;


Java 提供「三种」IO 网络编程模型,即:「BIO 同步阻塞」、「NIO 同步非阻塞」、「AIO 异步非阻塞」;

二、同步阻塞

1、模型图解

BIO 即同步阻塞,服务端收到客户端的请求时,会启动一个线程处理,「交互」会阻塞直到整个流程结束;



这种模式如果在高并发且流程复杂耗时的场景下,客户端的请求响应会存在严重的性能问题,并且占用过多资源;

2、参考案例

服务端】启动 ServerSocket 接收客户端的请求,经过一系列逻辑之后,向客户端发送消息,注意这里线程的 10 秒休眠;


public class SocketServer01 {    public static void main(String[] args) throws Exception {        // 1、创建Socket服务端        ServerSocket serverSocket = new ServerSocket(8080);        // 2、方法阻塞等待,直到有客户端连接        Socket socket = serverSocket.accept();        // 3、输入流,输出流        InputStream inStream = socket.getInputStream();        OutputStream outStream = socket.getOutputStream();        // 4、数据接收和响应        int readLen = 0;        byte[] buf = new byte[1024];        if ((readLen=inStream.read(buf)) != -1){            // 接收数据            String readVar = new String(buf, 0, readLen) ;            System.out.println("readVar======="+readVar);        }        // 响应数据        Thread.sleep(10000);        outStream.write("sever-8080-write;".getBytes());        // 5、资源关闭        IoClose.ioClose(outStream,inStream,socket,serverSocket);    }}
复制代码


客户端】Socket 连接,先向 ServerSocket 发送请求,再接收其响应,由于 Server 端模拟耗时,Client 处于长时间阻塞状态;


public class SocketClient01 {    public static void main(String[] args) throws Exception {        // 1、创建Socket客户端        Socket socket = new Socket(InetAddress.getLocalHost(), 8080);        // 2、输入流,输出流        OutputStream outStream = socket.getOutputStream();        InputStream inStream = socket.getInputStream();        // 3、数据发送和响应接收        // 发送数据        outStream.write("client-hello".getBytes());        // 接收数据        int readLen = 0;        byte[] buf = new byte[1024];        if ((readLen=inStream.read(buf)) != -1){            String readVar = new String(buf, 0, readLen) ;            System.out.println("readVar======="+readVar);        }        // 4、资源关闭        IoClose.ioClose(inStream,outStream,socket);    }}
复制代码

三、同步非阻塞

1、模型图解

NIO 即同步非阻塞,服务端可以实现一个线程,处理多个客户端请求连接,服务端的并发能力得到极大的提升;



这种模式下客户端的请求连接都会注册到 Selector 多路复用器上,多路复用器会进行轮询,对请求连接的 IO 流进行处理;

2、参考案例

服务端】单线程可以处理多个客户端请求,通过轮询多路复用器查看是否有 IO 请求;


public class SocketServer01 {    public static void main(String[] args) throws Exception {        try {            //启动服务开启监听            ServerSocketChannel socketChannel = ServerSocketChannel.open();            socketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 8989));            // 设置非阻塞,接受客户端            socketChannel.configureBlocking(false);            // 打开多路复用器            Selector selector = Selector.open();            // 服务端Socket注册到多路复用器,指定兴趣事件            socketChannel.register(selector, SelectionKey.OP_ACCEPT);            // 多路复用器轮询            ByteBuffer buffer = ByteBuffer.allocateDirect(1024);            while (selector.select() > 0){                Set<SelectionKey> selectionKeys = selector.selectedKeys();                Iterator<SelectionKey> selectionKeyIter = selectionKeys.iterator();                while (selectionKeyIter.hasNext()){                    SelectionKey selectionKey = selectionKeyIter.next() ;                    selectionKeyIter.remove();                    if(selectionKey.isAcceptable()) {                        // 接受新的连接                        SocketChannel client = socketChannel.accept();                        // 设置读非阻塞                        client.configureBlocking(false);                        // 注册到多路复用器                        client.register(selector, SelectionKey.OP_READ);                    } else if (selectionKey.isReadable()) {                        // 通道可读                        SocketChannel client = (SocketChannel) selectionKey.channel();                        int len = client.read(buffer);                        if (len > 0){                            buffer.flip();                            byte[] readArr = new byte[buffer.limit()];                            buffer.get(readArr);                            System.out.println(client.socket().getPort() + "端口数据:" + new String(readArr));                            buffer.clear();                        }                    }                }            }        } catch (Exception e) {            e.printStackTrace();        }    }}
复制代码


客户端】每隔 3 秒持续的向通道内写数据,服务端通过轮询多路复用器,持续的读取数据;


public class SocketClient01 {    public static void main(String[] args) throws Exception {        try {            // 连接服务端            SocketChannel socketChannel = SocketChannel.open();            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));            ByteBuffer writeBuffer = ByteBuffer.allocate(1024);            String conVar = "client-hello";            writeBuffer.put(conVar.getBytes());            writeBuffer.flip();            // 每隔3S发送一次数据            while (true) {                Thread.sleep(3000);                writeBuffer.rewind();                socketChannel.write(writeBuffer);                writeBuffer.clear();            }        } catch (Exception e) {            e.printStackTrace();        }    }}
复制代码

四、异步非阻塞

1、模型图解

AIO 即异步非阻塞,对于通道内数据的「读」和「写」动作,都是采用异步的模式,对于性能的提升是巨大的;



这与常规的第三方对接模式很相似,本地服务在请求第三方服务时,请求过程耗时很大,会异步执行,第三方第一次回调,确认请求可以被执行;第二次回调则是推送处理结果,这种思想在处理复杂问题时,可以很大程度的提高性能,节省资源:

2、参考案例

服务端】各种「accept」、「read」、「write」动作是异步,通过 Future 来获取计算的结果;


public class SocketServer01 {    public static void main(String[] args) throws Exception {        // 启动服务开启监听        AsynchronousServerSocketChannel socketChannel = AsynchronousServerSocketChannel.open() ;        socketChannel.bind(new InetSocketAddress("127.0.0.1", 8989));        // 指定30秒内获取客户端连接,否则超时        Future<AsynchronousSocketChannel> acceptFuture = socketChannel.accept();        AsynchronousSocketChannel asyChannel = acceptFuture.get(30, TimeUnit.SECONDS);
if (asyChannel != null && asyChannel.isOpen()){ // 读数据 ByteBuffer inBuffer = ByteBuffer.allocate(1024); Future<Integer> readResult = asyChannel.read(inBuffer); readResult.get(); System.out.println("read:"+new String(inBuffer.array()));
// 写数据 inBuffer.flip(); Future<Integer> writeResult = asyChannel.write(ByteBuffer.wrap("server-hello".getBytes())); writeResult.get(); }
// 关闭资源 asyChannel.close(); }}
复制代码


客户端】相关「connect」、「read」、「write」方法调用是异步的,通过 Future 来获取计算的结果;


public class SocketClient01 {    public static void main(String[] args) throws Exception {        // 连接服务端        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();        Future<Void> result = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));        result.get();
// 写数据 String conVar = "client-hello"; ByteBuffer reqBuffer = ByteBuffer.wrap(conVar.getBytes()); Future<Integer> writeFuture = socketChannel.write(reqBuffer); writeFuture.get();
// 读数据 ByteBuffer inBuffer = ByteBuffer.allocate(1024); Future<Integer> readFuture = socketChannel.read(inBuffer); readFuture.get(); System.out.println("read:"+new String(inBuffer.array()));
// 关闭资源 socketChannel.close(); }}
复制代码

五、Reactor 模型

1、模型图解

这部分内容,可以参考「Doug Lea 的《IO》」文档,查看更多细节;

1.1 Reactor 设计原理

Reactor 模式基于事件驱动设计,也称为「反应器」模式或者「分发者」模式;服务端收到多个客户端请求后,会将请求分派给对应的线程处理;



Reactor:负责事件的监听和分发;Handler:负责处理事件,核心逻辑「read 读」、「decode 解码」、「compute 业务计算」、「encode 编码」、「send 应答数据」;

1.2 单 Reactor 单线程


【1】Reactor 线程通过 select 监听客户端的请求事件,收到事件后通过 Dispatch 进行分发;


【2】如果是建立连接请求事件,Acceptor 通过「accept」方法获取连接,并创建一个 Handler 对象来处理后续业务;


【3】如果不是连接请求事件,则 Reactor 会将该事件交由当前连接的 Handler 来处理;


【4】在 Handler 中,会完成相应的业务流程;


这种模式将所有逻辑「连接、读写、业务」放在一个线程中处理,避免多线程的通信,资源竞争等问题,但是存在明显的并发和性能问题;

1.3 单 Reactor 多线程


【1】Reactor 线程通过 select 监听客户端的请求事件,收到事件后通过 Dispatch 进行分发;


【2】如果是建立连接请求事件,Acceptor 通过「accept」方法获取连接,并创建一个 Handler 对象来处理后续业务;


【3】如果不是连接请求事件,则 Reactor 会将该事件交由当前连接的 Handler 来处理;


【4】在 Handler 中,只负责事件响应不处理具体业务,将数据发送给 Worker 线程池来处理;


【5】Worker 线程池会分配具体的线程来处理业务,最后把结果返回给 Handler 做响应;


这种模式将业务从 Reactor 单线程分离处理,可以让其更专注于事件的分发和调度,Handler 使用多线程也充分的利用 cpu 的处理能力,导致逻辑变的更加复杂,Reactor 单线程依旧存在高并发的性能问题;

1.4 主从 Reactor 多线程


【1】 MainReactor 主线程通过 select 监听客户端的请求事件,收到事件后通过 Dispatch 进行分发;


【2】如果是建立连接请求事件,Acceptor 通过「accept」方法获取连接,之后 MainReactor 将连接分配给 SubReactor;


【3】如果不是连接请求事件,则 MainReactor 将连接分配给 SubReactor,SubReactor 调用当前连接的 Handler 来处理;


【4】在 Handler 中,只负责事件响应不处理具体业务,将数据发送给 Worker 线程池来处理;


【5】Worker 线程池会分配具体的线程来处理业务,最后把结果返回给 Handler 做响应;


这种模式 Reactor 线程分工明确,MainReactor 负责接收新的请求连接,SubReactor 负责后续的交互业务,适应于高并发的处理场景,是 Netty 组件通信框架的所采用的模式;

2、参考案例

服务端】提供两个 EventLoopGroup,「ParentGroup」主要是用来接收客户端的请求连接,真正的处理是转交给「ChildGroup」执行,即 Reactor 多线程模型;


@Slf4jpublic class NettyServer {    public static void main(String[] args) {        // EventLoop组,处理事件和IO        EventLoopGroup parentGroup = new NioEventLoopGroup();        EventLoopGroup childGroup = new NioEventLoopGroup();        try {            // 服务端启动引导类            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(parentGroup, childGroup)                    .channel(NioServerSocketChannel.class).childHandler(new ServerChannelInit());
// 异步IO的结果 ChannelFuture channelFuture = serverBootstrap.bind(8989).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e){ e.printStackTrace(); } finally { parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); } }}
class ServerChannelInit extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) { // 获取管道 ChannelPipeline pipeline = socketChannel.pipeline(); // 编码、解码器 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 添加自定义的handler pipeline.addLast("serverHandler", new ServerHandler()); }}
class ServerHandler extends ChannelInboundHandlerAdapter { /** * 通道读和写 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server-Msg【"+msg+"】"); TimeUnit.MILLISECONDS.sleep(2000); String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ; ctx.channel().writeAndFlush("hello-client;time:" + nowTime); ctx.fireChannelActive(); } @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
复制代码


客户端】通过 Bootstrap 类,与服务器建立连接,服务端通过 ServerBootstrap 启动服务,绑定在8989端口,然后服务端和客户端进行通信;


public class NettyClient {    public static void main(String[] args) {        // EventLoop处理事件和IO        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();        try {            // 客户端通道引导            Bootstrap bootstrap = new Bootstrap();            bootstrap.group(eventLoopGroup)                    .channel(NioSocketChannel.class).handler(new ClientChannelInit());
// 异步IO的结果 ChannelFuture channelFuture = bootstrap.connect("localhost", 8989).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e){ e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } }}
class ClientChannelInit extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) { // 获取管道 ChannelPipeline pipeline = socketChannel.pipeline(); // 编码、解码器 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 添加自定义的handler pipeline.addLast("clientHandler", new ClientHandler()); }}
class ClientHandler extends ChannelInboundHandlerAdapter { /** * 通道读和写 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Client-Msg【"+msg+"】"); TimeUnit.MILLISECONDS.sleep(2000); String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ; ctx.channel().writeAndFlush("hello-server;time:" + nowTime); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().writeAndFlush("channel...active"); } @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
复制代码

六、参考源码

编程文档:https://gitee.com/cicadasmile/butte-java-note
应用仓库:https://gitee.com/cicadasmile/butte-flyer-parent
复制代码


发布于: 刚刚阅读数: 4
用户头像

知了一笑

关注

公众号:知了一笑 2020-04-08 加入

源码仓库:https://gitee.com/cicadasmile

评论

发布
暂无评论
IO流中「线程」模型总结_Java_知了一笑_InfoQ写作社区