全网讲解最透彻:高性能网络应用框架 Netty,仅此一篇
void Reactor : : handle_events()
{
/*
通过同步事件多路选择器提供的
select() 方法监听网络事件
*/
select( handlers );
/* 处理网络事件 */
for ( h in handlers )
{
h.handle_event();
}
}
/* 在主程序中启动事件循环 */
while ( true )
{
handle_events();
Netty 中的线程模型
============
Netty 的实现虽然参考了 Reactor 模式,但是并没有完全照搬,Netty 中最核心的概念是事件循环(EventLoop),其实也就是 Reactor 模式中的 Reactor,负责监听网络事件并调用事件处理器进行处理。在 4.x 版本的 Netty 中,网络连接和 EventLoop 是稳定的多对 1 关系,而 EventLoop 和 Java 线程是 1 对 1 关系,这里的稳定指的是关系一旦确定就不再发生变化。也就是说一个网络连接只会对应唯一的一个 EventLoop,而一个 EventLoop 也只会对应到一个 Java 线程,所以一个网络连接只会对应到一个 Java 线程。
一个网络连接对应到一个 Java 线程上,有什么好处呢?最大的好处就是对于一个网络连接的事件处理是单线程的,这样就避免了各种并发问题。
Netty 中的线程模型可以参考下图,这个图和前面我们提到的理想的线程模型图非常相似,核心目标都是用一个线程处理多个网络连接。
Netty 中的线程模型
Netty 中还有一个核心概念是 EventLoopGroup,顾名思义,一个 EventLoopGroup 由一组 EventLoop 组成。实际使用中,一般都会创建两个 EventLoopGroup,一个称为 bossGroup,一个称为 workerGroup。为什么会有两个 EventLoopGroup 呢?
这个和 socket 处理网络请求的机制有关,socket 处理 TCP 网络连接请求,是在一个独立的 socket 中,每当有一个 TCP 连接成功建立,都会创建一个新的 socket,之后对 TCP 连接的读写都是由新创建处理的 socket 完成的。也就是说处理 TCP 连接请求和读写请求是通过两个不同的 socket 完成的。上面我们在讨论网络请求的时候,为了简化模型,只是讨论了读写请求,而没有讨论连接请求。
在 Netty 中,bossGroup 就用来处理连接请求的,而 workerGroup 是用来处理读写请求的。bossGroup 处理完连接请求后,会将这个连接提交给 workerGroup 来处理,workerGroup 里面有多个 EventLoop,那新的连接会交给哪个 EventLoop 来处理呢?这就需要一个负载均衡算法,Netty 中目前使用的是轮询算法。
下面我们用 Netty 重新实现以下 echo 程序的服务端,近距离感受一下 Netty。
用 Netty 实现 Echo 程序服务端
=====================
下面的示例代码基于 Netty 实现了 echo 程序服务端:首先创建了一个事件处理器(等同于 Reactor 模式中的事件处理器),然后创建了 bossGroup 和 workerGroup,再之后创建并初始化了 ServerBootstrap,代码还是很简单的,不过有两个地方需要注意一下。
第一个,如果 NettybossGroup 只监听一个端口,那 bossGroup 只需要 1 个 EventLoop 就可以了,多了纯属浪费。
第二个,默认情况下,Netty 会创建“2*CPU 核数”个 EventLoop,由于网络连接与 EventLoop 有稳定的关系,所以事件处理器在处理网络事件的时候是不能有阻塞操作的,否则很容易导致请求大面积超时。如果实在无法避免使用阻塞操作,那可以通过线程池来异步处理。
/* 事件处理器 */
final EchoServerHandler serverHandler
= new EchoServerHandler();
/* boss 线程组 */
EventLoopGroup bossGroup
= new NioEventLoopGrou
p( 1 );
/* worker 线程组 */
EventLoopGroup workerGroup
= new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group( bossGroup, workerGroup )
.channel( NioServerSocketChannel.class )
.childHandler( new ChannelInitializer<SocketChannel>()
{
@Override
public void initChannel( SocketChannel ch )
{
ch.pipeline().addLast( serverHandler );
}
} );
/* bind 服务端端口 */
ChannelFuture f = b.bind( 9090 ).sync();
f.channel().closeFuture().sync();
} finally {
/* 终止工作线程组 */
workerGroup.shutdownGracefully();
/* 终止 boss 线程组 */
bossGroup.shutdownGracefully();
}
/* socket 连接处理器 */
class EchoServerHandler extends
ChannelInboundHandlerAdapter {
/* 处理读事件 */
@Override
public void channelRead(
ChannelHandlerContext ctx, Object msg )
{
评论