写点什么

【Netty】「萌新入门」(三)ChannelFuture 与 CloseFuture

作者:sidiot
  • 2023-06-15
    浙江
  • 本文字数:3679 字

    阅读完需:约 12 分钟

前言


本篇博文是《从 0 到 1 学习 Netty》中入门系列的第三篇博文,主要内容是介绍 Netty 中 ChannelFuture 与 CloseFuture 的使用,解决连接问题与关闭问题,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;


连接问题与 ChannelFuture


在 Netty 中,所有的 I/O 操作都是异步的,因此当你发起一个 I/O 操作时,它会立即返回一个 ChannelFuture 对象,该对象代表了尚未完成的操作。ChannelFuture 提供了一种在操作完成时通知应用程序的机制,以便应用程序可以执行某些操作或检索操作的结果。


例如,在写入数据到 Channel 时,调用 write() 方法将立即返回一个 ChannelFuture 对象,而不是等待数据实际被写入。通过添加侦听器(Listener)到 ChannelFuture,当写操作完成时,侦听器将被通知,从而使应用程序能够对写入数据的结果做出响应。


sync


sync() 方法是 ChannelFuture 接口中的一个同步方法,它将阻塞当前线程,直到这个 ChannelFuture 执行完毕。调用 sync() 方法后会等待对应的 I/O 操作完成,如果操作失败则会抛出异常。


复用上篇博文 从0到1(七):入门-EventLoop 中的服务端代码,略微调整一下客户端代码如下:


@Slf4jpublic class ChannelFutureClient {    public static void main(String[] args) throws InterruptedException {        ChannelFuture channelFuture = new Bootstrap()                .group(new NioEventLoopGroup())                .channel(NioSocketChannel.class)                .handler(new ChannelInitializer<NioSocketChannel>() {                    @Override                    protected void initChannel(NioSocketChannel ch) throws Exception {                        ch.pipeline().addLast(new StringEncoder());                    }                })                .connect(new InetSocketAddress(7999));
channelFuture.sync(); Channel channel = channelFuture.channel(); log.debug(channel.toString()); channel.writeAndFlush("sidiot."); }}
复制代码


服务端运行结果:


20:24:04 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: sidiot..20:24:09 [DEBUG] [defaultEventLoopGroup-2-1] c.s.n.c.EventLoopServer - h2: sidiot..
复制代码


但如果将 channelFuture.sync(); 注释掉后,会发现客户端运行之后,服务端并没有像之前一样接收到消息。


客户端运行结果:


# 存在 sync()20:24:04 [DEBUG] [main] c.s.n.c.ChannelFutureClient - [id: 0x473d8e1a, L:/169.254.80.84:57837 - R:IDIOT/169.254.80.84:7999]
# 注释 sync()20:24:14 [DEBUG] [main] c.s.n.c.ChannelFutureClient - [id: 0x871ab919]
复制代码


这是因为 ChannelFuture 是用于异步操作结果通知的类。调用 sync() 将会阻塞当前线程,等待异步操作完成并获取其结果。如果注释掉了 sync() 方法,则程序不会等到连接建立成功后再向服务端发送消息,而是直接执行 writeAndFlush() 方法,此时连接还没有建立成功,所以服务端收不到客户端发的消息。


使用 sync() 方法可以保证在后续代码执行之前,完成当前的操作,这样可以避免一些并发问题。但是需要注意的是,由于 sync() 方法会阻塞当前线程,因此应该尽可能地避免在 I/O 线程中调用 sync() 方法,以免影响整个系统的性能表现。




addListener


除了 sync() 方法之外,我们还可以使用 addListener() 方法来处理结果。


在 Netty 中,addListener() 方法是异步方法,其作用是向 ChannelFuture 添加一个或多个 GenericFutureListener 监听器,用于监听异步操作(例如网络 I/O 操作)执行完成时的事件。当异步操作完成后,这些监听器会被通知,并且可以获取到操作的结果。


channelFuture.addListener(new ChannelFutureListener() {    @Override    public void operationComplete(ChannelFuture future) throws Exception {        Channel channel = future.channel();        log.debug(channel.toString());        channel.writeAndFlush("sidiot.");    }});
复制代码


运行结果:


# 服务端21:21:03 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: sidiot..21:21:08 [DEBUG] [defaultEventLoopGroup-2-6] c.s.n.c.EventLoopServer - h2: sidiot..
# 客户端21:21:03 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.ChannelFutureClient - [id: 0xc4465d09, L:/169.254.80.84:58393 - R:IDIOT/169.254.80.84:7999]
复制代码


