Netty 浅析
之前第一次接触就被这种处理 I/O 的方式惊艳到了,写 Netty 总有一种不是在写业务;而是真的在写代码的感觉!而且写过不少 Reactor 模型的代码,但基本都是 Echo 版本的,于是很想知道 Netty 到底封装了什么,为什么是很多高性能框架(SpringWebFlux,Dubbo,Lettuce)的选择?于是找个时间准备深究一下源码。
其实这源码也不是我强烈想看的,因为网上没找到什么文章和资源,官方文档也没啥好看的。除了之前看过的《Netty 实战》这本书之外,就没了。于是决定看看实现来进一步理解它。
P.S.这篇文章是我在掘金编辑的,所以图片有水印,不是我盗的!
组织结构
Netty 的源码组织结构如下:
(还好不复杂!(:p
Bootstrap
bootstrap 包主要负责启动服务,比如 BootStrap 负责启动客户端连接服务;ServerBootStrap 负责启动服务端监听服务;和我们传统 Socket 差不多的意思,所以并没有什么好谈的。源码相对简单:
Buffer
buffer 则是对于数据的操作,包括了对于直接内存的操作,对于缓冲数据的操作等。Netty 把网络 I/O 的 buffer 和直接内存的 buffer 整合到了一起,这样我们在 处理数据时,可以比较好的利用这种抽象编写方便的代码。
Channel
channel 则是定义了一些核心组件,比如对于连接的封装 Channel,对于请求响应链的组合 ChannelPipeline,以及对于 Channel 的轮询管理 EventLoop/EventLoopGroup 等。
Handler
为了方便我们的使用,Netty 提供了一些开箱即用的 Handler,比如 Http 编解码器,日志记录器等。
Resolver
一些解析器
Util
常用的工具类。但是有一个需要我们注意,就是 concurrent 包,他是对于 java.util.concurrent 的封装和扩展,方便我们在 Netty 使用线程来进行一些并发操作,以及对于线程的管理(包括对于轮询的管理)。Netty 的高连接的一个很重要的原因之一就是它对于线程的活用,使之满足一个线程管理多个连接(如果是小连接的话甚至可以一个线程管理成千上万个连接)。
核心组件
EventLoop
如果你写过 Reactor 模型的 Java NIO 网络通信,你肯定知道,在 NIO 中,是一个线程对应一个 Selector,一个 Selector 管理 N 个远程连接。
在 NIO 中,我们想要知道当前 Selector 管理的 N 个连接有没有就绪事件,需要调用 select()方法,这个方法底层调用操作系统的 select/poll/epoll 来实现,当发生了就绪事件,方法从阻塞状态返回;然后轮询就绪列表,所以称为"Loop"。对于感兴趣的时间集合进行轮询就是 EventLoop 名称的由来。
NioEventLoop
在 Netty 中,实现了 EventLoop 接口的类有很多,这正是 Netty 封装底层差异的表现之一。除了特定于操作系统的,比如 Linux 的 EpollEventLoop 和 macOS 的 Kqueue;我们一般使用 NIO。是因为 NIO 屏蔽了系统差异,会自动根据当前程序系统选择特定于系统的最好的方式。
上三层都是 JUC 标准库提供的,下面都是 Netty 进行了封装。通过观察我们很容易发现,
🍔EventExecutorGroup
首先我们分析 EventExecutorGroup。它是一个扩展了 JUC 线程池操作的接口,主要做了三件事:
1⃣️添加了“优雅关闭”线程池的方法。
2⃣️实现了迭代器接口,并添加了 next()方法,实现返回下一个 EventExecutor 的功能(就是迭代功能)。
3⃣️把 submit()和 execute()方法的返回值改写为 Netty 自己的 Future(扩展了 JUC 的 Future,添加了 Netty 自己的功能)。
🍟EventExecutor
刚刚我们提到了 EventExecutor 接口,它是 Netty 中一个事件执行接口,主要提供判断当前执行线程是否是 EventLoop 线程的方法,还有 parent()方法,返回管理它的 EventExecutorGroup;通过重写 next()使之返回自己。EventExecutorGroup 负责对多个 EventExecutor 进行管理。
🍕EventLoopGroup
此时我们发现了 EventLoopGroup 也是继承自 EventExecutorLoop 的,而它重写了 next()方法来获得由他管理的 EventLoop。此外,还添加了注册 Channel 的方法,当我们把 Channel 注册给 EventLoopGroup 时,EventLoopGroup 会选取一个由它管理的 EventLoop 来负责对这个 Channel 进行事件轮询操作。
🥪AbstractEventExecutor
那 AbstractEventExecutor 是干啥的?从继承关系看,我们知道它是 EventExecutor 的一个抽象类实现,其实查看源码可以看到它仅仅实现了部分功能。
比如说,它实现了 next()方法,这个方法返回一个 EventExecutor,也就是它自己;注意,我们要明确一件事,EventExecutor 不一定要有 Group 对它管理。所以它的 parent(EventExecutorGroup)可以是空的。
🥙AbstractScheduledEventExecutor
则是为 EventExecutor 提供了定时调度的执行功能。
但是迄今为止,我们还是没有实现线程运行功能。
🌮OrderedEventExecuto
来我们继续哈!OrderedEventExecutor 只是一个标记接口,指明任务会按照提交顺序执行。
🥘SingleThreadEventExecutor
而 SingleThreadEventExecutor 虽然实现了线程池的 execute()方法,但是它的实现行为和我们预料的线程池实现有很大的不同,核心实现在 execute()方法,我们来看看:
然后看看 startThread():
发现它调用了 doStartThread()方法:
查看源码知道这个方法做的事情很单一,就是运行自定义的 run()方法,并在运行之后关闭线程池。而且注意到这里设置了 thread = Thread.currentThread();所以上述的 execute()只会进入 startThread()一次!
注意一下,这里还有一个小细节,就是 SingleThreadEventExecutor.this.run()跑在了一个 Runnable 里,而这个 Runnable 提交到了 executor 来执行,这个 executor 是作为参数传递过来的;通过源码我们可以知道这个 executor 是 MultithreadEventExecutorGroup 提供的。这个 executor 的 execute(task)会开辟新的线程去跑 task。所以这个 Runnable 是在新的线程执行的。那么处于这个 Runnable 里面的 run()方法最终也是跑在了一个全新的线程中,但是因为 run()自始至终只会被调用一次,所以每一个 EventExecutor 都只有一个新开辟的线程来执行逻辑操作。而这就实现了每个 EventLoop 对应一个线程的逻辑。
总结一下就是:
传参到 EventExecutor 实现类的 executor(java.util.concurrent.Executor)的 execute(Runnable)方法每次都会新建一个线程去执行 Runnable。
为了实现一个线程管理一个 Selector 和任务执行的目标,我们就不能放任任务(Runnable 对象)每次都被提交给属性 executor 去执行(否则每次都会开辟线程),所以我们我们把任务放到了任务队列。
但是任务队列的任务怎么被执行呢?答案是放在子类的 run()方法去执行,里面有一个死循环,会不断重复:轮询网络 I/O 操作+执行全部任务队列的任务;这么一个过程。
所以既然每次都会开辟一个新的线程,而 run()恰好可以不停地处理 I/O+任务队列,那就索性只把 run()提交给 executor 去做,这样就只有一个线程,同时又实现了 I/O+任务队列的逻辑。
注意啦!运行的这个 run()这个可不是通过实现 Runnable 接口实现的,是 SingleThreadEventExecutor 自定义的抽象方法:
子类,比如 NioEventLoop 则负责实现这个抽象方法。通过 debug 我们确实看到了,这个 SingleThreadEventExecutor.this.run()确实被 NioEventLoop 实现了,所以最终调用的是 NioEventLoop 的 run()方法。
🌯EventLoop
现在来看看 EventLoop。EventLoop 之于 EventLoopGroup 就像 EventExecutor 之于 EventExecutorGroup。EventLoop 重写了 EventExecutor 的 parent(),返回管理自己的 Group。
🥗SingleThreadEventLoop
SingleThreadEventLoop 就是倒数第二步了,它实现了注册逻辑 register。
🥫NioEventLoop
而我们需要的 select()操作,打开 Selector,设置连接感兴趣事件则是在 NioEventLoop 里面完成的。
前面我们提到,NioEventLoop 实现了 run()方法:
通过继承关系可以看到,EventLoop 的最终实现(包含那些特定于系统的)最终都是和 NioEventLoop 一样的继承层次。
这里需要明确一下,虽然 NioEventLoop 使用线程来执行所有的上述实际操作,但是这个线程不是它创建的,而是 NioEventLoopGroup 根据入参设定的线程数量生成如是数量的线程,然后绑定到每一个 NioEventLoop 上来实现的,详见下述源码。
EventLoopGroup
NioEventLoopGroup
同上面的原因,我们分析 NioEventLoopGroup。
在这里我们很容易注意到,NioEventLoop 的 AbstractEventExecutor 被替换成了 AbstractEventExecutorGroup。也就是管理 EventExecutor 的 Group 的抽象实现。
🍝AbstractEventExecutorGroup
AbstractEventExecutorGroup 实现比较简单粗暴,它主要实现了 submit()和 execute()方法(这两个方法来自 JUC)和优雅关闭操作。并且对于 submit()和 execute()的实现是通过简单的调用 next()方法得到 EventExecutor,然后通过 EventExecutor 来执行的。
通过查看源码,我们可以很容易了解这一点:
🍜MultithreadEventExecutorGroup
通过理解 NioEventLoop 我们知道,抽象类实现的扩展是通过单线程模式的 EventExecutor 来实现的,那么对于“Group 版”,既然它管理多个 EventExecutor 的话,那它肯定是多个单线程的集合,也就是多个线程。所以有了 MultithreadEventExecutorGroup 这个类。
MultithreadEventExecutorGroup 定义了一个新的方法,叫做 newChild()。这个方法作用是:返回一个和可以被 next()方法返回的 EventExecutor,同时这个方法会为每一个线程都调用一次。换句话说,对于 MultithreadEventExecutorGroup 管理的 N 个线程,我们通过 newChild()方法返回 N 个 EventExecutor,每一个对应一个线程。此方法需要子类重写。
同时,它还有一个 Chooser 属性,用来通过自定义/默认策略从可用 EventExecutor 数组中选择一个作为 next()方法的返回值。此外,它的迭代器方法返回一个不可更改的迭代器对象。
这个类的构造器实现很有意思。EventLoopGroup 很大程度上依赖了这个构造器构造出来的一些数据。
首先来看看它的字段吧!
然后来看看它的构造器:
EventExecutor 数组元素通过 newChild()方法进行填充,这和我们上面分析一致。然后就是构建 Chooser 和不可变的用来做迭代的集合。
🍲MultithreadEventLoopGroup
MultithreadEventLoopGroup 则是实现了注册逻辑,对应了 SingleThreadEventLoop 的注册逻辑,它的注册逻辑就是通过调用 next()方法获取 EventLoop,然后注册到 EventLoop 来实现。
🍛NioEventLoopGroup
NioEventLoopGroup 则是只实现了 newChild()方法,这就是我们前面说的创建 EventExecutor 并绑定一个线程的实现。本质上是由 MultithreadEventExecutorGroup 传递参数,由专门化的 EventLoopGroup 接收参数并生成特定于某一平台或方式 EventExecutor。
小结
至此我们的 EventLoop/EventLoopGroup 讲完了,我们发现,Netty 是这样实现的:
封装 JUC 的 Executors 得到 EventExecutor,它是事件执行的执行体,但是这个执行体实现略微特殊罢了。
对于管理多个 EventExecutor 的想法,诞生出了 EventExecutorGroup 系列;但是 Netty 选择让 EventExecutor 继承自 EventExecutorGroup 的方式来实现这种组合操作,在开始阅读时有点头昏。
但是目前我们只是封装了执行体,没有对其实现,而且我们知道 Netty 特定于网络操作,所以我们需要的事件应该是网络 I/O 事件才对,于是我们有了 EventLoop 和它对应的管理者 EventLoopGroup。
EventLoop 相比 EventExecutor 实现了多路复用轮询 I/O 事件的操作,使得我们可以拿 EventExecutor 去处理网络 I/O;说白了就是扩展了 EventExecutor。
虽然 EventExecutor 可以执行 Runnable,但是其实现是压入任务队列,然后在 XxxEventLoop 的 run()方法出队所有 tasks 来实现对 task 的执行的,且 XxxEventLoop 的 run()调用在新的线程,同时只会被调用一次,所以做到了一个 EventExecutor 拥有一个线程去执行所有给它的 tasks 的同时实现事件轮询。
ChannelInboundInvoker/ChannelOutboundInvoker
🍣ChannelInboundInvoker/ChannelOutboundInvoker
这两个接口其实没什么好说的,但是因为他们是很重要的基础接口:
所以我单独拎出来讲一下。ChannelInboundInvoker 定义了一堆入操作,比如 fireXxx,表示向入方向下一个触发(比如数据读取,异常捕获这种需要向下流传递的);而 ChannelOutboundInvoker 定义了一堆出操作,比如写操作,还有连接/绑定这种对外暴露的操作,以及要求从外界读的 read()操作等。
Channel
🍱Channel
Channel 是一个比较抽象的接口,直接分析不好入手,不妨我们先看看继承层次,试着理一下:
Channel 把 read 操作和 flush 操作自己保留,把 write 和 flush(空返回值版)扔给了 Unsafe 实现(Unsafe 是它的内部接口)。
🍫AbstractChannel
AbstractChannel 则把大部分实现扔给了 ChannelPipeline 去实现。但是 ChannelPipeline 是它构造的,它会构造一个 DefaultChannelPipeline 实例,入参就是它自己(Channel 实例)。
一般使用的构造器是这样的:
其中的 parent 一般指的是 ServerSocketChannel,也即在服务端创建这个 SocketChannel 的 ServerSocketChannel,在 Netty 就是封装的 NioServerSocketChannel。
🍩AbstractNioChannel
AbstractNioChannel 则是进一步封装成了 Nio 相关的操作,包括提供 NioUnsafe 接口。
🍪AbstractNioByteChannel
AbstractNioByteChannel 则是提供了把 I/O 特定于 ByteBuf 的操作,以字节形式读取写入数据,属于进一步具体化。所以它实现了读写操作,这两个最基础的功能。
🍯NioSocketChannel
NioSocketChannel 可以看成是最后的交互实现了,我们可以通过看看构造器猜到一些用法:
其中 SocketChannel 回到了 Java,它就是 java.nio.channels.SocketChannel。此类更多是实现了关闭管道的操作。
而整个 Channel 链需要交互的根本——java.nio.channels.SocketChannel 就是在这里提供的,我们来看看是谁调用了它的构造方法:
而这个 accept()方法我们就相当熟悉了:
Channel.Unsafe
🍿Channel.Unsafe
直接上继承层次,干脆明了:
首先 Unsafe 本身提供了一些直接与外界数据交互的方法,这些方法按理说只能被 Netty 内部调用,而不应该被用户代码调用。
🥛NioUnsafe
其中呢,NioUnsafe 扩展了 Unsafe,它最主要的方法就是,提供了访问 SelectableChannel 的能力:
🍵AbstractUnsafe
那 AbstractUnsafe 呢?它润色了很多主要的方法,比如刷新 flush,写出 write,注册 register。但是这些方法的实现还是委托给子类来做的,它仅仅把需要刷新/写出的消息加入到刷新缓冲区,需要注册的 Channel 包装一下,然后调用子类的 register 来完成注册逻辑。
☕️AbstractNioUnsafe
AbstractNioUnsafe 则是实现了大部分逻辑,包括返回 SelectableChannel 的逻辑(实现很简单,直接返回 javaChannel)。
🍺NioByteUnsafe 而 NioByteUnsafe 则实现了重要的 read()方法,负责把上一次读感兴趣的操作读到的数据放到 ByteBuf 中并 fire 到下一个 Handler。注意,doBeginRead()负责设置当前 Channel 为读感兴趣,read()负责把读到的数据放到 ByteBuf 供使用。
小结
Channel 系列,包括 Channel.Unsafe 都是一个很复杂的系列,因为它们封装了 JDK 的 SocketChannel,而且涉及到真正的数据写入写出,Netty 为了实现统一和方便,封装了太多的层次,每一个 Abstract 都有自己的职责和工作,每一个最终实现类都有自己被父类委托的方法,所以想要梳理清晰,最好结合继承层次一起分析。
ChannelPipeline
🥟ChannelPipeline
首先看结构层次:
ChannelPipeline 实现起来比较单一,因为它的实现类只有一个 DefaultChannelPipeline。而且它的方法可以分为以下四类:
1⃣️添加一个 ChannelHandler
2⃣️移除一个 ChannelHandler
3⃣️从头触发 ChannelHandler 链的 fireXxx()方法
4⃣️获取 ChannelHandler/ChannelHandlerContext/Channel
🦪DefaultChannelPipeline
我们直接查看 DefaultChannelPipeline 的源码,可以看到其实现是很简单的:
1⃣️对于 fireXxx 操作,则是调用 head 节点的 invokeXxx 方法实现的,所以如果调用 pipeline 的 fireXxx 会触发从头开始的操作,而调用 ChannelHandlerContext 的 fireXxx 则会触发下一个 Handler。
2⃣️对于读、写、bind、connect 等操作,则是调用 tail 节点的相关方法实现的,tail 的 write 很好理解,毕竟 tail 在最后,你想写出数据,从最后节点刷新当然是正确的。但是 tail 的 read 操作可能不是那么好理解,它不是 channelRead()操作,而是表示此时程序想要读,也就是注册读感兴趣事件;最终会变成调用 head 的 read 方法,而此时就会触发真正的读感兴趣事件的注册。
ChannelHandlerContext
🍤ChannelHandlerContext
我们直接看继承层次图会更好理解:
源码这么写到:ChannelHandlerContext 可以让当前 ChannelHandler 获得与它所在的 ChannelPipeline 和同属于同意 Pipeline 的其他 Handler 交互的能力。一般指的是与 Pipeline 中数据流方向上的下一个 Handler 交互的能力(其实就是那一堆 fireXxx 方法)。
其实我们可以看到,它提供的方法无非就是获取 ChannelPipeline、ChannelHandler、Channel、EventExecutor、和重写了 fireXxx 这五个操作。就没了。然后就是一些继承而来的操作。ChannelHandlerContext 如果单纯的看接口定义所能获得信息就是:提供了 ChannelHandler 与其他 ChannelHandler 和 Channel 与 ChannelPipeline 交互的操作。
而 ChannelHandlerContext 很多关键操作是有 AbstractChannelHandlerContext 来实现的:
🍙AbstractChannelHandlerContext
它把关键实现委托给 invokeXxx()方法,在 invokeXxx 中的逻辑基本是一样的:获取下一个 ChannelHandlerContext=>判断下一的 context 的 EventExecutor(其实是 ChannelHandler 的 EventExecutor)是否是轮询线程(Netty 可以为每一个 ChannelHandler 指定一个 EventExecutor)=>如果是,则直接调用下一个的 read 操作;如果不是,则在它(next)的 eventExecutor 中执行 read()操作。
这里我想说一下关于为某一 ChannelHandler 自定义执行上下文的事情,如果为某一个 ChannelHandler 指定了 EventExecutor 的话,AbstractChannelHandlerContext 在 invokeXxx 之前会加一个判断,因为 Singlethread.this.thread 在开辟 I/O 轮询线程时已经被设置为 I/O 轮询线程(且只会调用一次),所以调用 executor.inEventLoop()时会得到 false,然后就会在 executor 中执行,乍一看没毛病。但是如果你 debug 的话,会发现这个 Handler(自定义了执行上下文的 Handler)后面的所有 Handler 都会在新的线程上,而不会再切回到 I/O 线程,这是为什么呢?我明明没有为后面的开辟新的执行上下文啊?
答案还是 executor 的 inEventLoop()方法,此时后面的 Handler 因为没有指定 executor,所以它们的 executor 还是 I/O 线程,所以返回值为 true,所以会直接在当前线程运行,注意⚠️,当前线程不是 I/O 线程,而当前线程早就因为前一个 Handler 的自定义执行器切换到了新的线程,所以它也会在新的线程执行。
因为一开始的 HeadContext 是在 I/O 线程跑的,所以会导致后面的所有 Handler 都会在它的线程上跑,除非有谁显示地切换了执行上下文。所以 Netty 的 fireXxx 方法会导致后面的 Handler 运行在当前 Handler 所处的执行上下文中。
其他操作基本一样。
🍡DefaultChannelHandlerContext
而 AbstractChannelHandlerContext 是没有和 ChannelHandler 相关的实现的,它把 ChannelHandler 的相关实现委托给了它的实现类去做,而在 Netty 中,实现类只有一个,就是 DefaultChannelHandlerContext,所以它的实现也异常简单:
🍧HeadContext/TailContext
除此之外,我们还有两个比较特殊的 ChannelHandlerContext,我们之前说过,Netty 是通过链表结构来组织 ChannelHandler 的。所以肯定要有首尾节点作为边界来进行圈定。所以还有两个特殊的实现,就是 HeadContext 和 TailContext。
HeadContext 实现了 InboundHandler/OutboundHander 接口,也继承了 AbstractHandlerContext。其实我们可以猜猜,作为链表头,那肯定是要为读写负责的嘛,所以它肯定要作为双向 BoundHandler 存在,而且不出意外它还应该拥有真正从 Channel 读数据的能力,然后 fire 到后面的 Handler;而且还要可以写数据,把数据真的写入到 Channel。这是它身为头节点必须拥有的能力。
HeadContext 的实现的方法分为三种:
1⃣️直接忽略的,比如 handlerAdded/Removed 这种通知方法。
2⃣️调用 Channel.Unsafe 实现的,比如 bind,connect,read,write 这种实质性的对应于 JavaChannel 的方法。
3⃣️直接转发给下一个 Handler 的,因为头节点做界限和实际 Channel 作用,所以对于读到数据和写出数据,异常处理这种业务逻辑不应该属于它,所以直接 fire 给下一个。比如 channelXxx 方法。
TailContext 就更简单了,它仅仅作为尾界定符的作用,所有的方法都是空(其实有些方法仅仅打了日志,约等于为空)。
小结
ChannelHandlerContext 提供了 ChannelHandler 与同一 ChannelPipeline 下的后一个 ChannelHandler 交互的能力。
对于把请求转发给链中下一个 ChannelHandler 的实现,则是通过判断执行上下文再调用下一个 Handler 的相关方法实现的。
ChannelHandler
🍦ChannelHandler
这是一个 ChannelHandler 链的实际组成:
因为涉及逻辑组成,所以又可以看成下面这种方式:
直接看接口定义吧:
其实就俩方法,做回掉用的,一个是添加 Handler,一个是移除 Handler。
同时还有一个注解,@Shareable,指出这个 Handler 是否可以被共享,即,在多个 Pipeline 之间共享同一个实例。
🥧ChannelHandlerAdapter
而 ChannelHandlerAdapter 主要添加了通过解析注解判断这个 Handler 是否是可共享的方法,实现很简单,就不说了。
ChannelHander 我们不太好单独的拎出来说,因为实际使用时,一般要分入和出两个 Handler:
ChannelInboundHandler
🍰ChannelInboundHandler
如果说 ChannelHandler 提供了添加/移除 ChannelHandler 回掉的话,那么 ChannelInboundHandler 则提供了更细致的关于 Channel 的状态更改的回掉,说白了就是当 Channel 被注册,被激活,读到数据时,数据读取完时,遇到异常时,得到一个回调。
ChannelOutboundHandler
🍭ChannelOutboundHandler
ChannelOutboundHandler 则是会在 I/O 操作的出操作(bind,connect,write)可用时得到回调。
ChannelDuplexHandler
🍬ChannelDuplexHandler
一个双向的 Handler,同时拥有 In/Out 两种功能。
ByteBuf
ByteBuf 是 Netty 数据缓冲,操作,读写的核心类,先挖个坑,日后再填 ByteBuf 相关的源码。
参考
暂无
版权声明: 本文为 InfoQ 作者【CodeWithBuff】的原创文章。
原文链接:【http://xie.infoq.cn/article/46736e8fdf6ea300695b0d6e4】。文章转载请联系作者。
评论