写点什么

详细图解 Netty Reactor 启动全流程

  • 2022 年 1 月 19 日
  • 本文字数:33048 字

    阅读完需:约 108 分钟

详细图解Netty Reactor启动全流程

本系列 Netty 源码解析文章基于 4.1.56.Final 版本



大家第一眼看到这幅流程图,是不是脑瓜子嗡嗡的呢?



大家先不要惊慌,问题不大,本文笔者的目的就是要让大家清晰的理解这幅流程图,从而深刻的理解 Netty Reactor 的启动全流程,包括其中涉及到的各种代码设计实现细节。



在上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》中我们详细介绍了 Netty 服务端核心引擎组件主从Reactor组模型 NioEventLoopGroup以及Reactor模型 NioEventLoop的创建过程。最终我们得到了 netty Reactor 模型的运行骨架如下:



现在 Netty 服务端程序的骨架是搭建好了,本文我们就基于这个骨架来深入剖析下 Netty 服务端的启动过程。


我们继续回到上篇文章提到的 Netty 服务端代码模板中,在创建完主从 Reactor 线程组:bossGroupworkerGroup后,接下来就开始配置 Netty 服务端的启动辅助类ServerBootstrap 了。


public final class EchoServer {    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception { // Configure the server. //创建主从Reactor线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup)//配置主从Reactor .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型 .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项 .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler .childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } });
// Start the server. 绑定端口启动服务,开始监听accept事件 ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
复制代码


在上篇文章中我们对代码模板中涉及到ServerBootstrap 的一些配置方法做了简单的介绍,大家如果忘记的话,可以在返回去回顾一下。


ServerBootstrap类其实没有什么特别的逻辑,主要是对 Netty 启动过程中需要用到的一些核心信息进行配置管理,比如:



  • Netty 的核心引擎组件主从Reactor线程组: bossGroup,workerGroup。通过ServerBootstrap#group方法配置。

  • Netty 服务端使用到的 Channel 类型:NioServerSocketChannel ,通过ServerBootstrap#channel方法配置。以及配置NioServerSocketChannel时用到的SocketOptionSocketOption用于设置底层 JDK NIO Socket 的一些选项。通过ServerBootstrap#option方法进行配置。


主 ReactorGroup 中的 MainReactor 管理的 Channel 类型为NioServerSocketChannel,如图所示主要用来监听端口,接收客户端连接,为客户端创建初始化NioSocketChannel,然后采用round-robin轮询的方式从图中从 ReactorGroup 中选择一个 SubReactor 与该客户端NioSocketChannel进行绑定。


从 ReactorGroup 中的 SubReactor 管理的 Channel 类型为NioSocketChannel,它是 netty 中定义客户端连接的一个模型,每个连接对应一个。如图所示 SubReactor 负责监听处理绑定在其上的所有NioSocketChannel上的 IO 事件。


  • 保存服务端NioServerSocketChannel和客户端NioSocketChannel对应pipeline中指定的ChannelHandler。用于后续 Channel 向 Reactor 注册成功之后,初始化 Channel 里的 pipeline。


不管是服务端用到的NioServerSocketChannel还是客户端用到的NioSocketChannel,每个Channel实例都会有一个PipelinePipeline中有多个ChannelHandler用于编排处理对应Channel上感兴趣的IO事件


ServerBootstrap结构中包含了 netty 服务端程序启动的所有配置信息,在我们介绍启动流程之前,先来看下ServerBootstrap的源码结构:

ServerBootstrap



ServerBootstrap的继承结构比较简单,继承层次的职责分工也比较明确。


ServerBootstrap主要负责对主从Reactor线程组相关的配置进行管理,其中带child前缀的配置方法是对从Reactor线程组的相关配置管理。从Reactor线程组中的Sub Reactor负责管理的客户端NioSocketChannel相关配置存储在ServerBootstrap结构中。


父类AbstractBootstrap则是主要负责对主Reactor线程组相关的配置进行管理,以及主Reactor线程组中的Main Reactor负责处理的服务端ServerSocketChannel相关的配置管理。

1. 配置主从 Reactor 线程组

ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)//配置主从Reactor
复制代码


public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//Main Reactor线程组 volatile EventLoopGroup group; //Sub Reactor线程组 private volatile EventLoopGroup childGroup;
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { //父类管理主Reactor线程组 super.group(parentGroup); if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup"); return this; }
}
复制代码

2. 配置服务端 ServerSocketChannel

ServerBootstrap b = new ServerBootstrap();b.channel(NioServerSocketChannel.class);
复制代码


public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//用于创建ServerSocketChannel ReflectiveChannelFactory private volatile ChannelFactory<? extends C> channelFactory;
public B channel(Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass") )); }
@Deprecated public B channelFactory(ChannelFactory<? extends C> channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory"); if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); }
this.channelFactory = channelFactory; return self(); }
}
复制代码


在向ServerBootstrap配置服务端ServerSocketChannelchannel 方法中,其实是创建了一个ChannelFactory工厂实例ReflectiveChannelFactory,在 Netty 服务端启动的过程中,会通过这个ChannelFactory去创建相应的Channel实例。


我们可以通过这个方法来配置 netty 的 IO 模型,下面为ServerSocketChannel在不同 IO 模型下的实现:


EventLoopGroup Reactor 线程组在不同 IO 模型下的实现:



我们只需要将IO模型的这些核心接口对应的实现类前缀改为对应IO模型的前缀,就可以轻松在 Netty 中完成对IO模型的切换。


2.1 ReflectiveChannelFactory

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {    //NioServerSocketChannelde 构造器    private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz"); try { //反射获取NioServerSocketChannel的构造器 this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", e); } }
@Override public T newChannel() { try { //创建NioServerSocketChannel实例 return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }}
复制代码


从类的签名我们可以看出,这个工厂类是通过泛型反射的方式来创建对应的Channel实例。


  • 泛型参数T extends Channel表示的是要通过工厂类创建的Channel类型,这里我们初始化的是NioServerSocketChannel

  • ReflectiveChannelFactory 的构造器中通过反射的方式获取NioServerSocketChannel的构造器。

  • newChannel 方法中通过构造器反射创建NioServerSocketChannel实例。


注意这时只是配置阶段,NioServerSocketChannel此时并未被创建。它是在启动的时候才会被创建出来。

3. 为 NioServerSocketChannel 配置 ChannelOption

ServerBootstrap b = new ServerBootstrap();//设置被MainReactor管理的NioServerSocketChannel的Socket选项b.option(ChannelOption.SO_BACKLOG, 100)
复制代码


public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
//serverSocketChannel中的ChannelOption配置 private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> B option(ChannelOption<T> option, T value) { ObjectUtil.checkNotNull(option, "option"); synchronized (options) { if (value == null) { options.remove(option); } else { options.put(option, value); } } return self(); }}
复制代码


无论是服务端的NioServerSocketChannel还是客户端的NioSocketChannel它们的相关底层 Socket 选项ChannelOption配置全部存放于一个Map类型的数据结构中。


由于客户端NioSocketChannel是由从Reactor线程组中的Sub Reactor来负责处理,所以涉及到客户端NioSocketChannel所有的方法和配置全部是以child前缀开头。


ServerBootstrap b = new ServerBootstrap();.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
复制代码


public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//客户端SocketChannel对应的ChannelOption配置 private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) { ObjectUtil.checkNotNull(childOption, "childOption"); synchronized (childOptions) { if (value == null) { childOptions.remove(childOption); } else { childOptions.put(childOption, value); } } return this; }}
复制代码


相关的底层 Socket 选项,netty 全部枚举在 ChannelOption 类中,笔者这里就不一一列举了,在本系列后续相关的文章中,笔者还会为大家详细的介绍这些参数的作用。


public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {
..................省略..............
public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST"); public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE"); public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF"); public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF"); public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR"); public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER"); public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG"); public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
..................省略..............
}
复制代码

4. 为服务端 NioServerSocketChannel 中的 Pipeline 配置 ChannelHandler

