写点什么

Netty 之旅三:Netty 服务端启动源码分析,一梭子带走!

发布于: 2020 年 09 月 15 日

Netty 服务端启动流程源码分析



前记


哈喽,自从上篇《Netty 之旅二:口口相传的高性能 Netty 到底是什么?》后,迟迟两周才开启今天的 Netty 源码系列。源码分析的第一篇文章,将由<吴小飞>分享 《Netty 服务端启动流程源码分析》,下一篇会由<王猛>分享客户端的启动过程源码分析。通过源码的阅读,我们将会知道,Netty 服务端启动的调用链是非常长的,同时肯定也会发现一些新的问题,随着我们源码阅读的不断深入,相信这些问题我们也会一一攻破。


废话不多说,直接上号!


一、从`EchoServer `示例入手



<div align="center", style="color:#A8A8A8;font-size:14px">netty-example:EchoServer</div>


示例从哪里来?任何开源框架都会有自己的示例代码,Netty 源码也不例外,如模块netty-example中就包括了最常见的EchoServer示例,下面通过这个示例进入服务端启动流程篇章。


public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; }
// 1. 声明Main-Sub Reactor模式线程池:EventLoopGroup // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); // 创建 EchoServerHandler 对象 final EchoServerHandler serverHandler = new EchoServerHandler(); try { // 2. 声明服务端启动引导器,并设置相关属性 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } });
// 3. 绑定端口即启动服务端,并同步等待 // Start the server. ChannelFuture f = b.bind(PORT).sync();
// 4. 监听服务端关闭,并阻塞等待 // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // 5. 优雅地关闭两个EventLoopGroup线程池 // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
复制代码


  1. [代码行 18、19]声明 Main-Sub Reactor 模式线程池:EventLoopGroup


创建两个 EventLoopGroup 对象。其中,bossGroup用于服务端接受客户端的连接,workerGroup用于进行客户端的 SocketChannel 的数据读写。


(<u>关于EventLoopGroup不是本文重点所以在后续文章中进行分析</u>)


  1. [代码行 23-39]声明服务端启动引导器,并设置相关属性


AbstractBootstrap是一个帮助类,通过方法链(method chaining)的方式,提供了一个简单易用的方式来配置启动一个Channelio.netty.bootstrap.ServerBootstrap ,实现 AbstractBootstrap 抽象类,用于 Server 的启动器实现类。io.netty.bootstrap.Bootstrap ,实现 AbstractBootstrap 抽象类,用于 Client 的启动器实现类。如下类图所示:


[

](https://imgchr.com/i/wwvVKA)


<div align="center", style="color:#A8A8A8;font-size:14px">AbstractBootstrap 类继承</div>


(<u>在EchoServer示例代码中,我们看到 ServerBootstrap groupchanneloptionchildHandler 等属性链式设置都放到关于AbstractBootstrap体系代码中详细介绍。</u>)


  1. [代码行 43]绑定端口即启动服务端,并同步等待


先调用 #bind(int port) 方法,绑定端口,后调用 ChannelFuture#sync() 方法,阻塞等待成功。对于bind操作就是本文要详细介绍的"服务端启动流程"。


  1. [代码行 47]监听服务端关闭,并阻塞等待


先调用 #closeFuture() 方法,监听服务器关闭,后调用 ChannelFuture#sync() 方法,阻塞等待成功。 注意,此处不是关闭服务器,而是channel的监听关闭。


  1. [代码行 51、52]优雅地关闭两个EventLoopGroup线程池


finally代码块中执行说明服务端将最终关闭,所以调用 EventLoopGroup#shutdownGracefully() 方法,分别关闭两个 EventLoopGroup 对象,终止所有线程。


二、服务启动过程


在服务启动过程的源码分析之前,这里回顾一下我们在通过JDK NIO编程在服务端启动初始的代码:


 serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
复制代码


这 5 行代码标示一个最为熟悉的过程:


  • 打开serverSocketChannel

  • 配置非阻塞模式

  • channelsocket绑定监听端口

  • 创建Selector

  • serverSocketChannel注册到 selector


后面等分析完 Netty 的启动过程后,会对这些步骤有一个新的认识。在EchoServer示例中,进入 #bind(int port) 方法,AbstractBootstrap#bind()其实有多个方法,方便不同地址参数的传递,实际调用的方法是AbstractBootstrap#doBind(final SocketAddress localAddress) 方法,代码如下:


private ChannelFuture doBind(final SocketAddress localAddress) {        final ChannelFuture regFuture = initAndRegister();        final Channel channel = regFuture.channel();        if (regFuture.cause() != null) {            return regFuture;        }
if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered();
doBind0(regFuture, channel, localAddress, promise); } } }); return promise; }}
复制代码


  • [代码行 2] :调用 #initAndRegister() 方法,初始化并注册一个 Channel 对象。因为注册是异步的过程,所以返回一个 ChannelFuture 对象。详细解析,见 「initAndRegister()」。

  • [代码行 4-6]]:若发生异常,直接进行返回。

  • [代码行 9-34]:因为注册是异步的过程,有可能已完成,有可能未完成。所以实现代码分成了【第 10 至 14 行】和【第 15 至 36 行】分别处理已完成和未完成的情况。

- 核心在[第 11 、29 行],调用 #doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) 方法,绑定 Channel 的端口,并注册 Channel 到 SelectionKey 中。

- 如果异步注册对应的 ChanelFuture 未完成,则调用 ChannelFuture#addListener(ChannelFutureListener) 方法,添加监听器,在注册完成后,进行回调执行 #doBind0(...) 方法的逻辑。


通过doBind方法可以知道服务端启动流程大致如下几个步骤:


