写点什么

netty 入门之服务端启动过程分析

作者:Hex
  • 2022 年 7 月 26 日
  • 本文字数:11606 字

    阅读完需:约 38 分钟

Netty 是一款用于高效开发网络应用的 NIO 网络框架,具有高性能、易使用、安全稳定、高可扩展等特点,并且被广泛的应用于各类中间件框架中,比如 Dubbo,gRPC,ES,RocketMQ 等。

本文以 netty 的服务端启动的 Demo 代码为入口,对 netty 的源码进行学习并加以分析,下面对 server 的启动过程进行记录。

以下 netty 代码为 4.1.49。

一、基础知识

Future 与 Promise 在 netty 的代码中会大量使用到 Futrue 与 Promise,理解它们是阅读 netty 代码的基本门槛之一,下面对它们进行简单的回顾学习。

Future

Runnable vs Callable 在 Java 1.5 之前,创建线程有两种方式,一种直接通过 new Thread 创建,另一种是通过实现 Runnable 接口创建,当然 Thread 类也是 Runnable 接口的实现类,也可以是同一种方式。这种方式最大的缺点就是不能拿到线程执行的结果,为了弥补这个缺点,从 Java 1.5 开始引入接口 Callable、Future 来实现多线程中取到线程的执行结果。如下为 Callable 的实用案例:


public class FactorialTask implements Callable<Integer> {    int number;
// standard constructors
public Integer call() throws InvalidParamaterException { int fact = 1; // ... for(int count = number; count > 1; count--) { fact = fact * count; }
return fact; }}
@Testpublic void whenTaskSubmitted_ThenFutureResultObtained(){ FactorialTask task = new FactorialTask(5); ExecutorService executorService = Executors.newFixedThreadPool(1); Future<Integer> future = executorService.submit(task); assertEquals(120, future.get().intValue());}
复制代码


在 JDK 的并发包 J.U.C 中定义了一个接口 Future,先从这个 Future 说起,它用于代表异步操作的结果。Future 提供了检查线程是否执行完成的方法 isDone(),提供了取消的方法 cancel()以及检查是否已取消的 isCanceled(),还提供了获取线程执行结果的方法 get()。Future 的 get 方法获取结果时会阻塞。


既然 JDK 已提供了 Future 接口,为何 netty 要再定义一个 Future 接口且继承 JDK 的 Future 接口,看下 netty 的 Future 接口的定义:


/** * The result of an asynchronous operation. */@SuppressWarnings("ClassNameSameAsAncestorName")public interface Future<V> extends java.util.concurrent.Future<V> {
/** * Returns {@code true} if and only if the I/O operation was completed * successfully. */ boolean isSuccess();
/** * returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}. */ boolean isCancellable();
/** * Returns the cause of the failed I/O operation if the I/O operation has * failed. * * @return the cause of the failure. * {@code null} if succeeded or this future is not * completed yet. */ Throwable cause();
/** * Adds the specified listener to this future. The * specified listener is notified when this future is * {@linkplain #isDone() done}. If this future is already * completed, the specified listener is notified immediately. */ Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
/** * Adds the specified listeners to this future. The * specified listeners are notified when this future is * {@linkplain #isDone() done}. If this future is already * completed, the specified listeners are notified immediately. */ Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/** * Removes the first occurrence of the specified listener from this future. * The specified listener is no longer notified when this * future is {@linkplain #isDone() done}. If the specified * listener is not associated with this future, this method * does nothing and returns silently. */ Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
/** * Removes the first occurrence for each of the listeners from this future. * The specified listeners are no longer notified when this * future is {@linkplain #isDone() done}. If the specified * listeners are not associated with this future, this method * does nothing and returns silently. */ Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/** * Waits for this future until it is done, and rethrows the cause of the failure if this future * failed. */ Future<V> sync() throws InterruptedException;
/** * Waits for this future until it is done, and rethrows the cause of the failure if this future * failed. */ Future<V> syncUninterruptibly();
/** * Waits for this future to be completed. * * @throws InterruptedException * if the current thread was interrupted */ Future<V> await() throws InterruptedException;
/** * Waits for this future to be completed without * interruption. This method catches an {@link InterruptedException} and * discards it silently. */ Future<V> awaitUninterruptibly();
/** * Waits for this future to be completed within the * specified time limit. * * @return {@code true} if and only if the future was completed within * the specified time limit * * @throws InterruptedException * if the current thread was interrupted */ boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/** * Waits for this future to be completed within the * specified time limit. * * @return {@code true} if and only if the future was completed within * the specified time limit * * @throws InterruptedException * if the current thread was interrupted */ boolean await(long timeoutMillis) throws InterruptedException;
/** * Waits for this future to be completed within the * specified time limit without interruption. This method catches an * {@link InterruptedException} and discards it silently. * * @return {@code true} if and only if the future was completed within * the specified time limit */ boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/** * Waits for this future to be completed within the * specified time limit without interruption. This method catches an * {@link InterruptedException} and discards it silently. * * @return {@code true} if and only if the future was completed within * the specified time limit */ boolean awaitUninterruptibly(long timeoutMillis);
/** * Return the result without blocking. If the future is not done yet this will return {@code null}. * * As it is possible that a {@code null} value is used to mark the future as successful you also need to check * if the future is really done with {@link #isDone()} and not rely on the returned {@code null} value. */ V getNow();
/** * {@inheritDoc} * * If the cancellation was successful it will fail the future with a {@link CancellationException}. */ @Override boolean cancel(boolean mayInterruptIfRunning);}
复制代码


