写点什么

netty 系列之:channel 和 channelGroup

作者:程序那些事
  • 2022 年 1 月 19 日
  • 本文字数:4332 字

    阅读完需:约 14 分钟

netty系列之:channel和channelGroup

简介

channel 是 netty 中数据传输和数据处理的渠道,也是 netty 程序中不可或缺的一环。在 netty 中 channel 是一个接口,针对不同的数据类型或者协议 channel 会有具体的不同实现。


虽然 channel 很重要,但是在代码中确实很神秘,基本上我们很少能够看到直接使用 channel 的情况,那么事实真的如此吗?和 channel 相关的 ChannelGroup 又有什么作用呢?一起来看看吧。

神龙见首不见尾的 channel

其实 netty 的代码是有固定的模板的,首先根据是 server 端还是 client 端,然后创建对应的 Bootstrap 和 ServerBootstrap。然后给这个 Bootstrap 配置对应的 group 方法。然后为 Bootstrap 配置 channel 和 handler,最后启动 Bootstrap 即可。


这样一个标准的 netty 程序就完成了。你需要做的就是为其挑选合适的 group、channel 和 handler。


我们先看一个最简单的 NioServerSocketChannel 的情况:


EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class)             .handler(new LoggingHandler(LogLevel.INFO))             .childHandler(new ChatServerInitializer());
b.bind(PORT).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
复制代码


这里,我们将 NioServerSocketChannel 设置为 ServerBootstrap 的 channel。


这样就完了吗?channel 到底是在哪里用到的呢?


别急,我们仔细看一下 try block 中的最后一句:


b.bind(PORT).sync().channel().closeFuture().sync();
复制代码


b.bind(PORT).sync()实际上返回了一个 ChannelFuture 对象,通过调用它的 channel 方法,就返回了和它关联的 Channel 对象。


然后我们调用了 channel.closeFuture()方法。closeFuture 方法会返回一个 ChannelFuture 对象,这个对象将会在 channel 关闭的时候收到通知。


而 sync 方法会实现同步阻塞,一直等到 channel 关闭为止,从而进行后续的 eventGroup 的 shutdown 操作。


在 ServerBootstrap 中构建模板中,channel 其实有两个作用,第一个作用是指定 ServerBootstrap 的 channel,第二个作用就是通过 channel 获取到 channel 关闭的事件,最终关闭整个 netty 程序。


虽然我们基本上看不到 channel 的直接方法调用,但是 channel 毋庸置疑,它就是 netty 的灵魂。


