写点什么

netty 系列之:kequeue 传输协议详解

作者:程序那些事
  • 2022 年 5 月 20 日
  • 本文字数:5676 字

    阅读完需:约 19 分钟

netty系列之:kequeue传输协议详解

简介

在前面的章节中,我们介绍了在 netty 中可以使用 kequeue 或者 epoll 来实现更为高效的 native 传输方式。那么 kequeue 和 epoll 和 NIO 传输协议有什么不同呢?


本章将会以 kequeue 为例进行深入探讨。


在上面我们介绍的 native 的例子中,关于 kqueue 的类有这样几个,分别是 KQueueEventLoopGroup,KQueueServerSocketChannel 和 KQueueSocketChannel,通过简单的替换和添加对应的依赖包,我们可以轻松的将普通的 NIO netty 服务替换成为 native 的 Kqueue 服务。


是时候揭开 Kqueue 的秘密了。

KQueueEventLoopGroup

eventLoop 和 eventLoopGroup 是用来接受 event 和事件处理的。先来看下 KQueueEventLoopGroup 的定义:


public final class KQueueEventLoopGroup extends MultithreadEventLoopGroup
复制代码


作为一个 MultithreadEventLoopGroup,必须实现一个 newChild 方法,用来创建 child EventLoop。在 KQueueEventLoopGroup 中,除了构造函数之外,额外需要实现的方法就是 newChild:


    protected EventLoop newChild(Executor executor, Object... args) throws Exception {        Integer maxEvents = (Integer) args[0];        SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];        RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];        EventLoopTaskQueueFactory taskQueueFactory = null;        EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length; if (argsLength > 3) { taskQueueFactory = (EventLoopTaskQueueFactory) args[3]; } if (argsLength > 4) { tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4]; } return new KQueueEventLoop(this, executor, maxEvents, selectStrategyFactory.newSelectStrategy(), rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory); }
复制代码


newChild 中的所有参数都是从 KQueueEventLoopGroup 的构造函数中传入的。除了 maxEvents,selectStrategyFactory 和 rejectedExecutionHandler 之外,还可以接收 taskQueueFactory 和 tailTaskQueueFactory 两个参数,最后把这些参数都传到 KQueueEventLoop 的构造函数中去,最终返回一个 KQueueEventLoop 对象。


另外在使用 KQueueEventLoopGroup 之前我们还需要确保 Kqueue 在系统中是可用的,这个判断是通过调用KQueue.ensureAvailability();来实现的。


KQueue.ensureAvailability 首先判断是否定义了系统属性 io.netty.transport.noNative,如果定了,说明 native transport 被禁用了,后续也就没有必要再进行判断了。


如果 io.netty.transport.noNative 没有被定义,那么会调用Native.newKQueue()来尝试从 native 中获取一个 kqueue 的 FileDescriptor,如果上述的获取过程中没有任何异常,则说明 kqueue 在 native 方法中存在,我们可以继续使用了。