[

](https://imgchr.com/i/wwvl8g)


<div align="center", style="color:#A8A8A8;font-size:14px">服务端启动流程</div>


1. 创建 Channel



<div align="center", style="color:#A8A8A8;font-size:14px">创建服务端 channel</div>


#doBind(final SocketAddress localAddress)进入到initAndRegister():


 final ChannelFuture initAndRegister() {     Channel channel = null;     try {         channel = channelFactory.newChannel();         init(channel);     } catch (Throwable t) {         if (channel != null) {             // channel can be null if newChannel crashed (eg SocketException("too many open files"))             channel.unsafe().closeForcibly();             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);         }         // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor         return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);     }
ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } }
// If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread.
return regFuture;}
复制代码


[代码行 4]调用 ChannelFactory#newChannel() 方法,创建 Channel 对象。 ChannelFactory类继承如下:



<div align="center", style="color:#A8A8A8;font-size:14px">ChannelFactroy 类继承</div>


可以在ChannelFactory注释看到@deprecated Use {@link io.netty.channel.ChannelFactory} instead.,这里只是包名的调整,对于继承结构不变。netty默认使用ReflectiveChannelFactory,我们可以看到重载方法:


@Overridepublic T newChannel() {    try {        return constructor.newInstance();    } catch (Throwable t) {        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);    }}
复制代码


很明显,正如其名是通过反射机制构造Channel对象实例的。constructor是在其构造方法初始化的:this.constructor = clazz.getConstructor();这个clazz按理说应该是我们要创建的Channel的 Class 对象。那Class对象是什么呢?我们接着看channelFactory是怎么初始化的。


首先在AbstractBootstrap找到如下代码:


@Deprecatedpublic B channelFactory(ChannelFactory<? extends C> channelFactory) {    ObjectUtil.checkNotNull(channelFactory, "channelFactory");    if (this.channelFactory != null) {        throw new IllegalStateException("channelFactory set already");    }
this.channelFactory = channelFactory; return self();}
复制代码


调用这个方法的递推向上看到:


public B channel(Class<? extends C> channelClass) {    return channelFactory(new ReflectiveChannelFactory<C>(        ObjectUtil.checkNotNull(channelClass, "channelClass")    ));}
复制代码


这个方法正是在EchoServerServerBootstrap链式设置时调用.channel(NioServerSocketChannel.class)的方法。我们看到,channelClass就是NioServerSocketChannel.classchannelFactory也是以ReflectiveChannelFactory作为具体实例,并且将NioServerSocketChannel.class作为构造参数传递初始化的,所以这回答了反射机制构造的是io.netty.channel.socket.nio.NioServerSocketChannel对象。


继续看NioServerSocketChannel构造方法逻辑做了什么事情,看之前先给出NioServerSocketChannel类继承关系:



<div align="center", style="color:#A8A8A8;font-size:14px">Channel 类继承</div>


NioServerSocketChannelNioSocketChannel分别对应服务端和客户端,公共父类都是AbstractNioChannelAbstractChannel,下面介绍创建过程可以参照这个Channel类继承图。进入NioServerSocketChannel构造方法:


/**  * Create a new instance  */public NioServerSocketChannel() {    this(newSocket(DEFAULT_SELECTOR_PROVIDER));}
复制代码


点击newSocket进去:


private static ServerSocketChannel newSocket(SelectorProvider provider) {    try {        /**          *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in          *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.          *          *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.          */        return provider.openServerSocketChannel();    } catch (IOException e) {        throw new ChannelException(            "Failed to open a server socket.", e);    }}
复制代码


以上传进来的providerDEFAULT_SELECTOR_PROVIDER即默认的java.nio.channels.spi.SelectorProvider,[代码行 9]就是熟悉的jdk nio创建ServerSocketChannel。这样newSocket(DEFAULT_SELECTOR_PROVIDER)就返回了结果ServerSocketChannel,回到NioServerSocketChannel()#this()点进去:


/**  * Create a new instance using the given {@link ServerSocketChannel}.  */public NioServerSocketChannel(ServerSocketChannel channel) {    super(null, channel, SelectionKey.OP_ACCEPT);    config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
复制代码


以上super代表父类AbstractNioMessageChannel构造方法,点进去看到:


 /**   * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)   */protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {    super(parent, ch, readInterestOp);}
复制代码


以上super代表父类AbstractNioChannel构造方法,点进去看到:


 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {     super(parent);     this.ch = ch;     this.readInterestOp = readInterestOp;     try {         ch.configureBlocking(false);     } catch (IOException e) {         try {             ch.close();         } catch (IOException e2) {             if (logger.isWarnEnabled()) {                 logger.warn("Failed to close a partially initialized socket.", e2);             }         }
throw new ChannelException("Failed to enter non-blocking mode.", e); }}
复制代码


以上[代码行 3]将ServerSocketChannel保存到了AbstractNioChannel#ch成员变量,在上面提到的NioServerSocketChannel构造方法的[代码行 6]javaChannel()拿到的就是ch保存的ServerSocketChannel变量。


以上[代码行 6]就是熟悉的jdk nio编程设置ServerSocketChannel非阻塞方式。这里还有super父类构造方法,点击进去看到:


 protected AbstractChannel(Channel parent) {     this.parent = parent;     id = newId();     unsafe = newUnsafe();     pipeline = newChannelPipeline();}
复制代码


以上构造方法中:


  • parent 属性,代表父 Channel 对象。对于 NioServerSocketChannel parentnull

  • id 属性,Channel 编号对象。在构造方法中,通过调用 #newId() 方法进行创建。(<u>这里不细展开 Problem-1</u>)

  • unsafe 属性,Unsafe 对象。因为Channel 真正的具体操作,是通过调用对应的 Unsafe 对象实施。所以需要在构造方法中,通过调用 #newUnsafe() 方法进行创建。这里的 Unsafe 并不是我们常说的 jdk自带的sun.misc.Unsafe ,而是 io.netty.channel.Channel#Unsafe。(<u>这里不细展开 Problem-2</u>)

  • pipeline属性默认是DefaultChannelPipeline对象,赋值后在后面为 channel 绑定端口的时候会用到


通过以上创建channel源码过程分析,总结的流程时序图如下:



2. 初始化 Channel



<div align="center", style="color:#A8A8A8;font-size:14px">初始化 channel</div>


回到一开始创建ChannelinitAndRegister()入口方法,在创建Channel后紧接着init(channel)进入初始化流程,因为是服务端初始化,所以是ServerBootstrap#init(Channel channel),代码如下:


@Overridevoid init(Channel channel) throws Exception {    final Map<ChannelOption<?>, Object> options = options0();    synchronized (options) {        setChannelOptions(channel, options, logger);    }
final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } }
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); }
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); }
ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });}
复制代码


  • [代码 3 - 6 行]: options0()方法返回的options保存了用户在EchoServer中设置自定义的可选项集合,这样ServerBootstrap将配置的选项集合,设置到了 Channel 的可选项集合中。

  • [代码 8 - 15 行]: attrs0()方法返回的attrs保存了用户在EchoServer中设置自定义的属性集合,这样ServerBootstrap将配置的属性集合,设置到了 Channel 的属性集合中。

  • [代码 21-28 行]:通过局部变量currentChildOptionscurrentChildAttrs保存了用户自定义的childOptionschildAttrs,用于[代码 43 行] ServerBootstrapAcceptor 构造方法。

  • [代码 30-47]]:创建ChannelInitializer 对象,添加到 pipeline 中,用于后续初始化 ChannelHandler pipeline 中,包括用户在EchoServer配置的LoggingHandler和创建的创建 ServerBootstrapAcceptor 对象。