netty 的 Future 接口提供了更多的方法,与 JDK 最主要的区别是增加了用于监听器相关的方法,可用于当 future 执行完成之后立即执行监听器。(观察者模式)

Promise

Promise,中文翻译为承诺或者许诺,含义是人与人之间,一个人对另一个人所说的具有一定憧憬的话,一般是可以实现的。


netty 定义的 Promise 接口继承自其 Future,是可以写操作的 Future,通过 setSuccess,setFailure 等方法来设置返回对象。


public class PromiseDemo {
public static void main(String[] args) {
EventExecutor executor = GlobalEventExecutor.INSTANCE; Promise<Map<String, String>> promise = new DefaultPromise<>(executor); promise.addListener(new MyListener());
Thread myThread = new Thread(new MyThread(promise)); myThread.start();
executor.shutdownGracefully(); }

static class MyThread implements Runnable { Promise promise;
public MyThread(Promise promise) { this.promise = promise; }
@Override public void run() { // 模拟处理业务逻辑 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } Map<String, String> result = new HashMap<>(); result.put("completeTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")));
promise.setSuccess(result); } }
static class MyListener implements GenericFutureListener<Future<Map<String, String>>> {
@Override public void operationComplete(Future<Map<String, String>> future) throws Exception { if (future.isSuccess()) { Map<String, String> result = future.getNow(); System.out.println("执行完成,输出结果:"); for (Map.Entry<String, String> entry : result.entrySet()) { System.out.println(entry.getKey() + ":" + entry.getValue()); } } } }}
复制代码

二、理解 EventLoopGroup、EventLoop

直接以 netty 的入门使用代码作为入口,如下:


ServerBootstrap server = new ServerBootstrap();EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();server.group(bossGroup, workerGroup);
复制代码


这里看到里 EventLoopGroup,直接从 NioEventLoopGroup 的构造函数跟进。由于这里涉及到的类较多,命名看起来有点绕,所以先给出一张类图,方便在跟进代码的时候更清晰,如下:


NioEventLoopGroup 初始化

NioEventLoopGroup 最常用的两个构造函数为:


public NioEventLoopGroup() {        this(0);}
public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null);}
复制代码


线程的数量决定了 EventExecutor[] children 属性,也就是 EventLoopGroup 中用于的 EventLoop 数组的长度。


** NioEventLoopGroup 初始化过程中,其属性 EventExecutor[] children 初始化的过程?**1)数组的大小是 NioEventLoopGroup 的构造函数传入的数组,当没传的时候(无参构造函数)会取默认数量,如下:


