写点什么

netty 系列之:NIO 和 netty 详解

作者:程序那些事
  • 2022 年 2 月 24 日
  • 本文字数:8787 字

    阅读完需:约 29 分钟

netty系列之:NIO和netty详解

简介

netty 为什么快呢?这是因为 netty 底层使用了 JAVA 的 NIO 技术,并在其基础上进行了性能的优化,虽然 netty 不是单纯的 JAVA nio,但是 netty 的底层还是基于的是 nio 技术。


nio 是 JDK1.4 中引入的,用于区别于传统的 IO,所以 nio 也可以称之为 new io。


nio 的三大核心是 Selector,channel 和 Buffer,本文我们将会深入探究 NIO 和 netty 之间的关系。

NIO 常用用法

在讲解 netty 中的 NIO 实现之前,我们先来回顾一下 JDK 中 NIO 的 selector,channel 是怎么工作的。对于 NIO 来说 selector 主要用来接受客户端的连接,所以一般用在 server 端。我们以一个 NIO 的服务器端和客户端聊天室为例来讲解 NIO 在 JDK 中是怎么使用的。


因为是一个简单的聊天室,我们选择 Socket 协议为基础的 ServerSocketChannel,首先就是 open 这个 Server channel:


ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress("localhost", 9527));serverSocketChannel.configureBlocking(false);
复制代码


然后向 server channel 中注册 selector:


Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
复制代码


虽然是 NIO,但是对于 Selector 来说,它的 select 方法是阻塞方法,只有找到匹配的 channel 之后才会返回,为了多次进行 select 操作,我们需要在一个 while 循环里面进行 selector 的 select 操作:


while (true) {            selector.select();            Set<SelectionKey> selectedKeys = selector.selectedKeys();            Iterator<SelectionKey> iter = selectedKeys.iterator();            while (iter.hasNext()) {                SelectionKey selectionKey = iter.next();                if (selectionKey.isAcceptable()) {                    register(selector, serverSocketChannel);                }                if (selectionKey.isReadable()) {                    serverResponse(byteBuffer, selectionKey);                }                iter.remove();            }            Thread.sleep(1000);        }
复制代码


selector 中会有一些 SelectionKey,SelectionKey 中有一些表示操作状态的 OP Status,根据这个 OP Status 的不同,selectionKey 可以有四种状态,分别是 isReadable,isWritable,isConnectable 和 isAcceptable。


当 SelectionKey 处于 isAcceptable 状态的时候,表示 ServerSocketChannel 可以接受连接了,我们需要调用 register 方法将 serverSocketChannel accept 生成的 socketChannel 注册到 selector 中,以监听它的 OP READ 状态,后续可以从中读取数据:


    private static void register(Selector selector, ServerSocketChannel serverSocketChannel)            throws IOException {        SocketChannel socketChannel = serverSocketChannel.accept();        socketChannel.configureBlocking(false);        socketChannel.register(selector, SelectionKey.OP_READ);    }
复制代码


当 selectionKey 处于 isReadable 状态的时候,表示可以从 socketChannel 中读取数据然后进行处理:


    private static void serverResponse(ByteBuffer byteBuffer, SelectionKey selectionKey)            throws IOException {        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();        socketChannel.read(byteBuffer);        byteBuffer.flip();        byte[] bytes= new byte[byteBuffer.limit()];        byteBuffer.get(bytes);        log.info(new String(bytes).trim());        if(new String(bytes).trim().equals(BYE_BYE)){            log.info("说再见不如不见!");            socketChannel.write(ByteBuffer.wrap("再见".getBytes()));            socketChannel.close();        }else {            socketChannel.write(ByteBuffer.wrap("你是个好人".getBytes()));        }        byteBuffer.clear();    }
复制代码


上面的 serverResponse 方法中,从 selectionKey 中拿到对应的 SocketChannel,然后调用 SocketChannel 的 read 方法,将 channel 中的数据读取到 byteBuffer 中,要想回复消息到 channel 中,还是使用同一个 socketChannel,然后调用 write 方法回写消息给 client 端,到这里一个简单的回写客户端消息的 server 端就完成了。


接下来就是对应的 NIO 客户端,在 NIO 客户端需要使用 SocketChannel,首先建立和服务器的连接:


socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9527));
复制代码


