Netty 服务端启动流程源码分析
前记
哈喽,自从上篇《Netty 之旅二:口口相传的高性能 Netty 到底是什么?》后,迟迟两周才开启今天的 Netty 源码系列。源码分析的第一篇文章,将由<吴小飞>分享 《Netty 服务端启动流程源码分析》,下一篇会由<王猛>分享客户端的启动过程源码分析。通过源码的阅读,我们将会知道,Netty 服务端启动的调用链是非常长的,同时肯定也会发现一些新的问题,随着我们源码阅读的不断深入,相信这些问题我们也会一一攻破。
废话不多说,直接上号!
一、从`EchoServer `示例入手
<div align="center", style="color:#A8A8A8;font-size:14px">netty-example:EchoServer</div>
示例从哪里来?任何开源框架都会有自己的示例代码,Netty 源码也不例外,如模块netty-example
中就包括了最常见的EchoServer
示例,下面通过这个示例进入服务端启动流程篇章。
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// 1. 声明Main-Sub Reactor模式线程池:EventLoopGroup
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 创建 EchoServerHandler 对象
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
// 2. 声明服务端启动引导器,并设置相关属性
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// 3. 绑定端口即启动服务端,并同步等待
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// 4. 监听服务端关闭,并阻塞等待
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// 5. 优雅地关闭两个EventLoopGroup线程池
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
复制代码
[代码行 18、19]声明 Main-Sub Reactor 模式线程池:EventLoopGroup
创建两个 EventLoopGroup
对象。其中,bossGroup
用于服务端接受客户端的连接,workerGroup
用于进行客户端的 SocketChannel
的数据读写。
(<u>关于EventLoopGroup
不是本文重点所以在后续文章中进行分析</u>)
[代码行 23-39]声明服务端启动引导器,并设置相关属性
AbstractBootstrap
是一个帮助类,通过方法链(method chaining
)的方式,提供了一个简单易用的方式来配置启动一个Channel
。io.netty.bootstrap.ServerBootstrap
,实现 AbstractBootstrap
抽象类,用于 Server
的启动器实现类。io.netty.bootstrap.Bootstrap
,实现 AbstractBootstrap
抽象类,用于 Client
的启动器实现类。如下类图所示:
[
](https://imgchr.com/i/wwvVKA)
<div align="center", style="color:#A8A8A8;font-size:14px">AbstractBootstrap 类继承</div>
(<u>在EchoServer
示例代码中,我们看到 ServerBootstrap
的 group
、channel
、option
、childHandler
等属性链式设置都放到关于AbstractBootstrap
体系代码中详细介绍。</u>)
[代码行 43]绑定端口即启动服务端,并同步等待
先调用 #bind(int port)
方法,绑定端口,后调用 ChannelFuture#sync()
方法,阻塞等待成功。对于bind
操作就是本文要详细介绍的"服务端启动流程"。
[代码行 47]监听服务端关闭,并阻塞等待
先调用 #closeFuture()
方法,监听服务器关闭,后调用 ChannelFuture#sync()
方法,阻塞等待成功。 注意,此处不是关闭服务器,而是channel
的监听关闭。
[代码行 51、52]优雅地关闭两个EventLoopGroup
线程池
finally
代码块中执行说明服务端将最终关闭,所以调用 EventLoopGroup#shutdownGracefully()
方法,分别关闭两个 EventLoopGroup
对象,终止所有线程。
二、服务启动过程
在服务启动过程的源码分析之前,这里回顾一下我们在通过JDK NIO
编程在服务端启动初始的代码:
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
复制代码
这 5 行代码标示一个最为熟悉的过程:
后面等分析完 Netty 的启动过程后,会对这些步骤有一个新的认识。在EchoServer
示例中,进入 #bind(int port)
方法,AbstractBootstrap#bind()
其实有多个方法,方便不同地址参数的传递,实际调用的方法是AbstractBootstrap#doBind(final SocketAddress localAddress)
方法,代码如下:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
复制代码
[代码行 2] :调用 #initAndRegister()
方法,初始化并注册一个 Channel
对象。因为注册是异步的过程,所以返回一个 ChannelFuture
对象。详细解析,见 「initAndRegister()
」。
[代码行 4-6]]:若发生异常,直接进行返回。
[代码行 9-34]:因为注册是异步的过程,有可能已完成,有可能未完成。所以实现代码分成了【第 10 至 14 行】和【第 15 至 36 行】分别处理已完成和未完成的情况。
- 核心在[第 11 、29 行],调用 #doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise)
方法,绑定 Channel 的端口,并注册 Channel 到 SelectionKey
中。
- 如果异步注册对应的 ChanelFuture
未完成,则调用 ChannelFuture#addListener(ChannelFutureListener)
方法,添加监听器,在注册完成后,进行回调执行 #doBind0(...)
方法的逻辑。
通过doBind
方法可以知道服务端启动流程大致如下几个步骤:
[
](https://imgchr.com/i/wwvl8g)
<div align="center", style="color:#A8A8A8;font-size:14px">服务端启动流程</div>
1. 创建 Channel
<div align="center", style="color:#A8A8A8;font-size:14px">创建服务端 channel</div>
从#doBind(final SocketAddress localAddress)
进入到initAndRegister()
:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
复制代码
[代码行 4]调用 ChannelFactory#newChannel()
方法,创建 Channel
对象。 ChannelFactory
类继承如下:
<div align="center", style="color:#A8A8A8;font-size:14px">ChannelFactroy 类继承</div>
可以在ChannelFactory
注释看到@deprecated Use {@link io.netty.channel.ChannelFactory} instead.
,这里只是包名的调整,对于继承结构不变。netty
默认使用ReflectiveChannelFactory
,我们可以看到重载方法:
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
复制代码
很明显,正如其名是通过反射机制构造Channel
对象实例的。constructor
是在其构造方法初始化的:this.constructor = clazz.getConstructor();
这个clazz
按理说应该是我们要创建的Channel
的 Class 对象。那Class
对象是什么呢?我们接着看channelFactory
是怎么初始化的。
首先在AbstractBootstrap
找到如下代码:
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
复制代码
调用这个方法的递推向上看到:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
复制代码
这个方法正是在EchoServer
中ServerBootstrap
链式设置时调用.channel(NioServerSocketChannel.class)
的方法。我们看到,channelClass
就是NioServerSocketChannel.class
,channelFactory
也是以ReflectiveChannelFactory
作为具体实例,并且将NioServerSocketChannel.class
作为构造参数传递初始化的,所以这回答了反射机制构造的是io.netty.channel.socket.nio.NioServerSocketChannel
对象。
继续看NioServerSocketChannel
构造方法逻辑做了什么事情,看之前先给出NioServerSocketChannel
类继承关系:
<div align="center", style="color:#A8A8A8;font-size:14px">Channel 类继承</div>
NioServerSocketChannel
与NioSocketChannel
分别对应服务端和客户端,公共父类都是AbstractNioChannel
和AbstractChannel
,下面介绍创建过程可以参照这个Channel
类继承图。进入NioServerSocketChannel
构造方法:
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
复制代码
点击newSocket
进去:
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
复制代码
以上传进来的provider
是DEFAULT_SELECTOR_PROVIDER
即默认的java.nio.channels.spi.SelectorProvider
,[代码行 9]就是熟悉的jdk nio
创建ServerSocketChannel
。这样newSocket(DEFAULT_SELECTOR_PROVIDER)
就返回了结果ServerSocketChannel
,回到NioServerSocketChannel()#this()
点进去:
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
复制代码
以上super
代表父类AbstractNioMessageChannel
构造方法,点进去看到:
/**
* @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
*/
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
复制代码
以上super
代表父类AbstractNioChannel
构造方法,点进去看到:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
复制代码
以上[代码行 3]将ServerSocketChannel
保存到了AbstractNioChannel#ch
成员变量,在上面提到的NioServerSocketChannel
构造方法的[代码行 6]javaChannel()
拿到的就是ch
保存的ServerSocketChannel
变量。
以上[代码行 6]就是熟悉的jdk nio
编程设置ServerSocketChannel
非阻塞方式。这里还有super
父类构造方法,点击进去看到:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
复制代码
以上构造方法中:
parent
属性,代表父 Channel
对象。对于 NioServerSocketChannel
的 parent
为null
。
id
属性,Channel
编号对象。在构造方法中,通过调用 #newId()
方法进行创建。(<u>这里不细展开 Problem-1</u>)
unsafe
属性,Unsafe
对象。因为Channel
真正的具体操作,是通过调用对应的 Unsafe
对象实施。所以需要在构造方法中,通过调用 #newUnsafe()
方法进行创建。这里的 Unsafe
并不是我们常说的 jdk
自带的sun.misc.Unsafe
,而是 io.netty.channel.Channel#Unsafe
。(<u>这里不细展开 Problem-2</u>)
pipeline
属性默认是DefaultChannelPipeline
对象,赋值后在后面为 channel 绑定端口的时候会用到
通过以上创建channel
源码过程分析,总结的流程时序图如下:
2. 初始化 Channel
<div align="center", style="color:#A8A8A8;font-size:14px">初始化 channel</div>
回到一开始创建Channel
的initAndRegister()
入口方法,在创建Channel
后紧接着init(channel)
进入初始化流程,因为是服务端初始化,所以是ServerBootstrap#init(Channel channel)
,代码如下:
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
复制代码
[代码 3 - 6 行]: options0()
方法返回的options
保存了用户在EchoServer
中设置自定义的可选项集合,这样ServerBootstrap
将配置的选项集合,设置到了 Channel
的可选项集合中。
[代码 8 - 15 行]: attrs0()
方法返回的attrs
保存了用户在EchoServer
中设置自定义的属性集合,这样ServerBootstrap
将配置的属性集合,设置到了 Channel
的属性集合中。
[代码 21-28 行]:通过局部变量currentChildOptions
和currentChildAttrs
保存了用户自定义的childOptions
和childAttrs
,用于[代码 43 行] ServerBootstrapAcceptor
构造方法。
[代码 30-47]]:创建ChannelInitializer
对象,添加到 pipeline
中,用于后续初始化 ChannelHandler
到 pipeline
中,包括用户在EchoServer
配置的LoggingHandler
和创建的创建 ServerBootstrapAcceptor
对象。
- [代码行 34-37]:添加启动器配置的 LoggingHandler
到pipeline
中。
- [代码行 39-45]:创建 ServerBootstrapAcceptor
对象,添加到 pipeline
中。从名字上就可以看出来,ServerBootstrapAcceptor
也是一个 ChannelHandler
实现类,专门用于接受客户端的新连接请求,把新的请求扔给某个事件循环器,我们先不做过多分析。我们发现是使用EventLoop.execute
执行添加的过程,这是为什么呢?同样记录问题(<u>Problem-</u>3)
- 需要说明的是pipeline
在之前介绍 Netty 核心组件的时候提到是一个包含ChannelHandlerContext
的双向链表,每一个context
对于唯一一个ChannelHandler
,这里初始化后,ChannelPipeline
里就是如下一个结构:
<div align="center", style="color:#A8A8A8;font-size:14px">ChannelPipeline 内部结构</div>
3. 注册 Channel
<img src="https://s1.ax1x.com/2020/09/13/wwvXi8.png" alt="wwvXi8.png" style="zoom:80%;" />
<div align="center", style="color:#A8A8A8;font-size:14px">注册 channel</div>
初始化Channel
一些基本配置和属性完毕后,回到一开始创建Channel
的initAndRegister()
入口方法,在初始化Channel
后紧接着[代码行 17] ChannelFuture regFuture = config().group().register(channel);
明显这里是通过EventLoopGroup
进入注册流程(EventLoopGroup
体系将在后续文章讲解)
在EchoServer
中启动器同样通过ServerBootstrap#group()
设置了NioEventLoopGroup
,它继承自MultithreadEventLoopGroup
,所以注册流程会进入MultithreadEventLoopGroup
重载的register(Channel channel)
方法,代码如下:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
复制代码
这里会调用 next()
方法选择出来一个 EventLoop
来注册 Channel
,里面实际上使用的是一个叫做 EventExecutorChooser
的东西来选择,它实际上又有两种实现方式 ——PowerOfTwoEventExecutorChooser
和 GenericEventExecutorChooser
,本质上就是从 EventExecutor
数组中选择一个 EventExecutor
,我们这里就是 NioEventLoop
,那么,它们有什么区别呢?(<u>Problem-4:在介绍EventLoopGroup
体系的后续文章中将会详细讲解,这里简单地提一下,本质都是按数组长度取余数 ,不过,2 的 N 次方的形式更高效。</u>)
接着,来到 NioEventLoop
的 register(channel)
方法,你会不会问找不到该方法?提示NioEventLoop
继承SingleThreadEventLoop
,所以父类方法:
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
复制代码
可以看到,先创建了一个叫做 ChannelPromise
的东西,它是 ChannelFuture
的子类。[代码行 9]又调回了 Channel
的 Unsafe
的 register ()
方法,这里第一个参数是 this
,也就是 NioEventLoop
,第二个参数是刚创建的 ChannelPromise
。
点击 AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise)
方法进去,代码如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
复制代码
[代码行 15]这行代码是设置 Channel
的 eventLoop
属性。这行前面的代码主要是在校验传入的 eventLoop
参数非空,校验是否有注册过以及校验 Channel
和 eventLoop
类型是否匹配。
[代码 18、24]接着,跟踪到 AbstractUnsafe#register0(ChannelPromise promise)
方法中:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
复制代码
[代码行 9]进入 AbstractNioChannel#doRegister()
方法:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
复制代码
[代码行 5]关键一行代码,将 Java 原生 NIO Selector
与 Java 原生 NIO
的 Channel
对象(ServerSocketChannel
) 绑定在一起,并将当前 Netty 的 Channel
通过 attachment
的形式绑定到 SelectionKey
上:
调用 #unwrappedSelector()
方法,返回 Java 原生 NIO Selector
对象,而且每个 NioEventLoop
与 Selector
唯一一对应。
调用 SelectableChannel#register(Selector sel, int ops, Object att)
方法,注册 Java 原生NIO
的 Channel
对象到 NIO Selector
对象上。
通过以上注册 channel 源码分析,总结流程的时序图如下:
4. 绑定端口
<div align="center", style="color:#A8A8A8;font-size:14px">绑定端口</div>
注册完Channel
最后回到AbstractBootstrap#doBind()
方法,分析 Channel
的端口绑定逻辑。进入doBind0
代码如下:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
复制代码
[代码行 7]:在前面Channel
注册成功的条件下,调用 EventLoop
执行 Channel
的端口绑定逻辑。但是,实际上当前线程已经是 EventLoop
所在的线程了,为何还要这样操作呢?答案在【第 5 至 6 行】的英语注释,这里作为一个问题记着(<u>Problem-5</u>)。
[代码行 11]:进入AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
,同样立即异步返回并添加ChannelFutureListener.CLOSE_ON_FAILURE
监听事件。
[代码行 13]:如果绑定端口之前的操作并没有成功,自然也就不能进行端口绑定操作了,通过 promise 记录异常原因。
AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
方法如下:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
复制代码
pipeline
是之前创建channel
的时候创建的DefaultChannelPipeline
,进入该方法:
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
复制代码
[在分析初始化流程的时候最后画一个DefaultChannelPipeline
内部的结构,能够便于分析后面进入DefaultChannelPipeline
一系列bind
方法。]
首先,tail
代表TailContext
,进入AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)
方法:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
//省略部分代码
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
复制代码
[代码行 3]:findContextOutbound
方法里主要是执行ctx = ctx.prev;
那么得到的next
就是绑定LoggingHandler
的context
[代码行 6]:进入invokeBind(localAddress, promise)
方法并直接执行LoggingHandler#bind(this, localAddress, promise)
,进入后的方法如下:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "BIND", localAddress));
}
ctx.bind(localAddress, promise);
}
复制代码
设置了LoggingHandler
的日志基本级别为默认的 INFO 后,进行绑定操作的信息打印。接着,继续循环到AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)
方法执行ctx = ctx.prev
取出HeadContext
进入到 bind 方法:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
复制代码
兜兜转转,最终跳出了pipeline
轮回到AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise)
方法,Channel 的端口绑定逻辑。代码如下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
//此处有省略...
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
//此处有省略...
}
复制代码
做实事方法doBind
进入后如下:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
复制代码
到了此处,服务端的 Java 原生 NIO ServerSocketChannel
终于绑定上了端口。
三、问题归纳
Problem-1: 创建Channel
流程中AbstractChannel
构造函数中为channel
分配 ID 的算法如何实现?
Problem-2: AbstractChannel
内部类AbstractUnsafe
的作用?
Problem-3: 初始化channel
流程中pipeline
添加ServerBootstrapAcceptor
是通过EventLoop.execute
执行添加的过程,这是为什么呢?
Problem-4:注册channel
流程中PowerOfTwoEventExecutorChooser
和 GenericEventExecutorChooser
的区别和优化原理?
Problem-5:绑定端口流程中调用 EventLoop
执行 Channel
的端口绑定逻辑。但是,实际上当前线程已经是 EventLoop
所在的线程了,为何还要这样操作呢?
小结
通过对 Netty 服务端启动流程源码分析,我们发现了在使用NIO
的模式下,服务端启动流程其实就是封装了JDK NIO
编程在服务端启动的流程。只不过对原生JDK NIO
进行了增强和优化,同时从架构设计上简化了服务端流程的编写。
最重要的是感谢彤哥、艿艿和俞超-闪电侠这些大佬前期的分享,能够让更多人学习源码的旅途少走很多弯路,谢谢!
Netty 服务端启动流程源码分析
前记
哈喽,自从上篇《Netty 之旅二:口口相传的高性能 Netty 到底是什么?》后,迟迟两周才开启今天的 Netty 源码系列。源码分析的第一篇文章,将由<吴小飞>分享 《Netty 服务端启动流程源码分析》,下一篇会由<王猛>分享客户端的启动过程源码分析。通过源码的阅读,我们将会知道,Netty 服务端启动的调用链是非常长的,同时肯定也会发现一些新的问题,随着我们源码阅读的不断深入,相信这些问题我们也会一一攻破。
废话不多说,直接上号!
一、从`EchoServer `示例入手
<div align="center", style="color:#A8A8A8;font-size:14px">netty-example:EchoServer</div>
示例从哪里来?任何开源框架都会有自己的示例代码,Netty 源码也不例外,如模块netty-example
中就包括了最常见的EchoServer
示例,下面通过这个示例进入服务端启动流程篇章。
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// 1. 声明Main-Sub Reactor模式线程池:EventLoopGroup
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 创建 EchoServerHandler 对象
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
// 2. 声明服务端启动引导器,并设置相关属性
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// 3. 绑定端口即启动服务端,并同步等待
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// 4. 监听服务端关闭,并阻塞等待
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// 5. 优雅地关闭两个EventLoopGroup线程池
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
复制代码
[代码行 18、19]声明 Main-Sub Reactor 模式线程池:EventLoopGroup
创建两个 EventLoopGroup
对象。其中,bossGroup
用于服务端接受客户端的连接,workerGroup
用于进行客户端的 SocketChannel
的数据读写。
(<u>关于EventLoopGroup
不是本文重点所以在后续文章中进行分析</u>)
[代码行 23-39]声明服务端启动引导器,并设置相关属性
AbstractBootstrap
是一个帮助类,通过方法链(method chaining
)的方式,提供了一个简单易用的方式来配置启动一个Channel
。io.netty.bootstrap.ServerBootstrap
,实现 AbstractBootstrap
抽象类,用于 Server
的启动器实现类。io.netty.bootstrap.Bootstrap
,实现 AbstractBootstrap
抽象类,用于 Client
的启动器实现类。如下类图所示:
[
](https://imgchr.com/i/wwvVKA)
<div align="center", style="color:#A8A8A8;font-size:14px">AbstractBootstrap 类继承</div>
(<u>在EchoServer
示例代码中,我们看到 ServerBootstrap
的 group
、channel
、option
、childHandler
等属性链式设置都放到关于AbstractBootstrap
体系代码中详细介绍。</u>)
[代码行 43]绑定端口即启动服务端,并同步等待
先调用 #bind(int port)
方法,绑定端口,后调用 ChannelFuture#sync()
方法,阻塞等待成功。对于bind
操作就是本文要详细介绍的"服务端启动流程"。
[代码行 47]监听服务端关闭,并阻塞等待
先调用 #closeFuture()
方法,监听服务器关闭,后调用 ChannelFuture#sync()
方法,阻塞等待成功。 注意,此处不是关闭服务器,而是channel
的监听关闭。
[代码行 51、52]优雅地关闭两个EventLoopGroup
线程池
finally
代码块中执行说明服务端将最终关闭,所以调用 EventLoopGroup#shutdownGracefully()
方法,分别关闭两个 EventLoopGroup
对象,终止所有线程。
二、服务启动过程
在服务启动过程的源码分析之前,这里回顾一下我们在通过JDK NIO
编程在服务端启动初始的代码:
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
复制代码
这 5 行代码标示一个最为熟悉的过程:
后面等分析完 Netty 的启动过程后,会对这些步骤有一个新的认识。在EchoServer
示例中,进入 #bind(int port)
方法,AbstractBootstrap#bind()
其实有多个方法,方便不同地址参数的传递,实际调用的方法是AbstractBootstrap#doBind(final SocketAddress localAddress)
方法,代码如下:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
复制代码
[代码行 2] :调用 #initAndRegister()
方法,初始化并注册一个 Channel
对象。因为注册是异步的过程,所以返回一个 ChannelFuture
对象。详细解析,见 「initAndRegister()
」。
[代码行 4-6]]:若发生异常,直接进行返回。
[代码行 9-34]:因为注册是异步的过程,有可能已完成,有可能未完成。所以实现代码分成了【第 10 至 14 行】和【第 15 至 36 行】分别处理已完成和未完成的情况。
- 核心在[第 11 、29 行],调用 #doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise)
方法,绑定 Channel 的端口,并注册 Channel 到 SelectionKey
中。
- 如果异步注册对应的 ChanelFuture
未完成,则调用 ChannelFuture#addListener(ChannelFutureListener)
方法,添加监听器,在注册完成后,进行回调执行 #doBind0(...)
方法的逻辑。
通过doBind
方法可以知道服务端启动流程大致如下几个步骤:
[
](https://imgchr.com/i/wwvl8g)
<div align="center", style="color:#A8A8A8;font-size:14px">服务端启动流程</div>
1. 创建 Channel
<div align="center", style="color:#A8A8A8;font-size:14px">创建服务端 channel</div>
从#doBind(final SocketAddress localAddress)
进入到initAndRegister()
:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
复制代码
[代码行 4]调用 ChannelFactory#newChannel()
方法,创建 Channel
对象。 ChannelFactory
类继承如下:
<div align="center", style="color:#A8A8A8;font-size:14px">ChannelFactroy 类继承</div>
可以在ChannelFactory
注释看到@deprecated Use {@link io.netty.channel.ChannelFactory} instead.
,这里只是包名的调整,对于继承结构不变。netty
默认使用ReflectiveChannelFactory
,我们可以看到重载方法:
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
复制代码
很明显,正如其名是通过反射机制构造Channel
对象实例的。constructor
是在其构造方法初始化的:this.constructor = clazz.getConstructor();
这个clazz
按理说应该是我们要创建的Channel
的 Class 对象。那Class
对象是什么呢?我们接着看channelFactory
是怎么初始化的。
首先在AbstractBootstrap
找到如下代码:
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
复制代码
调用这个方法的递推向上看到:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
复制代码
这个方法正是在EchoServer
中ServerBootstrap
链式设置时调用.channel(NioServerSocketChannel.class)
的方法。我们看到,channelClass
就是NioServerSocketChannel.class
,channelFactory
也是以ReflectiveChannelFactory
作为具体实例,并且将NioServerSocketChannel.class
作为构造参数传递初始化的,所以这回答了反射机制构造的是io.netty.channel.socket.nio.NioServerSocketChannel
对象。
继续看NioServerSocketChannel
构造方法逻辑做了什么事情,看之前先给出NioServerSocketChannel
类继承关系:
<div align="center", style="color:#A8A8A8;font-size:14px">Channel 类继承</div>
NioServerSocketChannel
与NioSocketChannel
分别对应服务端和客户端,公共父类都是AbstractNioChannel
和AbstractChannel
,下面介绍创建过程可以参照这个Channel
类继承图。进入NioServerSocketChannel
构造方法:
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
复制代码
点击newSocket
进去:
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
复制代码
以上传进来的provider
是DEFAULT_SELECTOR_PROVIDER
即默认的java.nio.channels.spi.SelectorProvider
,[代码行 9]就是熟悉的jdk nio
创建ServerSocketChannel
。这样newSocket(DEFAULT_SELECTOR_PROVIDER)
就返回了结果ServerSocketChannel
,回到NioServerSocketChannel()#this()
点进去:
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
复制代码
以上super
代表父类AbstractNioMessageChannel
构造方法,点进去看到:
/**
* @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
*/
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
复制代码
以上super
代表父类AbstractNioChannel
构造方法,点进去看到:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
复制代码
以上[代码行 3]将ServerSocketChannel
保存到了AbstractNioChannel#ch
成员变量,在上面提到的NioServerSocketChannel
构造方法的[代码行 6]javaChannel()
拿到的就是ch
保存的ServerSocketChannel
变量。
以上[代码行 6]就是熟悉的jdk nio
编程设置ServerSocketChannel
非阻塞方式。这里还有super
父类构造方法,点击进去看到:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
复制代码
以上构造方法中:
parent
属性,代表父 Channel
对象。对于 NioServerSocketChannel
的 parent
为null
。
id
属性,Channel
编号对象。在构造方法中,通过调用 #newId()
方法进行创建。(<u>这里不细展开 Problem-1</u>)
unsafe
属性,Unsafe
对象。因为Channel
真正的具体操作,是通过调用对应的 Unsafe
对象实施。所以需要在构造方法中,通过调用 #newUnsafe()
方法进行创建。这里的 Unsafe
并不是我们常说的 jdk
自带的sun.misc.Unsafe
,而是 io.netty.channel.Channel#Unsafe
。(<u>这里不细展开 Problem-2</u>)
pipeline
属性默认是DefaultChannelPipeline
对象,赋值后在后面为 channel 绑定端口的时候会用到
通过以上创建channel
源码过程分析,总结的流程时序图如下:
2. 初始化 Channel
<div align="center", style="color:#A8A8A8;font-size:14px">初始化 channel</div>
回到一开始创建Channel
的initAndRegister()
入口方法,在创建Channel
后紧接着init(channel)
进入初始化流程,因为是服务端初始化,所以是ServerBootstrap#init(Channel channel)
,代码如下:
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
复制代码
[代码 3 - 6 行]: options0()
方法返回的options
保存了用户在EchoServer
中设置自定义的可选项集合,这样ServerBootstrap
将配置的选项集合,设置到了 Channel
的可选项集合中。
[代码 8 - 15 行]: attrs0()
方法返回的attrs
保存了用户在EchoServer
中设置自定义的属性集合,这样ServerBootstrap
将配置的属性集合,设置到了 Channel
的属性集合中。
[代码 21-28 行]:通过局部变量currentChildOptions
和currentChildAttrs
保存了用户自定义的childOptions
和childAttrs
,用于[代码 43 行] ServerBootstrapAcceptor
构造方法。
[代码 30-47]]:创建ChannelInitializer
对象,添加到 pipeline
中,用于后续初始化 ChannelHandler
到 pipeline
中,包括用户在EchoServer
配置的LoggingHandler
和创建的创建 ServerBootstrapAcceptor
对象。
- [代码行 34-37]:添加启动器配置的 LoggingHandler
到pipeline
中。
- [代码行 39-45]:创建 ServerBootstrapAcceptor
对象,添加到 pipeline
中。从名字上就可以看出来,ServerBootstrapAcceptor
也是一个 ChannelHandler
实现类,专门用于接受客户端的新连接请求,把新的请求扔给某个事件循环器,我们先不做过多分析。我们发现是使用EventLoop.execute
执行添加的过程,这是为什么呢?同样记录问题(<u>Problem-</u>3)
- 需要说明的是pipeline
在之前介绍 Netty 核心组件的时候提到是一个包含ChannelHandlerContext
的双向链表,每一个context
对于唯一一个ChannelHandler
,这里初始化后,ChannelPipeline
里就是如下一个结构:
<div align="center", style="color:#A8A8A8;font-size:14px">ChannelPipeline 内部结构</div>
3. 注册 Channel
<img src="https://s1.ax1x.com/2020/09/13/wwvXi8.png" alt="wwvXi8.png" style="zoom:80%;" />
<div align="center", style="color:#A8A8A8;font-size:14px">注册 channel</div>
初始化Channel
一些基本配置和属性完毕后,回到一开始创建Channel
的initAndRegister()
入口方法,在初始化Channel
后紧接着[代码行 17] ChannelFuture regFuture = config().group().register(channel);
明显这里是通过EventLoopGroup
进入注册流程(EventLoopGroup
体系将在后续文章讲解)
在EchoServer
中启动器同样通过ServerBootstrap#group()
设置了NioEventLoopGroup
,它继承自MultithreadEventLoopGroup
,所以注册流程会进入MultithreadEventLoopGroup
重载的register(Channel channel)
方法,代码如下:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
复制代码
这里会调用 next()
方法选择出来一个 EventLoop
来注册 Channel
,里面实际上使用的是一个叫做 EventExecutorChooser
的东西来选择,它实际上又有两种实现方式 ——PowerOfTwoEventExecutorChooser
和 GenericEventExecutorChooser
,本质上就是从 EventExecutor
数组中选择一个 EventExecutor
,我们这里就是 NioEventLoop
,那么,它们有什么区别呢?(<u>Problem-4:在介绍EventLoopGroup
体系的后续文章中将会详细讲解,这里简单地提一下,本质都是按数组长度取余数 ,不过,2 的 N 次方的形式更高效。</u>)
接着,来到 NioEventLoop
的 register(channel)
方法,你会不会问找不到该方法?提示NioEventLoop
继承SingleThreadEventLoop
,所以父类方法:
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
复制代码
可以看到,先创建了一个叫做 ChannelPromise
的东西,它是 ChannelFuture
的子类。[代码行 9]又调回了 Channel
的 Unsafe
的 register ()
方法,这里第一个参数是 this
,也就是 NioEventLoop
,第二个参数是刚创建的 ChannelPromise
。
点击 AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise)
方法进去,代码如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
复制代码
[代码行 15]这行代码是设置 Channel
的 eventLoop
属性。这行前面的代码主要是在校验传入的 eventLoop
参数非空,校验是否有注册过以及校验 Channel
和 eventLoop
类型是否匹配。
[代码 18、24]接着,跟踪到 AbstractUnsafe#register0(ChannelPromise promise)
方法中:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
复制代码
[代码行 9]进入 AbstractNioChannel#doRegister()
方法:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
复制代码
[代码行 5]关键一行代码,将 Java 原生 NIO Selector
与 Java 原生 NIO
的 Channel
对象(ServerSocketChannel
) 绑定在一起,并将当前 Netty 的 Channel
通过 attachment
的形式绑定到 SelectionKey
上:
调用 #unwrappedSelector()
方法,返回 Java 原生 NIO Selector
对象,而且每个 NioEventLoop
与 Selector
唯一一对应。
调用 SelectableChannel#register(Selector sel, int ops, Object att)
方法,注册 Java 原生NIO
的 Channel
对象到 NIO Selector
对象上。
通过以上注册 channel 源码分析,总结流程的时序图如下:
4. 绑定端口
<div align="center", style="color:#A8A8A8;font-size:14px">绑定端口</div>
注册完Channel
最后回到AbstractBootstrap#doBind()
方法,分析 Channel
的端口绑定逻辑。进入doBind0
代码如下:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
复制代码
[代码行 7]:在前面Channel
注册成功的条件下,调用 EventLoop
执行 Channel
的端口绑定逻辑。但是,实际上当前线程已经是 EventLoop
所在的线程了,为何还要这样操作呢?答案在【第 5 至 6 行】的英语注释,这里作为一个问题记着(<u>Problem-5</u>)。
[代码行 11]:进入AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
,同样立即异步返回并添加ChannelFutureListener.CLOSE_ON_FAILURE
监听事件。
[代码行 13]:如果绑定端口之前的操作并没有成功,自然也就不能进行端口绑定操作了,通过 promise 记录异常原因。
AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
方法如下:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
复制代码
pipeline
是之前创建channel
的时候创建的DefaultChannelPipeline
,进入该方法:
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
复制代码
[在分析初始化流程的时候最后画一个DefaultChannelPipeline
内部的结构,能够便于分析后面进入DefaultChannelPipeline
一系列bind
方法。]
首先,tail
代表TailContext
,进入AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)
方法:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
//省略部分代码
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
复制代码
[代码行 3]:findContextOutbound
方法里主要是执行ctx = ctx.prev;
那么得到的next
就是绑定LoggingHandler
的context
[代码行 6]:进入invokeBind(localAddress, promise)
方法并直接执行LoggingHandler#bind(this, localAddress, promise)
,进入后的方法如下:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "BIND", localAddress));
}
ctx.bind(localAddress, promise);
}
复制代码
设置了LoggingHandler
的日志基本级别为默认的 INFO 后,进行绑定操作的信息打印。接着,继续循环到AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)
方法执行ctx = ctx.prev
取出HeadContext
进入到 bind 方法:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
复制代码
兜兜转转,最终跳出了pipeline
轮回到AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise)
方法,Channel 的端口绑定逻辑。代码如下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
//此处有省略...
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
//此处有省略...
}
复制代码
做实事方法doBind
进入后如下:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
复制代码
到了此处,服务端的 Java 原生 NIO ServerSocketChannel
终于绑定上了端口。
三、问题归纳
Problem-1: 创建Channel
流程中AbstractChannel
构造函数中为channel
分配 ID 的算法如何实现?
Problem-2: AbstractChannel
内部类AbstractUnsafe
的作用?
Problem-3: 初始化channel
流程中pipeline
添加ServerBootstrapAcceptor
是通过EventLoop.execute
执行添加的过程,这是为什么呢?
Problem-4:注册channel
流程中PowerOfTwoEventExecutorChooser
和 GenericEventExecutorChooser
的区别和优化原理?
Problem-5:绑定端口流程中调用 EventLoop
执行 Channel
的端口绑定逻辑。但是,实际上当前线程已经是 EventLoop
所在的线程了,为何还要这样操作呢?
小结
通过对 Netty 服务端启动流程源码分析,我们发现了在使用NIO
的模式下,服务端启动流程其实就是封装了JDK NIO
编程在服务端启动的流程。只不过对原生JDK NIO
进行了增强和优化,同时从架构设计上简化了服务端流程的编写。
最重要的是感谢彤哥、艿艿和俞超-闪电侠这些大佬前期的分享,能够让更多人学习源码的旅途少走很多弯路,谢谢!
欢迎关注:
评论