int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
复制代码


优先取配置 io.netty.eventLoopThreads,没有这个配置时会使用系统可用的处理器的两倍。


2)确定了数组的大小,然后就是进行初始化,即初始化 NioEventLoop,在其构造函数中 executor 为 new ThreadPerTaskExecutor(newDefaultThreadFactory())


3)NioEventLoopGroup 的属性 chooser,用于选择下一个 EventLoop,也就是在 children 数组中选择下一个 NioEventLoop,chooser 会根据数组的值是否为 2 的幂次方有两种方式,如果是 2 的幂次方会使用 &运算来计算下标(PowerOfTwoEventExecutorChooser),否则使用 %运算来计算下标(GenericEventExecutorChooser)


关于 EventLoop 的内部实现这里不再展开,包括内部的任务队列、执行方式等。

三、server 端启动过程

先看下简单的 server 端启动的代码:


    ServerBootstrap server = new ServerBootstrap();        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();
server.group(bossGroup, workerGroup); server.channel(NioServerSocketChannel.class); server.localAddress(8080); server.option(ChannelOption.SO_KEEPALIVE, true); server.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
server.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("codec", new HttpServerCodec()) .addLast("compressor", new HttpContentCompressor()) .addLast("aggregator", new HttpObjectAggregator(65536)) .addLast("handler", new HttpServerHandler()) ; } });
ChannelFuture future = server.bind().sync();
System.out.println("服务器启动成功,监听端口:" + future.channel().localAddress());
future.channel().closeFuture().sync();
复制代码


代码中包含的内容分为两部分,一部分是初始化并设置一些属性,比如设置 channel 类型、端口号、连接的参数、处理类、流水线等;另一部分是启动服务,也就是 server.bind().sync()这行代码。下面主要看下启动服务这一行代码,netty 做了哪些事情,分为 bind 和 sync 两部分来看。

bind()