对比使用 sync()addListener() 两个方法的客户端结果可以发现,使用 sync() 的客户端的处理线程是当前线程,即 main 线程,而 addListener() 因为是异步方法的关系,其客户端的处理线程就不是当前线程,而是 NIO 线程 nioEventLoopGroup-2-1




小结


sync()addListener() 都是用于在不同组件之间进行通信的方法,但它们的实现方式略有不同。


sync() 是一种通过将属性绑定到一个共享状态来实现组件之间通信的方法。当某个组件更改该绑定的属性时,其他所有使用该属性的组件都会自动更新。这种方法的优点是简单直接,能够快速实现组件之间的数据同步,但缺点是对于大型应用程序,使用全局状态管理可能会变得复杂和混乱。


相比之下,addListener() 则是一种更加灵活的方法,它允许组件之间精确地控制何时以及如何进行通信。addListener() 可以被用于创建事件监听器,使得一个组件可以注册到另一个组件中发生的事件的通知。当事件发生时,触发监听器并向其传递相应的数据。这种方法的优点是,更容易实现针对特定事件的精细控制,并且可以减少对全局状态的依赖。


因此,总的来说,addListener() 更灵活,并且可以更好地适应复杂的应用程序需求,而 sync() 则更适合简单的应用场景。

关闭问题与 CloseFuture


在前面的博文中,博主都是以 DEBUG 的形式来操作客户端的,但这时的客户端都不是被正常关闭的,因此,接下来修改一下代码,使得客户端能够不断向服务端发送消息,并在某一时刻能够被关闭:


Channel channel = channelFuture.sync().channel();log.debug(channel.toString());
new Thread(() -> { Scanner scanner = new Scanner(System.in); while (true) { String line = scanner.nextLine(); if ("quit".equals(line)) { channel.close(); break; } channel.writeAndFlush(line); }}, "input").start();
log.debug("处理 channel 关闭之后的操作");
复制代码


运行结果:



可以发现 “处理 channel 关闭之后的操作” 并没有等 channel 关闭之后再进行,这是因为在 input 线程运行过程中并没有阻塞主线程,因此,主线程就会继续向下运行,造成了上面的情况;


那如果将 “处理 channel 关闭之后的操作” 移动到 channel.close(); 后面是不是就可以了呢?


if ("quit".equals(line)) {      channel.close();      log.debug("处理 channel 关闭之后的操作");      break;  }
复制代码


接下来我们进行验证,在 pipeline 中新增一个 handler:


ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
复制代码


同时需要在配置文件 logback.xml 中增加下述代码:


<logger name="io.netty.handler.logging.LoggingHandler" level="DEBUG" additivity="false">      <appender-ref ref="STDOUT" />  </logger>
复制代码


运行结果:



根据运行结果可以发现,将 “处理 channel 关闭之后的操作” 移动到 channel.close(); 后面的方法也是行不通的,因为这两个操作不属于同一个线程;


“处理 channel 关闭之后的操作” 是在 input 线程中执行的,而 channel.close(); 则是在 NIO 线程 nioEventLoopGroup-2-1 中所执行的,因此两个线程谁先谁后是不一定的,这是由 CPU 调度器决定的;




这里,我们可以使用 closeFuture() 来解决问题,closeFuture() 方法可以让我们监听 Channel 关闭事件,从而在 Channel 关闭后执行一些特定的逻辑。例如,在处理连接断开的情况下,我们可以等待 closeFuture() 的完成,并在其完成后释放资源或清理状态。


closeFuture()ChannelFuture() 相似,同样是有同步方法 sync 和异步方法 addaddListener 两种方式;


sync


ChannelFuture closeFuture = channel.closeFuture();System.out.println("Waiting Close...");closeFuture.sync();log.debug("处理 channel 关闭之后的操作");
复制代码


运行结果:



addaddListener


ChannelFuture closeFuture = channel.closeFuture();System.out.println("Waiting Close...");closeFuture.addListener((ChannelFutureListener) future -> {      log.debug("处理 channel 关闭之后的操作");      group.shutdownGracefully();  });
复制代码


运行结果:



后记


以上就是 ChannelFuture 与 CloseFuture 的所有内容了,希望本篇博文对大家有所帮助!


参考:



📝 上篇精讲:「萌新入门」(二)剖析 EventLoop

💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;

👍 创作不易,请多多支持;

发布于: 2023-06-15阅读数: 19
用户头像

sidiot

关注

还未添加个人签名 2023-06-04 加入

还未添加个人简介

评论

发布
暂无评论
【Netty】「萌新入门」(三)ChannelFuture 与 CloseFuture_Java_sidiot_InfoQ写作社区