写点什么

源码分析 Netty:核心组件及启动过程分析

发布于: 2021 年 03 月 22 日
源码分析Netty:核心组件及启动过程分析

系列文章:

源码分析 -Netty:开篇

源码分析 -Netty:多线程在 Netty 中的应用

源码分析-Netty: 并发编程的实践(二)

源码分析-Netty: 架构剖析

源码分析-Netty: 高性能之道


一 Netty 核心组件

1.1 Channel

Channel(通道)是 NIO 基本的结构。JDK 的 NIO 包中,有Channel接口的介绍:

A nexus for I/O operations.

A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.

A channel is either open or closed. A channel is open upon creation, and once closed it remains closed. Once a channel is closed, any attempt to invoke an I/O operation upon it will cause a ClosedChannelException to be thrown. Whether or not a channel is open may be tested by invoking its isOpen method.

Channels are, in general, intended to be safe for multithreaded access as described in the specifications of the interfaces and classes that extend and implement this interface.

根据上述文档的描述,Channel 是 I/O 操作的连接(关系),表示与能够执行一个或多个不同 I/O 操作(例如读取或写入)的实体(例如硬件设备、文件、网络套接字或程序组件)的开放连接。

通道是“打开”或“关闭”的。通道在创建时是开放的,一旦关闭它就会保持关闭。一旦通道关闭,对其调用 I/O 操作的任何尝试都将导致引发 ClosedChannelException。通道是否打开可以通过调用其 isOpen 方法进行测试;通道通常是为了安全地进行多线程访问,如扩展和实现该接口的接口和类的规范所述。


1.2 Callback

回调。Callback 已经是一种非常常见的异步实现方法,用于通知调用方操作已完成。可以简单理解为一个方法,提供给另一种方法作为引用,这样后者就可以在某个合适的时间调用前者。

1.3 Future

Future 提供了另外一种通知应用操作已经完成的方式。这个对象作为一个异步操作结果的占位符,它将在将来的某个时候完成并提供结果。

1.3.1 JDK 的 Future 接口

JDK 提供了 java.util.concurrent.Future 接口,Futrue 是个接口。Future 就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。

Future代表异步计算的结果,接口提供方法用于检查计算是否已完成,等待计算完成,然后取回计算结果。计算结果只能通过get方法返回;如果有必要会堵塞直到它计算完成。可以通过cancel方法取消。附加的方法可用于判断任务是否正常完成或者被取消。一旦计算完成,那么它不能被取消。如果你想要使用Future 来取消,但是不提供一个可用的结果,你可以声明Futrue 的类型,但会返回null 作为一个基本任务的结果。FutureTask 类是Futrue类的一个实现类,实现了Runnable接口,可以被Executor 执行。
复制代码

1.3.2 Future 提供的方法

1.3.2.1 cancel

boolean cancel(boolean mayInterruptIfRunning) 调用该方法将试图取消对任务的执行。如果任务已经完成了、已取消、无法取消这种尝试会失败。当该方法调用时任务还没有开始,方法调用成功而且任务将不会再执行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否执行此任务的线程应该以试图停止任务被中断。此方法返回后调用 isDone 方法将返回 true 。后续调用 isCancelled 总是返回第一次调用的返回值。

1.3.2.2 isCancelled

boolean isCancelled() 如果任务在完成前被取消,将返回 true。

请注意任务取消是一种主动的行为。

1.3.2.3 isDone

boolean isDone() 任务已经结束,在任务完成、任务取消、任务异常的情况下都返回 true 。

1.3.2.4 get

V get() throws InterruptedException, ExecutionException 调用此方法会在获取计算结果前等待。一但计算完毕将立刻返回结果。它还有一个重载方法 V get(long timeout, TimeUnit unit) 在单位时间内没有返回任务计算结果将超时,任务将立即结束。

1.3.2 ChannelFuture

JDK 的 Future 提供了一个异步获取执行结果的机制,但提供的方法有限,所以 Netty 自定义了 Future 接口。另外,Netty 中所有的 I/O 操作都是异步的,因为一个操作可能不会立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此,Netty 提供了 ChannelFuture 接口,其 addListener()方法注册了一个 ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。

ChannelFuture 提供多个方法来允许一个或者多个 ChannelFutureListener 实例。这个回调方法 operationComplete() 会在操作完成时调用。事件监听者能够确认这个操作是否成功或者是错误。如果是后者,我们可以检索到产生的 Throwable。简而言之, ChannelFutureListener 提供的通知机制不需要手动检查操作是否完成的。