在使用 Java NIO 时,启动服务端的过程需要将 Selector 注册到 Channel 上,代码实现如下:


    Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 绑定端口号 serverSocketChannel.bind(new InetSocketAddress(9090)); // 设置为非阻塞 serverSocketChannel.configureBlocking(false);
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor(serverSocketChannel, new SubReactor()));
while (true) { int select = selector.select(); System.out.println("select收到事件数:" + select); if (select > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) { SelectionKey next = iterator.next(); dispatch(next); // 移除 iterator.remove(); } } }
复制代码


上面的代码省略 dispatch 方法,Acceptor 的代码,通过 Java NIO 的实现服务端的方式包括了以下步骤:


  1. 创建 Channel,新建 ServerSocketChannel 并绑定端口号,设置相关的属性;

  2. 创建 Selector

  3. 将 Selector 注册到 Channel 上,并设置监听的类型、设置附件

  4. 开启循环通过 Selectot.select 判断是否有连接,并依次处理各连接


下面与 netty 的实现过程进行对比与参照,来看 netty 对 bind 的过程:


  1. 初始化 Channel

  2. Channel 与 EventLoop 的关联

  3. 执行绑定

初始化 Channel

初始化 Channel,在服务端就是实例化 NioServerSocketChannel,包含设置 Channel 的参数(也就是 server.option 中设置的)、为 Channel 创建流水线 Pipline。调用链路:


AbstarctBootstrap#bind( )AbstarctBootstrap#doBind( )AbstarctBootstrap#initAndRegister( )


关键代码:


  final ChannelFuture initAndRegister() {        Channel channel = null;        try {      // 通过反射实例化,在Bootstrap.channel(Class<? extends C> channelClass)方法设置要实例化的类            channel = channelFactory.newChannel();            // 设置Channel的属性Option,关联新的流水线Pipline      init(channel);        } catch (Throwable t) {           // ....        }    // ....    }
复制代码


在创建 Channel 实例时做了两件事情:


  1. 通过反射实例化 NioServerSocketChannel

  2. 设置 Channel 的属性 Option(也就是 server.option 中设置的),关联新的流水线 Pipline

Channel 与 EventLoop 关联

换句话说,也就是讲 Channel 通过 EventLoopGroup 注册到某个 EventLoop 上,也就是 AbstractBootstrap 中 initAndRegister 方法中的下面一行代码:


    final ChannelFuture initAndRegister() {        // 省略Channel的初始化及相关设置    // ....
ChannelFuture regFuture = config().group().register(channel); // 省略返回等代码 // .... }
复制代码


首先会从 EventLoopGroup 中通过 next()选择一个 EventLoop(选择的方式在上面的 chooser 有说明),然后将 Channel 注册到 EventLoop,此处也就是 NioEventLoop。


  // SingleThreadEventLoop是NioEventLoop的父类,注册channel的方法如下
@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
复制代码


如上面的代码所示,会将 EventLoop 和 Promise 注册到 Channel 的 unsafe 实例上,对于 netty 的 unsafe 后面找机会再进行具体说明,这里 NioServerSocketChannel 的 unsafe 可以定位到其父类抽象类 AbstractNioMessageChannel 的私有内部类 NioMessageUnsafe。也就是说,注册的过程要查看 NioMessageUnsafe 的 register 方法,该方法在其父类 AbstractChannel 的私有内部类 AbstractUnsafe 中,代码如下:


    // AbstractChannel.java中的内部类AbstractUnsafe类    
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop"); if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; }
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
复制代码


调用 EventLoop 的 execute 方法,如上代码会最终会调用 eventLoop.execute,并将一个执行 register0 的线程(任务 task)传入。


到此时,将提交一个 register 任务到 EventLoop 中(EventLoop 包含任务队列,具体执行方法此处不展开),然后返回一个 ChannelFuture,再次回到 AbstractBootstrap 的 initAndRegister 方法的代码中:


  final ChannelFuture initAndRegister() {    // 省略Channel的初始化及相关设置    // ....                // 返回DefaultChannelPromise        ChannelFuture regFuture = config().group().register(channel);        if (regFuture.cause() != null) {            if (channel.isRegistered()) {                channel.close();            } else {                channel.unsafe().closeForcibly();            }        }
// If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread.
return regFuture; }
复制代码


netty 源码中的注释显示,这个方法返回 regFuture 时,有两个情况,一种是注册的过程已经处理成功,另一种是提交给异步的线程,也就是添加到 EventLoop 的任务队列中等待被执行。所以,在这里返回的 ChannelFuture(Promise)可以等待注册完成后的一个回调处理。接下来就在具体的 bind 过程中继续查看。

执行绑定:doBind()

最后在执行 doBind 操作时会进入到 AbstractChannel 的 AbstractUnsafe 中的 bind 方法,如下:


    @Override        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {            assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) { return; }
// See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); }
boolean wasActive = isActive(); try { // 执行绑定,类似serverSocketChannel.bind(new InetSocketAddress(9090)); doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; }
if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } // 绑定成功后,设置promise为成功状态 safeSetSuccess(promise); }
复制代码


最终返回的 ChannelFuture 是 PendingRegistrationPromise 的实例。

sync()

sync 的作用通过查看 DefaultPromise 的 sync 方法可知,主要作用是调用 wait()进入休眠,等待上面的 doBind()执行完成之后,通过调用 safeSetSuccess(promise)来唤醒。其作用防止执行 main 方法的主线程在调用 server.bind()之后就退出,具体代码细节就不再进行分析。

四、总结

netty 服务端的启动过程简单概括为,首先初始化 Channel,包括 EventLoopGroup 初始化设置、Channel 属性设置、Pipline 初始化设置,然后通过 EventLoopGroup 分配 EventLoop 与 Channel 进行关联注册,并开启绑定 Selector 和循环监听。从上面的过程中来看,netty 大量的使用异步的方式执行,并通过 Promise 来完成回调,所以理解 Promise 对理解 netty 的代码。

发布于: 12 小时前阅读数: 15
用户头像

Hex

关注

还未添加个人签名 2018.05.24 加入

还未添加个人简介

评论

发布
暂无评论
netty入门之服务端启动过程分析_Java_Hex_InfoQ写作社区