- [代码行 34-37]:添加启动器配置的 LoggingHandlerpipeline 中。

- [代码行 39-45]:创建 ServerBootstrapAcceptor 对象,添加到 pipeline 中。从名字上就可以看出来,ServerBootstrapAcceptor 也是一个 ChannelHandler 实现类,专门用于接受客户端的新连接请求,把新的请求扔给某个事件循环器,我们先不做过多分析。我们发现是使用EventLoop.execute 执行添加的过程,这是为什么呢?同样记录问题(<u>Problem-</u>3)

- 需要说明的是pipeline 在之前介绍 Netty 核心组件的时候提到是一个包含ChannelHandlerContext的双向链表,每一个context对于唯一一个ChannelHandler,这里初始化后,ChannelPipeline里就是如下一个结构:


<div align="center", style="color:#A8A8A8;font-size:14px">ChannelPipeline 内部结构</div>


3. 注册 Channel


<img src="https://s1.ax1x.com/2020/09/13/wwvXi8.png" alt="wwvXi8.png" style="zoom:80%;" />


<div align="center", style="color:#A8A8A8;font-size:14px">注册 channel</div>


初始化Channel一些基本配置和属性完毕后,回到一开始创建ChannelinitAndRegister()入口方法,在初始化Channel后紧接着[代码行 17] ChannelFuture regFuture = config().group().register(channel);明显这里是通过EventLoopGroup进入注册流程(EventLoopGroup体系将在后续文章讲解)


EchoServer中启动器同样通过ServerBootstrap#group()设置了NioEventLoopGroup,它继承自MultithreadEventLoopGroup,所以注册流程会进入MultithreadEventLoopGroup重载的register(Channel channel)方法,代码如下:


@Overridepublic ChannelFuture register(Channel channel) {    return next().register(channel);}
复制代码


这里会调用 next() 方法选择出来一个 EventLoop 来注册 Channel,里面实际上使用的是一个叫做 EventExecutorChooser 的东西来选择,它实际上又有两种实现方式 ——PowerOfTwoEventExecutorChooserGenericEventExecutorChooser,本质上就是从 EventExecutor 数组中选择一个 EventExecutor,我们这里就是 NioEventLoop,那么,它们有什么区别呢?(<u>Problem-4:在介绍EventLoopGroup体系的后续文章中将会详细讲解,这里简单地提一下,本质都是按数组长度取余数 ,不过,2 的 N 次方的形式更高效。</u>)


接着,来到 NioEventLoopregister(channel) 方法,你会不会问找不到该方法?提示NioEventLoop 继承SingleThreadEventLoop,所以父类方法:


@Overridepublic ChannelFuture register(Channel channel) {    return register(new DefaultChannelPromise(channel, this));}
@Overridepublic ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise;}
复制代码


可以看到,先创建了一个叫做 ChannelPromise 的东西,它是 ChannelFuture 的子类。[代码行 9]又调回了 ChannelUnsaferegister () 方法,这里第一个参数是 this,也就是 NioEventLoop,第二个参数是刚创建的 ChannelPromise


点击 AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise) 方法进去,代码如下:


 public final void register(EventLoop eventLoop, final ChannelPromise promise) {     if (eventLoop == null) {         throw new NullPointerException("eventLoop");     }     if (isRegistered()) {         promise.setFailure(new IllegalStateException("registered to an event loop already"));         return;     }     if (!isCompatible(eventLoop)) {         promise.setFailure(             new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));         return;     }
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }}
复制代码


[代码行 15]这行代码是设置 ChanneleventLoop 属性。这行前面的代码主要是在校验传入的 eventLoop 参数非空,校验是否有注册过以及校验 ChanneleventLoop 类型是否匹配。


[代码 18、24]接着,跟踪到 AbstractUnsafe#register0(ChannelPromise promise) 方法中:


private void register0(ChannelPromise promise) {    try {        // check if the channel is still open as it could be closed in the mean time when the register        // call was outside of the eventLoop        if (!promise.setUncancellable() || !ensureOpen(promise)) {            return;        }        boolean firstRegistration = neverRegistered;        doRegister();        neverRegistered = false;        registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}
复制代码


[代码行 9]进入 AbstractNioChannel#doRegister() 方法:


protected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            return;        } catch (CancelledKeyException e) {            if (!selected) {                // Force the Selector to select now as the "canceled" SelectionKey may still be                // cached and not removed because no Select.select(..) operation was called yet.                eventLoop().selectNow();                selected = true;            } else {                // We forced a select operation on the selector before but the SelectionKey is still cached                // for whatever reason. JDK bug ?                throw e;            }        }    }}
复制代码


[代码行 5]关键一行代码,将 Java 原生 NIO Selector 与 Java 原生 NIOChannel 对象(ServerSocketChannel) 绑定在一起,并将当前 Netty 的 Channel 通过 attachment 的形式绑定到 SelectionKey 上:


  • 调用 #unwrappedSelector() 方法,返回 Java 原生 NIO Selector 对象,而且每个 NioEventLoop Selector 唯一一对应。

  • 调用 SelectableChannel#register(Selector sel, int ops, Object att) 方法,注册 Java 原生NIOChannel 对象到 NIO Selector 对象上。


通过以上注册 channel 源码分析,总结流程的时序图如下:



4. 绑定端口



<div align="center", style="color:#A8A8A8;font-size:14px">绑定端口</div>


注册完Channel最后回到AbstractBootstrap#doBind() 方法,分析 Channel 的端口绑定逻辑。进入doBind0代码如下:


private static void doBind0(    final ChannelFuture regFuture, final Channel channel,    final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } });}
复制代码


  • [代码行 7]:在前面Channel 注册成功的条件下,调用 EventLoop执行 Channel 的端口绑定逻辑。但是,实际上当前线程已经是 EventLoop所在的线程了,为何还要这样操作呢?答案在【第 5 至 6 行】的英语注释,这里作为一个问题记着(<u>Problem-5</u>)。

  • [代码行 11]:进入AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise),同样立即异步返回并添加ChannelFutureListener.CLOSE_ON_FAILURE监听事件。

  • [代码行 13]:如果绑定端口之前的操作并没有成功,自然也就不能进行端口绑定操作了,通过 promise 记录异常原因。


AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)方法如下:


 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {        return pipeline.bind(localAddress, promise); }
复制代码


pipeline是之前创建channel的时候创建的DefaultChannelPipeline,进入该方法:


 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {        return tail.bind(localAddress, promise); }
复制代码


[在分析初始化流程的时候最后画一个DefaultChannelPipeline内部的结构,能够便于分析后面进入DefaultChannelPipeline一系列bind方法。]


首先,tail代表TailContext,进入AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)方法:


 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {     //省略部分代码     final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);     EventExecutor executor = next.executor();     if (executor.inEventLoop()) {         next.invokeBind(localAddress, promise);     } else {         safeExecute(executor, new Runnable() {             @Override             public void run() {                 next.invokeBind(localAddress, promise);             }         }, promise, null);     }     return promise;}
复制代码


[代码行 3]:findContextOutbound方法里主要是执行ctx = ctx.prev;那么得到的next就是绑定LoggingHandlercontext


[代码行 6]:进入invokeBind(localAddress, promise)方法并直接执行LoggingHandler#bind(this, localAddress, promise),进入后的方法如下:


 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {     if (logger.isEnabled(internalLevel)) {         logger.log(internalLevel, format(ctx, "BIND", localAddress));     }     ctx.bind(localAddress, promise); }
复制代码


设置了LoggingHandler的日志基本级别为默认的 INFO 后,进行绑定操作的信息打印。接着,继续循环到AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)方法执行ctx = ctx.prev取出HeadContext进入到 bind 方法:


 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {     unsafe.bind(localAddress, promise); }
复制代码


兜兜转转,最终跳出了pipeline轮回到AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise) 方法,Channel 的端口绑定逻辑。代码如下:


public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {    //此处有省略...    boolean wasActive = isActive();    try {        doBind(localAddress);    } catch (Throwable t) {        safeSetFailure(promise, t);        closeIfClosed();        return;    }    //此处有省略...}
复制代码


做实事方法doBind进入后如下:


@Overrideprotected void doBind(SocketAddress localAddress) throws Exception {    if (PlatformDependent.javaVersion() >= 7) {        javaChannel().bind(localAddress, config.getBacklog());    } else {        javaChannel().socket().bind(localAddress, config.getBacklog());    }}
复制代码


到了此处,服务端的 Java 原生 NIO ServerSocketChannel 终于绑定上了端口。


三、问题归纳


  • Problem-1: 创建Channel流程中AbstractChannel构造函数中为channel分配 ID 的算法如何实现?

  • Problem-2: AbstractChannel内部类AbstractUnsafe的作用?

  • Problem-3: 初始化channel流程中pipeline 添加ServerBootstrapAcceptor 是通过EventLoop.execute 执行添加的过程,这是为什么呢?

  • Problem-4:注册channel流程中PowerOfTwoEventExecutorChooserGenericEventExecutorChooser的区别和优化原理?

  • Problem-5:绑定端口流程中调用 EventLoop执行 Channel 的端口绑定逻辑。但是,实际上当前线程已经是 EventLoop所在的线程了,为何还要这样操作呢?


小结


通过对 Netty 服务端启动流程源码分析,我们发现了在使用NIO的模式下,服务端启动流程其实就是封装了JDK NIO编程在服务端启动的流程。只不过对原生JDK NIO进行了增强和优化,同时从架构设计上简化了服务端流程的编写。


最重要的是感谢彤哥、艿艿和俞超-闪电侠这些大佬前期的分享,能够让更多人学习源码的旅途少走很多弯路,谢谢!


Netty 服务端启动流程源码分析



前记


哈喽,自从上篇《Netty 之旅二:口口相传的高性能 Netty 到底是什么?》后,迟迟两周才开启今天的 Netty 源码系列。源码分析的第一篇文章,将由<吴小飞>分享 《Netty 服务端启动流程源码分析》,下一篇会由<王猛>分享客户端的启动过程源码分析。通过源码的阅读,我们将会知道,Netty 服务端启动的调用链是非常长的,同时肯定也会发现一些新的问题,随着我们源码阅读的不断深入,相信这些问题我们也会一一攻破。


废话不多说,直接上号!


一、从`EchoServer `示例入手



<div align="center", style="color:#A8A8A8;font-size:14px">netty-example:EchoServer</div>


示例从哪里来?任何开源框架都会有自己的示例代码,Netty 源码也不例外,如模块netty-example中就包括了最常见的EchoServer示例,下面通过这个示例进入服务端启动流程篇章。


public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; }
// 1. 声明Main-Sub Reactor模式线程池:EventLoopGroup // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); // 创建 EchoServerHandler 对象 final EchoServerHandler serverHandler = new EchoServerHandler(); try { // 2. 声明服务端启动引导器,并设置相关属性 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } });
// 3. 绑定端口即启动服务端,并同步等待 // Start the server. ChannelFuture f = b.bind(PORT).sync();
// 4. 监听服务端关闭,并阻塞等待 // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // 5. 优雅地关闭两个EventLoopGroup线程池 // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
复制代码


  1. [代码行 18、19]声明 Main-Sub Reactor 模式线程池:EventLoopGroup


