本系列 Netty 源码解析文章基于 4.1.56.Final 版本
大家第一眼看到这幅流程图,是不是脑瓜子嗡嗡的呢?
大家先不要惊慌,问题不大,本文笔者的目的就是要让大家清晰的理解这幅流程图,从而深刻的理解 Netty Reactor 的启动全流程,包括其中涉及到的各种代码设计实现细节。
在上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》中我们详细介绍了 Netty 服务端核心引擎组件主从Reactor组模型 NioEventLoopGroup
以及Reactor模型 NioEventLoop
的创建过程。最终我们得到了 netty Reactor 模型的运行骨架如下:
现在 Netty 服务端程序的骨架是搭建好了,本文我们就基于这个骨架来深入剖析下 Netty 服务端的启动过程。
我们继续回到上篇文章提到的 Netty 服务端代码模板中,在创建完主从 Reactor 线程组:bossGroup
,workerGroup
后,接下来就开始配置 Netty 服务端的启动辅助类ServerBootstrap
了。
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//创建主从Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
.option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
.handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 绑定端口启动服务,开始监听accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
复制代码
在上篇文章中我们对代码模板中涉及到ServerBootstrap
的一些配置方法做了简单的介绍,大家如果忘记的话,可以在返回去回顾一下。
ServerBootstrap类
其实没有什么特别的逻辑,主要是对 Netty 启动过程中需要用到的一些核心信息进行配置管理,比如:
Netty 的核心引擎组件主从Reactor线程组: bossGroup,workerGroup
。通过ServerBootstrap#group方法
配置。
Netty 服务端使用到的 Channel 类型:NioServerSocketChannel
,通过ServerBootstrap#channel方法
配置。以及配置NioServerSocketChannel
时用到的SocketOption
。SocketOption
用于设置底层 JDK NIO Socket 的一些选项。通过ServerBootstrap#option方法
进行配置。
主 ReactorGroup 中的 MainReactor 管理的 Channel 类型为NioServerSocketChannel
,如图所示主要用来监听端口,接收客户端连接,为客户端创建初始化NioSocketChannel
,然后采用round-robin
轮询的方式从图中从 ReactorGroup 中选择一个 SubReactor 与该客户端NioSocketChannel
进行绑定。
从 ReactorGroup 中的 SubReactor 管理的 Channel 类型为NioSocketChannel
,它是 netty 中定义客户端连接的一个模型,每个连接对应一个。如图所示 SubReactor 负责监听处理绑定在其上的所有NioSocketChannel
上的 IO 事件。
不管是服务端用到的NioServerSocketChannel
还是客户端用到的NioSocketChannel
,每个Channel实例
都会有一个Pipeline
,Pipeline
中有多个ChannelHandler
用于编排处理对应Channel
上感兴趣的IO事件
。
ServerBootstrap
结构中包含了 netty 服务端程序启动的所有配置信息,在我们介绍启动流程之前,先来看下ServerBootstrap
的源码结构:
ServerBootstrap
ServerBootstrap
的继承结构比较简单,继承层次的职责分工也比较明确。
ServerBootstrap
主要负责对主从Reactor线程组
相关的配置进行管理,其中带child前缀的配置方法
是对从Reactor线程组
的相关配置管理。从Reactor线程组
中的Sub Reactor
负责管理的客户端NioSocketChannel
相关配置存储在ServerBootstrap
结构中。
父类AbstractBootstrap
则是主要负责对主Reactor线程组
相关的配置进行管理,以及主Reactor线程组
中的Main Reactor
负责处理的服务端ServerSocketChannel
相关的配置管理。
1. 配置主从 Reactor 线程组
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
复制代码
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//Main Reactor线程组
volatile EventLoopGroup group;
//Sub Reactor线程组
private volatile EventLoopGroup childGroup;
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//父类管理主Reactor线程组
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
}
复制代码
2. 配置服务端 ServerSocketChannel
ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);
复制代码
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//用于创建ServerSocketChannel ReflectiveChannelFactory
private volatile ChannelFactory<? extends C> channelFactory;
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
}
复制代码
在向ServerBootstrap
配置服务端ServerSocketChannel
的channel
方法中,其实是创建了一个ChannelFactory
工厂实例ReflectiveChannelFactory
,在 Netty 服务端启动的过程中,会通过这个ChannelFactory
去创建相应的Channel
实例。
我们可以通过这个方法来配置 netty 的 IO 模型,下面为ServerSocketChannel
在不同 IO 模型下的实现:
EventLoopGroup
Reactor 线程组在不同 IO 模型下的实现:
我们只需要将IO模型
的这些核心接口对应的实现类前缀
改为对应IO模型
的前缀,就可以轻松在 Netty 中完成对IO模型
的切换。
2.1 ReflectiveChannelFactory
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
//NioServerSocketChannelde 构造器
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
//反射获取NioServerSocketChannel的构造器
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
//创建NioServerSocketChannel实例
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}
复制代码
从类的签名我们可以看出,这个工厂类是通过泛型
加反射
的方式来创建对应的Channel
实例。
泛型参数T extends Channel
表示的是要通过工厂类创建的Channel类型
,这里我们初始化的是NioServerSocketChannel
。
在ReflectiveChannelFactory
的构造器中通过反射
的方式获取NioServerSocketChannel
的构造器。
在newChannel
方法中通过构造器反射创建NioServerSocketChannel
实例。
注意这时只是配置阶段,NioServerSocketChannel
此时并未被创建。它是在启动的时候才会被创建出来。
3. 为 NioServerSocketChannel 配置 ChannelOption
ServerBootstrap b = new ServerBootstrap();
//设置被MainReactor管理的NioServerSocketChannel的Socket选项
b.option(ChannelOption.SO_BACKLOG, 100)
复制代码
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
//serverSocketChannel中的ChannelOption配置
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> B option(ChannelOption<T> option, T value) {
ObjectUtil.checkNotNull(option, "option");
synchronized (options) {
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}
}
复制代码
无论是服务端的NioServerSocketChannel
还是客户端的NioSocketChannel
它们的相关底层 Socket 选项ChannelOption
配置全部存放于一个Map
类型的数据结构中。
由于客户端NioSocketChannel
是由从Reactor线程组
中的Sub Reactor
来负责处理,所以涉及到客户端NioSocketChannel
所有的方法和配置全部是以child
前缀开头。
ServerBootstrap b = new ServerBootstrap();
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
复制代码
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//客户端SocketChannel对应的ChannelOption配置
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
ObjectUtil.checkNotNull(childOption, "childOption");
synchronized (childOptions) {
if (value == null) {
childOptions.remove(childOption);
} else {
childOptions.put(childOption, value);
}
}
return this;
}
}
复制代码
相关的底层 Socket 选项,netty 全部枚举在 ChannelOption 类中,笔者这里就不一一列举了,在本系列后续相关的文章中,笔者还会为大家详细的介绍这些参数的作用。
public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {
..................省略..............
public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
..................省略..............
}
复制代码
4. 为服务端 NioServerSocketChannel 中的 Pipeline 配置 ChannelHandler
//serverSocketChannel中pipeline里的handler(主要是acceptor)
private volatile ChannelHandler handler;
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
复制代码
向NioServerSocketChannel
中的Pipeline
添加ChannelHandler
分为两种方式:
关于ChannelInitializer
后面笔者会有详细介绍,这里大家只需要知道ChannelInitializer
是一种特殊的ChannelHandler
,用于初始化pipeline
。适用于向 pipeline 中添加多个 ChannelHandler 的场景。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
.handler(new ChannelInitializer<NioServerSocketChannel>() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(channelhandler1)
.addLast(channelHandler2)
......
.addLast(channelHandler3);
}
})
复制代码
隐式添加ServerBootstrapAcceptor
是由 Netty 框架在启动的时候负责添加,用户无需关心。
在本例中,NioServerSocketChannel
的PipeLine
中只有两个ChannelHandler
,一个由用户在外部显式添加的LoggingHandler
,另一个是由 Netty 框架隐式添加的ServerBootstrapAcceptor
。
其实我们在实际项目使用的过程中,不会向 netty 服务端NioServerSocketChannel
添加额外的 ChannelHandler,NioServerSocketChannel
只需要专心做好自己最重要的本职工作接收客户端连接就好了。这里额外添加一个LoggingHandler
只是为了向大家展示ServerBootstrap
的配置方法。
5. 为客户端 NioSocketChannel 中的 Pipeline 配置 ChannelHandler
final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
复制代码
//socketChannel中pipeline中的处理handler
private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
复制代码
向客户端NioSocketChannel
中的Pipeline
里添加ChannelHandler
完全是由用户自己控制显式添加,添加的数量不受限制。
由于在 Netty 的IO线程模型
中,是由单个Sub Reactor线程
负责执行客户端NioSocketChannel
中的Pipeline
,一个Sub Reactor线程
负责处理多个NioSocketChannel
上的IO事件
,如果Pipeline
中的ChannelHandler
添加的太多,就会影响Sub Reactor线程
执行其他NioSocketChannel
上的Pipeline
,从而降低IO处理效率
,降低吞吐量。
所以Pipeline
中的ChannelHandler
不易添加过多,并且不能再ChannelHandler
中执行耗时的业务处理任务。
在我们通过ServerBootstrap
配置 netty 服务端启动信息的时候,无论是向服务端NioServerSocketChannel
的 pipeline 中添加 ChannelHandler,还是向客户端NioSocketChannel
的 pipeline 中添加 ChannelHandler,当涉及到多个 ChannelHandler 添加的时候,我们都会用到ChannelInitializer
,那么这个ChannelInitializer
究竟是何方圣神,为什么要这样做呢?我们接着往下看~~
ChannelInitializer
首先ChannelInitializer
它继承于ChannelHandler
,它自己本身就是一个 ChannelHandler,所以它可以添加到childHandler
中。
其他的父类大家这里可以不用管,后面文章中笔者会一一为大家详细介绍。
那为什么不直接添加ChannelHandler
而是选择用ChannelInitializer
呢?
这里主要有两点原因:
前边我们提到,客户端NioSocketChannel
是在服务端 accept 连接后,在服务端NioServerSocketChannel
中被创建出来的。但是此时我们正处于配置ServerBootStrap
阶段,服务端还没有启动,更没有客户端连接上来,此时客户端NioSocketChannel
还没有被创建出来,所以也就没办法向客户端NioSocketChannel
的 pipeline 中添加ChannelHandler
。
客户端NioSocketChannel
中Pipeline
里可以添加任意多个ChannelHandler
,但是 Netty 框架无法预知用户到底需要添加多少个ChannelHandler
,所以 Netty 框架提供了回调函数ChannelInitializer#initChannel
,使用户可以自定义ChannelHandler
的添加行为。
当客户端NioSocketChannel
注册到对应的Sub Reactor
上后,紧接着就会初始化NioSocketChannel
中的Pipeline
,此时 Netty 框架会回调ChannelInitializer#initChannel
执行用户自定义的添加逻辑。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//当channelRegister事件发生时,调用initChannel初始化pipeline
if (initChannel(ctx)) {
.................省略...............
} else {
.................省略...............
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
//此时客户单NioSocketChannel已经创建并初始化好了
initChannel((C) ctx.channel());
} catch (Throwable cause) {
.................省略...............
} finally {
.................省略...............
}
return true;
}
return false;
}
protected abstract void initChannel(C ch) throws Exception;
.................省略...............
}
复制代码
这里由 netty 框架回调的ChannelInitializer#initChannel方法
正是我们自定义的添加逻辑。
final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
复制代码
到此为止,Netty 服务端启动所需要的必要配置信息,已经全部存入ServerBootStrap
启动辅助类中。
接下来要做的事情就是服务端的启动了。
// Start the server. 绑定端口启动服务,开始监听accept事件
ChannelFuture f = serverBootStrap.bind(PORT).sync();
复制代码
Netty 服务端的启动
经过前面的铺垫终于来到了本文的核心内容----Netty 服务端的启动过程。
如代码模板中的示例所示,Netty 服务端的启动过程封装在io.netty.bootstrap.AbstractBootstrap#bind(int)
函数中。
接下来我们看一下 Netty 服务端在启动过程中究竟干了哪些事情?
大家看到这副启动流程图先不要慌,接下来的内容笔者会带大家各个击破它,在文章的最后保证让大家看懂这副流程图。
我们先来从 netty 服务端启动的入口函数开始我们今天的源码解析旅程:
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
//校验Netty核心组件是否配置齐全
validate();
//服务端开始启动,绑定端口地址,接收客户端连接
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//异步创建,初始化,注册ServerSocketChannel到main reactor上
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
........serverSocketChannel向Main Reactor注册成功后开始绑定端口....,
} else {
//如果此时注册操作没有完成,则向regFuture添加operationComplete回调函数,注册成功后回调。
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
........serverSocketChannel向Main Reactor注册成功后开始绑定端口....,
});
return promise;
}
}
复制代码
Netty 服务端的启动流程总体如下:
创建服务端NioServerSocketChannel
并初始化。
将服务端NioServerSocketChannel
注册到主Reactor线程组
中。
注册成功后,开始初始化NioServerSocketChannel
中的 pipeline,然后在 pipeline 中触发 channelRegister 事件。
随后由NioServerSocketChannel
绑定端口地址。
绑定端口地址成功后,向NioServerSocketChannel
对应的Pipeline
中触发传播ChannelActive事件
,在ChannelActive事件回调
中向Main Reactor
注册OP_ACCEPT事件
,开始等待客户端连接。服务端启动完成。
当 netty 服务端启动成功之后,最终我们会得到如下结构的阵型,开始枕戈待旦,准备接收客户端的连接,Reactor 开始运转。
接下来,我们就来看下 Netty 源码是如何实现以上步骤的~~
1. initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//创建NioServerSocketChannel
//ReflectiveChannelFactory通过泛型,反射,工厂的方式灵活创建不同类型的channel
channel = channelFactory.newChannel();
//初始化NioServerSocketChannel
init(channel);
} catch (Throwable t) {
..............省略.................
}
//向MainReactor注册ServerSocketChannel
ChannelFuture regFuture = config().group().register(channel);
..............省略.................
return regFuture;
}
复制代码
从函数命名中我们可以看出,这个函数主要做的事情就是首先创建NioServerSocketChannel
,并对NioServerSocketChannel
进行初始化,最后将NioServerSocketChannel
注册到Main Reactor
中。
1.1 创建 NioServerSocketChannel
还记得我们在介绍ServerBootstrap
启动辅助类配置服务端ServerSocketChannel
类型的时候提到的工厂类ReflectiveChannelFactory
吗?
因为当时我们在配置ServerBootstrap
启动辅助类的时候,还没到启动阶段,而配置阶段并不是创建具体ServerSocketChannel
的时机。
所以 Netty 通过工厂模式
将要创建的ServerSocketChannel
的类型(通过泛型指定)以及 创建的过程(封装在newChannel函数中
)统统先封装在工厂类ReflectiveChannelFactory
中。
ReflectiveChannelFactory
通过泛型
,反射
,工厂
的方式灵活
创建不同类型的channel
等待创建时机来临,我们调用保存在ServerBootstrap
中的channelFactory
直接进行创建。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}
复制代码
下面我们来看下NioServerSocketChannel
的构建过程:
1.1.1 NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
//SelectorProvider(用于创建Selector和Selectable Channels)
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
//创建JDK NIO ServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
//ServerSocketChannel相关的配置
private final ServerSocketChannelConfig config;
public NioServerSocketChannel(ServerSocketChannel channel) {
//父类AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中设置用于Channel接收数据用的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
复制代码
首先调用newSocket
创建 JDK NIO 原生ServerSocketChannel
,这里调用了SelectorProvider#openServerSocketChannel
来创建 JDK NIO 原生ServerSocketChannel
,我们在上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》中详细的介绍了SelectorProvider
相关内容,当时是用SelectorProvider
来创建Reactor
中的Selector
。大家还记得吗??
通过父类构造器设置NioServerSocketChannel
感兴趣的IO事件
,这里设置的是SelectionKey.OP_ACCEPT
事件。并将 JDK NIO 原生ServerSocketChannel
封装起来。
创建Channel
的配置类NioServerSocketChannelConfig
,在配置类中封装了对Channel底层
的一些配置行为,以及 JDK 中的ServerSocket
。以及创建NioServerSocketChannel
接收数据用的Buffer
分配器AdaptiveRecvByteBufAllocator
。
NioServerSocketChannelConfig
没什么重要的东西,我们这里也不必深究,它就是管理NioServerSocketChannel
相关的配置,这里唯一需要大家注意的是这个用于Channel
接收数据用的Buffer分配器
AdaptiveRecvByteBufAllocator,我们后面在介绍 Netty 如何接收连接的时候还会提到。
NioServerSocketChannel
的整体构建过程介绍完了,现在我们来按照继承层次再回过头来看下NioServerSocketChannel
的层次构建,来看下每一层都创建了什么,封装了什么,这些信息都是Channel
的核心信息,所以有必要了解一下。
在NioServerSocketChannel
的创建过程中,我们主要关注继承结构图中红框标注的三个类,其他的我们占时先不用管。
其中AbstractNioMessageChannel类
主要是对NioServerSocketChannel
底层读写行为的封装和定义,比如 accept 接收客户端连接。这个我们后续会介绍到,这里我们并不展开。
1.1.2 AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel {
//JDK NIO原生Selectable Channel
private final SelectableChannel ch;
// Channel监听事件集合 这里是SelectionKey.OP_ACCEPT事件
protected final int readInterestOp;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//设置Channel为非阻塞 配合IO多路复用模型
ch.configureBlocking(false);
} catch (IOException e) {
.............省略................
}
}
}
复制代码
封装由SelectorProvider
创建出来的 JDK NIO 原生ServerSocketChannel
。
封装Channel
在创建时指定感兴趣的IO事件
,对于NioServerSocketChannel
来说感兴趣的IO事件
为OP_ACCEPT事件
。
设置 JDK NIO 原生ServerSocketChannel
为非阻塞模式, 配合 IO 多路复用模型。
1.1.3 AbstractChannel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
//channel是由创建层次的,比如ServerSocketChannel 是 SocketChannel的 parent
private final Channel parent;
//channel全局唯一ID machineId+processId+sequence+timestamp+random
private final ChannelId id;
//unsafe用于封装对底层socket的相关操作
private final Unsafe unsafe;
//为channel分配独立的pipeline用于IO事件编排
private final DefaultChannelPipeline pipeline;
protected AbstractChannel(Channel parent) {
this.parent = parent;
//channel全局唯一ID machineId+processId+sequence+timestamp+random
id = newId();
//unsafe用于定义实现对Channel的底层操作
unsafe = newUnsafe();
//为channel分配独立的pipeline用于IO事件编排
pipeline = newChannelPipeline();
}
}
复制代码
Netty 中的Channel创建
是有层次的,这里的parent属性
用来保存上一级的Channel
,比如这里的NioServerSocketChannel
是顶级Channel
,所以它的parent = null
。客户端NioSocketChannel
是由NioServerSocketChannel
创建的,所以它的parent = NioServerSocketChannel
。
为Channel
分配全局唯一的ChannelId
。ChannelId
由机器 Id(machineId
),进程 Id(processId
),序列号(sequence
),时间戳(timestamp
),随机数(random
)构成
private DefaultChannelId() {
data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
int i = 0;
// machineId
System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
i += MACHINE_ID.length;
// processId
i = writeInt(i, PROCESS_ID);
// sequence
i = writeInt(i, nextSequence.getAndIncrement());
// timestamp (kind of)
i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
// random
int random = PlatformDependent.threadLocalRandom().nextInt();
i = writeInt(i, random);
assert i == data.length;
hashCode = Arrays.hashCode(data);
}
复制代码
Unsafe
为Channel接口
的一个内部接口,用于定义实现对 Channel 底层的各种操作,Unsafe接口
定义的操作行为只能由 Netty 框架的Reactor线程
调用,用户线程禁止调用。
interface Unsafe {
//分配接收数据用的Buffer
RecvByteBufAllocator.Handle recvBufAllocHandle();
//服务端绑定的端口地址
SocketAddress localAddress();
//远端地址
SocketAddress remoteAddress();
//channel向Reactor注册
void register(EventLoop eventLoop, ChannelPromise promise);
//服务端绑定端口地址
void bind(SocketAddress localAddress, ChannelPromise promise);
//客户端连接服务端
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
//关闭channle
void close(ChannelPromise promise);
//读数据
void beginRead();
//写数据
void write(Object msg, ChannelPromise promise);
}
复制代码
ChannelHandlerContext
保存 ChannelHandler 上下文信息,用于事件传播。后面笔者会单独开一篇文章介绍,这里我们还是聚焦于启动主线。
这里只是为了让大家简单理解pipeline
的一个大致的结构,后面会写一篇文章专门详细讲解pipeline
。
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
复制代码
到了这里NioServerSocketChannel
就创建完毕了,我们来回顾下它到底包含了哪些核心信息。
1.2 初始化 NioServerSocketChannel
void init(Channel channel) {
//向NioServerSocketChannelConfig设置ServerSocketChannelOption
setChannelOptions(channel, newOptionsArray(), logger);
//向netty自定义的NioServerSocketChannel设置attributes
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
//获取从Reactor线程组
final EventLoopGroup currentChildGroup = childGroup;
//获取用于初始化客户端NioSocketChannel的ChannelInitializer
final ChannelHandler currentChildHandler = childHandler;
//获取用户配置的客户端SocketChannel的channelOption以及attributes
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
//向NioServerSocketChannel中的pipeline添加初始化ChannelHandler的逻辑
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap中用户指定的channelHandler
ChannelHandler handler = config.handler();
if (handler != null) {
//LoggingHandler
pipeline.addLast(handler);
}
//添加用于接收客户端连接的acceptor
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
复制代码
Netty 自定义的SocketChannel
类型均继承AttributeMap
接口以及DefaultAttributeMap
类,正是它们定义了ChannelAttributes
。用于向Channel
添加用户自定义的一些信息。
这个ChannelAttributes
的用处大有可为,Netty 后边的许多特性都是依靠这个ChannelAttributes
来实现的。这里先卖个关子,大家可以自己先想一下可以用这个ChannelAttributes
做哪些事情?
获取从 Reactor 线程组childGroup
,以及用于初始化客户端NioSocketChannel
的ChannelInitializer
,ChannelOption
,ChannelAttributes
,这些信息均是由用户在启动的时候向ServerBootstrap
添加的客户端NioServerChannel
配置信息。这里用这些信息来初始化ServerBootstrapAcceptor
。因为后续会在ServerBootstrapAcceptor
中接收客户端连接以及创建NioServerChannel
。
向NioServerSocketChannel
中的pipeline
添加用于初始化pipeline
的ChannelInitializer
。
问题来了,这里为什么不干脆直接将ChannelHandler
添加到pipeline
中,而是又使用到了ChannelInitializer
呢?
其实原因有两点:
初始化NioServerSocketChannel
中pipeline
的时机是:当NioServerSocketChannel
注册到Main Reactor
之后,绑定端口地址之前。
前边在介绍ServerBootstrap
配置childHandler
时也用到了ChannelInitializer
,还记得吗??
问题又来了,大家注意下ChannelInitializer#initChannel
方法,在该初始化回调方法中,添加 LoggingHandler 是直接向 pipeline 中添加,而添加 Acceptor 为什么不是直接添加而是封装成异步任务呢?
这里先给大家卖个关子,笔者会在后续流程中为大家解答~~~~~
此时NioServerSocketChannel
中的pipeline
结构如下图所示:
1.3 向 Main Reactor 注册 NioServerSocketChannel
从ServerBootstrap
获取主 Reactor 线程组NioEventLoopGroup
,将NioServerSocketChannel
注册到NioEventLoopGroup
中。
ChannelFuture regFuture = config().group().register(channel);
复制代码
下面我们来看下具体的注册过程:
1.3.1 主 Reactor 线程组中选取一个 Main Reactor 进行注册
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventExecutor next() {
return chooser.next();
}
//获取绑定策略
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
//采用轮询round-robin的方式选择Reactor
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
复制代码
Netty 通过next()
方法根据上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》提到的channel到reactor的绑定策略
,从ReactorGroup
中选取一个 Reactor 进行注册绑定。之后Channel
生命周期内的所有 IO 事件
都由这个 Reactor
负责处理,如 accept、connect、read、write
等 IO 事件。
一个channel
只能绑定到一个Reactor
上,一个Reactor
负责监听多个channel
。
由于这里是NioServerSocketChannle
向Main Reactor
进行注册绑定,所以Main Reactor
主要负责处理的IO事件
是OP_ACCEPT
事件。
1.3.2 向绑定后的 Main Reactor 进行注册
向Reactor
进行注册的行为定义在NioEventLoop
的父类SingleThreadEventLoop
中,印象模糊的同学可以在回看下上篇文章中的NioEventLoop继承结构
小节内容。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
//注册channel到绑定的Reactor上
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//unsafe负责channel底层的各种操作
promise.channel().unsafe().register(this, promise);
return promise;
}
}
复制代码
通过NioServerSocketChannel
中的Unsafe类
执行底层具体的注册动作。
protected abstract class AbstractUnsafe implements Unsafe {
/**
* 注册Channel到绑定的Reactor上
* */
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
//EventLoop的类型要与Channel的类型一样 Nio Oio Aio
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//在channel上设置绑定的Reactor
AbstractChannel.this.eventLoop = eventLoop;
/**
* 执行channel注册的操作必须是Reactor线程来完成
*
* 1: 如果当前执行线程是Reactor线程,则直接执行register0进行注册
* 2:如果当前执行线程是外部线程,则需要将register0注册操作 封装程异步Task 由Reactor线程执行
* */
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...............省略...............
}
}
}
}
复制代码
上篇文章我们介绍过 Netty 对三种IO模型
:Oio,Nio,Aio
的支持,用户可以通过改变 Netty 核心类的前缀轻松切换IO模型
。isCompatible
方法目的就是需要保证Reactor
和Channel
使用的是同一种IO模型
。
在Channel
中保存其绑定的Reactor实例
。
执行Channel
向Reactor
注册的动作必须要确保是在Reactor线程
中执行。
如果当前线程是Reactor线程
则直接执行注册动作register0
如果当前线程不是Reactor线程
,则需要将注册动作register0
封装成异步任务,存放在Reactor
中的taskQueue
中,等待Reactor线程
执行。
当前执行线程并不是Reactor线程
,而是用户程序的启动线程Main线程
。
1.3.3 Reactor 线程的启动
上篇文章中我们在介绍NioEventLoopGroup
的创建过程中提到了一个构造器参数executor
,它用于启动Reactor线程
,类型为ThreadPerTaskExecutor
。
当时笔者向大家卖了一个关子~~“Reactor线程是何时启动的?”
那么现在就到了为大家揭晓谜底的时候了~~
Reactor线程
的启动是在向Reactor
提交第一个异步任务的时候启动的。
Netty 中的主 Reactor 线程组NioEventLoopGroup
中的 Main ReactorNioEventLoop
是在用户程序Main线程
向Main Reactor
提交用于注册NioServerSocketChannel
的异步任务时开始启动。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
复制代码
接下来我们关注下NioEventLoop
的execute方法
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
//当前线程是否为Reactor线程
boolean inEventLoop = inEventLoop();
//addTaskWakesUp = true addTask唤醒Reactor线程执行任务
addTask(task);
if (!inEventLoop) {
//如果当前线程不是Reactor线程,则启动Reactor线程
//这里可以看出Reactor线程的启动是通过 向NioEventLoop添加异步任务时启动的
startThread();
.....................省略.....................
}
.....................省略.....................
}
}
复制代码
1.3.4 startThread
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
//定义Reactor线程状态
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
//Reactor线程状态 初始为 未启动状态
private volatile int state = ST_NOT_STARTED;
//Reactor线程状态字段state 原子更新器
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
}
复制代码
Reactor线程
初始化状态为ST_NOT_STARTED
,首先CAS
更新状态为ST_STARTED
doStartThread
启动Reactor线程
启动失败的话,需要将Reactor线程
状态改回ST_NOT_STARTED
//ThreadPerTaskExecutor 用于启动Reactor线程
private final Executor executor;
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//Reactor线程开始启动
SingleThreadEventExecutor.this.run();
success = true;
}
................省略..............
}
复制代码
这里就来到了ThreadPerTaskExecutor
类型的executor
的用武之地了。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
@Override
public void execute(Runnable command) {
//启动Reactor线程
threadFactory.newThread(command).start();
}
}
复制代码
此时Reactor线程
已经启动,后面的工作全部都由这个Reactor线程
来负责执行了。
而用户启动线程在向Reactor
提交完NioServerSocketChannel
的注册任务register0
后,就逐步退出调用堆栈,回退到最开始的启动入口处ChannelFuture f = b.bind(PORT).sync()
。
此时Reactor
中的任务队列中只有一个任务register0
,Reactor线程
启动后,会从任务队列中取出任务执行。
至此NioServerSocketChannel
的注册工作正式拉开帷幕~~
1.3.5 register0
//true if the channel has never been registered, false otherwise
private boolean neverRegistered = true;
private void register0(ChannelPromise promise) {
try {
//查看注册操作是否已经取消,或者对应channel已经关闭
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//执行真正的注册操作
doRegister();
//修改注册状态
neverRegistered = false;
registered = true;
//回调pipeline中添加的ChannelInitializer的handlerAdded方法,在这里初始化channelPipeline
pipeline.invokeHandlerAddedIfNeeded();
//设置regFuture为success,触发operationComplete回调,将bind操作放入Reactor的任务队列中,等待Reactor线程执行。
safeSetSuccess(promise);
//触发channelRegister事件
pipeline.fireChannelRegistered();
//对于服务端ServerSocketChannel来说 只有绑定端口地址成功后 channel的状态才是active的。
//此时绑定操作作为异步任务在Reactor的任务队列中,绑定操作还没开始,所以这里的isActive()是false
if (isActive()) {
if (firstRegistration) {
//触发channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
............省略.............
}
}
复制代码
register0
是驱动整个Channel
注册绑定流程的关键方法,下面我们来看下它的核心逻辑:
首先需要检查Channel
的注册动作是否在Reactor线程
外被取消了已经!promise.setUncancellable()
。检查要注册的Channel
是否已经关闭!ensureOpen(promise)
。如果Channel
已经关闭或者注册操作已经被取消,那么就直接返回,停止注册流程。
调用doRegister()
方法,执行真正的注册操作。最终实现在AbstractChannel
的子类AbstractNioChannel
中,这个我们一会在介绍,先关注整体流程。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
/**
* Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
*
* Sub-classes may override this method
*/
protected void doRegister() throws Exception {
// NOOP
}
}
复制代码
初始化ChannelPipeline
的时机是当Channel
向对应的Reactor
注册成功后,在handlerAdded事件回调
中利用ChannelInitializer
进行初始化。
还记得这个regFuture
在哪里出现的吗?它是在哪里被创建,又是在哪里添加的ChannelFutureListener
呢? 大家还有印象吗?回忆不起来也没关系,笔者后面还会提到
pipeline
中channelHandler
的channelRegistered方法
被回调。
当Reactor线程
执行完register0方法
后,才会去执行绑定任务
。
下面我们来看下register0
方法中这些核心步骤
的具体实现:
1.3.6 doRegister()
public abstract class AbstractNioChannel extends AbstractChannel {
//channel注册到Selector后获得的SelectKey
volatile SelectionKey selectionKey;
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...............省略....................
}
}
}
}
复制代码
调用底层JDK NIO Channel
方法java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object)
,将 NettyNioServerSocketChannel
中包装的JDK NIO ServerSocketChannel
注册到Reactor
中的JDK NIO Selector
上。
简单介绍下SelectableChannel#register
方法参数的含义:
SelectionKey
可以理解为Channel
在Selector
上的特殊表示形式, SelectionKey
中封装了Channel
感兴趣的IO事件集合~~~interestOps
,以及IO就绪的事件集合~~readyOps
, 同时也封装了对应的JDK NIO Channel
以及注册的Selector
。最后还有一个重要的属性attachment
,可以允许我们在SelectionKey
上附加一些自定义的对象。
这里NioServerSocketChannel
向Reactor
中的Selector
注册的IO事件
为0
,这个操作的主要目的是先获取到Channel
在Selector
中对应的SelectionKey
,完成注册。当绑定操作完成后,在去向SelectionKey
添加感兴趣的IO事件
~~~OP_ACCEPT事件
。
同时通过SelectableChannel#register
方法将 Netty 自定义的NioServerSocketChannel
(这里的this
指针)附着在SelectionKey
的attechment
属性上,完成 Netty 自定义Channel
与 JDK NIO Channel
的关系绑定。这样在每次对Selector
进行IO就绪事件
轮询时,Netty 都可以从 JDK NIO Selector
返回的SelectionKey
中获取到自定义的Channel
对象(这里指的就是NioServerSocketChannel
)。
1.3.7 HandlerAdded 事件回调中初始化 ChannelPipeline
当NioServerSocketChannel
注册到Main Reactor
上的Selector
后,Netty 通过调用pipeline.invokeHandlerAddedIfNeeded()
开始回调NioServerSocketChannel
中pipeline
里的 ChannelHandler 的handlerAdded方法
。
此时NioServerSocketChannel
的pipeline
结构如下:
此时pipeline
中只有在初始化NioServerSocketChannel
时添加的ChannelInitializer
。
我们来看下ChannelInitializer
中handlerAdded回调方法
具体作了哪些事情~~
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
//初始化工作完成后,需要将自身从pipeline中移除
removeState(ctx);
}
}
}
//ChannelInitializer实例是被所有的Channel共享的,用于初始化ChannelPipeline
//通过Set集合保存已经初始化的ChannelPipeline,避免重复初始化同一ChannelPipeline
private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
new ConcurrentHashMap<ChannelHandlerContext, Boolean>());
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
//初始化完毕后,从pipeline中移除自身
pipeline.remove(this);
}
}
return true;
}
return false;
}
//匿名类实现,这里指定具体的初始化逻辑
protected abstract void initChannel(C ch) throws Exception;
private void removeState(final ChannelHandlerContext ctx) {
//从initMap防重Set集合中删除ChannelInitializer
if (ctx.isRemoved()) {
initMap.remove(ctx);
} else {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
initMap.remove(ctx);
}
});
}
}
}
复制代码
ChannelInitializer
中的初始化逻辑比较简单明了:
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap中用户指定的channelHandler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
复制代码
还记得在初始化NioServerSocketChannel
时。io.netty.bootstrap.ServerBootstrap#init
方法中向pipeline
中添加的ChannelInitializer
吗?
当初始化完pipeline
时,此时pipeline
的结构再次发生了变化:
此时Main Reactor
中的任务队列taskQueue
结构变化为:
添加ServerBootstrapAcceptor
的任务是在初始化NioServerSocketChannel
的时候向 main reactor 提交过去的。还记得吗?
1.3.8 回调 regFuture 的 ChannelFutureListener
在本小节《Netty 服务端的启动》的最开始,我们介绍了服务端启动的入口函数io.netty.bootstrap.AbstractBootstrap#doBind
,在函数的最开头调用了initAndRegister()
方法用来创建并初始化NioServerSocketChannel
,之后便会将NioServerSocketChannel
注册到Main Reactor
中。
注册的操作是一个异步的过程,所以在initAndRegister()
方法调用后返回一个代表注册结果的ChannelFuture regFuture
。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private ChannelFuture doBind(final SocketAddress localAddress) {
//异步创建,初始化,注册ServerSocketChannel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
//如果注册完成,则进行绑定操作
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//添加注册完成 回调函数
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
...............省略...............
// 注册完成后,Reactor线程回调这里
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
}
复制代码
之后会向ChannelFuture regFuture
添加一个注册完成后的回调函数~~~~ ChannelFutureListener
。在回调函数operationComplete
中开始发起绑端口地址流程
。
那么这个回调函数在什么时候?什么地方发起的呢??
让我们在回到本小节的主题register0
方法的流程中:
当调用doRegister()
方法完成NioServerSocketChannel
向Main Reactor
的注册后,紧接着会调用pipeline.invokeHandlerAddedIfNeeded()
方法中触发ChannelInitializer#handlerAdded
回调中对pipeline
进行初始化。
最后在safeSetSuccess
方法中,开始回调注册在regFuture
上的ChannelFutureListener
。
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
@Override
public boolean trySuccess() {
return trySuccess(null);
}
@Override
public boolean trySuccess(V result) {
return setSuccess0(result);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
//回调注册在promise上的listeners
notifyListeners();
}
return true;
}
return false;
}
复制代码
safeSetSuccess
的逻辑比较简单,首先设置regFuture
结果为success
,并且回调注册在regFuture
上的ChannelFutureListener
。
需要提醒的是,执行safeSetSuccess
方法,以及后边回调regFuture
上的ChannelFutureListener
这些动作都是由Reactor线程
执行的。
关于 Netty 中的Promise模型
后边我会在写一篇专门的文章进行分析,这里大家只需清楚大体的流程即可。不必在意过多的细节。
下面我们把视角切换到regFuture
上的ChannelFutureListener
回调中,看看在Channel
注册完成后,Netty 又会做哪些事情?
2. doBind0
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
}
复制代码
这里 Netty 又将绑定端口地址
的操作封装成异步任务,提交给Reactor
执行。
但是这里有一个问题,其实此时执行doBind0
方法的线程正是Reactor线程
,那为什么不直接在这里去执行bind操作
,而是再次封装成异步任务提交给Reactor
中的taskQueue
呢?
反正最终都是由Reactor线程
执行,这其中又有什么分别呢?
经过上小节的介绍我们知道,bind0
方法的调用是由io.netty.channel.AbstractChannel.AbstractUnsafe#register0
方法在将NioServerSocketChannel
注册到Main Reactor
之后,并且NioServerSocketChannel
的pipeline
已经初始化完毕后,通过safeSetSuccess
方法回调过来的。
这个过程全程是由Reactor线程
来负责执行的,但是此时register0
方法并没有执行完毕,还需要执行后面的逻辑。
而绑定逻辑需要在注册逻辑执行完之后执行,所以在doBind0
方法中Reactor线程
会将绑定操作
封装成异步任务先提交给taskQueue
中保存,这样可以使Reactor线程
立马从safeSetSuccess
中返回,继续执行剩下的register0
方法逻辑。
private void register0(ChannelPromise promise) {
try {
................省略............
doRegister();
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//触发channelRegister事件
pipeline.fireChannelRegistered();
if (isActive()) {
................省略............
}
} catch (Throwable t) {
................省略............
}
}
复制代码
当Reactor线程
执行完register0
方法后,就会从taskQueue
中取出异步任务执行。
此时Reactor线程
中的taskQueue
结构如下:
此时NioServerSocketChannel
中pipeline
的结构如下:
3. 绑定端口地址
对Channel
的操作行为全部定义在ChannelOutboundInvoker接口中
。
public interface ChannelOutboundInvoker {
/**
* Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
* completes, either because the operation was successful or because of an error.
*
*/
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
}
复制代码
bind
方法由子类AbstractChannel
实现。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
}
复制代码
调用pipeline.bind(localAddress, promise)
在pipeline
中传播bind事件
,触发回调pipeline
中所有ChannelHandler
的bind方法
。
事件在pipeline
中的传播具有方向性:
inbound事件
只能被pipeline
中的ChannelInboundHandler
响应处理outbound事件
只能被pipeline
中的ChannelOutboundHandler
响应处理
然而这里的bind事件
在 Netty 中被定义为outbound事件
,所以它在pipeline
中是反向传播。先从TailContext
开始反向传播直到HeadContext
。
然而bind
的核心逻辑也正是实现在HeadContext
中。
3.1 HeadContext
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
//触发AbstractChannel->bind方法 执行JDK NIO SelectableChannel 执行底层绑定操作
unsafe.bind(localAddress, promise);
}
}
复制代码
在HeadContext#bind
回调方法中,调用Channel
里的unsafe
操作类执行真正的绑定操作。
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
.................省略................
//这时channel还未激活 wasActive = false
boolean wasActive = isActive();
try {
//io.netty.channel.socket.nio.NioServerSocketChannel.doBind
//调用具体channel实现类
doBind(localAddress);
} catch (Throwable t) {
.................省略................
return;
}
//绑定成功后 channel激活 触发channelActive事件传播
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//pipeline中触发channelActive事件
pipeline.fireChannelActive();
}
});
}
//回调注册在promise上的ChannelFutureListener
safeSetSuccess(promise);
}
protected abstract void doBind(SocketAddress localAddress) throws Exception;
}
复制代码
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
//调用JDK NIO 底层SelectableChannel 执行绑定操作
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
复制代码
判断是否为首次绑定,如果是的话将触发pipeline中的ChannelActive事件
封装成异步任务放入Reactor
中的taskQueue
中。
执行safeSetSuccess(promise)
,回调注册在promise
上的ChannelFutureListener
。
还是同样的问题,当前执行线程已经是Reactor线程
了,那么为何不直接触发pipeline
中的ChannelActive
事件而是又封装成异步任务呢??
因为如果直接在这里触发ChannelActive事件
,那么Reactor线程
就会去执行pipeline
中的ChannelHandler
的channelActive事件回调
。
这样的话就影响了safeSetSuccess(promise)
的执行,延迟了
注册在promise
上的ChannelFutureListener
的回调。
到现在为止,Netty 服务端就已经完成了绑定端口地址的操作,NioServerSocketChannel
的状态现在变为Active
。
最后还有一件重要的事情要做,我们接着来看pipeline
中对channelActive事件
处理。
3.2 channelActive 事件处理
channelActive事件
在 Netty 中定义为inbound事件
,所以它在pipeline
中的传播为正向传播,从HeadContext
一直到TailContext
为止。
在channelActive事件
回调中需要触发向Selector
指定需要监听的IO事件
~~OP_ACCEPT事件
。
这块的逻辑主要在HeadContext
中实现。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//pipeline中继续向后传播channelActive事件
ctx.fireChannelActive();
//如果是autoRead 则自动触发read事件传播
//在read回调函数中 触发OP_ACCEPT注册
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
//如果是autoRead 则触发read事件传播
channel.read();
}
}
//AbstractChannel
public Channel read() {
//触发read事件
pipeline.read();
return this;
}
@Override
public void read(ChannelHandlerContext ctx) {
//触发注册OP_ACCEPT或者OP_READ事件
unsafe.beginRead();
}
}
复制代码
3.3 beginRead
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void beginRead() {
assertEventLoop();
//channel必须是Active
if (!isActive()) {
return;
}
try {
// 触发在selector上注册channel感兴趣的监听事件
doBeginRead();
} catch (final Exception e) {
.............省略..............
}
}
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
//子类负责继承实现
protected abstract void doBeginRead() throws Exception;
}
复制代码
断言判断执行该方法的线程必须是Reactor线程
。
此时NioServerSocketChannel
已经完成端口地址的绑定操作,isActive() = true
调用doBeginRead
实现向Selector
注册监听事件OP_ACCEPT
public abstract class AbstractNioChannel extends AbstractChannel {
//channel注册到Selector后获得的SelectKey
volatile SelectionKey selectionKey;
// Channel监听事件集合
protected final int readInterestOp;
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
/**
* 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件
* */
if ((interestOps & readInterestOp) == 0) {
//添加OP_ACCEPT事件到interestOps集合中
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}
复制代码
前边提到在NioServerSocketChannel
在向Main Reactor
中的Selector
注册后,会获得一个SelectionKey
。这里首先要获取这个SelectionKey
。
从SelectionKey
中获取NioServerSocketChannel
感兴趣的IO事件集合 interestOps
,当时在注册的时候interestOps
设置为0
。
将在NioServerSocketChannel
初始化时设置的readInterestOp = OP_ACCEPT
,设置到SelectionKey
中的interestOps
集合中。这样Reactor
中的Selector
就开始监听interestOps
集合中包含的IO事件
了。
Main Reactor
中主要监听的是OP_ACCEPT事件
。
流程走到这里,Netty 服务端就真正的启动起来了,下一步就开始等待接收客户端连接了。大家此刻在来回看这副启动流程图,是不是清晰了很多呢?
此时 Netty 的Reactor模型
结构如下:
总结
本文我们通过图解源码的方式完整地介绍了整个 Netty 服务端启动流程,并介绍了在启动过程中涉及到的ServerBootstrap
相关的属性以及配置方式。NioServerSocketChannel
的创建初始化过程以及类的继承结构。
其中重点介绍了NioServerSocketChannel
向Reactor
的注册过程以及Reactor线程
的启动时机和pipeline
的初始化时机。
最后介绍了NioServerSocketChannel
绑定端口地址的整个流程。
上述介绍的这些流程全部是异步操作,各种回调绕来绕去的,需要反复回想下,读异步代码就是这样,需要理清各种回调之间的关系,并且时刻提醒自己当前的执行线程是什么?
好了,现在 Netty 服务端已经启动起来,接着就该接收客户端连接了,我们下篇文章见~~~~
评论