写点什么

如何评价 Netty 封装的 io_uring?

用户头像
BUG侦探
关注
发布于: 刚刚
如何评价Netty封装的io_uring?

linux AIO 简史


在继续 io_uring 的旅行之前,让我们先回顾一下 linux 中的各种异步 IO,也就是 AIO。

1. glibc aio

官方地址:Perform I/O Operations in Parallel(官方文档用的字眼比较考究)

glibc 是 GNU 发布的 libc 库,该库提供的异步 IO 被称为 glibc aio,在某些地方也被称为 posix aio。glibc aio 用多线程同步 IO 来模拟异步 IO,回调函数在一个单线程中执行。

该实现备受非议,存在一些难以忍受的缺陷和 bug,极不推荐使用。详见:http://davmac.org/davpage/linux/async-io.html

2. libaio

linux kernel 2.6 版本引入了原生异步 IO 支持 —— libaio,也被称为 native aio。

ibaio 与 glibc aio 的多线程伪异步不同,它真正的内核异步通知,是真正的异步 IO。

虽然很真了,但是缺陷也很明显:libaio 仅支持 O_DIRECT 标志,也就是 Direct I/O,这意味着无法利用系统缓存,同时读写的的大小和偏移要以区块的方式对齐。

3. libeio

由于上面两个都不靠谱,所以 Marc Lehmann 又开发了一个 AIO 库 —— libeio。

与 glibc aio 的思路一样,也是在用户空间用多线程同步模拟异步 IO,但是 libeio 实现的更高效,代码也更稳定,著名的 node.js 早期版本就是用 libev 和 libeio 驱动的(新版本在 libuv 中移除了 libev 和 libeio)。

libeio 提供全套异步文件操作的接口,让用户能写出完全非阻塞的程序,但 libeio 也不属于真正的异步 IO。

libeio 项目地址:https://github.com/kindy/libeio

4. io_uring

接下来就是 linux kernel 5.1 版本引入的 io_uring 了。

io_uring 类似于 Windows 世界的 IOCP,但是还没有达到对应的地位,目前来看正式使用 io_uring 的产品基本没有,我感觉还是没有一个成熟的编程模型与其匹配,就像 Netty 的编程模型特别适配 epoll。

至于 Netty 对 io_uring 的封装,看下来的总体感受是:Netty 为了维持编程模型统一,完全没有发挥出 io_uring 的长处。具体 Netty 是如何封装的,我们下面一起探讨一下。


Netty 对 io_uring 的封装


1. 使用方式

public class EchoIOUringServer {    private static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
public static void main(String []args) { EventLoopGroup bossGroup = new IOUringEventLoopGroup(1); EventLoopGroup workerGroup = new IOUringEventLoopGroup(1); final EchoIOUringServerHandler serverHandler = new EchoIOUringServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(IOUringServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } });
// Start the server. ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed. f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
复制代码


从 Netty 官方给的这个例子来看,io_uring 的使用方式与 epoll 一样,初步来看线程模型也是一样的,也是分了 bossGroup 和 workerGroup 两个 EventLoopGroup,从名字猜测 bossGroup 还是处理连接创建,workerGroup 还是处理网络读写。

io_uring 的具体逻辑都封装在了 IOUringEventLoopGroup 和 IOUringServerSocketChannel 中。

2. IOUringEventLoopGroup

Netty 的线程模型此处不再赘述,详见《Netty in Action》的第七章。

我们先看一下 IOUringEventLoop 构造方法:

    IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize, int iosqeAsyncThreshold,                     RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),                rejectedExecutionHandler);        // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray        IOUring.ensureAvailability();
ringBuffer = Native.createRingBuffer(ringSize, iosqeAsyncThreshold);
eventfd = Native.newBlockingEventFd(); logger.trace("New EventLoop: {}", this.toString()); }
复制代码


可见每个事件循环处理线程都创建了一个 io_uring ringBuffer,另外还有一个用来通知事件的文件描述符 eventfd。

