写点什么

Java Core「19」使用 Java IO API 创建 C/S 程序的方法

作者:Samson
  • 2022 年 6 月 28 日
  • 本文字数:5541 字

    阅读完需:约 18 分钟

01-Unix 下的五种 IO 模型

《UNIX 网络编程卷 1:套接字联网 API(第 3 版)》中描述了 UNIX 系统下的五种 IO 模型,它们分别是:

  1. 阻塞式 IO 模型

  2. 非阻塞式 IO 模型

  3. IO 复用模型

  4. 信号驱动型 IO 模型

  5. 异步 IO 模型

关于这五个不同的 IO 模型的详细解释大家可以去阅读一下。这里我也向大家提供一个比较好理解的在线资源 [1]

IO 过程一般可分为两个阶段:等待数据就绪、将数据从内核拷贝到用户进程空间。上述模型 1-4 都属于同步 IO,具体不同是阻塞的时机不同:

  1. 阻塞式 IO,两阶段全部阻塞

  2. 非阻塞式 IO,第一阶段非阻塞(轮训),效率比较低,第二阶段阻塞

  3. IO 复用模型,第一阶段阻塞(阻塞在 select 上),第二阶段阻塞

  4. 信号驱动 IO 模型,第一阶段基于信号(或事件),第二阶段阻塞。


图 1.《UNIX 网络编程卷 1:套接字联网 API(第 3 版)》中对五种 IO 模型的比较

[1] Unix IO 模型简介

02-传统 BIO 实现 client / server

传统的 Java IO 模型便是基于阻塞式 IO,实现包位于java.io.*。其中的核心类是java.io.InputStreamjava.io.OutputStream

在实现各种类型的 IO 流时,采用了设计模式中的“装饰模式”。如果想进一步了解这一设计模式请参考[1],在我的 gitee 中有一个实现的示例。

通过原生 BIO API 实现一个 Server 的代码比较直观:

// 创建一个服务端 socketthis.port = 8888;this.serverSocket = new ServerSocket(port);
try (Socket accept = this.serverSocket.accept(); InputStream request = accept.getInputStream(); OutputStream response = accept.getOutputStream()) {
int maxLen = 2048; byte[] bytes = new byte[maxLen];
// 通过 input stream 从请求消息中读取 int read = request.read(bytes, 0, maxLen); String msg = new String(bytes, 0, read);
int port = accept.getPort(); System.out.printf("server received a request from port %d, msg=[%s]%n", port, msg);
// 通过 output stream 向响应中写消息 response.write("response to client.".getBytes()); response.flush();
} catch (Exception e) {
}
复制代码

实现 Client 的代码:

try (Socket socket = new Socket("localhost", this.port);	 OutputStream request = socket.getOutputStream();	 InputStream response = socket.getInputStream()) {
request.write(String.format("client %d try connecting", this.clientId).getBytes()); request.flush();
System.out.printf("client %d send a request, and waiting response...%n", this.clientId);
int maxLen = 1024; byte[] bytes = new byte[maxLen];
String msg = "";
int read; while ((read = response.read(bytes, 0, maxLen)) != -1) { msg += new String(bytes, 0, read); } System.out.printf("client %d received response from the server, msg=[%s]%n", this.clientId, msg);
} catch (Exception e) { e.printStackTrace();}
复制代码

完整的代码可以参考 gitee

[1] Decorator

03-Java NIO 原生 API 实现 client / server

Java NIO 是 Java 新一代的 IO API。有时候也会有人将其称之为“non-blocking IO”。但这是不准确的,Java NIO 并非完全是非阻塞的 IO,它实现的非阻塞式 IO,但同时也有一部分方法仍然是阻塞的。

在 Java NIO 中,取消了流的概念,取而代之的使用 Channel 概念。可以将 Channel 理解成各类的数据源,例如 SocketChannle 表示套接字源,FileChannel 表示文件源。

与流的单方向不同,Channel 是支持读写双向(双工)的。