然后就可以使用这个 channel 来发送和接受消息了:


    public String sendMessage(String msg) throws IOException {        byteBuffer = ByteBuffer.wrap(msg.getBytes());        String response = null;        socketChannel.write(byteBuffer);        byteBuffer.clear();        socketChannel.read(byteBuffer);        byteBuffer.flip();        byte[] bytes= new byte[byteBuffer.limit()];        byteBuffer.get(bytes);        response =new String(bytes).trim();        byteBuffer.clear();        return response;    }
复制代码


向 channel 中写入消息可以使用 write 方法,从 channel 中读取消息可以使用 read 方法。


这样一个 NIO 的客户端就完成了。


虽然以上是 NIO 的 server 和 client 的基本使用,但是基本上涵盖了 NIO 的所有要点。接下来我们来详细了解一下 netty 中 NIO 到底是怎么使用的。

NIO 和 EventLoopGroup

以 netty 的 ServerBootstrap 为例,启动的时候需要指定它的 group,先来看一下 ServerBootstrap 的 group 方法:


public ServerBootstrap group(EventLoopGroup group) {        return group(group, group);    }
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { ...}
复制代码


ServerBootstrap 可以接受一个 EventLoopGroup 或者两个 EventLoopGroup,EventLoopGroup 被用来处理所有的 event 和 IO,对于 ServerBootstrap 来说,可以有两个 EventLoopGroup,对于 Bootstrap 来说只有一个 EventLoopGroup。两个 EventLoopGroup 表示 acceptor group 和 worker group。


EventLoopGroup 只是一个接口,我们常用的一个实现就是 NioEventLoopGroup,如下所示是一个常用的 netty 服务器端代码:


        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        public void initChannel(SocketChannel ch) throws Exception {                            ch.pipeline().addLast(new FirstServerHandler());                        }                    })                    .option(ChannelOption.SO_BACKLOG, 128)                    .childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口并开始接收连接 ChannelFuture f = b.bind(port).sync(); // 等待server socket关闭 f.channel().closeFuture().sync();
复制代码


这里和 NIO 相关的有两个类,分别是 NioEventLoopGroup 和 NioServerSocketChannel,事实上在他们的底层还有两个类似的类分别叫做 NioEventLoop 和 NioSocketChannel,接下来我们分别讲解一些他们的底层实现和逻辑关系。

NioEventLoopGroup

NioEventLoopGroup 和 DefaultEventLoopGroup 一样都是继承自 MultithreadEventLoopGroup:


public class NioEventLoopGroup extends MultithreadEventLoopGroup 
复制代码


他们的不同之处在于 newChild 方法的不同,newChild 用来构建 Group 中的实际对象,NioEventLoopGroup 来说,newChild 返回的是一个 NioEventLoop 对象,先来看下 NioEventLoopGroup 的 newChild 方法:


    protected EventLoop newChild(Executor executor, Object... args) throws Exception {        SelectorProvider selectorProvider = (SelectorProvider) args[0];        SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];        RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];        EventLoopTaskQueueFactory taskQueueFactory = null;        EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length; if (argsLength > 3) { taskQueueFactory = (EventLoopTaskQueueFactory) args[3]; } if (argsLength > 4) { tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4]; } return new NioEventLoop(this, executor, selectorProvider, selectStrategyFactory.newSelectStrategy(), rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory); }
复制代码


这个 newChild 方法除了固定的 executor 参数之外,还可以根据 NioEventLoopGroup 的构造函数传入的参数来实现更多的功能。


这里参数中传入了 SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler、taskQueueFactory 和 tailTaskQueueFactory 这几个参数,其中后面的两个 EventLoopTaskQueueFactory 并不是必须的。


最后所有的参数都会传递给 NioEventLoop 的构造函数用来构造出一个新的 NioEventLoop。


在详细讲解 NioEventLoop 之前,我们来研读一下传入的这几个参数类型的实际作用。

SelectorProvider

SelectorProvider 是 JDK 中的类,它提供了一个静态的 provider()方法可以从 Property 或者 ServiceLoader 中加载对应的 SelectorProvider 类并实例化。


另外还提供了 openDatagramChannel、openPipe、openSelector、openServerSocketChannel 和 openSocketChannel 等实用的 NIO 操作方法。

SelectStrategyFactory

SelectStrategyFactory 是一个接口,里面只定义了一个方法,用来返回 SelectStrategy:


public interface SelectStrategyFactory {
SelectStrategy newSelectStrategy();}
复制代码


什么是 SelectStrategy 呢?


先看下 SelectStrategy 中定义了哪些 Strategy:


    int SELECT = -1;
int CONTINUE = -2;
int BUSY_WAIT = -3;
复制代码