深入 Native.createRingBuffer(ringSize, iosqeAsyncThreshold) 看一下:ringSize 默认值为 4096,iosqeAsyncThreshold 默认为 25


    static RingBuffer createRingBuffer(int ringSize, int iosqeAsyncThreshold) {        long[][] values = ioUringSetup(ringSize);        assert values.length == 2;        long[] submissionQueueArgs = values[0];        assert submissionQueueArgs.length == 11;        IOUringSubmissionQueue submissionQueue = new IOUringSubmissionQueue(                submissionQueueArgs[0],                submissionQueueArgs[1],                submissionQueueArgs[2],                submissionQueueArgs[3],                submissionQueueArgs[4],                submissionQueueArgs[5],                submissionQueueArgs[6],                submissionQueueArgs[7],                (int) submissionQueueArgs[8],                submissionQueueArgs[9],                (int) submissionQueueArgs[10],                iosqeAsyncThreshold);        long[] completionQueueArgs = values[1];        assert completionQueueArgs.length == 9;        IOUringCompletionQueue completionQueue = new IOUringCompletionQueue(                completionQueueArgs[0],                completionQueueArgs[1],                completionQueueArgs[2],                completionQueueArgs[3],                completionQueueArgs[4],                completionQueueArgs[5],                (int) completionQueueArgs[6],                completionQueueArgs[7],                (int) completionQueueArgs[8]);        return new RingBuffer(submissionQueue, completionQueue);    }Netty 的这个 RingBuffer 封装基本上与 io_uring 的结构一一对应。
再深入看一下 io_uring_setup 的 JNI 封装,发现 Netty 当前的实现并没设置任何 flag,使用默认模式,也就是通过 io_uring_enter 提交任务。该模式倒是与 Netty 的线程模型很匹配,如果要支持 SQPOLL 可能需要较大改动。

复制代码


Netty 的这个 RingBuffer 封装基本上与 io_uring 的结构一一对应。

再深入看一下 io_uring_setup 的 JNI 封装,发现 Netty 当前的实现并没设置任何 flag,使用默认模式,也就是通过 io_uring_enter 提交任务。该模式倒是与 Netty 的线程模型很匹配,如果要支持 SQPOLL 可能需要较大改动。

回过头来再看一下 IOUringEventLoop 的事件循环:


    @Override    protected void run() {        final IOUringCompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();        final IOUringSubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
// Lets add the eventfd related events before starting to do any real work. addEventFdRead(submissionQueue);
for (;;) { try { logger.trace("Run IOUringEventLoop {}", this);
// Prepare to block wait long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos);
// Only submit a timeout if there are no tasks to process and do a blocking operation // on the completionQueue. try { if (!hasTasks()) { if (curDeadlineNanos != prevDeadlineNanos) { prevDeadlineNanos = curDeadlineNanos; submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos), (short) 0); }
// Check there were any completion events to process if (!completionQueue.hasCompletions()) { // Block if there is nothing to process after this try again to call process(....) logger.trace("submitAndWait {}", this); submissionQueue.submitAndWait(); } } } finally { if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) { pendingWakeup = true; } } } catch (Throwable t) { handleLoopException(t); }
// Avoid blocking for as long as possible - loop until available work exhausted boolean maybeMoreWork = true; do { try { // CQE processing can produce tasks, and new CQEs could arrive while // processing tasks. So run both on every iteration and break when // they both report that nothing was done (| means always run both). maybeMoreWork = completionQueue.process(callback) != 0 | runAllTasks(); } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } if (!maybeMoreWork) { maybeMoreWork = hasTasks() || completionQueue.hasCompletions(); } } } catch (Throwable t) { handleLoopException(t); } } while (maybeMoreWork); } }
复制代码


先交代两个非主干逻辑的细节:

  1. addEventFdRead(submissionQueue) 将 eventfd 的读操作提交 io_uring,其作用主要用于唤醒事件循环线程。由于 submissionQueue.submitAndWait() 这一步是阻塞的,想要唤醒事件循环,向 eventfd 执行一个写操作即可。

  2. submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos), (short) 0) 用于处理延迟执行的任务,可以暂且忽略。

搞清楚上述两个细节,主干流程就很清晰了:

  1. submissionQueue.submitAndWait() 提交任务,等待至少一个任务完成;

  2. completionQueue.process(callback) 处理已经完成的任务,回调方法也就是 void handle(int fd, int res, int flags, byte op, short data);

  3. 最后就是向 submissionQueue 添加任务。(具体任务类型和调用时机下面分析)

3. IOUringServerSocketChannel

Netty 对于 ServerSocket 的通用处理流程这里也不赘述了。

在 IOUringServerSocketChannel 注册后回调 channelActive()时会触发了 beginRead 方法:


@Overrideprotected void doBeginRead() {    if ((ioState & POLL_IN_SCHEDULED) == 0) {        ioUringUnsafe().schedulePollIn();    }}
final void schedulePollIn() { assert (ioState & POLL_IN_SCHEDULED) == 0; if (!isActive() || shouldBreakIoUringInReady(config())) { return; } ioState |= POLL_IN_SCHEDULED; IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addPollIn(socket.intValue()); }
复制代码


也就是向 io_uring 提交了一个 POLLIN 任务,在有新连接进来时 bossGroup 就能收到触发事件(注意此处不是”连接建立“的任务)。


    final void pollIn(int res) {        ioState &= ~POLL_IN_SCHEDULED;
if (res == Native.ERRNO_ECANCELED_NEGATIVE) { return; }
scheduleFirstReadIfNeeded(); }
@Override protected int scheduleRead0() { final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); allocHandle.attemptedBytesRead(1);
IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addAccept(fd().intValue(), acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, (short) 0); return 1; }
复制代码


