写点什么

Netty 源码—客户端接入流程

  • 2025-03-24
    福建
  • 本文字数:26952 字

    阅读完需:约 88 分钟

1.关于 Netty 客户端连接接入问题整理


一.Netty 是在哪里检测有新连接接入的?


答:boss 线程第一个过程轮询出 ACCEPT 事件,然后 boss 线程第二个过程通过 JDK 底层 Channel 的 accept()方法创建一条连接。

 

二.新连接是怎样注册到 NioEventLoop 线程的?


答:boss 线程调用 chooser 的 next()方法拿到一个 NioEventLoop,然后将新连接注册到 NioEventLoop 的 Selector 上。

 

2.Reactor 线程模型和服务端启动流程


(1)Netty 中的 Reactor 线程模型


Netty 中最核心的是两种类型的 Reactor 线程,这两种类型的 Reactor 线程可以看作 Netty 中的两组发动机,驱动着 Netty 整个框架的运转。一种类型是 boss 线程,专门用来接收新连接,然后将连接封装成 Channel 对象传递给 worker 线程。另一种类型是 worker 线程,专门用来处理连接上的数据读写。

 

boss 线程和 worker 线程所做的事情均分为 3 步。第一是轮询注册在 Selector 上的 IO 事件,第二是处理 IO 事件,第三是执行异步任务。对 boss 线程来说,第一步轮询出来的基本都是 ACCEPT 事件,表示有新的连接。对 worker 线程来说,第一步轮询出来的基本都是 READ 事件或 WRITE 事件,表示网络的读写。

 

(2)服务端启动流程


服务端是在用户线程中开启的,通过 ServerBootstrap.bind()方法,在第一次添加异步任务的时候启动 boss 线程。启动之后,当前服务器就可以开启监听。

 

3.Netty 新连接接入的整体处理逻辑


新连接接入的处理总体就是:检测新连接 + 注册 Reactor 线程,具体就可以分为如下 4 个过程。

 

一.检测新连接


服务端 Channel 对应的 NioEventLoop 会轮询该 Channel 绑定的 Selector 中是否发生了 ACCEPT 事件,如果是则说明有新连接接入了。

 

二.创建 NioSocketChannel


检测出新连接之后,便会基于 JDK NIO 的 Channel 创建出一个 NioSocketChannel,也就是客户端 Channel。

 

三.分配 worker 线程及注册 Selector


接着 Netty 给客户端 Channel 分配一个 NioEventLoop,也就是分配 worker 线程。然后把这个客户端 Channel 注册到这个 NioEventLoop 对应的 Selector 上,之后这个客户端 Channel 的读写事件都会由这个 NioEventLoop 进行处理。

 

四.向 Selector 注册读事件


最后向这个客户端 Channel 对应的 Selector 注册 READ 事件,注册的逻辑和服务端 Channel 启动时注册 ACCEPT 事件的一样。

 

4.新连接接入之检测新连接


(1)何时会检测到有新连接


当调用辅助启动类 ServerBootstrap 的 bind()方法启动服务端之后,服务端的 Channel 也就是 NioServerSocketChannel 就会注册到 boss 的 Reactor 线程上。boss 的 Reactor 线程会不断检测是否有新的事件,直到检测出有 ACCEPT 事件发生即有新连接接入。此时 boss 的 Reactor 线程将通过服务端 Channel 的 unsafe 变量来进行实际操作。

 