使用 Java NIO 实现 server 时,可以很方便的获得高并发性能。这得益于 Java NIO 中提供的java.nio.channels.Selector。通过 Selector 类,一个线程可以监控多个 SocketChannel,与每个套接字连接分配一个线程的模型相比极大地提高了并发性。

使用 Selector 实现 server 的逻辑如下:

try {	// 1. 初始化 ServerSocketChannel,并绑定到本机端口 port	final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();	serverSocketChannel.configureBlocking(false);	serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", port));	 // 将 channel 注册到选择器上	final Selector selector = Selector.open();	serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) { selector.select(); final Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { final SelectionKey next = iterator.next(); iterator.remove(); // 连接建立成功 if (next.isAcceptable()) { // 服务端会为每一个连接创建一个 socket 需要将其注册到 selector 中,已被后续可以收到客户端的消息 final SocketChannel accept = serverSocketChannel.accept(); // 忽略其他代码 } if (next.isReadable()) { // 某个 socket 收到数据 SocketChannel socket = (SocketChannel) next.channel(); // 忽略其他代码 } } }} catch (Exception e) { }
复制代码

实现 Client 端的代码可以使用 BIO,也可以使用 NIO 方式。

我的 gitee 中提供了一个基于 NIO 的 client 实现,感兴趣的朋友可以自行参考,如有其他想法也可以互相交流。

04-Apache MINA API 实现 client / server

Apache MINA 是一个著名的开源项目,提供了一个高性能、高扩展性网络应用程序框架。

MINA 与应用程序之间的关系是:


图 2. 应用程序与 MINA 的关系


图 3. MINA 的架构

从上图中可以看出,MINA 作为应用程序与网络通讯层交互的桥梁。它提供了丰富的接口用于 client 与 server 进程通讯。

MINA 的核心接口是org.apache.mina.core.service.IoService。它的两个实现org.apache.mina.core.service.IoConnectororg.apache.mina.core.service.IoAcceptor,分别用于为 client 端程序和 server 端程序提供网络通讯的基本接口。

另外一个核心接口是org.apache.mina.core.service.IoHandler,它是我们实现业务逻辑的地方。它提供了一系列的回调函数,方便我们在多个关键节点介入处理流程。常用的回调有:

  • IoHandler#sessionCreated,当链接建立好后,MINA 会在内存中创建一个 Session,然后回调该方法

  • IoHandler#messageReceived,当 Session 中有消息到达时,回调此函数

  • IoHandler#exceptionCaught,当 Session 异常时,回调此函数

  • IoHandler#event,事件触发回调

基于 Java NIO 和 Socket 的两个实现,它们与 IoConnector 和 IoAcceptor 的关系如下图所示:



我们可以借助这两个类,实现基于 Socket 通讯的 client 和 server。

server 的代码如下:

// 创建一个 IoAcceptorIoAcceptor acceptor = new NioSocketAcceptor();// 自定义 filter chainacceptor.getFilterChain().addLast("logger", new LoggingFilter());acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(StandardCharsets.UTF_8)));// 设置 IoHandler,这里借助了mina-example 中的一个 TimeServerHandler acceptor.setHandler(new TimeServerHandler());acceptor.getSessionConfig().setReadBufferSize(2048);acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 监听 444 端口acceptor.bind(new InetSocketAddress(444));
// org.apache.mina.example.gettingstarted.timeserver.TimeServerHandler 中重写的 messageReceived 方法public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); if (str.trim().equalsIgnoreCase("quit")) { session.closeNow(); } else { Date date = new Date(); session.write(date.toString()); System.out.println("Message written..."); }}
复制代码

client 的实现代码如下:

// 创建一个 IoConnector IoConnector connector = new NioSocketConnector();// 自定义 filter chainconnector.getFilterChain().addLast("logger", new LoggingFilter());connector.getFilterChain()addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(StandardCharsets.UTF_8)));// 设置 handler,ClientIoHandler 重写了 sessionOpened 方法,// 当链接建立成功后,发送 Hello Server.. 到服务端connector.setHandler(new ClientIoHandler("Hello Server.."));// MINA 基于异步实现ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", 444));future.awaitUninterruptibly();if (future.isConnected()) {	IoSession session = future.getSession();	session.getConfig().setUseReadOperation(true);	session.getCloseFuture().awaitUninterruptibly();	System.out.println("After Writing");	connector.dispose();}
复制代码

完整的代码示例可以参考我的 gitee

05-Netty API 实现 client / server

Netty 是一个更著名的项目,它提供了异步事件驱动的网络应用框架和工具,以便用户快速的开发高性能、高可维护性、高扩展性的网络应用。

Netty 不仅仅是一个框架,它还是一个工具集

它提供更统一、便捷的 Byte Buffer 操作 API

它提供了统一的网络通讯接口 API,可在不同的通讯层协议实现上迁移

它提供了可扩展的、事件驱动模型



基于 Netty 实现 server 的代码如下:

// Server 端需要两个 事件循环组,boss 组和 worker 组// boss 负责接收链接,并将链接交给 worker 处理final NioEventLoopGroup bossGroup = new NioEventLoopGroup();final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try { // 帮助类,创建一个 Server final ServerBootstrap bootstrap = new ServerBootstrap(); // 设置事件循环组 bootstrap.group(bossGroup, workerGroup) // 指定 Channel 类型,此处使用 Socket 通讯 .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 在 pipeline 中添加 handler socketChannel.pipeline().addLast(new TimeServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); // 同步监听 ChannelFuture f = bootstrap.bind(8888).sync(); // 同步关闭 f.channel().closeFuture().sync();} catch (InterruptedException e) { e.printStackTrace();} finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully();}
复制代码

添加到 pipeline 中的 handler,会在消息到来后回调对应的函数:

class TimeServerHandler extends ChannelInboundHandlerAdapter {	/**	* 链接建立后,调用此回调	*/	@Override	public void channelActive(ChannelHandlerContext ctx) throws Exception {		final ByteBuf time = ctx.alloc().buffer(4);		time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { assert f == channelFuture; ctx.close(); } }); }
/** * 异常时,调用此回调 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
复制代码

client 端代码实现如下:

// client 只需要一个事件循环组EventLoopGroup workerGroup = new NioEventLoopGroup();try {	Bootstrap bootstrap = new Bootstrap();	bootstrap.group(workerGroup)			// 类型与服务端的互相匹配			.channel(NioSocketChannel.class)			.option(ChannelOption.SO_KEEPALIVE, true)			.handler(new ChannelInitializer<SocketChannel>() {				@Override				protected void initChannel(SocketChannel socketChannel) throws Exception {					socketChannel.pipeline().addLast(new TimeClientHandler());				}			});
final ChannelFuture f = bootstrap.connect("127.0.0.1", 8888).sync(); f.channel().closeFuture().sync();} catch (InterruptedException e) { e.printStackTrace();} finally { workerGroup.shutdownGracefully();}
复制代码

handler 的实现为:

class TimeClientHandler extends ChannelInboundHandlerAdapter {	@Override	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {		ByteBuf byteBuf = (ByteBuf) msg;		try {			long time = (byteBuf.readUnsignedInt() - 2208988800L) * 2000L;			System.out.println(new Date(time));			ctx.close();		} finally {			byteBuf.release();		}	}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
复制代码

06-总结

本文介绍了 Java 中几种创建 client / server 应用的方法。根据应用场景的不同,选择合适的方法。

当并发量不是很大或对性能要求不高时,建议选用基于 BIO 的实现。因为实现容易,代码逻辑也更直观。

当并发量(或者说性能)是主要考虑的因素时,推荐使用基于 Netty API 实现。但是这样也有个缺点,就是 Netty 相对学习成本较高。


发布于: 刚刚阅读数: 4
用户头像

Samson

关注

还未添加个人签名 2019.07.22 加入

还未添加个人简介

评论

发布
暂无评论
Java Core「19」使用 Java IO API 创建 C/S 程序的方法_学习笔记_Samson_InfoQ写作社区