Netty 是一款用于高效开发网络应用的 NIO 网络框架,具有高性能、易使用、安全稳定、高可扩展等特点,并且被广泛的应用于各类中间件框架中,比如 Dubbo,gRPC,ES,RocketMQ 等。
本文以 netty 的服务端启动的 Demo 代码为入口,对 netty 的源码进行学习并加以分析,下面对 server 的启动过程进行记录。
以下 netty 代码为 4.1.49。
一、基础知识
Future 与 Promise 在 netty 的代码中会大量使用到 Futrue 与 Promise,理解它们是阅读 netty 代码的基本门槛之一,下面对它们进行简单的回顾学习。
Future
Runnable vs Callable 在 Java 1.5 之前,创建线程有两种方式,一种直接通过 new Thread 创建,另一种是通过实现 Runnable 接口创建,当然 Thread 类也是 Runnable 接口的实现类,也可以是同一种方式。这种方式最大的缺点就是不能拿到线程执行的结果,为了弥补这个缺点,从 Java 1.5 开始引入接口 Callable、Future 来实现多线程中取到线程的执行结果。如下为 Callable 的实用案例:
public class FactorialTask implements Callable<Integer> {
int number;
// standard constructors
public Integer call() throws InvalidParamaterException {
int fact = 1;
// ...
for(int count = number; count > 1; count--) {
fact = fact * count;
}
return fact;
}
}
@Test
public void whenTaskSubmitted_ThenFutureResultObtained(){
FactorialTask task = new FactorialTask(5);
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Integer> future = executorService.submit(task);
assertEquals(120, future.get().intValue());
}
复制代码
在 JDK 的并发包 J.U.C 中定义了一个接口 Future,先从这个 Future 说起,它用于代表异步操作的结果。Future 提供了检查线程是否执行完成的方法 isDone(),提供了取消的方法 cancel()以及检查是否已取消的 isCanceled(),还提供了获取线程执行结果的方法 get()。Future 的 get 方法获取结果时会阻塞。
既然 JDK 已提供了 Future 接口,为何 netty 要再定义一个 Future 接口且继承 JDK 的 Future 接口,看下 netty 的 Future 接口的定义:
/**
* The result of an asynchronous operation.
*/
@SuppressWarnings("ClassNameSameAsAncestorName")
public interface Future<V> extends java.util.concurrent.Future<V> {
/**
* Returns {@code true} if and only if the I/O operation was completed
* successfully.
*/
boolean isSuccess();
/**
* returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}.
*/
boolean isCancellable();
/**
* Returns the cause of the failed I/O operation if the I/O operation has
* failed.
*
* @return the cause of the failure.
* {@code null} if succeeded or this future is not
* completed yet.
*/
Throwable cause();
/**
* Adds the specified listener to this future. The
* specified listener is notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listener is notified immediately.
*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* Adds the specified listeners to this future. The
* specified listeners are notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listeners are notified immediately.
*/
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* Removes the first occurrence of the specified listener from this future.
* The specified listener is no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listener is not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* Removes the first occurrence for each of the listeners from this future.
* The specified listeners are no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listeners are not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
*/
Future<V> sync() throws InterruptedException;
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
*/
Future<V> syncUninterruptibly();
/**
* Waits for this future to be completed.
*
* @throws InterruptedException
* if the current thread was interrupted
*/
Future<V> await() throws InterruptedException;
/**
* Waits for this future to be completed without
* interruption. This method catches an {@link InterruptedException} and
* discards it silently.
*/
Future<V> awaitUninterruptibly();
/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeoutMillis) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeoutMillis);
/**
* Return the result without blocking. If the future is not done yet this will return {@code null}.
*
* As it is possible that a {@code null} value is used to mark the future as successful you also need to check
* if the future is really done with {@link #isDone()} and not rely on the returned {@code null} value.
*/
V getNow();
/**
* {@inheritDoc}
*
* If the cancellation was successful it will fail the future with a {@link CancellationException}.
*/
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
复制代码
netty 的 Future 接口提供了更多的方法,与 JDK 最主要的区别是增加了用于监听器相关的方法,可用于当 future 执行完成之后立即执行监听器。(观察者模式)
Promise
Promise,中文翻译为承诺或者许诺,含义是人与人之间,一个人对另一个人所说的具有一定憧憬的话,一般是可以实现的。
netty 定义的 Promise 接口继承自其 Future,是可以写操作的 Future,通过 setSuccess,setFailure 等方法来设置返回对象。
public class PromiseDemo {
public static void main(String[] args) {
EventExecutor executor = GlobalEventExecutor.INSTANCE;
Promise<Map<String, String>> promise = new DefaultPromise<>(executor);
promise.addListener(new MyListener());
Thread myThread = new Thread(new MyThread(promise));
myThread.start();
executor.shutdownGracefully();
}
static class MyThread implements Runnable {
Promise promise;
public MyThread(Promise promise) {
this.promise = promise;
}
@Override
public void run() {
// 模拟处理业务逻辑
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, String> result = new HashMap<>();
result.put("completeTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")));
promise.setSuccess(result);
}
}
static class MyListener implements GenericFutureListener<Future<Map<String, String>>> {
@Override
public void operationComplete(Future<Map<String, String>> future) throws Exception {
if (future.isSuccess()) {
Map<String, String> result = future.getNow();
System.out.println("执行完成,输出结果:");
for (Map.Entry<String, String> entry : result.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
}
}
}
}
复制代码
二、理解 EventLoopGroup、EventLoop
直接以 netty 的入门使用代码作为入口,如下:
ServerBootstrap server = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
server.group(bossGroup, workerGroup);
复制代码
这里看到里 EventLoopGroup,直接从 NioEventLoopGroup 的构造函数跟进。由于这里涉及到的类较多,命名看起来有点绕,所以先给出一张类图,方便在跟进代码的时候更清晰,如下:
NioEventLoopGroup 初始化
NioEventLoopGroup 最常用的两个构造函数为:
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
复制代码
线程的数量决定了 EventExecutor[] children 属性,也就是 EventLoopGroup 中用于的 EventLoop 数组的长度。
** NioEventLoopGroup 初始化过程中,其属性 EventExecutor[] children 初始化的过程?**1)数组的大小是 NioEventLoopGroup 的构造函数传入的数组,当没传的时候(无参构造函数)会取默认数量,如下:
int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
复制代码
优先取配置 io.netty.eventLoopThreads,没有这个配置时会使用系统可用的处理器的两倍。
2)确定了数组的大小,然后就是进行初始化,即初始化 NioEventLoop,在其构造函数中 executor 为 new ThreadPerTaskExecutor(newDefaultThreadFactory())
3)NioEventLoopGroup 的属性 chooser,用于选择下一个 EventLoop,也就是在 children 数组中选择下一个 NioEventLoop,chooser 会根据数组的值是否为 2 的幂次方有两种方式,如果是 2 的幂次方会使用 &运算来计算下标(PowerOfTwoEventExecutorChooser),否则使用 %运算来计算下标(GenericEventExecutorChooser)
关于 EventLoop 的内部实现这里不再展开,包括内部的任务队列、执行方式等。
三、server 端启动过程
先看下简单的 server 端启动的代码:
ServerBootstrap server = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
server.group(bossGroup, workerGroup);
server.channel(NioServerSocketChannel.class);
server.localAddress(8080);
server.option(ChannelOption.SO_KEEPALIVE, true);
server.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
server.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("codec", new HttpServerCodec())
.addLast("compressor", new HttpContentCompressor())
.addLast("aggregator", new HttpObjectAggregator(65536))
.addLast("handler", new HttpServerHandler())
;
}
});
ChannelFuture future = server.bind().sync();
System.out.println("服务器启动成功,监听端口:" + future.channel().localAddress());
future.channel().closeFuture().sync();
复制代码
代码中包含的内容分为两部分,一部分是初始化并设置一些属性,比如设置 channel 类型、端口号、连接的参数、处理类、流水线等;另一部分是启动服务,也就是 server.bind().sync()这行代码。下面主要看下启动服务这一行代码,netty 做了哪些事情,分为 bind 和 sync 两部分来看。
bind()
在使用 Java NIO 时,启动服务端的过程需要将 Selector 注册到 Channel 上,代码实现如下:
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 绑定端口号
serverSocketChannel.bind(new InetSocketAddress(9090));
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor(serverSocketChannel, new SubReactor()));
while (true) {
int select = selector.select();
System.out.println("select收到事件数:" + select);
if (select > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
dispatch(next);
// 移除
iterator.remove();
}
}
}
复制代码
上面的代码省略 dispatch 方法,Acceptor 的代码,通过 Java NIO 的实现服务端的方式包括了以下步骤:
创建 Channel,新建 ServerSocketChannel 并绑定端口号,设置相关的属性;
创建 Selector
将 Selector 注册到 Channel 上,并设置监听的类型、设置附件
开启循环通过 Selectot.select 判断是否有连接,并依次处理各连接
下面与 netty 的实现过程进行对比与参照,来看 netty 对 bind 的过程:
初始化 Channel
Channel 与 EventLoop 的关联
执行绑定
初始化 Channel
初始化 Channel,在服务端就是实例化 NioServerSocketChannel,包含设置 Channel 的参数(也就是 server.option 中设置的)、为 Channel 创建流水线 Pipline。调用链路:
AbstarctBootstrap#bind( )AbstarctBootstrap#doBind( )AbstarctBootstrap#initAndRegister( )
关键代码:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 通过反射实例化,在Bootstrap.channel(Class<? extends C> channelClass)方法设置要实例化的类
channel = channelFactory.newChannel();
// 设置Channel的属性Option,关联新的流水线Pipline
init(channel);
} catch (Throwable t) {
// ....
}
// ....
}
复制代码
在创建 Channel 实例时做了两件事情:
通过反射实例化 NioServerSocketChannel
设置 Channel 的属性 Option(也就是 server.option 中设置的),关联新的流水线 Pipline
Channel 与 EventLoop 关联
换句话说,也就是讲 Channel 通过 EventLoopGroup 注册到某个 EventLoop 上,也就是 AbstractBootstrap 中 initAndRegister 方法中的下面一行代码:
final ChannelFuture initAndRegister() {
// 省略Channel的初始化及相关设置
// ....
ChannelFuture regFuture = config().group().register(channel);
// 省略返回等代码
// ....
}
复制代码
首先会从 EventLoopGroup 中通过 next()选择一个 EventLoop(选择的方式在上面的 chooser 有说明),然后将 Channel 注册到 EventLoop,此处也就是 NioEventLoop。
// SingleThreadEventLoop是NioEventLoop的父类,注册channel的方法如下
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
复制代码
如上面的代码所示,会将 EventLoop 和 Promise 注册到 Channel 的 unsafe 实例上,对于 netty 的 unsafe 后面找机会再进行具体说明,这里 NioServerSocketChannel 的 unsafe 可以定位到其父类抽象类 AbstractNioMessageChannel 的私有内部类 NioMessageUnsafe。也就是说,注册的过程要查看 NioMessageUnsafe 的 register 方法,该方法在其父类 AbstractChannel 的私有内部类 AbstractUnsafe 中,代码如下:
// AbstractChannel.java中的内部类AbstractUnsafe类
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
复制代码
调用 EventLoop 的 execute 方法,如上代码会最终会调用 eventLoop.execute,并将一个执行 register0 的线程(任务 task)传入。
到此时,将提交一个 register 任务到 EventLoop 中(EventLoop 包含任务队列,具体执行方法此处不展开),然后返回一个 ChannelFuture,再次回到 AbstractBootstrap 的 initAndRegister 方法的代码中:
final ChannelFuture initAndRegister() {
// 省略Channel的初始化及相关设置
// ....
// 返回DefaultChannelPromise
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
复制代码
netty 源码中的注释显示,这个方法返回 regFuture 时,有两个情况,一种是注册的过程已经处理成功,另一种是提交给异步的线程,也就是添加到 EventLoop 的任务队列中等待被执行。所以,在这里返回的 ChannelFuture(Promise)可以等待注册完成后的一个回调处理。接下来就在具体的 bind 过程中继续查看。
执行绑定:doBind()
最后在执行 doBind 操作时会进入到 AbstractChannel 的 AbstractUnsafe 中的 bind 方法,如下:
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
// 执行绑定,类似serverSocketChannel.bind(new InetSocketAddress(9090));
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
// 绑定成功后,设置promise为成功状态
safeSetSuccess(promise);
}
复制代码
最终返回的 ChannelFuture 是 PendingRegistrationPromise 的实例。
sync()
sync 的作用通过查看 DefaultPromise 的 sync 方法可知,主要作用是调用 wait()进入休眠,等待上面的 doBind()执行完成之后,通过调用 safeSetSuccess(promise)来唤醒。其作用防止执行 main 方法的主线程在调用 server.bind()之后就退出,具体代码细节就不再进行分析。
四、总结
netty 服务端的启动过程简单概括为,首先初始化 Channel,包括 EventLoopGroup 初始化设置、Channel 属性设置、Pipline 初始化设置,然后通过 EventLoopGroup 分配 EventLoop 与 Channel 进行关联注册,并开启绑定 Selector 和循环监听。从上面的过程中来看,netty 大量的使用异步的方式执行,并通过 Promise 来完成回调,所以理解 Promise 对理解 netty 的代码。
评论