linux AIO 简史
在继续 io_uring 的旅行之前,让我们先回顾一下 linux 中的各种异步 IO,也就是 AIO。
1. glibc aio
官方地址:Perform I/O Operations in Parallel(官方文档用的字眼比较考究)
glibc 是 GNU 发布的 libc 库,该库提供的异步 IO 被称为 glibc aio,在某些地方也被称为 posix aio。glibc aio 用多线程同步 IO 来模拟异步 IO,回调函数在一个单线程中执行。
该实现备受非议,存在一些难以忍受的缺陷和 bug,极不推荐使用。详见:http://davmac.org/davpage/linux/async-io.html
2. libaio
linux kernel 2.6 版本引入了原生异步 IO 支持 —— libaio,也被称为 native aio。
ibaio 与 glibc aio 的多线程伪异步不同,它真正的内核异步通知,是真正的异步 IO。
虽然很真了,但是缺陷也很明显:libaio 仅支持 O_DIRECT 标志,也就是 Direct I/O,这意味着无法利用系统缓存,同时读写的的大小和偏移要以区块的方式对齐。
3. libeio
由于上面两个都不靠谱,所以 Marc Lehmann 又开发了一个 AIO 库 —— libeio。
与 glibc aio 的思路一样,也是在用户空间用多线程同步模拟异步 IO,但是 libeio 实现的更高效,代码也更稳定,著名的 node.js 早期版本就是用 libev 和 libeio 驱动的(新版本在 libuv 中移除了 libev 和 libeio)。
libeio 提供全套异步文件操作的接口,让用户能写出完全非阻塞的程序,但 libeio 也不属于真正的异步 IO。
libeio 项目地址:https://github.com/kindy/libeio
4. io_uring
接下来就是 linux kernel 5.1 版本引入的 io_uring 了。
io_uring 类似于 Windows 世界的 IOCP,但是还没有达到对应的地位,目前来看正式使用 io_uring 的产品基本没有,我感觉还是没有一个成熟的编程模型与其匹配,就像 Netty 的编程模型特别适配 epoll。
至于 Netty 对 io_uring 的封装,看下来的总体感受是:Netty 为了维持编程模型统一,完全没有发挥出 io_uring 的长处。具体 Netty 是如何封装的,我们下面一起探讨一下。
Netty 对 io_uring 的封装
1. 使用方式
public class EchoIOUringServer {
private static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
public static void main(String []args) {
EventLoopGroup bossGroup = new IOUringEventLoopGroup(1);
EventLoopGroup workerGroup = new IOUringEventLoopGroup(1);
final EchoIOUringServerHandler serverHandler = new EchoIOUringServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(IOUringServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
复制代码
从 Netty 官方给的这个例子来看,io_uring 的使用方式与 epoll 一样,初步来看线程模型也是一样的,也是分了 bossGroup 和 workerGroup 两个 EventLoopGroup,从名字猜测 bossGroup 还是处理连接创建,workerGroup 还是处理网络读写。
io_uring 的具体逻辑都封装在了 IOUringEventLoopGroup 和 IOUringServerSocketChannel 中。
2. IOUringEventLoopGroup
Netty 的线程模型此处不再赘述,详见《Netty in Action》的第七章。
我们先看一下 IOUringEventLoop 构造方法:
IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize, int iosqeAsyncThreshold,
RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
// Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
IOUring.ensureAvailability();
ringBuffer = Native.createRingBuffer(ringSize, iosqeAsyncThreshold);
eventfd = Native.newBlockingEventFd();
logger.trace("New EventLoop: {}", this.toString());
}
复制代码
可见每个事件循环处理线程都创建了一个 io_uring ringBuffer,另外还有一个用来通知事件的文件描述符 eventfd。
深入 Native.createRingBuffer(ringSize, iosqeAsyncThreshold) 看一下:ringSize 默认值为 4096,iosqeAsyncThreshold 默认为 25
static RingBuffer createRingBuffer(int ringSize, int iosqeAsyncThreshold) {
long[][] values = ioUringSetup(ringSize);
assert values.length == 2;
long[] submissionQueueArgs = values[0];
assert submissionQueueArgs.length == 11;
IOUringSubmissionQueue submissionQueue = new IOUringSubmissionQueue(
submissionQueueArgs[0],
submissionQueueArgs[1],
submissionQueueArgs[2],
submissionQueueArgs[3],
submissionQueueArgs[4],
submissionQueueArgs[5],
submissionQueueArgs[6],
submissionQueueArgs[7],
(int) submissionQueueArgs[8],
submissionQueueArgs[9],
(int) submissionQueueArgs[10],
iosqeAsyncThreshold);
long[] completionQueueArgs = values[1];
assert completionQueueArgs.length == 9;
IOUringCompletionQueue completionQueue = new IOUringCompletionQueue(
completionQueueArgs[0],
completionQueueArgs[1],
completionQueueArgs[2],
completionQueueArgs[3],
completionQueueArgs[4],
completionQueueArgs[5],
(int) completionQueueArgs[6],
completionQueueArgs[7],
(int) completionQueueArgs[8]);
return new RingBuffer(submissionQueue, completionQueue);
}
Netty 的这个 RingBuffer 封装基本上与 io_uring 的结构一一对应。
再深入看一下 io_uring_setup 的 JNI 封装,发现 Netty 当前的实现并没设置任何 flag,使用默认模式,也就是通过 io_uring_enter 提交任务。该模式倒是与 Netty 的线程模型很匹配,如果要支持 SQPOLL 可能需要较大改动。
复制代码
Netty 的这个 RingBuffer 封装基本上与 io_uring 的结构一一对应。
再深入看一下 io_uring_setup 的 JNI 封装,发现 Netty 当前的实现并没设置任何 flag,使用默认模式,也就是通过 io_uring_enter 提交任务。该模式倒是与 Netty 的线程模型很匹配,如果要支持 SQPOLL 可能需要较大改动。
回过头来再看一下 IOUringEventLoop 的事件循环:
@Override
protected void run() {
final IOUringCompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
final IOUringSubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
// Lets add the eventfd related events before starting to do any real work.
addEventFdRead(submissionQueue);
for (;;) {
try {
logger.trace("Run IOUringEventLoop {}", this);
// Prepare to block wait
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
// Only submit a timeout if there are no tasks to process and do a blocking operation
// on the completionQueue.
try {
if (!hasTasks()) {
if (curDeadlineNanos != prevDeadlineNanos) {
prevDeadlineNanos = curDeadlineNanos;
submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos), (short) 0);
}
// Check there were any completion events to process
if (!completionQueue.hasCompletions()) {
// Block if there is nothing to process after this try again to call process(....)
logger.trace("submitAndWait {}", this);
submissionQueue.submitAndWait();
}
}
} finally {
if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
pendingWakeup = true;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Avoid blocking for as long as possible - loop until available work exhausted
boolean maybeMoreWork = true;
do {
try {
// CQE processing can produce tasks, and new CQEs could arrive while
// processing tasks. So run both on every iteration and break when
// they both report that nothing was done (| means always run both).
maybeMoreWork = completionQueue.process(callback) != 0 | runAllTasks();
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
if (!maybeMoreWork) {
maybeMoreWork = hasTasks() || completionQueue.hasCompletions();
}
}
} catch (Throwable t) {
handleLoopException(t);
}
} while (maybeMoreWork);
}
}
复制代码
先交代两个非主干逻辑的细节:
addEventFdRead(submissionQueue) 将 eventfd 的读操作提交 io_uring,其作用主要用于唤醒事件循环线程。由于 submissionQueue.submitAndWait() 这一步是阻塞的,想要唤醒事件循环,向 eventfd 执行一个写操作即可。
submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos), (short) 0) 用于处理延迟执行的任务,可以暂且忽略。
搞清楚上述两个细节,主干流程就很清晰了:
submissionQueue.submitAndWait() 提交任务,等待至少一个任务完成;
completionQueue.process(callback) 处理已经完成的任务,回调方法也就是 void handle(int fd, int res, int flags, byte op, short data);
最后就是向 submissionQueue 添加任务。(具体任务类型和调用时机下面分析)
3. IOUringServerSocketChannel
Netty 对于 ServerSocket 的通用处理流程这里也不赘述了。
在 IOUringServerSocketChannel 注册后回调 channelActive()时会触发了 beginRead 方法:
@Override
protected void doBeginRead() {
if ((ioState & POLL_IN_SCHEDULED) == 0) {
ioUringUnsafe().schedulePollIn();
}
}
final void schedulePollIn() {
assert (ioState & POLL_IN_SCHEDULED) == 0;
if (!isActive() || shouldBreakIoUringInReady(config())) {
return;
}
ioState |= POLL_IN_SCHEDULED;
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollIn(socket.intValue());
}
复制代码
也就是向 io_uring 提交了一个 POLLIN 任务,在有新连接进来时 bossGroup 就能收到触发事件(注意此处不是”连接建立“的任务)。
final void pollIn(int res) {
ioState &= ~POLL_IN_SCHEDULED;
if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
return;
}
scheduleFirstReadIfNeeded();
}
@Override
protected int scheduleRead0() {
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.attemptedBytesRead(1);
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addAccept(fd().intValue(),
acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, (short) 0);
return 1;
}
复制代码
POLLIN 任务完成后会再提交一个 IORING_OP_ACCEPT 任务,也就是”连接建立“的任务。
等连接建立完成就能收到回调:
@Override
protected void readComplete0(int res, int data, int outstanding) {
final IOUringRecvByteAllocatorHandle allocHandle =
(IOUringRecvByteAllocatorHandle) unsafe()
.recvBufAllocHandle();
final ChannelPipeline pipeline = pipeline();
allocHandle.lastBytesRead(res);
if (res >= 0) {
allocHandle.incMessagesRead(1);
try {
Channel channel = newChildChannel(
res, acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress);
pipeline.fireChannelRead(channel);
if (allocHandle.continueReading()) {
scheduleRead();
} else {
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
} catch (Throwable cause) {
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
}
} else {
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
// Check if we did fail because there was nothing to accept atm.
if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
// Something bad happened. Convert to an exception.
pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
}
}
}
复制代码
创建连接:
@Override
Channel newChildChannel(int fd, long acceptedAddressMemoryAddress, long acceptedAddressLengthMemoryAddress) {
final InetSocketAddress address;
if (socket.isIpv6()) {
byte[] ipv6Array = ((IOUringEventLoop) eventLoop()).inet6AddressArray();
byte[] ipv4Array = ((IOUringEventLoop) eventLoop()).inet4AddressArray();
address = SockaddrIn.readIPv6(acceptedAddressMemoryAddress, ipv6Array, ipv4Array);
} else {
byte[] addressArray = ((IOUringEventLoop) eventLoop()).inet4AddressArray();
address = SockaddrIn.readIPv4(acceptedAddressMemoryAddress, addressArray);
}
return new IOUringSocketChannel(this, new LinuxSocket(fd), address);
}
复制代码
4. IOUringSocketChannel
新连接注册到 workerGroup 执行,在注册时也会触发 beginRead 方法,也是向 io_uring 提交了一个 POLLIN 任务,在任务完成时 workerGroup 执行回调。
@Override
protected int scheduleRead0() {
assert readBuffer == null;
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
ByteBuf byteBuf = allocHandle.allocate(alloc());
IOUringSubmissionQueue submissionQueue = submissionQueue();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
readBuffer = byteBuf;
submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(),
byteBuf.writerIndex(), byteBuf.capacity(), (short) 0);
return 1;
}
复制代码
这里才会提交一个 IORING_OP_READ 任务,也就是”读取数据“的任务。
@Override
protected void readComplete0(int res, int data, int outstanding) {
boolean close = false;
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
final ChannelPipeline pipeline = pipeline();
ByteBuf byteBuf = this.readBuffer;
this.readBuffer = null;
assert byteBuf != null;
try {
if (res < 0) {
// If res is negative we should pass it to ioResult(...) which will either throw
// or convert it to 0 if we could not read because the socket was not readable.
allocHandle.lastBytesRead(ioResult("io_uring read", res));
} else if (res > 0) {
byteBuf.writerIndex(byteBuf.writerIndex() + res);
allocHandle.lastBytesRead(res);
} else {
// EOF which we signal with -1.
allocHandle.lastBytesRead(-1);
}
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
shutdownInput(false);
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
return;
}
allocHandle.incMessagesRead(1);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (allocHandle.continueReading()) {
// Let's schedule another read.
scheduleRead();
} else {
// We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
}
}
复制代码
原来的 epoll 模型是,epoll_wait 等待就绪事件,然后执行相关的 IO 系统调用;现在是先提交一个 POLLIN 任务,有响应时再提交一个具体的 IO 任务。虽然系统调用的次数是少了,但是多了一轮异步任务,这样操作性能会提升么?
5. iosqeAsyncThreshold 的作用
最后再看一下 iosqeAsyncThreshold 这个阈值的作用。
private int flags() {
return numHandledFds < iosqeAsyncThreshold ? 0 : Native.IOSQE_ASYNC;
}
复制代码
如果连接数大于该阈值,那么在提交任务时会设置上 IOSQE_ASYNC 标志。如果设置了该标志,那么该任务会直接被放入 io-wq 队列;如果没有设置,那么 io_uring 会先用非阻塞模式尝试执行一次 SQE 中包含的操作。举个例子:执行 io_read 时,如果数据已经在 page cache 里面,非阻塞模式的 io_read 操作就会成功。如果成功,则直接返回。如果不成功,放入 io-wq 中。
这个标志对性能的影响不太容易下定论,对于网络 IO 这种场景,如果大部分数据已经在 cache 里了,那么再强制放入 io-wq 队列,对于时延和吞吐量应该会有负面影响。
最后测试一下
申请了一台阿里云的虚拟机,安装的是 Ubuntu 20.04.2 LTS,内核升级了一下 Linux 5.12.13-051213-generic。
直接用 Netty 官方实现的 Redis 编解码,用 redis-benchmark 测试一下:
io_uring
socket.yuan@xxx-iouring-test-001:~$ redis-benchmark -c 10 -p 8080 -n 20000000 -t set -d 1024
====== SET ======
20000000 requests completed in 229.39 seconds
10 parallel clients
1024 bytes payload
keep alive: 1
100.00% <= 1 milliseconds
100.00% <= 3 milliseconds
100.00% <= 5 milliseconds
100.00% <= 5 milliseconds
87188.90 requests per second
复制代码
epoll
socket.yuan@xxx-iouring-test-001:~$ redis-benchmark -c 10 -p 8080 -n 20000000 -t set -d 1024
====== SET ======
20000000 requests completed in 200.70 seconds
10 parallel clients
1024 bytes payload
keep alive: 1
99.99% <= 1 milliseconds
100.00% <= 2 milliseconds
100.00% <= 3 milliseconds
100.00% <= 5 milliseconds
100.00% <= 5 milliseconds
99653.21 requests per second
复制代码
连接数设为 10,请求负载 1KB,测试结果来看 io_uring 相比于 epoll 吞吐量下降了 12.5%
调大连接数,超过 iosqeAsyncThreshold 阈值再试一下:
io_uring
socket.yuan@xxx-iouring-test-001:~$ redis-benchmark -c 50 -p 8080 -n 20000000 -t set -d 1024
====== SET ======
20000000 requests completed in 293.22 seconds
50 parallel clients
1024 bytes payload
keep alive: 1
99.77% <= 1 milliseconds
99.98% <= 2 milliseconds
99.99% <= 3 milliseconds
100.00% <= 4 milliseconds
100.00% <= 5 milliseconds
100.00% <= 6 milliseconds
100.00% <= 7 milliseconds
100.00% <= 8 milliseconds
100.00% <= 9 milliseconds
100.00% <= 10 milliseconds
100.00% <= 12 milliseconds
100.00% <= 13 milliseconds
100.00% <= 14 milliseconds
100.00% <= 15 milliseconds
100.00% <= 16 milliseconds
100.00% <= 18 milliseconds
100.00% <= 23 milliseconds
100.00% <= 26 milliseconds
100.00% <= 27 milliseconds
100.00% <= 32 milliseconds
100.00% <= 32 milliseconds
68208.17 requests per second
复制代码
epoll
socket.yuan@xxx-iouring-test-001:~$ redis-benchmark -c 50 -p 8080 -n 20000000 -t set -d 1024
====== SET ======
20000000 requests completed in 190.76 seconds
50 parallel clients
1024 bytes payload
keep alive: 1
99.54% <= 1 milliseconds
99.91% <= 2 milliseconds
99.98% <= 3 milliseconds
100.00% <= 4 milliseconds
100.00% <= 8 milliseconds
100.00% <= 8 milliseconds
104846.53 requests per second
复制代码
io_uring 相比于 epoll 吞吐量下降了 34.9%,并且 io_uring 的 CPU 消耗是 epoll 的两倍(主要是内核处理 io-wq 的消耗)。
io_uring 在原理上有很大的优势,但是要用好还是要付出很大努力,像 Netty 这样的封装看起来是不行的。
评论