注意:服务端 Channel 的 unsafe 变量是一个 NioMessageUnsafe 对象,客户端 Channel 的 unsafe 变量是一个 NioByteUnsafe 对象。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    Selector selector;    private SelectedSelectionKeySet selectedKeys;    private boolean needsToSelectAgain;    private int cancelledKeys;    ...        @Override    protected void run() {        for (;;) {            ...            //1.调用select()方法执行一次事件轮询            select(wakenUp.getAndSet(false));            if (wakenUp.get()) {                selector.wakeup();            }            ...            //2.处理产生IO事件的Channel            needsToSelectAgain = false;            processSelectedKeys();            ...            //3.执行外部线程放入TaskQueue的任务            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);        }    }        private void processSelectedKeys() {        if (selectedKeys != null) {            //selectedKeys.flip()会返回一个数组            processSelectedKeysOptimized(selectedKeys.flip());        } else {            processSelectedKeysPlain(selector.selectedKeys());        }    }        private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {        for (int i = 0;; i ++) {            //1.首先取出IO事件            final SelectionKey k = selectedKeys[i];            if (k == null) {                break;            }            selectedKeys[i] = null;//Help GC            //2.然后获取对应的Channel和处理该Channel            //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel            final Object a = k.attachment();            if (a instanceof AbstractNioChannel) {                //网络事件的处理                processSelectedKey(k, (AbstractNioChannel) a);            } else {                //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                processSelectedKey(k, task);            }            //3.最后判断是否应该再进行一次轮询            if (needsToSelectAgain) {                for (;;) {                    i++;                    if (selectedKeys[i] == null) {                        break;                    }                    selectedKeys[i] = null;                }                selectAgain();                //selectedKeys.flip()会返回一个数组                selectedKeys = this.selectedKeys.flip();                i = -1;            }        }    }        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();        if (!k.isValid()) {            final EventLoop eventLoop;            try {                eventLoop = ch.eventLoop();            } catch (Throwable ignored) {                //If the channel implementation throws an exception because there is no event loop,                 //we ignore this because we are only trying to determine if ch is registered to this event loop and thus has authority to close ch.                return;            }            //Only close ch if ch is still registerd to this EventLoop.             //ch could have deregistered from the event loop and thus the SelectionKey could be cancelled as part of the deregistration process,             //but the channel is still healthy and should not be closed.            if (eventLoop != this || eventLoop == null) {                return;            }            //close the channel if the key is not valid anymore            unsafe.close(unsafe.voidPromise());            return;        }
try { int readyOps = k.readyOps(); //We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise //the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { //remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); }
//Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { //Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
//Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop //boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入 //此时将调用Channel的unsafe变量来进行实际操作 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //进行新连接接入处理 unsafe.read(); if (!ch.isOpen()) { //Connection already closed - no need to handle write. return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } ...}
复制代码


(2)新连接接入的流程梳理


一.NioMessageUnsafe 的 read()方法说明


首先使用一条断言确保该 read()方法必须来自 Reactor 线程调用,然后获得 Channel 对应的 Pipeline 和 RecvByteBufAllocator.Handle。

 

接着调用 NioServerSocketChannel 的 doReadMessages()方法不断地读取新连接到 readBuf 容器。然后使用 for 循环处理 readBuf 容器里的新连接,也就是通过 pipeline.fireChannelRead()方法让每个新连接都经过一层服务端 Channel 的 Pipeline 逻辑处理,最后清理容器并执行 pipeline.fireChannelReadComplete()。


//AbstractNioChannel base class for Channels that operate on messages.public abstract class AbstractNioMessageChannel extends AbstractNioChannel {    ...    private final class NioMessageUnsafe extends AbstractNioUnsafe {        //临时存放读到的连接NioSocketChannel        private final List<Object> readBuf = new ArrayList<Object>();
@Override public void read() { //断言确保该read()方法必须来自Reactor线程调用 assert eventLoop().inEventLoop(); //获得Channel对应的Pipeline final ChannelPipeline pipeline = pipeline(); //获得Channel对应的RecvByteBufAllocator.Handle final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); do { //1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel //通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } } while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接 //2.设置并绑定NioSocketChannel int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } //3.清理容器并触发pipeline.fireChannelReadComplete() readBuf.clear(); pipeline.fireChannelReadComplete(); } } //Read messages into the given array and return the amount which was read. protected abstract int doReadMessages(List<Object> buf) throws Exception; ...}
复制代码


二.新连接接入的流程梳理


首先会从服务端 Channel 对应的 NioEventLoop 的 run()方法的第二个步骤处理 IO 事件开始。然后会调用服务端 Channel 的 unsafe 变量的 read()方法,也就是 NioMessageUnsafe 对象的 read()方法。

 

接着循环调用 NioServerSocketChannel 的 doReadMessages()方法来创建新连接对象 NioSocketChannel。其中创建新连接对象最核心的方法就是调用 JDK Channel 的 accept()方法来创建 JDK Channel。

 

与服务端启动一样,Netty 会把 JDK 底层 Channel 包装成 Netty 自定义的 NioSocketChannel。


NioEventLoop.processSelectedKeys(key, channel) //入口  NioMessageUnsafe.read() //新连接接入处理    NioServerSocketChannel.doReadMessages() //创建新连接对象NioSocketChannel      javaChannel.accept() //创建JDK Channel
复制代码


(3)新连接接入的总结


在服务端 Channel 对应的 NioEventLoop 的 run()方法的 processSelectedKeys()方法里,发现产生的 IO 事件是 ACCEPT 事件之后,会通过 JDK Channel 的 accept()方法取创建 JDK 的 Channel,并把它包装成 Netty 自定义的 NioSocketChannel。在这个过程中会通过一个 RecvByteBufAllocator.Handle 对象控制连接接入的速率,默认一次性读取 16 个连接。

 

5.新连接接入之创建 NioSocketChannel


(1)doReadMessages()方法相关说明


首先通过 javaChannel().accept()创建一个 JDK 的 Channel,即客户端 Channel。然后把服务端 Channel 和这个客户端 Channel 作为参数传入 NioSocketChannel 的构造方法中,从而把 JDK 的 Channel 封装成 Netty 自定义的 NioSocketChannel。最后把封装好的 NioSocketChannel 添加到一个 List 里,以便外层可以遍历 List 进行处理。


//A ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections.public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel {    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();    private final ServerSocketChannelConfig config;    ...        @Override    protected int doReadMessages(List<Object> buf) throws Exception {        //1.创建JDK的Channel        SocketChannel ch = javaChannel().accept();        //2.封装成Netty的Channel,即把服务端Channel和客户端Channel当作参数传递到NioSocketChannel的构造方法里        if (ch != null) {            //先创建一个NioSocketChannel对象,再添加到buf里            buf.add(new NioSocketChannel(this, ch));            return 1;        }        return 0;    }        //Create a new instance    public NioServerSocketChannel() {        //创建服务端Channel        this(newSocket(DEFAULT_SELECTOR_PROVIDER));    }        private static ServerSocketChannel newSocket(SelectorProvider provider) {        //创建服务端Channel        return provider.openServerSocketChannel();    }
//Create a new instance using the given ServerSocketChannel. public NioServerSocketChannel(ServerSocketChannel channel) { //创建服务端Channel,关注ACCEPT事件 super(null, channel, SelectionKey.OP_ACCEPT); //javaChannel().socket()会调用JDK Channel的socket()方法 config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } @Override protected ServerSocketChannel javaChannel() { //返回一个JDK的Channel -> ServerSocketChannel return (ServerSocketChannel) super.javaChannel(); } ...}
//AbstractNioChannel base class for Channels that operate on messages.public abstract class AbstractNioMessageChannel extends AbstractNioChannel { ... //创建服务端Channel protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); } @Override protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); } ...}
//SocketChannel which uses NIO selector based implementation.public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { private final SocketChannelConfig config; ... //Create a new instance //@param parent,the Channel which created this instance or null if it was created by the user //@param socket,the SocketChannel which will be used public NioSocketChannel(Channel parent, SocketChannel socket) { //创建客户端Channel super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } @Override protected SocketChannel javaChannel() { //返回一个JDK的Channel -> ServerSocketChannel return (SocketChannel) super.javaChannel(); } private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { super(channel, javaSocket); } ... } ...}
//The default SocketChannelConfig implementation.public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { protected final Socket javaSocket; //Creates a new instance. public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) { ... this.javaSocket = javaSocket; setTcpNoDelay(true);//禁止Nagle算法 ... } ...}
//AbstractNioChannel base class for Channels that operate on bytes.public abstract class AbstractNioByteChannel extends AbstractNioChannel { ... //Create a new instance //@param parent,the parent Channel by which this instance was created. May be null //@param ch,the underlying SelectableChannel on which it operates protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { //创建客户端Channel,关注READ事件 super(parent, ch, SelectionKey.OP_READ); } @Override protected AbstractNioUnsafe newUnsafe() { return new NioByteUnsafe(); } ...}
//Abstract base class for Channel implementations which use a Selector based approach.public abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch; protected final int readInterestOp; ... //Create a new instance //@param parent,the parent Channel by which this instance was created. May be null //@param ch,the underlying SelectableChannel on which it operates //@param readInterestOp,the ops to set to receive data from the SelectableChannel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; ch.configureBlocking(false); ... } protected SelectableChannel javaChannel() { return ch; } @Override public NioUnsafe unsafe() { return (NioUnsafe) super.unsafe(); } ...}
//A skeletal Channel implementation.public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final Channel parent; private final ChannelId id; private final Unsafe unsafe; private final DefaultChannelPipeline pipeline; ... //Creates a new instance. //@param parent,the parent of this channel. null if there's no parent. protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } //Returns a new DefaultChannelId instance. //Subclasses may override this method to assign custom ChannelIds to Channels that use the AbstractChannel#AbstractChannel(Channel) constructor. protected ChannelId newId() { return DefaultChannelId.newInstance(); } //Create a new AbstractUnsafe instance which will be used for the life-time of the Channel protected abstract AbstractUnsafe newUnsafe(); //Returns a new DefaultChannelPipeline instance. protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
@Override public Unsafe unsafe() { return unsafe; } @Override public ChannelPipeline pipeline() { return pipeline; } @Override public EventLoop eventLoop() { EventLoop eventLoop = this.eventLoop; if (eventLoop == null) throw new IllegalStateException("channel not registered to an event loop"); return eventLoop; } protected abstract class AbstractUnsafe implements Unsafe { @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... //绑定事件循环器,即绑定一个NioEventLoop到该Channel上 AbstractChannel.this.eventLoop = eventLoop; //注册Selector,并启动一个NioEventLoop if (eventLoop.inEventLoop()) { register0(promise); } else { ... //通过启动这个NioEventLoop线程来调用register0()方法将这个服务端Channel注册到Selector上 //其实执行的是SingleThreadEventExecutor的execute()方法 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); ... } } ... } ...}
复制代码


(2)创建 NioSocketChannel 的流程梳理


NioServerSocketChannel 和 NioSocketChannel 都有同一个父类 AbstractNioChannel,所以创建 NioSocketChannel 的模版和创建 NioServerSocketChannel 保持一致。

 

但要注意的是:客户端 Channel 是通过 new 关键字创建的,服务端 Channel 是通过反射的方式创建的。

 

此外,Nagle 算法会让小数据包尽量聚合成大的数据包再发送出去,Netty 为了使数据能够及时发送出去会禁止该算法。


new NioSocketChannel(p, ch) //入口,客户端Channel是通过new关键字创建的,服务端Channel是通过反射的方式创建的  new AbstractNioByteChannel(p, ch) //逐层调用父类的构造方法    new AbstractNioChannel(p, ch, op_read) //逐层调用父类的构造方法      ch.configureBlocking(false) + save op //配置此Channel为非阻塞,以及将感兴趣的读事件保存到成员变量以方便后续注册到Selector       new AbstractChannel() //创建Channel的相关组件:        newId() //id作为Channel的唯一标识        newUnsafe() //unsafe用来进行底层数据读写        newChannelPipeline() //pipeline作为业务逻辑载体  new NioSocketChannelConfig() //创建和NioSocketChannel绑定的配置类    setTcpNoDelay(true) //禁止Nagle算法
复制代码


(3)创建 NioSocketChannel 的总结


创建 NioSocketChannel 的逻辑可以分成两部分。

 

第一部分是逐层调用父类的构造方法,其中会设置这个客户端 Channel 的阻塞模式为 false,然后再把感兴趣的读事件 OP_READ 保存到这个 Channel 的成员变量中以便后续注册到 Selector,接着会创建一系列的组件,包括作为 Channel 唯一标识的 Id 组件、用来进行底层数据读写的 unsafe 组件、用来作为业务逻辑载体的 pipeline 组件。

 

第二部分是创建和这个客户端 Channel 相关的 config 对象,该 config 对象会设置关闭 Nagle 算法,从而让小数据包尽快发送出去、降低延时。

 

(4)Netty 中的 Channel 分类



说明一:

Channel 继承 Comparable 表示 Channel 是一个可以比较的对象。

 

说明二:

Channel 继承 AttributeMap 表示 Channel 是一个可以绑定属性的对象,我们经常在代码中使用 channel.attr(...)来给 Channel 绑定属性,其实就是把属性设置到 AttributeMap 中。

 

说明三:

AbstractChannel 用来实现 Channel 的大部分方法,在 AbstractChannel 的构造方法中会创建一个 Channel 对象所包含的基本组件,这里的 Channel 通常是指 SocketChannel 和 ServerSocketChannel。

 

说明四:

AbstractNioChannel 继承了 AbstractChannel,然后通过 Selector 处理一些 NIO 相关的操作。比如它会保存 JDK 底层 SelectableChannel 的引用,并且在构造方法中设置 Channel 为非阻塞模式。注意:设置非阻塞模式是 NIO 编程必须的。

 

说明五:

Netty 的两大 Channel 是指:服务端的 NioServerSocketChannel 和客户端 NioSocketChannel,分别对应着服务端接收新连接的过程和服务端新连接读写数据的过程。

 

说明六:

服务端 Channel 和客户端 Channel 的区别是:服务端 Channel 通过反射方式创建,客户端 Channel 通过 new 关键字创建。服务端 Channel 注册的是 ACCEPT 事件,对应接收新连接。客户端 Channel 注册的是 READ 事件,对应新连接读写。服务端 Channel 和客户端 Channel 底层都会依赖一个 unsafe 对象,这个 unsafe 对象会用来实现这两种 Channel 底层的数据读写操作。对于读操作,服务端的读是读一条连接 doReadMessages(),客户端的读是读取数据 doReadBytes()。最后每一个 Channel 都会绑定一个 ChannelConfig,每一个 ChannelConfig 都会实现 Channel 的一些配置。

 

6.新连接接入之绑定 NioEventLoop 线程


(1)将新连接绑定到 Reactor 线程的入口


创建完 NioSocketChannel 后,接下来便要对 NioSocketChannel 进行一些设置,并且需要将它绑定到一个正在执行的 Reactor 线程中。

 

NioMessageUnsafe.read()方法里的 readBuf 容器会承载着所有新建的连接,如果某个时刻 Netty 轮询到多个连接,那么通过使用 for 循环就可以批量处理这些 NioSocketChannel 连接。

 

处理每个 NioSocketChannel 连接时,是通过 NioServerSocketChannel 的 pipeline 的 fireChannelRead()方法来处理的。


//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop {    Selector selector;    private SelectedSelectionKeySet selectedKeys;    private boolean needsToSelectAgain;    private int cancelledKeys;    ...        @Override    protected void run() {        for (;;) {            ...            //1.调用select()方法执行一次事件轮询            select(wakenUp.getAndSet(false));            if (wakenUp.get()) {                selector.wakeup();            }            ...            //2.处理产生IO事件的Channel            needsToSelectAgain = false;            processSelectedKeys();            ...            //3.执行外部线程放入TaskQueue的任务            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);        }    }        private void processSelectedKeys() {        if (selectedKeys != null) {            //selectedKeys.flip()会返回一个数组            processSelectedKeysOptimized(selectedKeys.flip());        } else {            processSelectedKeysPlain(selector.selectedKeys());        }    }        private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {        for (int i = 0;; i ++) {            //1.首先取出IO事件            final SelectionKey k = selectedKeys[i];            if (k == null) {                break;            }            selectedKeys[i] = null;//Help GC            //2.然后获取对应的Channel和处理该Channel            //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel            final Object a = k.attachment();            if (a instanceof AbstractNioChannel) {                //网络事件的处理                processSelectedKey(k, (AbstractNioChannel) a);            } else {                //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                processSelectedKey(k, task);            }            //3.最后判断是否应该再进行一次轮询            if (needsToSelectAgain) {                for (;;) {                    i++;                    if (selectedKeys[i] == null) {                        break;                    }                    selectedKeys[i] = null;                }                selectAgain();                //selectedKeys.flip()会返回一个数组                selectedKeys = this.selectedKeys.flip();                i = -1;            }        }    }        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();        ...        try {            int readyOps = k.readyOps();            //We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise            //the NIO JDK channel implementation may throw a NotYetConnectedException.            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {                //remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking                int ops = k.interestOps();                ops &= ~SelectionKey.OP_CONNECT;                k.interestOps(ops);                unsafe.finishConnect();            }
//Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { //Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
//Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop //boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入 //此时将调用Channel的unsafe变量来进行实际操作 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //进行新连接接入处理 unsafe.read(); if (!ch.isOpen()) { //Connection already closed - no need to handle write. return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } ...}
//AbstractNioChannel base class for Channels that operate on messages.public abstract class AbstractNioMessageChannel extends AbstractNioChannel { ... private final class NioMessageUnsafe extends AbstractNioUnsafe { //临时存放读到的连接NioSocketChannel private final List<Object> readBuf = new ArrayList<Object>();
@Override public void read() { //断言确保该read()方法必须来自Reactor线程调用 assert eventLoop().inEventLoop(); //获得Channel对应的Pipeline final ChannelPipeline pipeline = pipeline(); //获得Channel对应的RecvByteBufAllocator.Handle final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); do { //1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel //通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } } while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接 //2.设置并绑定NioSocketChannel int size = readBuf.size(); for (int i = 0; i < size; i ++) { //调用DefaultChannelPipeline的fireChannelRead()方法 //开始处理每个NioSocketChannel连接 pipeline.fireChannelRead(readBuf.get(i)); } //3.清理容器并触发DefaultChannelPipeline的fireChannelReadComplete()方法 readBuf.clear(); //结束处理每个NioSocketChannel连接 pipeline.fireChannelReadComplete(); } } //Read messages into the given array and return the amount which was read. protected abstract int doReadMessages(List<Object> buf) throws Exception; ...}
复制代码


(2)服务端 Channel 的 Pipeline 介绍


在 Netty 的各种类型的 Channel 中,都会包含一个 Pipeline。Pipeline 可理解为一条流水线,流水线有起点有结束,中间还会有各种各样的流水线关卡。对 Channel 的处理会在流水线的起点开始,然后经过各个流水线关卡的加工,最后到达流水线的终点结束。

 

流水线 Pipeline 的开始是 HeadContext,结束是 TailContext。HeadContext 中会调用 Unsafe 进行具体的操作,TailContext 中会向用户抛出流水线 Pipeline 中未处理异常和未处理消息的警告。

 

在服务端的启动过程中,Netty 会给服务端 Channel 自动添加一个 Pipeline 处理器 ServerBootstrapAcceptor,并且会将用户代码中设置的一系列参数传入到这个 ServerBootstrapAcceptor 的构造方法中。

 

服务端 Channel 的 Pipeline 如下所示:



所以服务端 Channel 的 Pipeline 在传播 ChannelRead 事件时首先会从 HeadContext 处理器开始,然后传播到 ServerBootstrapAcceptor 处理器,最后传播到 TailContext 处理器结束。

 

(3)服务端 Channel 默认的 Pipeline 处理器


首先,服务端启动时会给服务端 Channel 的 Pipeline 添加一个 ServerBootstrapAcceptor 处理器。


//Bootstrap sub-class which allows easy bootstrap of ServerChannelpublic class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {    ...    @Override    void init(Channel channel) throws Exception {        //1.设置服务端Channel的Option与Attr        final Map<ChannelOption<?>, Object> options = options0();        synchronized (options) {            channel.config().setOptions(options);        }        final Map<AttributeKey<?>, Object> attrs = attrs0();        synchronized (attrs) {            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {                @SuppressWarnings("unchecked")                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();                channel.attr(key).set(e.getValue());            }        }               //2.设置客户端Channel的Option与Attr        final EventLoopGroup currentChildGroup = childGroup;        final ChannelHandler currentChildHandler = childHandler;        final Entry<ChannelOption<?>, Object>[] currentChildOptions;        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;        synchronized (childOptions) {            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));        }        synchronized (childAttrs) {            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));        }               //3.配置服务端启动逻辑        ChannelPipeline p = channel.pipeline();        //p.addLast()用于定义服务端启动过程中需要执行哪些逻辑        p.addLast(new ChannelInitializer<Channel>() {            @Override            public void initChannel(Channel ch) throws Exception {                //一.添加用户自定义的Handler,注意这是handler,而不是childHandler                final ChannelPipeline pipeline = ch.pipeline();                ChannelHandler handler = config.handler();                if (handler != null) pipeline.addLast(handler);                //二.添加一个特殊的Handler用于接收新连接                //自定义的childHandler会作为参数传入连接器ServerBootstrapAcceptor                ch.eventLoop().execute(new Runnable() {                    @Override                    public void run() {                        //调用DefaultChannelPipeline的addLast()方法                        pipeline.addLast(new ServerBootstrapAcceptor(                            currentChildGroup,                             currentChildHandler,                             currentChildOptions,                             currentChildAttrs)                        );                    }                });            }        });    }    ...}
//A skeletal Channel implementation.public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final DefaultChannelPipeline pipeline; ... //Creates a new instance. protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
@Override public ChannelPipeline pipeline() { return pipeline; } ...}
//The default ChannelPipeline implementation. //It is usually created by a Channel implementation when the Channel is created.public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ... protected DefaultChannelPipeline(Channel channel) { ... tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } @Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); }
@Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { ... for (ChannelHandler h: handlers) { if (h == null) break; addLast(executor, null, h); } return this; } @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); ... } ... } //往Pipeline中添加ChannelHandler处理器 private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; } ...}
复制代码


然后,新连接接入调用到服务端 Channel 的 Pipeline 的 fireChannelRead()方法时,便会触发调用 ServerBootstrapAcceptor 处理器的 channelRead()方法。最终会调用 NioEventLoop 的 register()方法注册这个新连接 Channel,即给新连接 Channel 绑定一个 Reactor 线程。


//The default ChannelPipeline implementation.  //It is usually created by a Channel implementation when the Channel is created.public class DefaultChannelPipeline implements ChannelPipeline {    final AbstractChannelHandlerContext head;    final AbstractChannelHandlerContext tail;    ...        protected DefaultChannelPipeline(Channel channel) {        ...        tail = new TailContext(this);        head = new HeadContext(this);        head.next = tail;        tail.prev = head;    }        @Override    public final ChannelPipeline fireChannelRead(Object msg) {        //从Pipeline的第一个HeadContext处理器开始调用        AbstractChannelHandlerContext.invokeChannelRead(head, msg);        return this;    }        final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {        ...        @Override        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {            //调用AbstractChannelHandlerContext的fireChannelRead()方法            ctx.fireChannelRead(msg);        }             @Override        public ChannelHandler handler() {            return this;        }        ...    }    ...}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { ... static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { //首先调用的是Pipeline的第一个处理器HeadContext的channelRead()方法 //注意:HeadContext继承了AbstractChannelHandlerContext ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } @Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; } private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { //注意:HeadContext继承了AbstractChannelHandlerContext //所以如果this是HeadContext,那么这里会获取下一个节点ServerBootstrapAcceptor ctx = ctx.next; } while (!ctx.inbound); return ctx; } ...}
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { ... private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry<ChannelOption<?>, Object>[] childOptions; private final Entry<AttributeKey<?>, Object>[] childAttrs; ... //channelRead()方法在新连接接入时被调用 @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //1.给新连接的Channel添加用户自定义的Handler处理器 //这里的childHandler其实是一个特殊的Handler: ChannelInitializer child.pipeline().addLast(childHandler); //2.设置ChannelOption,主要和TCP连接一些底层参数及Netty自身对一个连接的参数有关 for (Entry<ChannelOption<?>, Object> e: childOptions) { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } //3.设置新连接Channel的属性 for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } //4.绑定Reactor线程 //childGroup是一个NioEventLoopGroup,所以下面会调用其父类的register()方法 childGroup.register(child); } ... } ...}
// MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.public class NioEventLoopGroup extends MultithreadEventLoopGroup { ... ...}
//Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { ... @Override public ChannelFuture register(Channel channel) { //最终会调用NioEventLoop的register()方法注册这个新连接Channel return next().register(channel); } @Override public EventLoop next() { //获取一个NioEventLoop return (EventLoop) super.next(); } ...}
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { private final EventExecutorChooserFactory.EventExecutorChooser chooser; ... //Create a new instance. protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args); } //Create a new instance. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { ... children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { children[i] = newChild(executor, args); ... } //创建chooser chooser = chooserFactory.newChooser(children); ... } @Override public EventExecutor next() { //调用chooser的next()方法获得一个NioEventLoop return chooser.next(); } ...}
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTowEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } }
private static boolean isPowerOfTwo(int val) { return (val & -val) == val; }
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTowEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } }
private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } }}
复制代码


