Netty 线程模型是指 Netty 框架为了提供高性能、高并发的网络通信,而设计的管理和利用线程的策略和机制。
Netty 线程模型被称为 Reactor(响应式)模型/模式,它是基于 NIO 多路复用模型的一种升级,它的核心思想是将 IO 事件和业务处理进行分离,使用一个或多个线程来执行任务的一种机制。
1.Reactor 三大组件
Reactor 包含以下三大组件:
其中:
Reactor(反应器):Reactor 负责监听和分发事件,它是整个 Reactor 模型的调度中心。Reactor 监视一个或多个输入通道,如监听套接字上的连接请求或读写事件。当检测到事件发生时,Reactor 会将其分发给预先注册的处理器(Handler)进行处理。在 Netty 中,这个角色经常是由 EventLoop 或其相关的 EventLoopGroup 来扮演,它们负责事件的循环处理、任务调度和 I/O 操作。
Acceptor(接收器):用于处理 IO 连接请求。当 Reactor 检测到有新的客户端连接请求时,会通知 Acceptor,后者通过 accept() 方法接受连接请求,并创建一个新的 SocketChannel(在 Netty 中是 Channel)来表示这个连接。随后,Acceptor 通常会将这个新连接的 Channel 注册到 Worker Reactor 或 EventLoop 中,以便进一步处理该连接上的读写事件。
Handlers(处理器):Handlers 负责具体的事件处理逻辑,即执行与事件相关的业务操作。在 Netty 中,Handler 是一个或多个 ChannelHandler 的实例,它们形成一个责任链(ChannelPipeline),每个 Handler 负责处理一种或一类特定的事件(如解码、编码、业务逻辑处理等)。数据或事件在 ChannelPipeline 中从一个 Handler 传递到下一个,直至处理完毕或被消费。Handler 可以分为入站(inbound)和出站(outbound)两种,分别处理流入的数据或流出的数据。
2.Reactor 三大模型
Reactor 模式支持以下三大模型:
单线程模型
多线程模型
主从多线程模型
具体内容如下。
2.1 单线程模型
在单线程模型中,所有的事件处理操作都由单个 Reactor 实例在单个线程下完成。Reactor 负责监控事件、分发事件和执行事件处理程序(Handlers),如下图所示:
单线程模型的实现 Demo 如下:
// 假设有一个单线程的Reactor,负责监听、接收连接、读写操作
class SingleThreadReactor {
EventLoop eventLoop; // 单个事件循环线程
SingleThreadReactor() {
eventLoop = new EventLoop(); // 初始化单个事件循环
}
void start(int port) {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port)); // 绑定端口
eventLoop.execute(() -> { // 在事件循环中执行
while (true) {
SocketChannel clientSocket = serverSocket.accept(); // 接受连接
if (clientSocket != null) {
handleConnection(clientSocket); // 处理连接
}
}
});
eventLoop.run(); // 启动事件循环
}
void handleConnection(SocketChannel clientSocket) {
// 读写操作,这里简化处理
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (clientSocket.read(buffer) > 0) {
// 处理读取的数据
buffer.flip();
// 假设处理数据逻辑...
buffer.clear();
}
// 写操作逻辑类似
}
}
复制代码
优缺点分析
2.2 多线程模型
在多线程模型中,连接 Acceptor 和业务处理(Handlers)是由不同线程分开执行的,其中 Handlers 是由线程池(多个线程)来执行的,如下图所示:
多线程模型的实现 Demo 如下:
// 假设有两个线程,一个用于监听连接,一个用于处理连接后的操作
class MultiThreadReactor {
EventLoop acceptLoop;
EventLoop workerLoop;
MultiThreadReactor() {
acceptLoop = new EventLoop(); // 接收连接的线程
workerLoop = new EventLoop(); // 处理连接的线程
}
void start(int port) {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
acceptLoop.execute(() -> { // 在接受线程中监听
while (true) {
SocketChannel clientSocket = serverSocket.accept();
if (clientSocket != null) {
workerLoop.execute(() -> handleConnection(clientSocket)); // 将新连接交给工作线程处理
}
}
});
acceptLoop.run(); // 启动接受线程
workerLoop.run(); // 启动工作线程
}
// handleConnection 方法与单线程模型中的相同
}
复制代码
优缺点分析
2.3 主从多线程模型
主从多线程模型是一个主 Reactor 线程加多个子 Reactor 子线程,以及多个工作线程池来处理业务的,如下图所示:
主从多线程模型的实现 Demo 如下:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MainReactorModel {
public static void main(String[] args) {
// 主Reactor,用于接受连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 从Reactor,用于处理连接后的读写操作
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 在这里添加业务处理器,如解码器、编码器、业务逻辑处理器
ch.pipeline().addLast(new MyBusinessHandler());
}
});
ChannelFuture future = bootstrap.bind(8080).sync();
System.out.println("Server started at port 8080");
future.channel().closeFuture().sync(); // 等待服务器关闭
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
复制代码
优缺点分析
课后思考
NioEventLoop 是如何实现的?它能够保证 Channel 操作的线程安全吗?为什么?
本文已收录到我的面试小站 www.javacn.site,其中包含的内容有:Redis、JVM、并发、并发、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、设计模式、消息队列等模块。
评论