学不懂 Netty?看不懂源码?不存在的,手把手带你阅读 Netty 源码
提前准备好如下代码, 从服务端构建着手,深入分析 Netty 服务端的启动过程。
public class NettyBasicServerExample {
}public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {private final String name;private final boolean flush;
}public class NormalOutBoundHandler extends ChannelOutboundHandlerAdapter {private final String name;
}在服务端启动之前,需要配置 ServerBootstrap 的相关参数,这一步大概分为以下几个步骤
配置 EventLoopGroup 线程组配置 Channel 类型设置 ServerSocketChannel 对应的 Handler 设置网络监听的端口设置 SocketChannel 对应的 Handler 配置 Channel 参数 Netty 会把我们配置的这些信息组装,发布服务监听。
ServerBootstrap 参数配置过程 #下面这段代码是我们配置 ServerBootStrap 相关参数,这个过程比较简单,就是把配置的参数值保存到 ServerBootstrap 定义的成员变量中就可以了。
bootstrap.group(bossGroup, workerGroup)//配置 Server 的通道,相当于 NIO 中的 ServerSocketChannel.channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)) //设置 ServerSocketChannel 对应的 Handler//childHandler 表示给 worker 那些线程配置了一个处理器,// 这个就是上面 NIO 中说的,把处理业务的具体逻辑抽象出来,放到 Handler 里面.childHandler(new ChannelInitializer<SocketChannel>() {});我们来看一下 ServerBootstrap 的类关系图以及属性定义
ServerBootstrap 类关系图 #如图 8-1 所示,表示 ServerBootstrap 的类关系图。
AbstractBootstrap,定义了一个抽象类,作为抽象类,一定是抽离了 Bootstrap 相关的抽象逻辑,所以很显然可以推断出 Bootstrap 应该也继承了 AbstractBootstrapServerBootstrap,服务端的启动类,ServerBootstrapAcceptor,继承了 ChannelInboundHandlerAdapter,所以本身就是一个 Handler,当服务端启动后,客户端连接上来时,会先进入到 ServerBootstrapAccepter。学不懂 Netty?看不懂源码?不存在的,手把手带你阅读 Netty 源码图 8-1 ServerBootstrap 类关系图
AbstractBootstrap 属性定义 #public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {@SuppressWarnings("unchecked")private static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];@SuppressWarnings("unchecked")private static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];/*** 这里的 EventLoopGroup 作为服务端 Acceptor 线程,负责处理客户端的请求接入* 作为客户端 Connector 线程,负责注册监听连接操作位,用于判断异步连接结果。*/volatile EventLoopGroup group; //@SuppressWarnings("deprecation")private volatile ChannelFactory<? extends C> channelFactory; //channel 工厂,很明显应该是用来制造对应 Channel 的 private volatile SocketAddress localAddress; //SocketAddress 用来绑定一个服务端地址
}对于上述属性定义,整体总结如下:
提供了一个 ChannelFactory 对象用来创建 Channel,一个 Channel 会对应一个 EventLoop 用于 IO 的事件处理,在一个 Channel 的整个生命周期中 只会绑定一个 EventLoop,这里可理解给 Channel 分配一个线程进行 IO 事件处理,结束后回收该线程。AbstractBootstrap 没有提供 EventLoop 而是提供了一个 EventLoopGroup,其实我认为这里只用一个 EventLoop 就行了。不管是服务器还是客户端的 Channel 都需要绑定一个本地端口这就有了 SocketAddress 类的对象 localAddress。Channel 有很多选项所有有了 options 对象 LinkedHashMap<channeloption<?>, Object>怎么处理 Channel 的 IO 事件呢,我们添加一个事件处理器 ChannelHandler 对象。ServerBootstrap 属性定义 #ServerBootstrap 可以理解为服务器启动的工厂类,我们可以通过它来完成服务器端的 Netty 初始化。主要职责:|
EventLoop 初始化 channel 的注册 pipeline 的初始化 handler 的添加过程服务端连接处理。public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
}服务端启动过程分析 #了解了 ServerBootstrap 相关属性的配置之后,我们继续来看服务的启动过程,在开始往下分析的时候,先不妨来思考以下这些问题
Netty 自己实现的 Channel 与底层 JDK 提供的 Channel 是如何联系并且构建实现的 ChannelInitializer 这个特殊的 Handler 处理器的作用以及实现原理 Pipeline 是如何初始化以的 ServerBootstrap.bind#先来看 ServerBootstrap.bind()方法的定义,这里主要用来绑定一个端口并且发布服务端监听。
根据我们使用 NIO 相关 API 的理解,无非就是使用 JDK 底层的 API 来打开一个服务端监听并绑定一个端口。
ChannelFuture channelFuture=bootstrap.bind(port).sync();public ChannelFuture bind(SocketAddress localAddress) {validate();return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));}validate(), 验证 ServerBootstrap 核心成员属性的配置是否正确,比如 group、channelFactory、childHandler、childGroup 等,这些属性如果没配置,那么服务端启动会报错 localAddress,绑定一个本地端口地址 doBind#doBind 方法比较长,从大的代码结构,可以分为三个部分
initAndRegister 初始化并注册 Channel,并返回一个 ChannelFuture,说明初始化注册 Channel 是异步实现 regFuture.cause() 用来判断 initAndRegister()是否发生异常,如果发生异常,则直接返回 regFuture.isDone(), 判断 initAndRegister()方法是否执行完成。如果执行完成,则调用 doBind0()方法。如果未执行完成,regFuture 添加一个监听回调,在监听回调中再次判断执行结果进行相关处理。PendingRegistrationPromise 用来保存异步执行结果的状态从整体代码逻辑来看,逻辑结构还是非常清晰的, initAndRegister()方法负责 Channel 的初始化和注册、doBind0()方法用来绑定端口。这个无非就是我们使用 NIO 相关 API 发布服务所做的事情。
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}
}initAndRegister#这个方法顾名思义,就是初始化和注册,基于我们整个流程的分析可以猜测到
初始化,应该就是构建服务端的 Handler 处理链 register,应该就是把当前服务端的连接注册到 selector 上下面我们通过源码印证我们的猜想。
final ChannelFuture initAndRegister() {Channel channel = null;try {//通过 ChannelFactory 创建一个具体的 Channel 实现 channel = channelFactory.newChannel();init(channel); //初始化} catch (Throwable t) {//省略....}//这个代码应该和我们猜想是一致的,就是将当前初始化的 channel 注册到 selector 上,这个过程同样也是异步的 ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) { //获取 regFuture 的执行结果 if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}return regFuture;}channelFactory.newChannel()#这个方法在分析之前,我们可以继续推测它的逻辑。
在最开始构建服务端的代码中,我们通过 channel 设置了一个 NioServerSocketChannel.class 类对象,这个对象表示当前 channel 的构建使用哪种具体的 API
bootstrap.group(bossGroup, workerGroup)//配置 Server 的通道,相当于 NIO 中的 ServerSocketChannel.channel(NioServerSocketChannel.class)而在 initAndRegister 方法中,又用到了 channelFactory.newChannel()来生成一个具体的 Channel 实例,因此不难想到,这两者必然有一定的联系,我们也可以武断的认为,这个工厂会根据我们配置的 channel 来动态构建一个指定的 channel 实例。
channelFactory 有多个实现类,所以我们可以从配置方法中找到 channelFactory 的具体定义,代码如下。
public B channel(Class<? extends C> channelClass) {return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")));}channelFactory 对应的具体实现是:ReflectiveChannelFactory,因此我们定位到 newChannel()方法的实现。
ReflectiveChannelFactory.newChannel#在该方法中,使用 constructor 构建了一个实例。
@Overridepublic T newChannel() {try {return constructor.newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);}}construtor 的初始化代码如下, 用到了传递进来的 clazz 类,获得该类的构造器,该构造器后续可以通过 newInstance 创建一个实例对象
而此时的 clazz 其实就是:NioServerSocketChannel
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
}NioServerSocketChannel#NioServerSocketChannel 的构造方法定义如下。
public class NioServerSocketChannel extends AbstractNioMessageChannelimplements io.netty.channel.socket.ServerSocketChannel {private static ServerSocketChannel newSocket(SelectorProvider provider) {try {return provider.openServerSocketChannel();} catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);}}public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));}}当 NioServerSocketChannel 实例化后,调用 newSocket 方法创建了一个服务端实例。
newSocket 方法中调用了 provider.openServerSocketChannel(),来完成 ServerSocketChannel 的创建,ServerSocketChannel 就是 Java 中 NIO 中的服务端 API。
public ServerSocketChannel openServerSocketChannel() throws IOException {return new ServerSocketChannelImpl(this);}通过层层推演,最终看到了 Netty 是如何一步步封装,完成 ServerSocketChannel 的创建。
设置非阻塞 #在 NioServerSocketChannel 中的构造方法中,先通过 super 调用父类做一些配置操作
public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);config = new NioServerSocketChannelConfig(this, javaChannel().socket());}最终,super 会调用 AbstractNioChannel 中的构造方法,
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp; //设置关心事件,此时是一个连接事件,所以是 OP_ACCEPTtry {ch.configureBlocking(false); //设置非阻塞} catch (IOException e) {try {ch.close();} catch (IOException e2) {logger.warn("Failed to close a partially initialized socket.", e2);}
}继续分析 initAndRegister#分析完成 channel 的初始化后,接下来就是要将当前 channel 注册到 Selector 上,所以继续回到 initAndRegister 方法。
final ChannelFuture initAndRegister() {//省略....//这个代码应该和我们猜想是一致的,就是将当前初始化的 channel 注册到 selector 上,这个过程同样也是异步的 ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) { //获取 regFuture 的执行结果 if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}return regFuture;}注册到某个 Selector 上,其实就是注册到某个 EventLoopGroup 中,如果大家能有这个猜想,说明前面的内容是听懂了的。
config().group().register(channel)这段代码,其实就是获取在 ServerBootstrap 中配置的 bossEventLoopGroup,然后把当前的服务端 channel 注册到该 group 中。
此时,我们通过快捷键想去看一下 register 的实现时,发现 EventLoopGroup 又有多个实现,我们来看一下类关系图如图 8-2 所示。
学不懂 Netty?看不懂源码?不存在的,手把手带你阅读 Netty 源码图 8-3 EventLoopGroup 类关系图
而我们在前面配置的 EventLoopGroup 的实现类是 NioEventLoopGroup,而 NioEventLoopGroup 继承自 MultithreadEventLoopGroup,所以在 register()方法中,我们直接找到父类的实现方法即可。
MultithreadEventLoopGroup.register#这段代码大家都熟了,从 NioEventLoopGroup 中选择一个 NioEventLoop,将当前 channel 注册上去
@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}next()方法返回的是 NioEventLoop,而 NioEventLoop 又有多个实现类,我们来看图 8-4 所示的类关系图。
学不懂 Netty?看不懂源码?不存在的,手把手带你阅读 Netty 源码图 8-4 NioEventLoop 类关系图
从类关系图中发现,发现 NioEventLoop 派生自 SingleThreadEventLoop,所以 next().register(channel);方法,执行的是 SingleThreadEventLoop 中的 register
SingleThreadEventLoop.register#@Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}ChannelPromise, 派生自 Future,用来实现异步任务处理回调功能。简单来说就是把注册的动作异步化,当异步执行结束后会把执行结果回填到 ChannelPromise 中
AbstractChannel.register#抽象类一般就是公共逻辑的处理,而这里的处理主要就是针对一些参数的判断,判断完了之后再调用 register0()方法。
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");if (isRegistered()) { //判断是否已经注册过 promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) { //判断 eventLoop 类型是否是 EventLoop 对象类型,如果不是则抛出异常 promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}
}AbstractChannel.register0#Netty 从 EventLoopGroup 线程组中选择一个 EventLoop 和当前的 Channel 绑定,之后该 Channel 生命周期中的所有 I/O 事件都由这个 EventLoop 负责。
register0 方法主要做四件事:
调用 JDK 层面的 API 对当前 Channel 进行注册触发 HandlerAdded 事件触发 channelRegistered 事件 Channel 状态为活跃时,触发 channelActive 事件在当前的 ServerSocketChannel 连接注册的逻辑中,我们只需要关注下面的 doRegister 方法即可。
private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister(); //调用 JDK 层面的 register()方法进行注册 neverRegistered = false;registered = true;
}AbstractNioChannel.doRegister#进入到 AbstractNioChannel.doRegister 方法。
javaChannel().register()负责调用 JDK 层面的方法,把 channel 注册到 eventLoop().unwrappedSelector()上,其中第三个参数传入的是 Netty 自己实现的 Channel 对象,也就是把该对象绑定到 attachment 中。
这样做的目的是,后续每次调 Selector 对象进行事件轮询时,当触发事件时,Netty 都可以获取自己的 Channe 对象。
@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}服务注册总结 #上述代码比较绕,但是整体总结下来并不难理解
初始化指定的 Channel 实例把该 Channel 分配给某一个 EventLoop 然后把 Channel 注册到该 EventLoop 的 Selector 中 AbstractBootstrap.doBind0#分析完了注册的逻辑后,再回到 AbstractBootstrap 类中的 doBind0 方法,这个方法不用看也能知道,ServerSocketChannel 初始化了之后,接下来要做的就是绑定一个 ip 和端口地址。
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {
}channel.bind 方法,会根据 ServerSocketChannel 中的 handler 链配置,逐个进行调用,由于在本次案例中,我们给 ServerSocketChannel 配置了一个 LoggingHandler 的处理器,所以 bind 方法会先调用 LoggingHandler,然后再调用 DefaultChannelPipeline 中的 bind 方法,调用链路
-> DefaultChannelPipeline.ind
-> AbstractChannel.bind
-> NioServerSocketChannel.doBind
最终就是调用前面初始化好的 ServerSocketChannel 中的 bind 方法绑定本地地址和端口。
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}构建 SocketChannel 的 Pipeline#在 ServerBootstrap 的配置中,我们针对 SocketChannel,配置了入站和出站的 Handler,也就是当某个 SocketChannel 的 IO 事件就绪时,就会按照我们配置的处理器链表进行逐一处理,那么这个链表是什么时候构建的,又是什么样的结构呢?下面我们来分析这部分的内容
.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NormalInBoundHandler("NormalInBoundA",false)).addLast(new NormalInBoundHandler("NormalInBoundB",false)).addLast(new NormalInBoundHandler("NormalInBoundC",true));socketChannel.pipeline().addLast(new NormalOutBoundHandler("NormalOutBoundA")).addLast(new NormalOutBoundHandler("NormalOutBoundB")).addLast(new NormalOutBoundHandler("NormalOutBoundC")).addLast(new ExceptionHandler());}});childHandler 的构建 #childHandler 的构建过程,在 AbstractChannel.register0 方法中实现
final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel(); //这是是创建 channelinit(channel); //这里是初始化} catch (Throwable t) {//省略....}ChannelFuture regFuture = config().group().register(channel); //这是是注册 if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}
ServerBootstrap.init#init 方法,调用的是 ServerBootstrap 中的 init(),代码如下。
@Overridevoid init(Channel channel) {setChannelOptions(channel, newOptionsArray(), logger);setAttributes(channel, newAttributesArray());
}其中,对于上述代码的核心部分说明如下
ChannelPipeline 是在 AbstractChannel 中的构造方法中初始化的一个 DefaultChannelPipelineprotected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }p.addLast 是为 NioServerSocketChannel 添加 handler 处理器链,这里添加了一个 ChannelInitializer 回调函数,该回调是异步触发的,在回调方法中做了两件事如果 ServerBootstrap.handler 添加了处理器,则会把相关处理器添加到该 pipeline 中,在本次演示的案例中,我们添加了 LoggerHandler 异步执行添加了 ServerBootstrapAcceptor,从名字来看,它是专门用来接收新的连接处理的。我们在这里思考一个问题,为什么 NioServerSocketChannel 需要通过 ChannelInitializer 回调处理器呢? ServerBootstrapAcceptor 为什么通过异步任务添加到 pipeline 中呢?
原因是,NioServerSocketChannel 在初始化的时候,还没有开始将该 Channel 注册到 Selector 对象上,也就是没办法把 ACCEPT 事件注册到 Selector 上,所以事先添加了 ChannelInitializer 处理器,等待 Channel 注册完成后,再向 Pipeline 中添加 ServerBootstrapAcceptor。
ServerBootstrapAcceptor#按照下面的方法演示一下 SocketChannel 中的 Pipeline 的构建过程
启动服务端监听在 ServerBootstrapAcceptor 的 channelRead 方法中打上断点通过 telnet 连接,此时会触发 debug。public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;
}ServerBootstrapAcceptor 是服务端 NioServerSocketChannel 中的一个特殊处理器,该处理器的 channelRead 事件只会在新连接产生时触发,所以这里通过 final Channel child = (Channel) msg;可以直接拿到客户端的链接 SocketChannel。
ServerBootstrapAcceptor 接着通过 childGroup.register()方法,把当前 NioSocketChannel 注册到工作线程中。
事件触发机制的流程 #在 ServerBootstrapAcceptor 中,收到客户端连接时,会调用 childGroup.register(child)把当前客户端连接注册到指定 NioEventLoop 的 Selector 中。
这个注册流程和前面讲解的 NioServerSocketChannel 注册流程完全一样,最终都会进入到 AbstractChannel.register0 方法。
AbstractChannel.register0#private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;
}pipeline.fireChannelRegistered()#@Overridepublic final ChannelPipeline fireChannelRegistered() {AbstractChannelHandlerContext.invokeChannelRegistered(head);return this;}下面的事件触发,分为两个逻辑
如果当前的任务是在 eventLoop 中触发的,则直接调用 invokeChannelRegistered 否则,异步执行 invokeChannelRegistered。static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRegistered();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRegistered();}});}}invokeChannelRegistered#触发下一个 handler 的 channelRegistered 方法。
private void invokeChannelRegistered() {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRegistered(this);} catch (Throwable t) {invokeExceptionCaught(t);}} else {fireChannelRegistered();}}Netty 服务端启动总结 #到此为止,整个服务端启动的过程,我们就已经分析完成了,主要的逻辑如下
创建服务端 Channel,本质上是根据用户配置的实现,调用 JDK 原生的 Channel 初始化 Channel 的核心属性,unsafe、pipeline 初始化 Channel 的 Pipeline,主要是添加两个特殊的处理器,ChannelInitializer 和 ServerBootstrapAcceptor 注册服务端的 Channel,添加 OP_ACCEPT 事件,这里底层调用的是 JDK 层面的实现,讲 Channel 注册到 BossEventLoop 中的 Selector 上绑定端口,调用 JDK 层面的 API,绑定端口。
评论