每个 Netty 的 outbound I/O 操作都会返回一个 ChannelFuture;这样就不会阻塞。这就是 Netty 所谓的“自底向上的异步和事件驱动”。ChannelFuture 接口代码如下:

public interface ChannelFuture extends Future<Void> {    Channel channel();
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1);
ChannelFuture addListeners(GenericFutureListener... var1);
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1);
ChannelFuture removeListeners(GenericFutureListener... var1);
ChannelFuture sync() throws InterruptedException;
ChannelFuture syncUninterruptibly();
ChannelFuture await() throws InterruptedException;
ChannelFuture awaitUninterruptibly();
boolean isVoid();}
复制代码

1.4 Event 与 Handler

事件机制的重要组成部分,通过不同的事件来通知我们更改的状态或操作的状态。这使我们能够根据发生的事件触发适当的“行为”。行为可能包括日志记录、数据转换、流控制、应用程序逻辑等。

Netty 是一个网络框架,事件很清晰的跟入站(inbound)或出站(outbound)的数据流相关。因为一些事件可能触发传入的数据或状态的变化包括:

  • 活动或非活动连接

  • 数据的读取

  • 用户事件

  • 错误

出站事件是由于在未来操作将触发一个动作。这些包括:

  • 打开或关闭一个连接到远程

  • 写或冲刷数据到 socket

每个事件都可以分配给用户实现处理程序类的方法。这说明了事件驱动的范例可直接转换为应用程序构建块。

上图是 Netty 的事流图(图片来自 w3c School)。

Netty 的 ChannelHandler 是各种处理程序的基本抽象。想象下,每个处理器实例就是一个回调,用于执行对各种事件的响应。

在此基础之上,Netty 也提供了一组丰富的预定义的处理程序方便开箱即用。比如,各种协议的编解码器包括 HTTP 和 SSL/TLS。在内部,ChannelHandler 使用事件和 future 本身,创建具有 Netty 特性抽象的消费者。

1.5 Eventloop

EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。下图说明了 Channel、EventLoop、Thread 以及 EventLoopGroup 之间的关系。

1.6 Netty 异步模型

1.6.1 Future 和 Callback

Netty 的异步编程模型是建立在 Future 和 Callback 的概念上的。

拦截操作和转换入站/出站数据,只需要开发者提供回调或利用 Future 操作返回。这使得链操作简单、高效,促进编写可重用的、通用的代码。Netty 的一个主要设计目标是促进“关注点分离”,即把业务逻辑从网络基础设施程序中分离出来。

1.6.2 Selector, Event 和 Eventloop

Netty 通过触发事件从应用程序中抽象出 Selector,从而避免手写调度代码。EventLoop 分配给每个 Channel 来处理所有的事件,包括

  • 注册感兴趣的事件

  • 调度事件到 ChannelHandler

  • 安排进一步行动

EventLoop 本身是由一个线程驱动,它给一个 Channel 处理所有的 I/O 事件,并且在 EventLoop 的生命周期内不会改变(参考上篇介绍的串行化设计概念)。这个简单而强大的线程模型消除可能对 ChannelHandler 同步是否正确和合理性的关注,这样就可以专注于提供正确的回调逻辑。

二 Netty 启动过程分析

2.1 一个 Netty Server 示例

public class HttpServer {
public static int DEFAULT_PORT = 8080;
public static void main(String[] args) throws Exception { int port;
try { port = Integer.parseInt(args[0]); } catch (RuntimeException ex) { port = DEFAULT_PORT; }
// 多线程事件循环器 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // boss EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker try { ServerBootstrap b = new ServerBootstrap(); // 引导程序 b.group(bossGroup, workerGroup) // 设置EventLoopGroup .channel(NioServerSocketChannel.class) // 指明新的Channel的类型 .childHandler(new HttpServerChannelInitializer()) // 指定ChannelHandler .option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项 .childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项 // 绑定端口,开始接收进来的连接 ChannelFuture f = b.bind(port).sync();
System.out.println("HttpServer已启动,端口:" + port);
// 等待服务器 socket 关闭 。 // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。 f.channel().closeFuture().sync(); } finally { // 优雅关闭 workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }
}}
复制代码

这里还有一个依赖,HttpServerChannelInitializer,代码如下:

public class HttpServerChannelInitializer extends ChannelInitializer<SocketChannel> {
public HttpServerChannelInitializer() { } @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(1048576)); ch.pipeline().addLast("serverHandler", new HttpServerHandler()); }
}
复制代码