以下是判断 kqueue 是否可用的代码:


    static {        Throwable cause = null;        if (SystemPropertyUtil.getBoolean("io.netty.transport.noNative", false)) {            cause = new UnsupportedOperationException(                    "Native transport was explicit disabled with -Dio.netty.transport.noNative=true");        } else {            FileDescriptor kqueueFd = null;            try {                kqueueFd = Native.newKQueue();            } catch (Throwable t) {                cause = t;            } finally {                if (kqueueFd != null) {                    try {                        kqueueFd.close();                    } catch (Exception ignore) {                        // ignore                    }                }            }        }        UNAVAILABILITY_CAUSE = cause;    }
复制代码

KQueueEventLoop

KQueueEventLoop 是从 KQueueEventLoopGroup 中创建出来的,用来执行具体的 IO 任务。


先来看一下 KQueueEventLoop 的定义:


final class KQueueEventLoop extends SingleThreadEventLoop 
复制代码


不管是 NIO 还是 KQueue 或者是 Epoll,因为使用了更加高级的 IO 技术,所以他们使用的 EventLoop 都是 SingleThreadEventLoop,也就是说使用单线程就够了。


和 KQueueEventLoopGroup 一样,KQueueEventLoop 也需要判断当前的系统环境是否支持 kqueue:


    static {        KQueue.ensureAvailability();    }
复制代码


上一节讲到了,KQueueEventLoopGroup 会调用 KQueueEventLoop 的构造函数来返回一个 eventLoop 对象, 我们先来看下 KQueueEventLoop 的构造函数:


    KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,                    SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,                    EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {        super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),                rejectedExecutionHandler);        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");        this.kqueueFd = Native.newKQueue();        if (maxEvents == 0) {            allowGrowing = true;            maxEvents = 4096;        } else {            allowGrowing = false;        }        this.changeList = new KQueueEventArray(maxEvents);        this.eventList = new KQueueEventArray(maxEvents);        int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);        if (result < 0) {            cleanup();            throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));        }    }
复制代码


传入的 maxEvents 表示的是这个 KQueueEventLoop 能够接受的最大的 event 个数。如果 maxEvents=0,则表示 KQueueEventLoop 的 event 容量可以动态扩展,并且最大值是 4096。否则的话,KQueueEventLoop 的 event 容量不能扩展。


maxEvents 是作为数组的大小用来构建 changeList 和 eventList。


KQueueEventLoop 中还定义了一个 map 叫做 channels,用来保存注册的 channels:


private final IntObjectMap<AbstractKQueueChannel> channels = new IntObjectHashMap<AbstractKQueueChannel>(4096);
复制代码


来看一下 channel 的 add 和 remote 方法:


    void add(AbstractKQueueChannel ch) {        assert inEventLoop();        AbstractKQueueChannel old = channels.put(ch.fd().intValue(), ch);        assert old == null || !old.isOpen();    }
void remove(AbstractKQueueChannel ch) throws Exception { assert inEventLoop(); int fd = ch.fd().intValue(); AbstractKQueueChannel old = channels.remove(fd); if (old != null && old != ch) { channels.put(fd, old); assert !ch.isOpen(); } else if (ch.isOpen()) { ch.unregisterFilters(); } }
复制代码


可以看到添加和删除的都是 AbstractKQueueChannel,后面的章节中我们会详细讲解 KQueueChannel,这里我们只需要知道 channel map 中的 key 是 kequeue 中特有的 FileDescriptor 的 int 值。


再来看一下 EventLoop 中最重要的 run 方法:


   protected void run() {        for (;;) {            try {                int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());                switch (strategy) {                    case SelectStrategy.CONTINUE:                        continue;
case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1); if (wakenUp == 1) { wakeup(); } default: }
final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { if (strategy > 0) { processReady(strategy); } } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime();
try { if (strategy > 0) { processReady(strategy); } } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); }
复制代码


它的逻辑是先使用 selectStrategy.calculateStrategy 获取当前的 select strategy,然后根据 strategy 的值来判断是否需要执行 processReady 方法,最后执行 runAllTasks,从 task queue 中拿到要执行的任务去执行。


selectStrategy.calculateStrategy 用来判断当前的 select 状态,默认情况下有三个状态,分别是:SELECT,CONTINUE,BUSY_WAIT。 这三个状态都是负数:


    int SELECT = -1;
int CONTINUE = -2;
int BUSY_WAIT = -3;
复制代码


分别表示当前的 IO 在 slect 的 block 状态,或者跳过当前 IO 的状态,和正在 IO loop pull 的状态。BUSY_WAIT 是一个非阻塞的 IO PULL,kqueue 并不支持,所以会 fallback 到 SELECT。


除了这三个状态之外,calculateStrategy 还会返回一个正值,表示当前要执行的任务的个数。


在 run 方法中,如果 strategy 的结果是 SELECT,那么最终会调用 Native.keventWait 方法返回当前 ready 的 events 个数,并且将 ready 的 event 放到 KQueueEventArray 的 eventList 中去。


如果 ready 的 event 个数大于零,则会调用 processReady 方法对这些 event 进行状态回调处理。


怎么处理的呢?下面是处理的核心逻辑:


            AbstractKQueueChannel channel = channels.get(fd);
AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) channel.unsafe();
if (filter == Native.EVFILT_WRITE) { unsafe.writeReady(); } else if (filter == Native.EVFILT_READ) { unsafe.readReady(eventList.data(i)); } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) { unsafe.readEOF(); }
复制代码


这里的 fd 是从 eventList 中读取到的:


final int fd = eventList.fd(i);
复制代码


根据 eventList 的 fd,我们可以从 channels 中拿到对应的 KQueueChannel,然后根据 event 的 filter 状态来决定 KQueueChannel 的具体操作,是 writeReady,readReady 或者 readEOF。


最后就是执行 runAllTasks 方法了,runAllTasks 的逻辑很简单,就是从 taskQueue 中读取任务然后执行。

KQueueServerSocketChannel 和 KQueueSocketChannel

KQueueServerSocketChannel 是用在 server 端的 channel:


public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel implements ServerSocketChannel {
复制代码


KQueueServerSocketChannel 继承自 AbstractKQueueServerChannel,除了构造函数之外,最重要的一个方法就是 newChildChannel:


    @Override    protected Channel newChildChannel(int fd, byte[] address, int offset, int len) throws Exception {        return new KQueueSocketChannel(this, new BsdSocket(fd), address(address, offset, len));    }
复制代码


这个方法用来创建一个新的 child channel。从上面的代码中,我们可以看到生成的 child channel 是一个 KQueueSocketChannel 的实例。


它的构造函数接受三个参数,分别是 parent channel,BsdSocket 和 InetSocketAddress。


    KQueueSocketChannel(Channel parent, BsdSocket fd, InetSocketAddress remoteAddress) {        super(parent, fd, remoteAddress);        config = new KQueueSocketChannelConfig(this);    }
复制代码


这里的 fd 是 socket accept acceptedAddress 的结果:


int acceptFd = socket.accept(acceptedAddress);
复制代码


下面是 KQueueSocketChannel 的定义:


public final class KQueueSocketChannel extends AbstractKQueueStreamChannel implements SocketChannel {
复制代码


KQueueSocketChannel 和 KQueueServerSocketChannel 的关系是父子的关系,在 KQueueSocketChannel 中有一个 parent 方法,用来返回 ServerSocketChannel 对象,这也是前面提到的 newChildChannel 方法中传入 KQueueSocketChannel 构造函数中的 serverChannel:


public ServerSocketChannel parent() {        return (ServerSocketChannel) super.parent();    }
复制代码


KQueueSocketChannel 还有一个特性就是支持 tcp fastopen,它的本质是调用 BsdSocket 的 connectx 方法,在建立连接的同时传递数据:


int bytesSent = socket.connectx(                                (InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true);
复制代码

总结

以上就是 KqueueEventLoop 和 KqueueSocketChannel 的详细介绍,基本上和 NIO 没有太大的区别,只不过性能根据优秀。


更多内容请参考 http://www.flydean.com/53-1-netty-kqueue-transport/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

发布于: 2022 年 05 月 20 日阅读数: 22
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:kequeue传输协议详解_Java_程序那些事_InfoQ写作社区