POLLIN 任务完成后会再提交一个 IORING_OP_ACCEPT 任务,也就是”连接建立“的任务。

等连接建立完成就能收到回调:


        @Override        protected void readComplete0(int res, int data, int outstanding) {            final IOUringRecvByteAllocatorHandle allocHandle =                    (IOUringRecvByteAllocatorHandle) unsafe()                            .recvBufAllocHandle();            final ChannelPipeline pipeline = pipeline();            allocHandle.lastBytesRead(res);
if (res >= 0) { allocHandle.incMessagesRead(1); try { Channel channel = newChildChannel( res, acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress); pipeline.fireChannelRead(channel); if (allocHandle.continueReading()) { scheduleRead(); } else { allocHandle.readComplete(); pipeline.fireChannelReadComplete(); } } catch (Throwable cause) { allocHandle.readComplete(); pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(cause); } } else { allocHandle.readComplete(); pipeline.fireChannelReadComplete(); // Check if we did fail because there was nothing to accept atm. if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) { // Something bad happened. Convert to an exception. pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res)); } } }
复制代码


创建连接:


@OverrideChannel newChildChannel(int fd, long acceptedAddressMemoryAddress, long acceptedAddressLengthMemoryAddress) {    final InetSocketAddress address;    if (socket.isIpv6()) {        byte[] ipv6Array = ((IOUringEventLoop) eventLoop()).inet6AddressArray();        byte[] ipv4Array = ((IOUringEventLoop) eventLoop()).inet4AddressArray();        address = SockaddrIn.readIPv6(acceptedAddressMemoryAddress, ipv6Array, ipv4Array);    } else {        byte[] addressArray = ((IOUringEventLoop) eventLoop()).inet4AddressArray();        address = SockaddrIn.readIPv4(acceptedAddressMemoryAddress, addressArray);    }    return new IOUringSocketChannel(this, new LinuxSocket(fd), address);}
复制代码


4. IOUringSocketChannel

新连接注册到 workerGroup 执行,在注册时也会触发 beginRead 方法,也是向 io_uring 提交了一个 POLLIN 任务,在任务完成时 workerGroup 执行回调。


        @Override        protected int scheduleRead0() {            assert readBuffer == null;
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); ByteBuf byteBuf = allocHandle.allocate(alloc()); IOUringSubmissionQueue submissionQueue = submissionQueue(); allocHandle.attemptedBytesRead(byteBuf.writableBytes());
readBuffer = byteBuf;
submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(), byteBuf.writerIndex(), byteBuf.capacity(), (short) 0); return 1; }
复制代码