SelectStrategy 中定义了 3 个 strategy,分别是 SELECT、CONTINUE 和 BUSY_WAIT。


我们知道一般情况下,在 NIO 中 select 操作本身是一个阻塞操作,也就是 block 操作,这个操作对应的 strategy 是 SELECT,也就是 select block 状态。


如果我们想跳过这个 block,重新进入下一个 event loop,那么对应的 strategy 就是 CONTINUE。


BUSY_WAIT 是一个特殊的 strategy,是指 IO 循环轮询新事件而不阻塞,这个 strategy 只有在 epoll 模式下才支持,NIO 和 Kqueue 模式并不支持这个 strategy。

RejectedExecutionHandler

RejectedExecutionHandler 是 netty 自己的类,和 java.util.concurrent.RejectedExecutionHandler 类似,但是是特别针对 SingleThreadEventExecutor 来说的。这个接口定义了一个 rejected 方法,用来表示因为 SingleThreadEventExecutor 容量限制导致的任务添加失败而被拒绝的情况:


void rejected(Runnable task, SingleThreadEventExecutor executor);
复制代码

EventLoopTaskQueueFactory

EventLoopTaskQueueFactory 是一个接口,用来创建存储提交给 EventLoop 的 taskQueue:


Queue<Runnable> newTaskQueue(int maxCapacity);
复制代码


这个 Queue 必须是线程安全的,并且继承自 java.util.concurrent.BlockingQueue.


讲解完这几个参数,接下来我们就可以详细查看 NioEventLoop 的具体 NIO 实现了。

NioEventLoop

首先 NioEventLoop 和 DefaultEventLoop 一样,都是继承自 SingleThreadEventLoop:


public final class NioEventLoop extends SingleThreadEventLoop
复制代码


表示的是使用单一线程来执行任务的 EventLoop。


首先作为一个 NIO 的实现,必须要有 selector,在 NioEventLoop 中定义了两个 selector,分别是 selector 和 unwrappedSelector:


    private Selector selector;    private Selector unwrappedSelector;
复制代码


在 NioEventLoop 的构造函数中,他们是这样定义的:


        final SelectorTuple selectorTuple = openSelector();        this.selector = selectorTuple.selector;        this.unwrappedSelector = selectorTuple.unwrappedSelector;
复制代码


首先调用 openSelector 方法,然后通过返回的 SelectorTuple 来获取对应的 selector 和 unwrappedSelector。


这两个 selector 有什么区别呢?


在 openSelector 方法中,首先通过调用 provider 的 openSelector 方法返回一个 Selector,这个 Selector 就是 unwrappedSelector:


final Selector unwrappedSelector;unwrappedSelector = provider.openSelector();
复制代码


然后检查 DISABLE_KEY_SET_OPTIMIZATION 是否设置,如果没有设置那么 unwrappedSelector 和 selector 实际上是同一个 Selector:


DISABLE_KEY_SET_OPTIMIZATION 表示的是是否对 select key set 进行优化:


if (DISABLE_KEY_SET_OPTIMIZATION) {      return new SelectorTuple(unwrappedSelector);   }
SelectorTuple(Selector unwrappedSelector) { this.unwrappedSelector = unwrappedSelector; this.selector = unwrappedSelector; }
复制代码


如果 DISABLE_KEY_SET_OPTIMIZATION 被设置为 false,那么意味着我们需要对 select key set 进行优化,具体是怎么进行优化的呢?


先来看下最后的返回:


return new SelectorTuple(unwrappedSelector,                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
复制代码


最后返回的 SelectorTuple 第二个参数就是 selector,这里的 selector 是一个 SelectedSelectionKeySetSelector 对象。


SelectedSelectionKeySetSelector 继承自 selector,构造函数传入的第一个参数是一个 delegate,所有的 Selector 中定义的方法都是通过调用 delegate 来实现的,不同的是对于 select 方法来说,会首先调用 selectedKeySet 的 reset 方法,下面是以 isOpen 和 select 方法为例观察一下代码的实现:


    public boolean isOpen() {        return delegate.isOpen();    }
public int select(long timeout) throws IOException { selectionKeys.reset(); return delegate.select(timeout); }
复制代码


selectedKeySet 是一个 SelectedSelectionKeySet 对象,是一个 set 集合,用来存储 SelectionKey,在 openSelector()方法中,使用 new 来实例化这个对象:


final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
复制代码


netty 实际是想用这个 SelectedSelectionKeySet 类来管理 Selector 中的 selectedKeys,所以接下来 netty 用了一个高技巧性的对象替换操作。


首先判断系统中有没有 sun.nio.ch.SelectorImpl 的实现:


        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {            @Override            public Object run() {                try {                    return Class.forName(                            "sun.nio.ch.SelectorImpl",                            false,                            PlatformDependent.getSystemClassLoader());                } catch (Throwable cause) {                    return cause;                }            }        });