(4)服务端 Channel 处理新连接的步骤


ServerBootstrapAcceptor 处理新连接的步骤:

 

一.给客户端 Channel 添加 childHandler


给客户端 Channel 添加 childHandler 也就是将用户自定义的 childHandler 添加到新连接的 pipeline 里。

 

pipeline.fireChannelRead(NioSocketChannel)最终会调用到 ServerBootstrapAcceptor 的 channelRead()方法,而且这个 channelRead()方法一上来就会把入参的 msg 强制转换为 Channel。

 

拿到新连接的 Channel 后就可以拿到其对应的 Pipeline,这个 Pipeline 是在调用 AbstractChannel 构造方法时创建的。于是可以将用户代码中的 childHandler 添加到 Pipeline 中,而 childHandler 其实就是用户代码中的 ChannelInitializer。所以新连接 Channel 的 Pipeline 的构成是:Head -> ChannelInitializer -> Tail。

 

二.设置客户端 Channel 的 options 和 attr


所设置的 childOptions 和 childAttrs 也是在用户代码中设置的,这些设置项最终会传递到 ServerBootstrapAcceptor 的 channelRead()方法中进行具体设置。

 

三.选择 NioEventLoop 绑定客户端 Channel


childGroup.register(child)中的 childGroup 就是用户代码里创建的 workerNioEventLoopGroup。NioEventLoopGroup 的 register()方法会调用 next()由其父类通过线程选择器 chooser 返回一个 NioEventLoop。所以 childGroup.register(child)最终会调用到 NioEventLoop 的 register()方法,这和注册服务端 Channel 时调用 config().group().register(channel)一样。

 