    //serverSocketChannel中pipeline里的handler(主要是acceptor)    private volatile ChannelHandler handler;
public B handler(ChannelHandler handler) { this.handler = ObjectUtil.checkNotNull(handler, "handler"); return self(); }
复制代码


NioServerSocketChannel中的Pipeline添加ChannelHandler分为两种方式:


  • 显式添加: 显式添加的方式是由用户在 main 线程中通过ServerBootstrap#handler的方式添加。如果需要添加多个ChannelHandler,则可以通过ChannelInitializerpipeline中进行添加。


关于ChannelInitializer后面笔者会有详细介绍,这里大家只需要知道ChannelInitializer是一种特殊的ChannelHandler,用于初始化pipeline。适用于向 pipeline 中添加多个 ChannelHandler 的场景。


            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)//配置主从Reactor             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型             .handler(new ChannelInitializer<NioServerSocketChannel>() {                 @Override                 protected void initChannel(NioServerSocketChannel ch) throws Exception {                     ChannelPipeline p = ch.pipeline();                     p.addLast(channelhandler1)                      .addLast(channelHandler2)                                            ......                                           .addLast(channelHandler3);                 }             })
复制代码


  • 隐式添加:隐式添加主要添加的就是主ReactorGroup的核心组件也就是下图中的acceptor,Netty 中的实现为ServerBootstrapAcceptor,本质上也是一种ChannelHandler,主要负责在客户端连接建立好后,初始化客户端NioSocketChannel,在从Reactor线程组中选取一个Sub Reactor,将客户端NioSocketChannel 注册到Sub Reactor中的selector上。


隐式添加ServerBootstrapAcceptor是由 Netty 框架在启动的时候负责添加,用户无需关心。



在本例中,NioServerSocketChannelPipeLine中只有两个ChannelHandler,一个由用户在外部显式添加的LoggingHandler,另一个是由 Netty 框架隐式添加的ServerBootstrapAcceptor


其实我们在实际项目使用的过程中,不会向 netty 服务端NioServerSocketChannel添加额外的 ChannelHandler,NioServerSocketChannel只需要专心做好自己最重要的本职工作接收客户端连接就好了。这里额外添加一个LoggingHandler只是为了向大家展示ServerBootstrap的配置方法。

5. 为客户端 NioSocketChannel 中的 Pipeline 配置 ChannelHandler

            final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } });
复制代码


    //socketChannel中pipeline中的处理handler    private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler) { this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler"); return this; }
复制代码


向客户端NioSocketChannel中的Pipeline里添加ChannelHandler完全是由用户自己控制显式添加,添加的数量不受限制。


由于在 Netty 的IO线程模型中,是由单个Sub Reactor线程负责执行客户端NioSocketChannel中的Pipeline,一个Sub Reactor线程负责处理多个NioSocketChannel上的IO事件,如果Pipeline中的ChannelHandler添加的太多,就会影响Sub Reactor线程执行其他NioSocketChannel上的Pipeline,从而降低IO处理效率,降低吞吐量。



所以Pipeline中的ChannelHandler不易添加过多,并且不能再ChannelHandler中执行耗时的业务处理任务。


在我们通过ServerBootstrap配置 netty 服务端启动信息的时候,无论是向服务端NioServerSocketChannel的 pipeline 中添加 ChannelHandler,还是向客户端NioSocketChannel的 pipeline 中添加 ChannelHandler,当涉及到多个 ChannelHandler 添加的时候,我们都会用到ChannelInitializer,那么这个ChannelInitializer究竟是何方圣神,为什么要这样做呢?我们接着往下看~~

ChannelInitializer



首先ChannelInitializer它继承于ChannelHandler,它自己本身就是一个 ChannelHandler,所以它可以添加到childHandler中。


其他的父类大家这里可以不用管,后面文章中笔者会一一为大家详细介绍。


那为什么不直接添加ChannelHandler而是选择用ChannelInitializer呢?


这里主要有两点原因:


  • 前边我们提到,客户端NioSocketChannel是在服务端 accept 连接后,在服务端NioServerSocketChannel中被创建出来的。但是此时我们正处于配置ServerBootStrap阶段,服务端还没有启动,更没有客户端连接上来,此时客户端NioSocketChannel还没有被创建出来,所以也就没办法向客户端NioSocketChannel的 pipeline 中添加ChannelHandler

  • 客户端NioSocketChannelPipeline里可以添加任意多个ChannelHandler,但是 Netty 框架无法预知用户到底需要添加多少个ChannelHandler,所以 Netty 框架提供了回调函数ChannelInitializer#initChannel,使用户可以自定义ChannelHandler的添加行为。


当客户端NioSocketChannel注册到对应的Sub Reactor上后,紧接着就会初始化NioSocketChannel中的Pipeline,此时 Netty 框架会回调ChannelInitializer#initChannel执行用户自定义的添加逻辑。


public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override @SuppressWarnings("unchecked") public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { //当channelRegister事件发生时,调用initChannel初始化pipeline if (initChannel(ctx)) { .................省略............... } else { .................省略............... } }
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { //此时客户单NioSocketChannel已经创建并初始化好了 initChannel((C) ctx.channel()); } catch (Throwable cause) { .................省略............... } finally { .................省略............... } return true; } return false; }
protected abstract void initChannel(C ch) throws Exception; .................省略...............}
复制代码


这里由 netty 框架回调的ChannelInitializer#initChannel方法正是我们自定义的添加逻辑。


            final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } });
复制代码




到此为止,Netty 服务端启动所需要的必要配置信息,已经全部存入ServerBootStrap启动辅助类中。


接下来要做的事情就是服务端的启动了。