HttpServerHandler:

public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { this.readRequest(msg);
String sendMsg; String uri = msg.uri();
switch (uri) { case "/": sendMsg = "<h3>Netty HTTP Server</h3><p>Welcome to <a href=\"https://waylau.com\">waylau.com</a>!</p>"; break; case "/hi": sendMsg = "<h3>Netty HTTP Server</h3><p>Hello Word!</p>"; break; case "/love": sendMsg = "<h3>Netty HTTP Server</h3><p>I Love You!</p>"; break; default: sendMsg = "<h3>Netty HTTP Server</h3><p>I was lost!</p>"; break; }
this.writeResponse(ctx, sendMsg); }
private void readRequest(FullHttpRequest msg) { //请求行信息 System.out.println(msg.method() + " " + msg.uri() + " " + msg.protocolVersion());
//请求头信息 for (String name : msg.headers().names()) { System.out.println(name + ": " + msg.headers().get(name));
}
//消息体 System.out.println(msg.content().toString(CharsetUtil.UTF_8));
}
private void writeResponse(ChannelHandlerContext ctx, String msg) { ByteBuf bf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, bf); res.headers().set(HttpHeaderNames.CONTENT_LENGTH, msg.length()); ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); }
复制代码


我们可以直接执行这段代码,启动一个 HTTP 服务,端口作为参数输入,或者直接使用默认的 8080 端口。

2.2 配置分析

2.2.1 引导(BootStrap)

引导一个应用程序是指对它进行配置,并使它运行起来的过程——尽管该过程的具体细节可能并不如它的定义那样简单,尤其是对于一个网络应用程序来说。上面示例中的 ServerBootstrap 就是一个“引导”类,可以看出,ServerBootstrap 是负责串联章节一中介绍过的 Channel、EventLoopGroup、ChannelHandler、ChannelFuture 以及 参数配置等组件的。

引导类的层次结构:

2.2.2 引导过程

步骤如下:

1、设置 EventLoopGroup:提供了用于处理 Channel 事件的 EventLoop,在示例中我们定义了两个 EventLoopGroup,boss 和 work;

2、channel(NioServerSocketChannel.class):指定要使用的 Channel 实现;

3、childHandler(new HttpServerChannelInitializer()):设置用于处理已被接受的子 Channel 的 I/O 及数

据的 ChannelInbound-Handler;

4、b.bind(port):通过配置好的 ServerBootstrap 的实例绑定该 Channel

通过图片看引导过程如下:

2.2.3 ChannelInitializer

在 Server 启动时,在 childHandler()方法中设置了一个自定义的 HttpServerChannelInitializer()。在代码中,可以看到在 SockerChannel 中设置了以下三个内容:

ch.pipeline().addLast("codec", new HttpServerCodec());ch.pipeline().addLast("aggregator", new HttpObjectAggregator(1048576));ch.pipeline().addLast("serverHandler", new HttpServerHandler());
复制代码

2.2.3.1 codec

编解码的处理,HttpServerCodec 是 netty 针对 http 编解码的处理类,但是这些只能处理像 http get的请求,也就是数据带在url问号后面的 http 请求;

2.2.3.2 aggregator

用 POST 方式请求服务器的时候,对应的参数信息是保存在 message body 中的,如果只是单纯的用 HttpServerCodec 是无法完全的解析 Http POST 请求的,因为 HttpServerCodec 只能获取 uri 中参数,所以需要加上 HttpObjectAggregator。它把 HttpMessage 和 HttpContent 聚合成为一个 FullHttpRquest 或者 FullHttpRsponse,大致结构如下图所示:


2.2.3.3 serverHandler

示例使用了自定义的 HttpServerHandler,在 channelRead0 方法中针对请求的 uri 进行处理,在生成响应(writeResponse)时使用对应的信息。

三 总结

本篇从实例出发,了解 Netty 核心组件的概念、作用及串联过程。从概念到设计原理,再到深入了解实现细节,从而能够清晰地掌握 Netty 的技术细节甚至存在的问题,才能最终更好地支持我们实际的各项业务。


发布于: 2021 年 03 月 22 日阅读数: 15
用户头像

磨炼中成长,痛苦中前行 2017.10.22 加入

微信公众号【程序员架构进阶】。多年项目实践,架构设计经验。曲折中向前,分享经验和教训

评论

发布
暂无评论
源码分析Netty:核心组件及启动过程分析