这里才会提交一个 IORING_OP_READ 任务,也就是”读取数据“的任务。


        @Override        protected void readComplete0(int res, int data, int outstanding) {            boolean close = false;
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); final ChannelPipeline pipeline = pipeline(); ByteBuf byteBuf = this.readBuffer; this.readBuffer = null; assert byteBuf != null;
try { if (res < 0) { // If res is negative we should pass it to ioResult(...) which will either throw // or convert it to 0 if we could not read because the socket was not readable. allocHandle.lastBytesRead(ioResult("io_uring read", res)); } else if (res > 0) { byteBuf.writerIndex(byteBuf.writerIndex() + res); allocHandle.lastBytesRead(res); } else { // EOF which we signal with -1. allocHandle.lastBytesRead(-1); } if (allocHandle.lastBytesRead() <= 0) { // nothing was read, release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. shutdownInput(false); } allocHandle.readComplete(); pipeline.fireChannelReadComplete(); return; }
allocHandle.incMessagesRead(1); pipeline.fireChannelRead(byteBuf); byteBuf = null; if (allocHandle.continueReading()) { // Let's schedule another read. scheduleRead(); } else { // We did not fill the whole ByteBuf so we should break the "read loop" and try again later. allocHandle.readComplete(); pipeline.fireChannelReadComplete(); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } }
复制代码


原来的 epoll 模型是,epoll_wait 等待就绪事件,然后执行相关的 IO 系统调用;现在是先提交一个 POLLIN 任务,有响应时再提交一个具体的 IO 任务。虽然系统调用的次数是少了,但是多了一轮异步任务,这样操作性能会提升么?

5. iosqeAsyncThreshold 的作用

最后再看一下 iosqeAsyncThreshold 这个阈值的作用。


    private int flags() {        return numHandledFds < iosqeAsyncThreshold ? 0 : Native.IOSQE_ASYNC;    }
复制代码


如果连接数大于该阈值,那么在提交任务时会设置上 IOSQE_ASYNC 标志。如果设置了该标志,那么该任务会直接被放入 io-wq 队列;如果没有设置,那么 io_uring 会先用非阻塞模式尝试执行一次 SQE 中包含的操作。举个例子:执行 io_read 时,如果数据已经在 page cache 里面,非阻塞模式的 io_read 操作就会成功。如果成功,则直接返回。如果不成功,放入 io-wq 中。



这个标志对性能的影响不太容易下定论,对于网络 IO 这种场景,如果大部分数据已经在 cache 里了,那么再强制放入 io-wq 队列,对于时延和吞吐量应该会有负面影响。

最后测试一下


申请了一台阿里云的虚拟机,安装的是 Ubuntu 20.04.2 LTS,内核升级了一下 Linux 5.12.13-051213-generic。

直接用 Netty 官方实现的 Redis 编解码,用 redis-benchmark 测试一下:

io_uring

socket.yuan@xxx-iouring-test-001:~$ redis-benchmark -c 10 -p 8080 -n 20000000 -t set -d 1024====== SET ======  20000000 requests completed in 229.39 seconds  10 parallel clients  1024 bytes payload  keep alive: 1
100.00% <= 1 milliseconds100.00% <= 3 milliseconds100.00% <= 5 milliseconds100.00% <= 5 milliseconds87188.90 requests per second
复制代码


epoll


socket.yuan@xxx-iouring-test-001:~$ redis-benchmark -c 10 -p 8080 -n 20000000 -t set -d 1024====== SET ======  20000000 requests completed in 200.70 seconds  10 parallel clients  1024 bytes payload  keep alive: 1
99.99% <= 1 milliseconds100.00% <= 2 milliseconds100.00% <= 3 milliseconds100.00% <= 5 milliseconds100.00% <= 5 milliseconds99653.21 requests per second
复制代码


连接数设为 10,请求负载 1KB,测试结果来看 io_uring 相比于 epoll 吞吐量下降了 12.5%

调大连接数,超过 iosqeAsyncThreshold 阈值再试一下:

io_uring


socket.yuan@xxx-iouring-test-001:~$ redis-benchmark -c 50 -p 8080 -n 20000000 -t set -d 1024====== SET ======  20000000 requests completed in 293.22 seconds  50 parallel clients  1024 bytes payload  keep alive: 1
99.77% <= 1 milliseconds99.98% <= 2 milliseconds99.99% <= 3 milliseconds100.00% <= 4 milliseconds100.00% <= 5 milliseconds100.00% <= 6 milliseconds100.00% <= 7 milliseconds100.00% <= 8 milliseconds100.00% <= 9 milliseconds100.00% <= 10 milliseconds100.00% <= 12 milliseconds100.00% <= 13 milliseconds100.00% <= 14 milliseconds100.00% <= 15 milliseconds100.00% <= 16 milliseconds100.00% <= 18 milliseconds100.00% <= 23 milliseconds100.00% <= 26 milliseconds100.00% <= 27 milliseconds100.00% <= 32 milliseconds100.00% <= 32 milliseconds68208.17 requests per second
复制代码


epoll


socket.yuan@xxx-iouring-test-001:~$ redis-benchmark -c 50 -p 8080 -n 20000000 -t set -d 1024====== SET ======  20000000 requests completed in 190.76 seconds  50 parallel clients  1024 bytes payload  keep alive: 1
99.54% <= 1 milliseconds99.91% <= 2 milliseconds99.98% <= 3 milliseconds100.00% <= 4 milliseconds100.00% <= 8 milliseconds100.00% <= 8 milliseconds104846.53 requests per second
复制代码


io_uring 相比于 epoll 吞吐量下降了 34.9%,并且 io_uring 的 CPU 消耗是 epoll 的两倍(主要是内核处理 io-wq 的消耗)。

io_uring 在原理上有很大的优势,但是要用好还是要付出很大努力,像 Netty 这样的封装看起来是不行的。







发布于: 刚刚阅读数: 2
用户头像

BUG侦探

关注

还未添加个人签名 2021.06.08 加入

专注于发掘程序员/工程师的有趣灵魂,对工作中的思路与总结进行闪光播报。

评论

发布
暂无评论
如何评价Netty封装的io_uring?