01-Unix 下的五种 IO 模型
《UNIX 网络编程卷 1:套接字联网 API(第 3 版)》中描述了 UNIX 系统下的五种 IO 模型,它们分别是:
阻塞式 IO 模型
非阻塞式 IO 模型
IO 复用模型
信号驱动型 IO 模型
异步 IO 模型
关于这五个不同的 IO 模型的详细解释大家可以去阅读一下。这里我也向大家提供一个比较好理解的在线资源 [1]
IO 过程一般可分为两个阶段:等待数据就绪、将数据从内核拷贝到用户进程空间。上述模型 1-4 都属于同步 IO,具体不同是阻塞的时机不同:
阻塞式 IO,两阶段全部阻塞
非阻塞式 IO,第一阶段非阻塞(轮训),效率比较低,第二阶段阻塞
IO 复用模型,第一阶段阻塞(阻塞在 select 上),第二阶段阻塞
信号驱动 IO 模型,第一阶段基于信号(或事件),第二阶段阻塞。
图 1.《UNIX 网络编程卷 1:套接字联网 API(第 3 版)》中对五种 IO 模型的比较
[1] Unix IO 模型简介
02-传统 BIO 实现 client / server
传统的 Java IO 模型便是基于阻塞式 IO,实现包位于java.io.*
。其中的核心类是java.io.InputStream
和java.io.OutputStream
。
在实现各种类型的 IO 流时,采用了设计模式中的“装饰模式”。如果想进一步了解这一设计模式请参考[1],在我的 gitee 中有一个实现的示例。
通过原生 BIO API 实现一个 Server 的代码比较直观:
// 创建一个服务端 socket
this.port = 8888;
this.serverSocket = new ServerSocket(port);
try (Socket accept = this.serverSocket.accept();
InputStream request = accept.getInputStream();
OutputStream response = accept.getOutputStream()) {
int maxLen = 2048;
byte[] bytes = new byte[maxLen];
// 通过 input stream 从请求消息中读取
int read = request.read(bytes, 0, maxLen);
String msg = new String(bytes, 0, read);
int port = accept.getPort();
System.out.printf("server received a request from port %d, msg=[%s]%n", port, msg);
// 通过 output stream 向响应中写消息
response.write("response to client.".getBytes());
response.flush();
} catch (Exception e) {
}
复制代码
实现 Client 的代码:
try (Socket socket = new Socket("localhost", this.port);
OutputStream request = socket.getOutputStream();
InputStream response = socket.getInputStream()) {
request.write(String.format("client %d try connecting", this.clientId).getBytes());
request.flush();
System.out.printf("client %d send a request, and waiting response...%n", this.clientId);
int maxLen = 1024;
byte[] bytes = new byte[maxLen];
String msg = "";
int read;
while ((read = response.read(bytes, 0, maxLen)) != -1) {
msg += new String(bytes, 0, read);
}
System.out.printf("client %d received response from the server, msg=[%s]%n", this.clientId, msg);
} catch (Exception e) {
e.printStackTrace();
}
复制代码
完整的代码可以参考 gitee
[1] Decorator
03-Java NIO 原生 API 实现 client / server
Java NIO 是 Java 新一代的 IO API。有时候也会有人将其称之为“non-blocking IO”。但这是不准确的,Java NIO 并非完全是非阻塞的 IO,它实现的非阻塞式 IO,但同时也有一部分方法仍然是阻塞的。
在 Java NIO 中,取消了流的概念,取而代之的使用 Channel 概念。可以将 Channel 理解成各类的数据源,例如 SocketChannle 表示套接字源,FileChannel 表示文件源。
与流的单方向不同,Channel 是支持读写双向(双工)的。
使用 Java NIO 实现 server 时,可以很方便的获得高并发性能。这得益于 Java NIO 中提供的java.nio.channels.Selector
。通过 Selector 类,一个线程可以监控多个 SocketChannel,与每个套接字连接分配一个线程的模型相比极大地提高了并发性。
使用 Selector 实现 server 的逻辑如下:
try {
// 1. 初始化 ServerSocketChannel,并绑定到本机端口 port
final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", port));
// 将 channel 注册到选择器上
final Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
final Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
final SelectionKey next = iterator.next();
iterator.remove();
// 连接建立成功
if (next.isAcceptable()) {
// 服务端会为每一个连接创建一个 socket 需要将其注册到 selector 中,已被后续可以收到客户端的消息
final SocketChannel accept = serverSocketChannel.accept();
// 忽略其他代码
}
if (next.isReadable()) {
// 某个 socket 收到数据
SocketChannel socket = (SocketChannel) next.channel();
// 忽略其他代码
}
}
}
} catch (Exception e) { }
复制代码
实现 Client 端的代码可以使用 BIO,也可以使用 NIO 方式。
我的 gitee 中提供了一个基于 NIO 的 client 实现,感兴趣的朋友可以自行参考,如有其他想法也可以互相交流。
04-Apache MINA API 实现 client / server
Apache MINA 是一个著名的开源项目,提供了一个高性能、高扩展性网络应用程序框架。
MINA 与应用程序之间的关系是:
图 2. 应用程序与 MINA 的关系
图 3. MINA 的架构
从上图中可以看出,MINA 作为应用程序与网络通讯层交互的桥梁。它提供了丰富的接口用于 client 与 server 进程通讯。
MINA 的核心接口是org.apache.mina.core.service.IoService
。它的两个实现org.apache.mina.core.service.IoConnector
和org.apache.mina.core.service.IoAcceptor
,分别用于为 client 端程序和 server 端程序提供网络通讯的基本接口。
另外一个核心接口是org.apache.mina.core.service.IoHandler
,它是我们实现业务逻辑的地方。它提供了一系列的回调函数,方便我们在多个关键节点介入处理流程。常用的回调有:
IoHandler#sessionCreated,当链接建立好后,MINA 会在内存中创建一个 Session,然后回调该方法
IoHandler#messageReceived,当 Session 中有消息到达时,回调此函数
IoHandler#exceptionCaught,当 Session 异常时,回调此函数
IoHandler#event,事件触发回调
基于 Java NIO 和 Socket 的两个实现,它们与 IoConnector 和 IoAcceptor 的关系如下图所示:
我们可以借助这两个类,实现基于 Socket 通讯的 client 和 server。
server 的代码如下:
// 创建一个 IoAcceptor
IoAcceptor acceptor = new NioSocketAcceptor();
// 自定义 filter chain
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(StandardCharsets.UTF_8)));
// 设置 IoHandler,这里借助了mina-example 中的一个 TimeServerHandler
acceptor.setHandler(new TimeServerHandler());
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
// 监听 444 端口
acceptor.bind(new InetSocketAddress(444));
// org.apache.mina.example.gettingstarted.timeserver.TimeServerHandler 中重写的 messageReceived 方法
public void messageReceived(IoSession session, Object message) throws Exception {
String str = message.toString();
if (str.trim().equalsIgnoreCase("quit")) {
session.closeNow();
} else {
Date date = new Date();
session.write(date.toString());
System.out.println("Message written...");
}
}
复制代码
client 的实现代码如下:
// 创建一个 IoConnector
IoConnector connector = new NioSocketConnector();
// 自定义 filter chain
connector.getFilterChain().addLast("logger", new LoggingFilter());
connector.getFilterChain()addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(StandardCharsets.UTF_8)));
// 设置 handler,ClientIoHandler 重写了 sessionOpened 方法,
// 当链接建立成功后,发送 Hello Server.. 到服务端
connector.setHandler(new ClientIoHandler("Hello Server.."));
// MINA 基于异步实现
ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", 444));
future.awaitUninterruptibly();
if (future.isConnected()) {
IoSession session = future.getSession();
session.getConfig().setUseReadOperation(true);
session.getCloseFuture().awaitUninterruptibly();
System.out.println("After Writing");
connector.dispose();
}
复制代码
完整的代码示例可以参考我的 gitee 。
05-Netty API 实现 client / server
Netty 是一个更著名的项目,它提供了异步事件驱动的网络应用框架和工具,以便用户快速的开发高性能、高可维护性、高扩展性的网络应用。
Netty 不仅仅是一个框架,它还是一个工具集
它提供更统一、便捷的 Byte Buffer 操作 API
它提供了统一的网络通讯接口 API,可在不同的通讯层协议实现上迁移
它提供了可扩展的、事件驱动模型
基于 Netty 实现 server 的代码如下:
// Server 端需要两个 事件循环组,boss 组和 worker 组
// boss 负责接收链接,并将链接交给 worker 处理
final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 帮助类,创建一个 Server
final ServerBootstrap bootstrap = new ServerBootstrap();
// 设置事件循环组
bootstrap.group(bossGroup, workerGroup)
// 指定 Channel 类型,此处使用 Socket 通讯
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 在 pipeline 中添加 handler
socketChannel.pipeline().addLast(new TimeServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 同步监听
ChannelFuture f = bootstrap.bind(8888).sync();
// 同步关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
复制代码
添加到 pipeline 中的 handler,会在消息到来后回调对应的函数:
class TimeServerHandler extends ChannelInboundHandlerAdapter {
/**
* 链接建立后,调用此回调
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
assert f == channelFuture;
ctx.close();
}
});
}
/**
* 异常时,调用此回调
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
复制代码
client 端代码实现如下:
// client 只需要一个事件循环组
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
// 类型与服务端的互相匹配
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
final ChannelFuture f = bootstrap.connect("127.0.0.1", 8888).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
复制代码
handler 的实现为:
class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
try {
long time = (byteBuf.readUnsignedInt() - 2208988800L) * 2000L;
System.out.println(new Date(time));
ctx.close();
} finally {
byteBuf.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
复制代码
06-总结
本文介绍了 Java 中几种创建 client / server 应用的方法。根据应用场景的不同,选择合适的方法。
当并发量不是很大或对性能要求不高时,建议选用基于 BIO 的实现。因为实现容易,代码逻辑也更直观。
当并发量(或者说性能)是主要考虑的因素时,推荐使用基于 Netty API 实现。但是这样也有个缺点,就是 Netty 相对学习成本较高。
评论