接下来我们再看一下具体消息处理的 handler 的基本操作:


    public void channelActive(ChannelHandlerContext ctx) throws Exception {        // channel活跃        ctx.write("Channel Active状态!\r\n");        ctx.flush();    }
复制代码


通常如果需要在 handler 中向 channel 写入数据,我们调用的是 ChannelHandlerContext 的 write 方法。这个方法和 channel 有什么关系呢?


首先 write 方法是 ChannelOutboundInvoker 接口中的方法,而 ChannelHandlerContext 和 Channel 都继承了 ChannelOutboundInvoker 接口,也就是说,ChannelHandlerContext 和 Channel 都有 write 方法:


ChannelFuture write(Object msg);
复制代码


因为这里我们使用的是 NioServerSocketChannel,所以我们来具体看一下 NioServerSocketChannel 中 write 的实现。


经过检查代码我们会发现 NioServerSocketChannel 继承自 AbstractNioMessageChannel,AbstractNioMessageChannel 继承自 AbstractNioChannel,AbstractNioChannel 继承自 AbstractChannel,而这个 write 方法就是 AbstractChannel 中实现的:


    public ChannelFuture write(Object msg) {        return pipeline.write(msg);    }
复制代码


Channel 的 write 方法,实际上调用了 pipeline 的 write 方法。下面是 pipeLine 中的 write 方法:


    public final ChannelFuture write(Object msg) {        return tail.write(msg);    }
复制代码


这里的 tail 是一个 AbstractChannelHandlerContext 对象。


这样我们就得出了这样的结论:channel 中的 write 方法最终实际上调用的是 ChannelHandlerContext 中的 write 方法。


所以上面的:


ctx.write("Channel Active状态!\r\n");
复制代码


实际上可以直接从 channel 中调用:


Channel ch = b.bind(0).sync().channel();
// 将消息写入channel中ch.writeAndFlush("Channel Active状态!\r\n").sync();
复制代码

channel 和 channelGroup

channel 是 netty 的灵魂,对于 Bootstrap 来说,要获取到对应的 channel,可以通过调用:


b.bind(PORT).sync().channel()
复制代码


来得到,从上面代码中我们也可以看到一个 Bootstrap 只会对应一个 channel。


channel 中有一个 parent()方法,用来返回它的父 channel,所以 channel 是有层级结构的,


我们再来看一下 channelGroup 的定义:


public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> 
复制代码


可以看到 ChannelGroup 实际上是 Channel 的集合。ChannelGroup 用来将类似的 Channel 构建成集合,从而可以对多个 channel 进行统一的管理。


可以能有小伙伴要问了,一个 Bootstrap 不是只对应一个 channel 吗?那么哪里来的 channel 的集合?


事实上,在一些复杂的程序中,我们可能启动多个 Bootstrap 来处理不同的业务,所以相应的就会有多个 channel。


如果创建的 channel 过多,并且这些 channel 又是很同质化的时候,就有需求对这些 channel 进行统一管理。这时候就需要用到 channelGroup 了。

channelGroup 的基本使用

先看下 channelGroup 的基本使用,首先是创建一个 channelGroup:


ChannelGroup recipients =           new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
复制代码


有了 channelGroup 之后,可以调用 add 方法,向其中添加不同的 channel:


   recipients.add(channelA);   recipients.add(channelB);
复制代码


并且还可以统一向这些 channel 中发送消息:


recipients.write(Unpooled.copiedBuffer(           "这是从channelGroup中发出的统一消息.",           CharsetUtil.UTF_8));
复制代码


基本上 channelGroup 提供了 write,flush,flushAndWrite,writeAndFlush,disconnect,close,newCloseFuture 等功能,用于对集合中的 channel 进行统一管理。


如果你有多个 channel,那么可以考虑使用 channelGroup。


另外 channelGroup 还有一些特性,我们来详细了解一下。

将关闭的 channel 自动移出

ChannelGroup 是一个 channel 的集合,当然我们只希望保存 open 状态的 channel,如果是 close 状态的 channel,还要手动从 ChannelGroup 中移出的话实在是太麻烦了。


所以在 ChannelGroup 中,如果一个 channel 被关闭了,那么它会自动从 ChannelGroup 中移出,这个功能是怎么实现的呢?


先来看下 channelGroup 的 add 方法:


   public boolean add(Channel channel) {        ConcurrentMap<ChannelId, Channel> map =            channel instanceof ServerChannel? serverChannels : nonServerChannels;
boolean added = map.putIfAbsent(channel.id(), channel) == null; if (added) { channel.closeFuture().addListener(remover); }
if (stayClosed && closed) { channel.close(); }
return added; }
复制代码


可以看到,在 add 方法中,为 channel 区分了是 server channel 还是非 server channel。然后根据 channel id 将其存入 ConcurrentMap 中。


如果添加成功,则给 channel 添加了一个 closeFuture 的回调。当 channel 被关闭的时候,会去调用这个 remover 方法:


private final ChannelFutureListener remover = new ChannelFutureListener() {        @Override        public void operationComplete(ChannelFuture future) throws Exception {            remove(future.channel());        }    };
复制代码


remover 方法会将 channel 从 serverChannels 或者 nonServerChannels 中移出。从而保证 ChannelGroup 中只保存 open 状态的 channel。

同时关闭 serverChannel 和 acceptedChannel

虽然 ServerBootstrap 的 bind 方法只会返回一个 channel,但是对于 server 来说,可以有多个 worker EventLoopGroup,所以当客户端和服务器端建立连接之后建立的 accepted Channel 是 server channel 的子 channel。


也就是说一个服务器端有一个 server channel 和多个 accepted channel。


那么如果我们想要同时关闭这些 channel 的话, 就可以使用 ChannelGroup 的 close 方法。


因为如果 Server channel 和非 Server channel 在同一个 ChannelGroup 的话,所有的 IO 命令都会先发给 server channel,然后才会发给非 server channel。


所以我们可以将 Server channel 和非 Server channel 统统加入同一个 ChannelGroup 中,在程序的最后,统一调用 ChannelGroup 的 close 方法,从而达到该目的:


   ChannelGroup allChannels =           new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);     public static void main(String[] args) throws Exception {       ServerBootstrap b = new ServerBootstrap(..);       ...       b.childHandler(new MyHandler());         // 启动服务器       b.getPipeline().addLast("handler", new MyHandler());       Channel serverChannel = b.bind(..).sync();       allChannels.add(serverChannel);         ... 等待shutdown指令 ...         // 关闭serverChannel 和所有的 accepted connections.       allChannels.close().awaitUninterruptibly();   }     public class MyHandler extends ChannelInboundHandlerAdapter {        @Override       public void channelActive(ChannelHandlerContext ctx) {           // 将accepted channel添加到allChannels中           allChannels.add(ctx.channel());           super.channelActive(ctx);       }   }
复制代码

ChannelGroupFuture

另外,和 channel 一样,channelGroup 的操作都是异步的,它会返回一个 ChannelGroupFuture 对象。


我们看下 ChannelGroupFuture 的定义:


public interface ChannelGroupFuture extends Future<Void>, Iterable<ChannelFuture>
复制代码


可以看到 ChannelGroupFuture 是一个 Future,同时它也是一个 ChannelFuture 的遍历器,可以遍历 ChannelGroup 中所有 channel 返回的 ChannelFuture。


同时 ChannelGroupFuture 提供了 isSuccess,isPartialSuccess,isPartialFailure 等方法判断命令是否部分成功。


ChannelGroupFuture 还提供了 addListener 方法用来监听具体的事件。

总结

channel 是 netty 的核心,当我们有多个 channel 不便进行管理的时候,就可以使用 channelGroup 进行统一管理。


本文已收录于 http://www.flydean.com/04-1-netty-channel-group/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

发布于: 20 小时前阅读数: 19
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:channel和channelGroup