创建两个 EventLoopGroup 对象。其中,bossGroup用于服务端接受客户端的连接,workerGroup用于进行客户端的 SocketChannel 的数据读写。


(<u>关于EventLoopGroup不是本文重点所以在后续文章中进行分析</u>)


  1. [代码行 23-39]声明服务端启动引导器,并设置相关属性


AbstractBootstrap是一个帮助类,通过方法链(method chaining)的方式,提供了一个简单易用的方式来配置启动一个Channelio.netty.bootstrap.ServerBootstrap ,实现 AbstractBootstrap 抽象类,用于 Server 的启动器实现类。io.netty.bootstrap.Bootstrap ,实现 AbstractBootstrap 抽象类,用于 Client 的启动器实现类。如下类图所示:


[

](https://imgchr.com/i/wwvVKA)


<div align="center", style="color:#A8A8A8;font-size:14px">AbstractBootstrap 类继承</div>


(<u>在EchoServer示例代码中,我们看到 ServerBootstrap groupchanneloptionchildHandler 等属性链式设置都放到关于AbstractBootstrap体系代码中详细介绍。</u>)


  1. [代码行 43]绑定端口即启动服务端,并同步等待


先调用 #bind(int port) 方法,绑定端口,后调用 ChannelFuture#sync() 方法,阻塞等待成功。对于bind操作就是本文要详细介绍的"服务端启动流程"。


  1. [代码行 47]监听服务端关闭,并阻塞等待


先调用 #closeFuture() 方法,监听服务器关闭,后调用 ChannelFuture#sync() 方法,阻塞等待成功。 注意,此处不是关闭服务器,而是channel的监听关闭。


  1. [代码行 51、52]优雅地关闭两个EventLoopGroup线程池


finally代码块中执行说明服务端将最终关闭,所以调用 EventLoopGroup#shutdownGracefully() 方法,分别关闭两个 EventLoopGroup 对象,终止所有线程。


二、服务启动过程


在服务启动过程的源码分析之前,这里回顾一下我们在通过JDK NIO编程在服务端启动初始的代码:


 serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
复制代码


这 5 行代码标示一个最为熟悉的过程:


  • 打开serverSocketChannel

  • 配置非阻塞模式

  • channelsocket绑定监听端口

  • 创建Selector

  • serverSocketChannel注册到 selector


后面等分析完 Netty 的启动过程后,会对这些步骤有一个新的认识。在EchoServer示例中,进入 #bind(int port) 方法,AbstractBootstrap#bind()其实有多个方法,方便不同地址参数的传递,实际调用的方法是AbstractBootstrap#doBind(final SocketAddress localAddress) 方法,代码如下:


private ChannelFuture doBind(final SocketAddress localAddress) {        final ChannelFuture regFuture = initAndRegister();        final Channel channel = regFuture.channel();        if (regFuture.cause() != null) {            return regFuture;        }
if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered();
doBind0(regFuture, channel, localAddress, promise); } } }); return promise; }}
复制代码


  • [代码行 2] :调用 #initAndRegister() 方法,初始化并注册一个 Channel 对象。因为注册是异步的过程,所以返回一个 ChannelFuture 对象。详细解析,见 「initAndRegister()」。

  • [代码行 4-6]]:若发生异常,直接进行返回。

  • [代码行 9-34]:因为注册是异步的过程,有可能已完成,有可能未完成。所以实现代码分成了【第 10 至 14 行】和【第 15 至 36 行】分别处理已完成和未完成的情况。

- 核心在[第 11 、29 行],调用 #doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) 方法,绑定 Channel 的端口,并注册 Channel 到 SelectionKey 中。

- 如果异步注册对应的 ChanelFuture 未完成,则调用 ChannelFuture#addListener(ChannelFutureListener) 方法,添加监听器,在注册完成后,进行回调执行 #doBind0(...) 方法的逻辑。


通过doBind方法可以知道服务端启动流程大致如下几个步骤:


[

](https://imgchr.com/i/wwvl8g)


<div align="center", style="color:#A8A8A8;font-size:14px">服务端启动流程</div>


1. 创建 Channel



<div align="center", style="color:#A8A8A8;font-size:14px">创建服务端 channel</div>


#doBind(final SocketAddress localAddress)进入到initAndRegister():


 final ChannelFuture initAndRegister() {     Channel channel = null;     try {         channel = channelFactory.newChannel();         init(channel);     } catch (Throwable t) {         if (channel != null) {             // channel can be null if newChannel crashed (eg SocketException("too many open files"))             channel.unsafe().closeForcibly();             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);         }         // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor         return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);     }
ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } }
// If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread.
return regFuture;}
复制代码


[代码行 4]调用 ChannelFactory#newChannel() 方法,创建 Channel 对象。 ChannelFactory类继承如下:



<div align="center", style="color:#A8A8A8;font-size:14px">ChannelFactroy 类继承</div>


可以在ChannelFactory注释看到@deprecated Use {@link io.netty.channel.ChannelFactory} instead.,这里只是包名的调整,对于继承结构不变。netty默认使用ReflectiveChannelFactory,我们可以看到重载方法:


@Overridepublic T newChannel() {    try {        return constructor.newInstance();    } catch (Throwable t) {        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);    }}
复制代码


很明显,正如其名是通过反射机制构造Channel对象实例的。constructor是在其构造方法初始化的:this.constructor = clazz.getConstructor();这个clazz按理说应该是我们要创建的Channel的 Class 对象。那Class对象是什么呢?我们接着看channelFactory是怎么初始化的。


首先在AbstractBootstrap找到如下代码:


@Deprecatedpublic B channelFactory(ChannelFactory<? extends C> channelFactory) {    ObjectUtil.checkNotNull(channelFactory, "channelFactory");    if (this.channelFactory != null) {        throw new IllegalStateException("channelFactory set already");    }
this.channelFactory = channelFactory; return self();}
复制代码


调用这个方法的递推向上看到:


public B channel(Class<? extends C> channelClass) {    return channelFactory(new ReflectiveChannelFactory<C>(        ObjectUtil.checkNotNull(channelClass, "channelClass")    ));}
复制代码


这个方法正是在EchoServerServerBootstrap链式设置时调用.channel(NioServerSocketChannel.class)的方法。我们看到,channelClass就是NioServerSocketChannel.classchannelFactory也是以ReflectiveChannelFactory作为具体实例,并且将NioServerSocketChannel.class作为构造参数传递初始化的,所以这回答了反射机制构造的是io.netty.channel.socket.nio.NioServerSocketChannel对象。


继续看NioServerSocketChannel构造方法逻辑做了什么事情,看之前先给出NioServerSocketChannel类继承关系:



<div align="center", style="color:#A8A8A8;font-size:14px">Channel 类继承</div>


NioServerSocketChannelNioSocketChannel分别对应服务端和客户端,公共父类都是AbstractNioChannelAbstractChannel,下面介绍创建过程可以参照这个Channel类继承图。进入NioServerSocketChannel构造方法:


/**  * Create a new instance  */public NioServerSocketChannel() {    this(newSocket(DEFAULT_SELECTOR_PROVIDER));}
复制代码


点击newSocket进去:


private static ServerSocketChannel newSocket(SelectorProvider provider) {    try {        /**          *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in          *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.          *          *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.          */        return provider.openServerSocketChannel();    } catch (IOException e) {        throw new ChannelException(            "Failed to open a server socket.", e);    }}
复制代码


以上传进来的providerDEFAULT_SELECTOR_PROVIDER即默认的java.nio.channels.spi.SelectorProvider,[代码行 9]就是熟悉的jdk nio创建ServerSocketChannel。这样newSocket(DEFAULT_SELECTOR_PROVIDER)就返回了结果ServerSocketChannel,回到NioServerSocketChannel()#this()点进去:


/**  * Create a new instance using the given {@link ServerSocketChannel}.  */public NioServerSocketChannel(ServerSocketChannel channel) {    super(null, channel, SelectionKey.OP_ACCEPT);    config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
复制代码


以上super代表父类AbstractNioMessageChannel构造方法,点进去看到:


 /**   * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)   */protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {    super(parent, ch, readInterestOp);}
复制代码


以上super代表父类AbstractNioChannel构造方法,点进去看到:


 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {     super(parent);     this.ch = ch;     this.readInterestOp = readInterestOp;     try {         ch.configureBlocking(false);     } catch (IOException e) {         try {             ch.close();         } catch (IOException e2) {             if (logger.isWarnEnabled()) {                 logger.warn("Failed to close a partially initialized socket.", e2);             }         }
throw new ChannelException("Failed to enter non-blocking mode.", e); }}
复制代码


以上[代码行 3]将ServerSocketChannel保存到了AbstractNioChannel#ch成员变量,在上面提到的NioServerSocketChannel构造方法的[代码行 6]javaChannel()拿到的就是ch保存的ServerSocketChannel变量。


以上[代码行 6]就是熟悉的jdk nio编程设置ServerSocketChannel非阻塞方式。这里还有super父类构造方法,点击进去看到:


 protected AbstractChannel(Channel parent) {     this.parent = parent;     id = newId();     unsafe = newUnsafe();     pipeline = newChannelPipeline();}
复制代码


以上构造方法中:


  • parent 属性,代表父 Channel 对象。对于 NioServerSocketChannel parentnull

  • id 属性,Channel 编号对象。在构造方法中,通过调用 #newId() 方法进行创建。(<u>这里不细展开 Problem-1</u>)

  • unsafe 属性,Unsafe 对象。因为Channel 真正的具体操作,是通过调用对应的 Unsafe 对象实施。所以需要在构造方法中,通过调用 #newUnsafe() 方法进行创建。这里的 Unsafe 并不是我们常说的 jdk自带的sun.misc.Unsafe ,而是 io.netty.channel.Channel#Unsafe。(<u>这里不细展开 Problem-2</u>)

  • pipeline属性默认是DefaultChannelPipeline对象,赋值后在后面为 channel 绑定端口的时候会用到


通过以上创建channel源码过程分析,总结的流程时序图如下:



2. 初始化 Channel



<div align="center", style="color:#A8A8A8;font-size:14px">初始化 channel</div>


回到一开始创建ChannelinitAndRegister()入口方法,在创建Channel后紧接着init(channel)进入初始化流程,因为是服务端初始化,所以是ServerBootstrap#init(Channel channel),代码如下:


@Overridevoid init(Channel channel) throws Exception {    final Map<ChannelOption<?>, Object> options = options0();    synchronized (options) {        setChannelOptions(channel, options, logger);    }
final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } }
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); }
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); }
ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });}
复制代码


  • [代码 3 - 6 行]: options0()方法返回的options保存了用户在EchoServer中设置自定义的可选项集合,这样ServerBootstrap将配置的选项集合,设置到了 Channel 的可选项集合中。

  • [代码 8 - 15 行]: attrs0()方法返回的attrs保存了用户在EchoServer中设置自定义的属性集合,这样ServerBootstrap将配置的属性集合,设置到了 Channel 的属性集合中。

  • [代码 21-28 行]:通过局部变量currentChildOptionscurrentChildAttrs保存了用户自定义的childOptionschildAttrs,用于[代码 43 行] ServerBootstrapAcceptor 构造方法。

  • [代码 30-47]]:创建ChannelInitializer 对象,添加到 pipeline 中,用于后续初始化 ChannelHandler pipeline 中,包括用户在EchoServer配置的LoggingHandler和创建的创建 ServerBootstrapAcceptor 对象。