// Start the server. 绑定端口启动服务,开始监听accept事件ChannelFuture f = serverBootStrap.bind(PORT).sync();
复制代码

Netty 服务端的启动

经过前面的铺垫终于来到了本文的核心内容----Netty 服务端的启动过程。


如代码模板中的示例所示,Netty 服务端的启动过程封装在io.netty.bootstrap.AbstractBootstrap#bind(int)函数中。


接下来我们看一下 Netty 服务端在启动过程中究竟干了哪些事情?



大家看到这副启动流程图先不要慌,接下来的内容笔者会带大家各个击破它,在文章的最后保证让大家看懂这副流程图。


我们先来从 netty 服务端启动的入口函数开始我们今天的源码解析旅程:


    public ChannelFuture bind(int inetPort) {        return bind(new InetSocketAddress(inetPort));    }
public ChannelFuture bind(SocketAddress localAddress) { //校验Netty核心组件是否配置齐全 validate(); //服务端开始启动,绑定端口地址,接收客户端连接 return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); }
private ChannelFuture doBind(final SocketAddress localAddress) { //异步创建,初始化,注册ServerSocketChannel到main reactor上 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; }
if (regFuture.isDone()) {
........serverSocketChannel向Main Reactor注册成功后开始绑定端口...., } else { //如果此时注册操作没有完成,则向regFuture添加operationComplete回调函数,注册成功后回调。 regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception {
........serverSocketChannel向Main Reactor注册成功后开始绑定端口...., }); return promise; } }
复制代码


Netty 服务端的启动流程总体如下:


  • 创建服务端NioServerSocketChannel并初始化。

  • 将服务端NioServerSocketChannel注册到主Reactor线程组中。

  • 注册成功后,开始初始化NioServerSocketChannel中的 pipeline,然后在 pipeline 中触发 channelRegister 事件。

  • 随后由NioServerSocketChannel绑定端口地址。

  • 绑定端口地址成功后,向NioServerSocketChannel对应的Pipeline中触发传播ChannelActive事件,在ChannelActive事件回调中向Main Reactor注册OP_ACCEPT事件,开始等待客户端连接。服务端启动完成。


当 netty 服务端启动成功之后,最终我们会得到如下结构的阵型,开始枕戈待旦,准备接收客户端的连接,Reactor 开始运转。



接下来,我们就来看下 Netty 源码是如何实现以上步骤的~~

1. initAndRegister



    final ChannelFuture initAndRegister() {        Channel channel = null;        try {            //创建NioServerSocketChannel            //ReflectiveChannelFactory通过泛型,反射,工厂的方式灵活创建不同类型的channel            channel = channelFactory.newChannel();            //初始化NioServerSocketChannel            init(channel);        } catch (Throwable t) {            ..............省略.................        }
//向MainReactor注册ServerSocketChannel ChannelFuture regFuture = config().group().register(channel);
..............省略.................
return regFuture; }
复制代码


从函数命名中我们可以看出,这个函数主要做的事情就是首先创建NioServerSocketChannel ,并对NioServerSocketChannel 进行初始化,最后将NioServerSocketChannel 注册到Main Reactor中。

1.1 创建 NioServerSocketChannel

还记得我们在介绍ServerBootstrap启动辅助类配置服务端ServerSocketChannel类型的时候提到的工厂类ReflectiveChannelFactory 吗?


因为当时我们在配置ServerBootstrap启动辅助类的时候,还没到启动阶段,而配置阶段并不是创建具体ServerSocketChannel的时机。


所以 Netty 通过工厂模式将要创建的ServerSocketChannel的类型(通过泛型指定)以及 创建的过程(封装在newChannel函数中)统统先封装在工厂类ReflectiveChannelFactory中。


ReflectiveChannelFactory通过泛型反射工厂的方式灵活创建不同类型的channel


等待创建时机来临,我们调用保存在ServerBootstrap中的channelFactory直接进行创建。


public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
@Override public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }}
复制代码


下面我们来看下NioServerSocketChannel的构建过程:

1.1.1 NioServerSocketChannel

public class NioServerSocketChannel extends AbstractNioMessageChannel                             implements io.netty.channel.socket.ServerSocketChannel {
//SelectorProvider(用于创建Selector和Selectable Channels) private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
//创建JDK NIO ServerSocketChannel private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }
//ServerSocketChannel相关的配置 private final ServerSocketChannelConfig config;
public NioServerSocketChannel(ServerSocketChannel channel) { //父类AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT super(null, channel, SelectionKey.OP_ACCEPT); //DefaultChannelConfig中设置用于Channel接收数据用的buffer->AdaptiveRecvByteBufAllocator config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
}
复制代码


  • 首先调用newSocket 创建 JDK NIO 原生ServerSocketChannel,这里调用了SelectorProvider#openServerSocketChannel 来创建 JDK NIO 原生ServerSocketChannel,我们在上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》中详细的介绍了SelectorProvider相关内容,当时是用SelectorProvider来创建Reactor中的Selector。大家还记得吗??

  • 通过父类构造器设置NioServerSocketChannel感兴趣的IO事件,这里设置的是SelectionKey.OP_ACCEPT事件。并将 JDK NIO 原生ServerSocketChannel封装起来。