(5)总结


服务端 Channel 在检测到新连接并且创建完客户端 Channel 后,会通过服务端 Channel 的 Pipeline 的一个处理器 ServerBootstrapAcceptor 做一些处理。这些处理包括:给客户端 Channel 的 Pipeline 添加 childHandler 处理器、设置客户端 Channel 的 options 和 attrs、调用线程选择器 chooser 选择一个 NioEventLoop 进行绑定。绑定时会将该客户端 Channel 注册到 NioEventLoop 的 Selector 上,此时还不会关心事件。

 

7.新连接接入之注册 Selector 和注册读事件


NioEventLoop 的 register()方法是由其父类 SingleThreadEventLoop 实现的,并最终调用到 AbstractChannel 的内部类 AbstractUnsafe 的 register0()方法。

 

步骤一:注册 Selector


和服务端启动过程一样,先调用 AbstractNioChannel 的 doRegister()方法进行注册。其中 javaChannel().register()会将新连接 NioSocketChannel 绑定到 Reactor 线程的 Selector 上,这样后续这个新连接 NioSocketChannel 所有的事件都由绑定的 Reactor 线程的 Selector 来轮询。

 

步骤二:配置自定义 Handler


此时新连接 NioSocketChannel 的 Pipeline 中有三个 Handler:Head -> ChannelInitializer -> Tail。invokeHandlerAddedIfNeeded()最终会调用 ChannelInitializer 的 handlerAdded()方法。

 