- [代码行 34-37]:添加启动器配置的 LoggingHandlerpipeline 中。

- [代码行 39-45]:创建 ServerBootstrapAcceptor 对象,添加到 pipeline 中。从名字上就可以看出来,ServerBootstrapAcceptor 也是一个 ChannelHandler 实现类,专门用于接受客户端的新连接请求,把新的请求扔给某个事件循环器,我们先不做过多分析。我们发现是使用EventLoop.execute 执行添加的过程,这是为什么呢?同样记录问题(<u>Problem-</u>3)

- 需要说明的是pipeline 在之前介绍 Netty 核心组件的时候提到是一个包含ChannelHandlerContext的双向链表,每一个context对于唯一一个ChannelHandler,这里初始化后,ChannelPipeline里就是如下一个结构:


<div align="center", style="color:#A8A8A8;font-size:14px">ChannelPipeline 内部结构</div>


3. 注册 Channel


<img src="https://s1.ax1x.com/2020/09/13/wwvXi8.png" alt="wwvXi8.png" style="zoom:80%;" />


<div align="center", style="color:#A8A8A8;font-size:14px">注册 channel</div>


初始化Channel一些基本配置和属性完毕后,回到一开始创建ChannelinitAndRegister()入口方法,在初始化Channel后紧接着[代码行 17] ChannelFuture regFuture = config().group().register(channel);明显这里是通过EventLoopGroup进入注册流程(EventLoopGroup体系将在后续文章讲解)


EchoServer中启动器同样通过ServerBootstrap#group()设置了NioEventLoopGroup,它继承自MultithreadEventLoopGroup,所以注册流程会进入MultithreadEventLoopGroup重载的register(Channel channel)方法,代码如下:


@Overridepublic ChannelFuture register(Channel channel) {    return next().register(channel);}
复制代码


这里会调用 next() 方法选择出来一个 EventLoop 来注册 Channel,里面实际上使用的是一个叫做 EventExecutorChooser 的东西来选择,它实际上又有两种实现方式 ——PowerOfTwoEventExecutorChooserGenericEventExecutorChooser,本质上就是从 EventExecutor 数组中选择一个 EventExecutor,我们这里就是 NioEventLoop,那么,它们有什么区别呢?(<u>Problem-4:在介绍EventLoopGroup体系的后续文章中将会详细讲解,这里简单地提一下,本质都是按数组长度取余数 ,不过,2 的 N 次方的形式更高效。</u>)


接着,来到 NioEventLoopregister(channel) 方法,你会不会问找不到该方法?提示NioEventLoop 继承SingleThreadEventLoop,所以父类方法:


@Overridepublic ChannelFuture register(Channel channel) {    return register(new DefaultChannelPromise(channel, this));}
@Overridepublic ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise;}
复制代码


可以看到,先创建了一个叫做 ChannelPromise 的东西,它是 ChannelFuture 的子类。[代码行 9]又调回了 ChannelUnsaferegister () 方法,这里第一个参数是 this,也就是 NioEventLoop,第二个参数是刚创建的 ChannelPromise


点击 AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise) 方法进去,代码如下:


 public final void register(EventLoop eventLoop, final ChannelPromise promise) {     if (eventLoop == null) {         throw new NullPointerException("eventLoop");     }     if (isRegistered()) {         promise.setFailure(new IllegalStateException("registered to an event loop already"));         return;     }     if (!isCompatible(eventLoop)) {         promise.setFailure(             new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));         return;     }
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }}
复制代码


[代码行 15]这行代码是设置 ChanneleventLoop 属性。这行前面的代码主要是在校验传入的 eventLoop 参数非空,校验是否有注册过以及校验 ChanneleventLoop 类型是否匹配。


[代码 18、24]接着,跟踪到 AbstractUnsafe#register0(ChannelPromise promise) 方法中:


private void register0(ChannelPromise promise) {    try {        // check if the channel is still open as it could be closed in the mean time when the register        // call was outside of the eventLoop        if (!promise.setUncancellable() || !ensureOpen(promise)) {            return;        }        boolean firstRegistration = neverRegistered;        doRegister();        neverRegistered = false;        registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}
复制代码


[代码行 9]进入 AbstractNioChannel#doRegister() 方法:


protected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            return;        } catch (CancelledKeyException e) {            if (!selected) {                // Force the Selector to select now as the "canceled" SelectionKey may still be                // cached and not removed because no Select.select(..) operation was called yet.                eventLoop().selectNow();                selected = true;            } else {                // We forced a select operation on the selector before but the SelectionKey is still cached                // for whatever reason. JDK bug ?                throw e;            }        }    }}
复制代码


[代码行 5]关键一行代码,将 Java 原生 NIO Selector 与 Java 原生 NIOChannel 对象(ServerSocketChannel) 绑定在一起,并将当前 Netty 的 Channel 通过 attachment 的形式绑定到 SelectionKey 上:


  • 调用 #unwrappedSelector() 方法,返回 Java 原生 NIO Selector 对象,而且每个 NioEventLoop Selector 唯一一对应。

  • 调用 SelectableChannel#register(Selector sel, int ops, Object att) 方法,注册 Java 原生NIOChannel 对象到 NIO Selector 对象上。


通过以上注册 channel 源码分析,总结流程的时序图如下:



4. 绑定端口



<div align="center", style="color:#A8A8A8;font-size:14px">绑定端口</div>


注册完Channel最后回到AbstractBootstrap#doBind() 方法,分析 Channel 的端口绑定逻辑。进入doBind0代码如下:


private static void doBind0(    final ChannelFuture regFuture, final Channel channel,    final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } });}
复制代码


  • [代码行 7]:在前面Channel 注册成功的条件下,调用 EventLoop执行 Channel 的端口绑定逻辑。但是,实际上当前线程已经是 EventLoop所在的线程了,为何还要这样操作呢?答案在【第 5 至 6 行】的英语注释,这里作为一个问题记着(<u>Problem-5</u>)。

  • [代码行 11]:进入AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise),同样立即异步返回并添加ChannelFutureListener.CLOSE_ON_FAILURE监听事件。

  • [代码行 13]:如果绑定端口之前的操作并没有成功,自然也就不能进行端口绑定操作了,通过 promise 记录异常原因。


AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)方法如下:


 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {        return pipeline.bind(localAddress, promise); }
复制代码


pipeline是之前创建channel的时候创建的DefaultChannelPipeline,进入该方法:


 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {        return tail.bind(localAddress, promise); }
复制代码


[在分析初始化流程的时候最后画一个DefaultChannelPipeline内部的结构,能够便于分析后面进入DefaultChannelPipeline一系列bind方法。]


首先,tail代表TailContext,进入AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)方法:


 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {     //省略部分代码     final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);     EventExecutor executor = next.executor();     if (executor.inEventLoop()) {         next.invokeBind(localAddress, promise);     } else {         safeExecute(executor, new Runnable() {             @Override             public void run() {                 next.invokeBind(localAddress, promise);             }         }, promise, null);     }     return promise;}
复制代码


[代码行 3]:findContextOutbound方法里主要是执行ctx = ctx.prev;那么得到的next就是绑定LoggingHandlercontext


[代码行 6]:进入invokeBind(localAddress, promise)方法并直接执行LoggingHandler#bind(this, localAddress, promise),进入后的方法如下:


 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {     if (logger.isEnabled(internalLevel)) {         logger.log(internalLevel, format(ctx, "BIND", localAddress));     }     ctx.bind(localAddress, promise); }
复制代码


设置了LoggingHandler的日志基本级别为默认的 INFO 后,进行绑定操作的信息打印。接着,继续循环到AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)方法执行ctx = ctx.prev取出HeadContext进入到 bind 方法:


 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {     unsafe.bind(localAddress, promise); }
复制代码


兜兜转转,最终跳出了pipeline轮回到AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise) 方法,Channel 的端口绑定逻辑。代码如下:


public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {    //此处有省略...    boolean wasActive = isActive();    try {        doBind(localAddress);    } catch (Throwable t) {        safeSetFailure(promise, t);        closeIfClosed();        return;    }    //此处有省略...}
复制代码


做实事方法doBind进入后如下:


@Overrideprotected void doBind(SocketAddress localAddress) throws Exception {    if (PlatformDependent.javaVersion() >= 7) {        javaChannel().bind(localAddress, config.getBacklog());    } else {        javaChannel().socket().bind(localAddress, config.getBacklog());    }}
复制代码


到了此处,服务端的 Java 原生 NIO ServerSocketChannel 终于绑定上了端口。


三、问题归纳


  • Problem-1: 创建Channel流程中AbstractChannel构造函数中为channel分配 ID 的算法如何实现?

  • Problem-2: AbstractChannel内部类AbstractUnsafe的作用?

  • Problem-3: 初始化channel流程中pipeline 添加ServerBootstrapAcceptor 是通过EventLoop.execute 执行添加的过程,这是为什么呢?

  • Problem-4:注册channel流程中PowerOfTwoEventExecutorChooserGenericEventExecutorChooser的区别和优化原理?

  • Problem-5:绑定端口流程中调用 EventLoop执行 Channel 的端口绑定逻辑。但是,实际上当前线程已经是 EventLoop所在的线程了,为何还要这样操作呢?


小结


通过对 Netty 服务端启动流程源码分析,我们发现了在使用NIO的模式下,服务端启动流程其实就是封装了JDK NIO编程在服务端启动的流程。只不过对原生JDK NIO进行了增强和优化,同时从架构设计上简化了服务端流程的编写。


最重要的是感谢彤哥、艿艿和俞超-闪电侠这些大佬前期的分享,能够让更多人学习源码的旅途少走很多弯路,谢谢!


欢迎关注:


发布于: 2020 年 09 月 15 日阅读数: 253
用户头像

专注Java生态,励志做最接地气的技术分享者 2018.05.05 加入

公众号:一枝花算不算浪漫 欢迎关注~

评论

发布
暂无评论
Netty之旅三:Netty服务端启动源码分析,一梭子带走!