  • 创建Channel的配置类NioServerSocketChannelConfig,在配置类中封装了对Channel底层的一些配置行为,以及 JDK 中的ServerSocket。以及创建NioServerSocketChannel接收数据用的Buffer分配器AdaptiveRecvByteBufAllocator


NioServerSocketChannelConfig没什么重要的东西,我们这里也不必深究,它就是管理NioServerSocketChannel相关的配置,这里唯一需要大家注意的是这个用于Channel接收数据用的Buffer分配器AdaptiveRecvByteBufAllocator,我们后面在介绍 Netty 如何接收连接的时候还会提到。


NioServerSocketChannel 的整体构建过程介绍完了,现在我们来按照继承层次再回过头来看下NioServerSocketChannel 的层次构建,来看下每一层都创建了什么,封装了什么,这些信息都是Channel的核心信息,所以有必要了解一下。



NioServerSocketChannel 的创建过程中,我们主要关注继承结构图中红框标注的三个类,其他的我们占时先不用管。


其中AbstractNioMessageChannel类主要是对NioServerSocketChannel底层读写行为的封装和定义,比如 accept 接收客户端连接。这个我们后续会介绍到,这里我们并不展开。

1.1.2 AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {   //JDK NIO原生Selectable Channel    private final SelectableChannel ch;    // Channel监听事件集合 这里是SelectionKey.OP_ACCEPT事件    protected final int readInterestOp;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { //设置Channel为非阻塞 配合IO多路复用模型 ch.configureBlocking(false); } catch (IOException e) { .............省略................ } }}
复制代码


  • 封装由SelectorProvider创建出来的 JDK NIO 原生ServerSocketChannel

  • 封装Channel在创建时指定感兴趣的IO事件,对于NioServerSocketChannel来说感兴趣的IO事件OP_ACCEPT事件

  • 设置 JDK NIO 原生ServerSocketChannel为非阻塞模式, 配合 IO 多路复用模型。

1.1.3 AbstractChannel

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
//channel是由创建层次的,比如ServerSocketChannel 是 SocketChannel的 parent private final Channel parent; //channel全局唯一ID machineId+processId+sequence+timestamp+random private final ChannelId id; //unsafe用于封装对底层socket的相关操作 private final Unsafe unsafe; //为channel分配独立的pipeline用于IO事件编排 private final DefaultChannelPipeline pipeline;
protected AbstractChannel(Channel parent) { this.parent = parent; //channel全局唯一ID machineId+processId+sequence+timestamp+random id = newId(); //unsafe用于定义实现对Channel的底层操作 unsafe = newUnsafe(); //为channel分配独立的pipeline用于IO事件编排 pipeline = newChannelPipeline(); }}
复制代码


  • Netty 中的Channel创建是有层次的,这里的parent属性用来保存上一级的Channel,比如这里的NioServerSocketChannel是顶级Channel,所以它的parent = null。客户端NioSocketChannel是由NioServerSocketChannel创建的,所以它的parent = NioServerSocketChannel

  • Channel分配全局唯一的ChannelIdChannelId由机器 Id(machineId),进程 Id(processId),序列号(sequence),时间戳(timestamp),随机数(random)构成


   private DefaultChannelId() {        data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];        int i = 0;
// machineId System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length); i += MACHINE_ID.length;
// processId i = writeInt(i, PROCESS_ID);
// sequence i = writeInt(i, nextSequence.getAndIncrement());
// timestamp (kind of) i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
// random int random = PlatformDependent.threadLocalRandom().nextInt(); i = writeInt(i, random); assert i == data.length;
hashCode = Arrays.hashCode(data); }
复制代码


  • 创建NioServerSocketChannel的底层操作类Unsafe 。这里创建的是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe


UnsafeChannel接口的一个内部接口,用于定义实现对 Channel 底层的各种操作,Unsafe接口定义的操作行为只能由 Netty 框架的Reactor线程调用,用户线程禁止调用。


interface Unsafe {                //分配接收数据用的Buffer        RecvByteBufAllocator.Handle recvBufAllocHandle();
//服务端绑定的端口地址 SocketAddress localAddress(); //远端地址 SocketAddress remoteAddress(); //channel向Reactor注册 void register(EventLoop eventLoop, ChannelPromise promise);
//服务端绑定端口地址 void bind(SocketAddress localAddress, ChannelPromise promise); //客户端连接服务端 void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); //关闭channle void close(ChannelPromise promise); //读数据 void beginRead(); //写数据 void write(Object msg, ChannelPromise promise);
}
复制代码


  • NioServerSocketChannel分配独立的pipeline用于 IO 事件编排。pipeline其实是一个ChannelHandlerContext类型的双向链表。头结点HeadContext,尾结点TailContextChannelHandlerContext中包装着ChannelHandler


ChannelHandlerContext 保存 ChannelHandler 上下文信息,用于事件传播。后面笔者会单独开一篇文章介绍,这里我们还是聚焦于启动主线。


这里只是为了让大家简单理解pipeline的一个大致的结构,后面会写一篇文章专门详细讲解pipeline


    protected DefaultChannelPipeline(Channel channel) {        this.channel = ObjectUtil.checkNotNull(channel, "channel");        succeededFuture = new SucceededChannelFuture(channel, null);        voidPromise =  new VoidChannelPromise(channel, true);
tail = new TailContext(this); head = new HeadContext(this);
head.next = tail; tail.prev = head; }
复制代码





到了这里NioServerSocketChannel就创建完毕了,我们来回顾下它到底包含了哪些核心信息。


1.2 初始化 NioServerSocketChannel

   void init(Channel channel) {        //向NioServerSocketChannelConfig设置ServerSocketChannelOption        setChannelOptions(channel, newOptionsArray(), logger);        //向netty自定义的NioServerSocketChannel设置attributes        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline(); //获取从Reactor线程组 final EventLoopGroup currentChildGroup = childGroup; //获取用于初始化客户端NioSocketChannel的ChannelInitializer final ChannelHandler currentChildHandler = childHandler; //获取用户配置的客户端SocketChannel的channelOption以及attributes final Entry<ChannelOption<?>, Object>[] currentChildOptions; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); } final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
//向NioServerSocketChannel中的pipeline添加初始化ChannelHandler的逻辑 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); //ServerBootstrap中用户指定的channelHandler ChannelHandler handler = config.handler(); if (handler != null) { //LoggingHandler pipeline.addLast(handler); } //添加用于接收客户端连接的acceptor ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
复制代码


  • NioServerSocketChannelConfig设置ServerSocketChannelOption

  • 向 netty 自定义的NioServerSocketChannel设置ChannelAttributes


Netty 自定义的SocketChannel类型均继承AttributeMap接口以及DefaultAttributeMap类,正是它们定义了ChannelAttributes。用于向Channel添加用户自定义的一些信息。



这个ChannelAttributes的用处大有可为,Netty 后边的许多特性都是依靠这个ChannelAttributes来实现的。这里先卖个关子,大家可以自己先想一下可以用这个ChannelAttributes做哪些事情?


  • 获取从 Reactor 线程组childGroup,以及用于初始化客户端NioSocketChannelChannelInitializer,ChannelOption,ChannelAttributes,这些信息均是由用户在启动的时候向ServerBootstrap添加的客户端NioServerChannel配置信息。这里用这些信息来初始化ServerBootstrapAcceptor。因为后续会在ServerBootstrapAcceptor中接收客户端连接以及创建NioServerChannel

  • NioServerSocketChannel中的pipeline添加用于初始化pipelineChannelInitializer


问题来了,这里为什么不干脆直接将ChannelHandler添加到pipeline中,而是又使用到了ChannelInitializer呢?


其实原因有两点:


  • 为了保证线程安全地初始化pipeline,所以初始化的动作需要由Reactor线程进行,而当前线程是用户程序启动Main线程不是Reactor 线程。这里不能立即初始化。

  • 初始化Channelpipeline的动作,需要等到Channel注册到对应的Reactor中才可以进行初始化,当前只是创建好了NioServerSocketChannel,但并未注册到Main Reactor上。


初始化NioServerSocketChannelpipeline的时机是:当NioServerSocketChannel注册到Main Reactor之后,绑定端口地址之前。


前边在介绍ServerBootstrap配置childHandler时也用到了ChannelInitializer,还记得吗??


问题又来了,大家注意下ChannelInitializer#initChannel方法,在该初始化回调方法中,添加 LoggingHandler 是直接向 pipeline 中添加,而添加 Acceptor 为什么不是直接添加而是封装成异步任务呢?


这里先给大家卖个关子,笔者会在后续流程中为大家解答~~~~~



此时NioServerSocketChannel中的pipeline结构如下图所示:


1.3 向 Main Reactor 注册 NioServerSocketChannel

ServerBootstrap获取主 Reactor 线程组NioEventLoopGroup,将NioServerSocketChannel注册到NioEventLoopGroup中。


ChannelFuture regFuture = config().group().register(channel);
复制代码


下面我们来看下具体的注册过程:

1.3.1 主 Reactor 线程组中选取一个 Main Reactor 进行注册



    @Override    public ChannelFuture register(Channel channel) {        return next().register(channel);    }
@Override public EventExecutor next() { return chooser.next(); }
//获取绑定策略 @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } //采用轮询round-robin的方式选择Reactor @Override public EventExecutor next() { return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)]; }
复制代码


Netty 通过next()方法根据上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》提到的channel到reactor的绑定策略,从ReactorGroup中选取一个 Reactor 进行注册绑定。之后Channel生命周期内的所有 IO 事件都由这个 Reactor 负责处理,如 accept、connect、read、write 等 IO 事件。


一个channel只能绑定到一个Reactor上,一个Reactor负责监听多个channel



由于这里是NioServerSocketChannleMain Reactor进行注册绑定,所以Main Reactor主要负责处理的IO事件OP_ACCEPT事件。

1.3.2 向绑定后的 Main Reactor 进行注册



Reactor进行注册的行为定义在NioEventLoop的父类SingleThreadEventLoop中,印象模糊的同学可以在回看下上篇文章中的NioEventLoop继承结构小节内容。


public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override public ChannelFuture register(Channel channel) { //注册channel到绑定的Reactor上 return register(new DefaultChannelPromise(channel, this)); }
@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); //unsafe负责channel底层的各种操作 promise.channel().unsafe().register(this, promise); return promise; }}
复制代码


通过NioServerSocketChannel中的Unsafe类执行底层具体的注册动作。


protected abstract class AbstractUnsafe implements Unsafe {
/** * 注册Channel到绑定的Reactor上 * */ @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop"); if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } //EventLoop的类型要与Channel的类型一样 Nio Oio Aio if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; }
//在channel上设置绑定的Reactor AbstractChannel.this.eventLoop = eventLoop;
/** * 执行channel注册的操作必须是Reactor线程来完成 * * 1: 如果当前执行线程是Reactor线程,则直接执行register0进行注册 * 2:如果当前执行线程是外部线程,则需要将register0注册操作 封装程异步Task 由Reactor线程执行 * */ if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { ...............省略............... } } }}
复制代码