复制代码


SelectorImpl 中有两个 Set 字段:


    private Set<SelectionKey> publicKeys;    private Set<SelectionKey> publicSelectedKeys;
复制代码


这两个字段就是我们需要替换的对象。如果有 SelectorImpl 的话,首先使用 Unsafe 类,调用 PlatformDependent 中的 objectFieldOffset 方法拿到这两个字段相对于对象示例的偏移量,然后调用 putObject 将这两个字段替换成为前面初始化的 selectedKeySet 对象:


Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; }
复制代码


如果系统设置不支持 Unsafe,那么就用反射再做一次:


 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) {     return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) {     return cause; } selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
复制代码


在 NioEventLoop 中我们需要关注的一个非常重要的重写方法就是 run 方法,在 run 方法中实现了如何执行 task 的逻辑。


还记得前面我们提到的 selectStrategy 吗?run 方法通过调用 selectStrategy.calculateStrategy 返回了 select 的 strategy,然后通过判断 strategy 的值来进行对应的处理。


如果 strategy 是 CONTINUE,这跳过这次循环,进入到下一个 loop 中。


BUSY_WAIT 在 NIO 中是不支持的,如果是 SELECT 状态,那么会在 curDeadlineNanos 之后再次进行 select 操作:


strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());  switch (strategy) {  case SelectStrategy.CONTINUE:      continue;  case SelectStrategy.BUSY_WAIT:      // fall-through to SELECT since the busy-wait is not supported with NIO  case SelectStrategy.SELECT:      long curDeadlineNanos = nextScheduledTaskDeadlineNanos();      if (curDeadlineNanos == -1L) {          curDeadlineNanos = NONE; // nothing on the calendar      }      nextWakeupNanos.set(curDeadlineNanos);      try {          if (!hasTasks()) {              strategy = select(curDeadlineNanos);          }      } finally {          // This update is just to help block unnecessary selector wakeups          // so use of lazySet is ok (no race condition)          nextWakeupNanos.lazySet(AWAKE);      }      // fall through  default:
复制代码


如果 strategy > 0,表示有拿到了 SelectedKeys,那么需要调用 processSelectedKeys 方法对 SelectedKeys 进行处理:


    private void processSelectedKeys() {        if (selectedKeys != null) {            processSelectedKeysOptimized();        } else {            processSelectedKeysPlain(selector.selectedKeys());        }    }
复制代码


上面提到了 NioEventLoop 中有两个 selector,还有一个 selectedKeys 属性,这个 selectedKeys 存储的就是 Optimized SelectedKeys,如果这个值不为空,就调用 processSelectedKeysOptimized 方法,否则就调用 processSelectedKeysPlain 方法。


processSelectedKeysOptimized 和 processSelectedKeysPlain 这两个方法差别不大,只是传入的要处理的 selectedKeys 不同。


处理的逻辑是首先拿到 selectedKeys 的 key,然后调用它的 attachment 方法拿到 attach 的对象:


final SelectionKey k = selectedKeys.keys[i];            selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }
复制代码


如果 channel 还没有建立连接,那么这个对象可能是一个 NioTask,用来处理 channelReady 和 channelUnregistered 的事件。


如果 channel 已经建立好连接了,那么这个对象可能是一个 AbstractNioChannel。


针对两种不同的对象,会去分别调用不同的 processSelectedKey 方法。


对第一种情况,会调用 task 的 channelReady 方法:


task.channelReady(k.channel(), k);
复制代码


对第二种情况,会根据 SelectionKey 的 readyOps()的各种状态调用 ch.unsafe()中的各种方法,去进行 read 或者 close 等操作。

总结

NioEventLoop 虽然也是一个 SingleThreadEventLoop,但是通过使用 NIO 技术,可以更好的利用现有资源实现更好的效率,这也就是为什么我们在项目中使用 NioEventLoopGroup 而不是 DefaultEventLoopGroup 的原因。


本文已收录于 http://www.flydean.com/05-2-netty-nioeventloop/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

发布于: 刚刚阅读数: 2
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:NIO和netty详解