步骤三:传播 ChannelRegistered 事件


pipeline.fireChannelRegistered()会把新连接的注册事件从 HeadContext 开始往下传播,调用每一个 ChannelHandler 的 channelRegistered()方法。

 

步骤四:注册读事件


接着还会传播 ChannelActive 事件。传播完 ChannelActive 事件后,便会继续调用 HeadContetx 的 readIfIsAutoRead()方法注册读事件。由于创建 NioSocketChannel 时已将 SelectionKey.OP_READ 的事件代码保存到其成员变量中,所以 AbstractNioChannel 的 doBeginRead()方法,就可以将 SelectionKey.OP_READ 事件注册到 Selector 中完成读事件的注册。


//A skeletal Channel implementation.public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {    private final ChannelId id;    private final Unsafe unsafe;    private final DefaultChannelPipeline pipeline;    private volatile EventLoop eventLoop;    ...        //Creates a new instance.    protected AbstractChannel(Channel parent) {        this.parent = parent;        id = newId();        unsafe = newUnsafe();        pipeline = newChannelPipeline();    }        //Returns a new DefaultChannelPipeline instance.    protected DefaultChannelPipeline newChannelPipeline() {        return new DefaultChannelPipeline(this);    }
//Unsafe implementation which sub-classes must extend and use. protected abstract class AbstractUnsafe implements Unsafe { ... @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... //绑定事件循环器,即绑定一个NioEventLoop到该Channel上 AbstractChannel.this.eventLoop = eventLoop; //注册Selector,并启动一个NioEventLoop if (eventLoop.inEventLoop()) { register0(promise); } else { ... //通过启动这个NioEventLoop线程来调用register0()方法将这个Channel注册到Selector上 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); ... } }
private void register0(ChannelPromise promise) { ... boolean firstRegistration = this.neverRegistered; //1.调用JDK底层注册Channel到Selector上 doRegister(); this.neverRegistered = false; this.registered = true; //2.配置自定义Handler this.pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //3.传播channelRegisterd事件 this.pipeline.fireChannelRegistered(); //4.注册读事件 if (isActive()) { if (firstRegistration) { //会进入这个方法,传播完ChannelActive事件后,再注册读事件 this.pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } ... } @Override public final void beginRead() { ... //调用AbstractNioChannel实现的doBeginRead()方法 doBeginRead(); ... } ... } //Is called after the Channel is registered with its EventLoop as part of the register process. //Sub-classes may override this method protected void doRegister() throws Exception { // NOOP } //Schedule a read operation. protected abstract void doBeginRead() throws Exception; @Override public Channel read() { //调用DefaultChannelPipeline的read()方法 pipeline.read(); return this; } ...}
//Abstract base class for Channel implementations which use a Selector based approach.public abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch;//这是NIO中的Channel protected final int readInterestOp; volatile SelectionKey selectionKey; ... //Create a new instance //@param parent,the parent Channel by which this instance was created. May be null. //@param ch,he underlying SelectableChannel on which it operates //@param readInterestOp,the ops to set to receive data from the SelectableChannel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); //NioServerSocketChannel.newSocket()方法通过JDK底层创建的Channel对象会被缓存在其父类AbstractNioChannel的变量ch中 //可以通过NioServerSocketChannel.javaChannel()方法获取其父类AbstractNioChannel的变量ch this.ch = ch; this.readInterestOp = readInterestOp; ... //设置Channel对象为非阻塞模式 ch.configureBlocking(false); ... } @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { ... //首先获取前面创建的JDK底层NIO的Channel,然后调用JDK底层NIO的register()方法, //将this也就是NioServerSocketChannel对象当作attachment绑定到JDK的Selector上; //这样绑定是为了后续从Selector拿到对应的事件后,可以把Netty领域的Channel拿出来; //而且注册的ops值是0,表示此时还不关注任何事件; selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; ... } } protected SelectableChannel javaChannel() { return ch; } @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; }
readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { //将SelectionKey.OP_READ读事件注册到Selector上,表示这个客户端Channel可以处理读事件了 selectionKey.interestOps(interestOps | readInterestOp); } } ...}
//The default ChannelPipeline implementation. //It is usually created by a Channel implementation when the Channel is created.public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ... @Override public final ChannelPipeline fireChannelActive() { //调用HeadContext的channelActive()方法 AbstractChannelHandlerContext.invokeChannelActive(head); return this; } @Override public final ChannelPipeline read() { //从TailContext开始,最终会调用到HeadContext的read()方法 tail.read(); return this; } final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; ... @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive();//传播ChannelActive事件 readIfIsAutoRead(); } private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { //调用AbstractChannel的read()方法 channel.read(); } } @Override public void read(ChannelHandlerContext ctx) { //调用AbstractChannel.AbstractUnsafe的beginRead()方法 unsafe.beginRead(); } ... }}
复制代码


8.注册 Reactor 线程总结


一.首先当 boss Reactor 线程在检测到有 ACCEPT 事件之后,会创建 JDK 底层的 Channel。

 

二.然后使用一个 NioSocketChannel 包装 JDK 底层的 Channel,把用户设置的 ChannelOption、ChannelAttr、ChannelHandler 都设置到该 NioSocketChannel 中。

 

三.接着从 worker Reactor 线程组中,也就是 worker NioEventLoopGroup 中,通过线程选择器 chooser 选择一个 NioEventLoop 出来。

 

四.最后把 NioSocketChannel 包装的 JDK 底层 Channel 当作 key,自身 NioSocketChannel 当作 attachment,注册到 NioEventLoop 对应的 Selector 上。这样后续有读写事件发生时,就可以从底层 Channel 直接获得 attachment 即 NioSocketChannel 来进行读写数据的逻辑处理。

 

9.新连接接入总结


新连接接入整体可以分为两部分:一是检测新连接,二是注册 Reactor 线程。

 

一.首先在 Netty 服务端的 Channel(也就是 NioServerSocketChannel)绑定的 NioEventLoop(也就是 boss 线程)中,轮询到 ACCEPT 事件。

 

二.然后调用 JDK 的服务端 Channel 的 accept()方法获取一个 JDK 的客户端 Channel,并且将其封装成 Netty 的客户端 Channel(即 NioSocketChannel)。

 

三.封装过程中会创建这个 NioSocketChannel 一系列的组件,如 unsafe 组件和 pipeline 组件。unsafe 组件主要用于进行 Channel 的读写,pipeline 组件主要用于处理 Channel 数据的业务逻辑。

 

四.接着 Netty 服务端 Channel 的 Pipeline 的一个处理器 ServerBootstrapAcceptor,会给当前 Netty 客户端 Channel 分配一个 NioEventLoop 并将客户端 Channel 绑定到 Selector 上。

 

五.最后会传播 ChannelRegistered 事件和 ChannelActive 事件,并将客户端 Channel 的读事件注册到 Selector 上。

 

至此,新连接 NioSocketChannel 便可以开始正常读写数据了。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18788382

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Netty源码—客户端接入流程_Java_不在线第一只蜗牛_InfoQ写作社区