  • 首先检查NioServerSocketChannel是否已经完成注册。如果以完成注册,则直接设置代表注册操作结果的ChannelPromisefail状态

  • 通过isCompatible方法验证 Reactor 模型EventLoop是否与Channel的类型匹配。NioEventLoop对应于NioServerSocketChannel


上篇文章我们介绍过 Netty 对三种IO模型Oio,Nio,Aio的支持,用户可以通过改变 Netty 核心类的前缀轻松切换IO模型isCompatible方法目的就是需要保证ReactorChannel使用的是同一种IO模型


  • Channel中保存其绑定的Reactor实例

  • 执行ChannelReactor注册的动作必须要确保是在Reactor线程中执行。

  • 如果当前线程是Reactor线程则直接执行注册动作register0

  • 如果当前线程不是Reactor线程,则需要将注册动作register0封装成异步任务,存放在Reactor中的taskQueue中,等待Reactor线程执行。


当前执行线程并不是Reactor线程,而是用户程序的启动线程Main线程

1.3.3 Reactor 线程的启动

上篇文章中我们在介绍NioEventLoopGroup的创建过程中提到了一个构造器参数executor,它用于启动Reactor线程,类型为ThreadPerTaskExecutor


当时笔者向大家卖了一个关子~~“Reactor线程是何时启动的?”



那么现在就到了为大家揭晓谜底的时候了~~


Reactor线程的启动是在向Reactor提交第一个异步任务的时候启动的。


Netty 中的主 Reactor 线程组NioEventLoopGroup中的 Main ReactorNioEventLoop是在用户程序Main线程Main Reactor提交用于注册NioServerSocketChannel的异步任务时开始启动。


   eventLoop.execute(new Runnable() {                        @Override                        public void run() {                            register0(promise);                        }                    });
复制代码


接下来我们关注下NioEventLoopexecute方法


public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
@Override public void execute(Runnable task) { ObjectUtil.checkNotNull(task, "task"); execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); }
private void execute(Runnable task, boolean immediate) { //当前线程是否为Reactor线程 boolean inEventLoop = inEventLoop(); //addTaskWakesUp = true addTask唤醒Reactor线程执行任务 addTask(task); if (!inEventLoop) { //如果当前线程不是Reactor线程,则启动Reactor线程 //这里可以看出Reactor线程的启动是通过 向NioEventLoop添加异步任务时启动的 startThread();
.....................省略..................... } .....................省略..................... }
}
复制代码


  • 首先将异步任务task添加到Reactor中的taskQueue中。

  • 判断当前线程是否为Reactor线程,此时当前执行线程为用户程序启动线程,所以这里调用startThread 启动Reactor线程

1.3.4 startThread

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {    //定义Reactor线程状态    private static final int ST_NOT_STARTED = 1;    private static final int ST_STARTED = 2;    private static final int ST_SHUTTING_DOWN = 3;    private static final int ST_SHUTDOWN = 4;    private static final int ST_TERMINATED = 5;
//Reactor线程状态 初始为 未启动状态 private volatile int state = ST_NOT_STARTED;
//Reactor线程状态字段state 原子更新器 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } }
}
复制代码


  • Reactor线程初始化状态为ST_NOT_STARTED ,首先CAS更新状态为ST_STARTED

  • doStartThread 启动Reactor线程

  • 启动失败的话,需要将Reactor线程状态改回ST_NOT_STARTED


    //ThreadPerTaskExecutor 用于启动Reactor线程    private final Executor executor;
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); }
boolean success = false; updateLastExecutionTime(); try { //Reactor线程开始启动 SingleThreadEventExecutor.this.run(); success = true; } ................省略.............. }
复制代码


这里就来到了ThreadPerTaskExecutor 类型的executor的用武之地了。


  • Reactor线程的核心工作之前介绍过:轮询所有注册其上的Channel中的IO就绪事件处理对应Channel上的IO事件执行异步任务。Netty 将这些核心工作封装在io.netty.channel.nio.NioEventLoop#run方法中。



  • NioEventLoop#run封装在异步任务中,提交给executor执行,Reactor线程至此开始工作了就。


public final class ThreadPerTaskExecutor implements Executor {    private final ThreadFactory threadFactory;
@Override public void execute(Runnable command) { //启动Reactor线程 threadFactory.newThread(command).start(); }}
复制代码


此时Reactor线程已经启动,后面的工作全部都由这个Reactor线程来负责执行了。


而用户启动线程在向Reactor提交完NioServerSocketChannel的注册任务register0后,就逐步退出调用堆栈,回退到最开始的启动入口处ChannelFuture f = b.bind(PORT).sync()


此时Reactor中的任务队列中只有一个任务register0Reactor线程启动后,会从任务队列中取出任务执行。



至此NioServerSocketChannel的注册工作正式拉开帷幕~~


1.3.5 register0

       //true if the channel has never been registered, false otherwise         private boolean neverRegistered = true;
private void register0(ChannelPromise promise) { try { //查看注册操作是否已经取消,或者对应channel已经关闭 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; //执行真正的注册操作 doRegister(); //修改注册状态 neverRegistered = false; registered = true; //回调pipeline中添加的ChannelInitializer的handlerAdded方法,在这里初始化channelPipeline pipeline.invokeHandlerAddedIfNeeded(); //设置regFuture为success,触发operationComplete回调,将bind操作放入Reactor的任务队列中,等待Reactor线程执行。 safeSetSuccess(promise); //触发channelRegister事件 pipeline.fireChannelRegistered(); //对于服务端ServerSocketChannel来说 只有绑定端口地址成功后 channel的状态才是active的。 //此时绑定操作作为异步任务在Reactor的任务队列中,绑定操作还没开始,所以这里的isActive()是false if (isActive()) { if (firstRegistration) { //触发channelActive事件 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { ............省略............. } }
复制代码


register0是驱动整个Channel注册绑定流程的关键方法,下面我们来看下它的核心逻辑:


  • 首先需要检查Channel的注册动作是否在Reactor线程外被取消了已经!promise.setUncancellable()。检查要注册的Channel是否已经关闭!ensureOpen(promise)。如果Channel已经关闭或者注册操作已经被取消,那么就直接返回,停止注册流程。

  • 调用doRegister()方法,执行真正的注册操作。最终实现在AbstractChannel的子类AbstractNioChannel中,这个我们一会在介绍,先关注整体流程。


public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
/** * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process. * * Sub-classes may override this method */ protected void doRegister() throws Exception { // NOOP }
}
复制代码


  • ChannelReactor注册完毕后,调用pipeline.invokeHandlerAddedIfNeeded()方法,触发回调 pipeline 中添加的 ChannelInitializer 的 handlerAdded 方法,在 handlerAdded 方法中利用前面提到的ChannelInitializer初始化ChannelPipeline



初始化ChannelPipeline的时机是当Channel向对应的Reactor注册成功后,在handlerAdded事件回调中利用ChannelInitializer进行初始化。


  • 设置regFutureSuccess,并回调注册在regFuture上的ChannelFutureListener#operationComplete方法,在operationComplete回调方法中将绑定操作封装成异步任务,提交到ReactortaskQueue中。等待Reactor的执行。


还记得这个regFuture在哪里出现的吗?它是在哪里被创建,又是在哪里添加的ChannelFutureListener呢? 大家还有印象吗?回忆不起来也没关系,笔者后面还会提到


  • 通过pipeline.fireChannelRegistered()pipeline中触发channelRegister事件


pipelinechannelHandlerchannelRegistered方法被回调。


  • 对于 Netty 服务端NioServerSocketChannel来说, 只有绑定端口地址成功后 channel 的状态才是active的。此时绑定操作regFuture上注册的ChannelFutureListener#operationComplete回调方法中被作为异步任务提交到了Reactor的任务队列中,Reactor线程没开始执行绑定任务。所以这里的isActive()false


Reactor线程执行完register0方法后,才会去执行绑定任务


下面我们来看下register0方法中这些核心步骤的具体实现:

1.3.6 doRegister()

public abstract class AbstractNioChannel extends AbstractChannel {
//channel注册到Selector后获得的SelectKey volatile SelectionKey selectionKey;
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { ...............省略.................... } } }
}
复制代码


调用底层JDK NIO Channel方法java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object),将 NettyNioServerSocketChannel中包装的JDK NIO ServerSocketChannel注册到Reactor中的JDK NIO Selector上。


简单介绍下SelectableChannel#register方法参数的含义:


  • Selector:表示JDK NIO Channel将要向哪个Selector进行注册。

  • int ops: 表示Channel上感兴趣的IO事件,当对应的IO事件就绪时,Selector会返回Channel对应的SelectionKey


SelectionKey可以理解为ChannelSelector上的特殊表示形式, SelectionKey中封装了Channel感兴趣的IO事件集合~~~interestOps,以及IO就绪的事件集合~~readyOps, 同时也封装了对应的JDK NIO Channel以及注册的Selector。最后还有一个重要的属性attachment,可以允许我们在SelectionKey上附加一些自定义的对象。


  • Object attachment:SelectionKey中添加用户自定义的附加对象。


这里NioServerSocketChannelReactor中的Selector注册的IO事件0,这个操作的主要目的是先获取到ChannelSelector中对应的SelectionKey,完成注册。当绑定操作完成后,在去向SelectionKey添加感兴趣的IO事件~~~OP_ACCEPT事件


同时通过SelectableChannel#register方法将 Netty 自定义的NioServerSocketChannel(这里的this指针)附着在SelectionKeyattechment属性上,完成 Netty 自定义Channel与 JDK NIO Channel的关系绑定。这样在每次对Selector 进行IO就绪事件轮询时,Netty 都可以从 JDK NIO Selector返回的SelectionKey中获取到自定义的Channel对象(这里指的就是NioServerSocketChannel)。

1.3.7 HandlerAdded 事件回调中初始化 ChannelPipeline

NioServerSocketChannel注册到Main Reactor上的Selector后,Netty 通过调用pipeline.invokeHandlerAddedIfNeeded()开始回调NioServerSocketChannelpipeline里的 ChannelHandler 的handlerAdded方法


此时NioServerSocketChannelpipeline结构如下:



此时pipeline中只有在初始化NioServerSocketChannel时添加的ChannelInitializer


我们来看下ChannelInitializerhandlerAdded回调方法具体作了哪些事情~~


public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { if (initChannel(ctx)) { //初始化工作完成后,需要将自身从pipeline中移除 removeState(ctx); } } }
//ChannelInitializer实例是被所有的Channel共享的,用于初始化ChannelPipeline //通过Set集合保存已经初始化的ChannelPipeline,避免重复初始化同一ChannelPipeline private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap( new ConcurrentHashMap<ChannelHandlerContext, Boolean>());
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { //初始化完毕后,从pipeline中移除自身 pipeline.remove(this); } } return true; } return false; }
//匿名类实现,这里指定具体的初始化逻辑 protected abstract void initChannel(C ch) throws Exception;
private void removeState(final ChannelHandlerContext ctx) { //从initMap防重Set集合中删除ChannelInitializer if (ctx.isRemoved()) { initMap.remove(ctx); } else { ctx.executor().execute(new Runnable() { @Override public void run() { initMap.remove(ctx); } }); } }}
复制代码


ChannelInitializer 中的初始化逻辑比较简单明了:


  • 首先要判断必须是当前Channel已经完成注册后,才可以进行pipeline的初始化。ctx.channel().isRegistered()

  • 调用ChannelInitializer 的匿名类指定的initChannel 执行自定义的初始化逻辑。


        p.addLast(new ChannelInitializer<Channel>() {            @Override            public void initChannel(final Channel ch) {                final ChannelPipeline pipeline = ch.pipeline();                //ServerBootstrap中用户指定的channelHandler                ChannelHandler handler = config.handler();                if (handler != null) {                    pipeline.addLast(handler);                }
ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
复制代码


还记得在初始化NioServerSocketChannel时。io.netty.bootstrap.ServerBootstrap#init方法中向pipeline中添加的ChannelInitializer吗?


  • 当执行完initChannel 方法后,ChannelPipeline的初始化就结束了,此时ChannelInitializer 就没必要再继续呆在pipeline中了,所需要将ChannelInitializer pipeline中删除。pipeline.remove(this)


当初始化完pipeline时,此时pipeline的结构再次发生了变化:



此时Main Reactor中的任务队列taskQueue结构变化为:



添加ServerBootstrapAcceptor的任务是在初始化NioServerSocketChannel的时候向 main reactor 提交过去的。还记得吗?

1.3.8 回调 regFuture 的 ChannelFutureListener

在本小节《Netty 服务端的启动》的最开始,我们介绍了服务端启动的入口函数io.netty.bootstrap.AbstractBootstrap#doBind,在函数的最开头调用了initAndRegister()方法用来创建并初始化NioServerSocketChannel,之后便会将NioServerSocketChannel注册到Main Reactor中。


注册的操作是一个异步的过程,所以在initAndRegister()方法调用后返回一个代表注册结果的ChannelFuture regFuture


public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private ChannelFuture doBind(final SocketAddress localAddress) { //异步创建,初始化,注册ServerSocketChannel final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; }
if (regFuture.isDone()) { //如果注册完成,则进行绑定操作 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); //添加注册完成 回调函数 regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception {
...............省略............... // 注册完成后,Reactor线程回调这里 doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }}
复制代码


之后会向ChannelFuture regFuture添加一个注册完成后的回调函数~~~~ ChannelFutureListener 。在回调函数operationComplete 中开始发起绑端口地址流程



那么这个回调函数在什么时候?什么地方发起的呢??


让我们在回到本小节的主题register0 方法的流程中:


当调用doRegister()方法完成NioServerSocketChannelMain Reactor的注册后,紧接着会调用pipeline.invokeHandlerAddedIfNeeded()方法中触发ChannelInitializer#handlerAdded回调中对pipeline进行初始化。


最后在safeSetSuccess方法中,开始回调注册在regFuture 上的ChannelFutureListener


   protected final void safeSetSuccess(ChannelPromise promise) {        if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {           logger.warn("Failed to mark a promise as success because it is done already: {}", promise);        }   }
@Override public boolean trySuccess() { return trySuccess(null); }
@Override public boolean trySuccess(V result) { return setSuccess0(result); }
private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); }
private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { //回调注册在promise上的listeners notifyListeners(); } return true; } return false; }
复制代码


safeSetSuccess 的逻辑比较简单,首先设置regFuture结果为success,并且回调注册在regFuture上的ChannelFutureListener


需要提醒的是,执行safeSetSuccess 方法,以及后边回调regFuture上的ChannelFutureListener 这些动作都是由Reactor线程执行的。


关于 Netty 中的Promise模型后边我会在写一篇专门的文章进行分析,这里大家只需清楚大体的流程即可。不必在意过多的细节。


下面我们把视角切换到regFuture上的ChannelFutureListener 回调中,看看在Channel注册完成后,Netty 又会做哪些事情?

2. doBind0

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
}
复制代码


这里 Netty 又将绑定端口地址的操作封装成异步任务,提交给Reactor执行。


但是这里有一个问题,其实此时执行doBind0方法的线程正是Reactor线程,那为什么不直接在这里去执行bind操作,而是再次封装成异步任务提交给Reactor中的taskQueue呢?


反正最终都是由Reactor线程执行,这其中又有什么分别呢?


经过上小节的介绍我们知道,bind0方法的调用是由io.netty.channel.AbstractChannel.AbstractUnsafe#register0方法在将NioServerSocketChannel注册到Main Reactor之后,并且NioServerSocketChannelpipeline已经初始化完毕后,通过safeSetSuccess 方法回调过来的。


这个过程全程是由Reactor线程来负责执行的,但是此时register0方法并没有执行完毕,还需要执行后面的逻辑。


而绑定逻辑需要在注册逻辑执行完之后执行,所以在doBind0方法中Reactor线程会将绑定操作封装成异步任务先提交给taskQueue中保存,这样可以使Reactor线程立马从safeSetSuccess 中返回,继续执行剩下的register0方法逻辑。


        private void register0(ChannelPromise promise) {            try {                ................省略............
doRegister(); pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //触发channelRegister事件 pipeline.fireChannelRegistered();
if (isActive()) { ................省略............ } } catch (Throwable t) { ................省略............ } }
复制代码


Reactor线程执行完register0方法后,就会从taskQueue中取出异步任务执行。


此时Reactor线程中的taskQueue结构如下:



  • Reactor线程会先取出位于taskQueue队首的任务执行,这里是指向NioServerSocketChannelpipeline中添加ServerBootstrapAcceptor的异步任务。


此时NioServerSocketChannelpipeline的结构如下:



  • Reactor线程执行绑定任务。

3. 绑定端口地址

Channel的操作行为全部定义在ChannelOutboundInvoker接口中



public interface ChannelOutboundInvoker {
/** * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation * completes, either because the operation was successful or because of an error. * */ ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);}
复制代码


bind方法由子类AbstractChannel实现。


public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
}
复制代码


调用pipeline.bind(localAddress, promise)pipeline中传播bind事件,触发回调pipeline中所有ChannelHandlerbind方法


事件在pipeline中的传播具有方向性:


  • inbound事件HeadContext开始逐个向后传播直到TailContext

  • outbound事件则是反向传播,从TailContext开始反向向前传播直到HeadContext


inbound事件只能被pipeline中的ChannelInboundHandler响应处理outbound事件只能被pipeline中的ChannelOutboundHandler响应处理



然而这里的bind事件在 Netty 中被定义为outbound事件,所以它在pipeline中是反向传播。先从TailContext开始反向传播直到HeadContext


然而bind的核心逻辑也正是实现在HeadContext中。

3.1 HeadContext

  final class HeadContext extends AbstractChannelHandlerContext            implements ChannelOutboundHandler, ChannelInboundHandler {
@Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { //触发AbstractChannel->bind方法 执行JDK NIO SelectableChannel 执行底层绑定操作 unsafe.bind(localAddress, promise); }
}
复制代码


HeadContext#bind回调方法中,调用Channel里的unsafe操作类执行真正的绑定操作。


protected abstract class AbstractUnsafe implements Unsafe {
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { .................省略................
//这时channel还未激活 wasActive = false boolean wasActive = isActive(); try { //io.netty.channel.socket.nio.NioServerSocketChannel.doBind //调用具体channel实现类 doBind(localAddress); } catch (Throwable t) { .................省略................ return; }
//绑定成功后 channel激活 触发channelActive事件传播 if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { //pipeline中触发channelActive事件 pipeline.fireChannelActive(); } }); } //回调注册在promise上的ChannelFutureListener safeSetSuccess(promise); }
protected abstract void doBind(SocketAddress localAddress) throws Exception;}
复制代码


  • 首先执行子类NioServerSocketChannel具体实现的doBind方法,通过JDK NIO 原生 ServerSocketChannel执行底层的绑定操作。


    @Override    protected void doBind(SocketAddress localAddress) throws Exception {        //调用JDK NIO 底层SelectableChannel 执行绑定操作        if (PlatformDependent.javaVersion() >= 7) {            javaChannel().bind(localAddress, config.getBacklog());        } else {            javaChannel().socket().bind(localAddress, config.getBacklog());        }    }
复制代码


  • 判断是否为首次绑定,如果是的话将触发pipeline中的ChannelActive事件封装成异步任务放入Reactor中的taskQueue中。

  • 执行safeSetSuccess(promise),回调注册在promise上的ChannelFutureListener


还是同样的问题,当前执行线程已经是Reactor线程了,那么为何不直接触发pipeline中的ChannelActive事件而是又封装成异步任务呢??


因为如果直接在这里触发ChannelActive事件,那么Reactor线程就会去执行pipeline中的ChannelHandlerchannelActive事件回调


这样的话就影响了safeSetSuccess(promise)的执行,延迟了注册在promise上的ChannelFutureListener的回调。


到现在为止,Netty 服务端就已经完成了绑定端口地址的操作,NioServerSocketChannel的状态现在变为Active


最后还有一件重要的事情要做,我们接着来看pipeline中对channelActive事件处理。

3.2 channelActive 事件处理

channelActive事件在 Netty 中定义为inbound事件,所以它在pipeline中的传播为正向传播,从HeadContext一直到TailContext为止。


channelActive事件回调中需要触发向Selector指定需要监听的IO事件~~OP_ACCEPT事件


这块的逻辑主要在HeadContext中实现。


    final class HeadContext extends AbstractChannelHandlerContext            implements ChannelOutboundHandler, ChannelInboundHandler {
@Override public void channelActive(ChannelHandlerContext ctx) { //pipeline中继续向后传播channelActive事件 ctx.fireChannelActive(); //如果是autoRead 则自动触发read事件传播 //在read回调函数中 触发OP_ACCEPT注册 readIfIsAutoRead(); }
private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { //如果是autoRead 则触发read事件传播 channel.read(); } }
//AbstractChannel public Channel read() { //触发read事件 pipeline.read(); return this; }
@Override public void read(ChannelHandlerContext ctx) { //触发注册OP_ACCEPT或者OP_READ事件 unsafe.beginRead(); } }
复制代码


  • HeadContext中的channelActive回调中触发pipeline中的read事件

  • read事件再次传播到HeadContext时,触发HeadContext#read方法的回调。在read回调中调用channel底层操作类unsafebeginRead方法向selector注册监听OP_ACCEPT事件

3.3 beginRead

protected abstract class AbstractUnsafe implements Unsafe {
@Override public final void beginRead() { assertEventLoop(); //channel必须是Active if (!isActive()) { return; }
try { // 触发在selector上注册channel感兴趣的监听事件 doBeginRead(); } catch (final Exception e) { .............省略.............. } }}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { //子类负责继承实现 protected abstract void doBeginRead() throws Exception;
}
复制代码


  • 断言判断执行该方法的线程必须是Reactor线程

  • 此时NioServerSocketChannel已经完成端口地址的绑定操作,isActive() = true

  • 调用doBeginRead实现向Selector注册监听事件OP_ACCEPT


public abstract class AbstractNioChannel extends AbstractChannel {
//channel注册到Selector后获得的SelectKey volatile SelectionKey selectionKey; // Channel监听事件集合 protected final int readInterestOp;
@Override protected void doBeginRead() throws Exception { final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; }
readPending = true;
final int interestOps = selectionKey.interestOps(); /** * 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件 * */ if ((interestOps & readInterestOp) == 0) { //添加OP_ACCEPT事件到interestOps集合中 selectionKey.interestOps(interestOps | readInterestOp); } }}
复制代码


  • 前边提到在NioServerSocketChannel在向Main Reactor中的Selector注册后,会获得一个SelectionKey。这里首先要获取这个SelectionKey

  • SelectionKey中获取NioServerSocketChannel感兴趣的IO事件集合 interestOps ,当时在注册的时候interestOps设置为0

  • 将在NioServerSocketChannel初始化时设置的readInterestOp = OP_ACCEPT,设置到SelectionKey中的interestOps 集合中。这样Reactor中的Selector就开始监听interestOps 集合中包含的IO事件了。


Main Reactor中主要监听的是OP_ACCEPT事件


流程走到这里,Netty 服务端就真正的启动起来了,下一步就开始等待接收客户端连接了。大家此刻在来回看这副启动流程图,是不是清晰了很多呢?



此时 Netty 的Reactor模型结构如下:




总结

本文我们通过图解源码的方式完整地介绍了整个 Netty 服务端启动流程,并介绍了在启动过程中涉及到的ServerBootstrap 相关的属性以及配置方式。NioServerSocketChannel 的创建初始化过程以及类的继承结构。


其中重点介绍了NioServerSocketChannel Reactor的注册过程以及Reactor线程的启动时机和pipeline的初始化时机。


最后介绍了NioServerSocketChannel绑定端口地址的整个流程。


上述介绍的这些流程全部是异步操作,各种回调绕来绕去的,需要反复回想下,读异步代码就是这样,需要理清各种回调之间的关系,并且时刻提醒自己当前的执行线程是什么?


好了,现在 Netty 服务端已经启动起来,接着就该接收客户端连接了,我们下篇文章见~~~~

发布于: 刚刚阅读数: 2
用户头像

微信公众号:bin的技术小屋,专注源码解析 2018.01.31 加入

专注源码解析系列原创技术文章,分享自己的技术感悟

评论

发布
暂无评论
详细图解Netty Reactor启动全流程