写点什么

netty 系列之:channel,ServerChannel 和 netty 中的实现

作者:程序那些事
  • 2022 年 2 月 11 日
  • 本文字数:6286 字

    阅读完需:约 21 分钟

netty系列之:channel,ServerChannel和netty中的实现

简介

我们知道 channel 是 netty 中用于沟通 ByteBuf 和 Event 的桥梁,在 netty 服务的创建过程中,不管是客户端的 Bootstrap 还是服务器端的 ServerBootstrap,都需要调用 channel 方法来指定对应的 channel 类型。


那么 netty 中 channel 到底有哪些类型呢?他们具体是如何工作的呢?一起来看看。

channel 和 ServerChannel

Channel 在 netty 中是一个 interface,在 Channel 中定义了很多非常有用的方法。通常来说如果是客户端的话,对应的 channel 就是普通的 channel。如果是服务器端的话,对应的 channel 就应该是 ServerChannel。


那么客户端 channel 和服务器端 channel 有什么区别呢?我们先来看下 ServerChannel 的定义:


public interface ServerChannel extends Channel {    // This is a tag interface.}
复制代码


可以看到 ServerChannel 继承自 Channel,表示服务端的 Channel 也是 Channel 的一种。


但是很奇怪的是,你可以看到 ServerChannel 中并没有新增任何新的方法。也就是说 ServerChannel 和 Channel 在定义上本质是一样的。你可以把 ServerChannel 看做是一个 tag interface 而已。


那么 channel 和 ServerChannel 有什么联系呢?


我们知道在 Channel 中定义了一个 parent 方法:


Channel parent();
复制代码


这个 parent 方法返回的是该 channel 的父 channel。我们以最简单的 LocalChannel 和 LocalServerChannel 为例,来查看一下他们的父子关系到底是怎么创建的。


首先 parent 的值是通过 LocalChannel 和 LocalServerChannel 的公共父类 AbstractChannel 来实现的:


    protected AbstractChannel(Channel parent) {        this.parent = parent;        id = newId();        unsafe = newUnsafe();        pipeline = newChannelPipeline();    }
复制代码


对于 LocalChannel 来说,可以通过它的构造函数来设置 parent channel:


    protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {        super(parent);        config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));        this.peer = peer;        localAddress = parent.localAddress();        remoteAddress = peer.localAddress();    }
复制代码


我们知道当 client 端想要连接到 server 端的时候,需要调用 client channel 的 connect 方法,对于 LocalChannel 来说,它的 connect 方法实际上调用的是 pipeline 的 connect 方法:


public ChannelFuture connect(SocketAddress remoteAddress) {        return pipeline.connect(remoteAddress);    }
复制代码


最终会调用 LocalChannel 中的 LocalUnsafe.connect 方法。


而在 LocalUnsafe.connect 方法中又会调用 serverChannel.serve 方法。


serverChannel 的 newLocalChannel 方法会创建新的 LocalChannel 并返回:


    protected LocalChannel newLocalChannel(LocalChannel peer) {        return new LocalChannel(this, peer);    }
复制代码


这里使用 newLocalChannel 方法创建的 LocalChannel 就是 serverChannel 的子 channel。


最后返回的 LocalChannel 会作为 client 端 LocalChannel 的 peer channel 而存在。

netty 中 channel 的实现

在 netty 中 channel 和 Serverchannel 有很多个实现类,用来完成不同的业务功能。


为了循序渐进一步步了解 netty 中 channel 的秘密,这里我们先来探讨一下 netty 中 channel 的基本实现 LocalChannel 和 LocalServerChannel 的工作原理。


下图是 LocalChannel 和 LocalServerChannel 的主要继承和依赖关系:


<img src="https://img-blog.csdnimg.cn/1d9c19d567084c199dfade76c8a0d52a.png" style="zoom:67%;" />


从图中可以看到,LocalChannel 继承自 AbstractChannel 而 LocalServerChannel 则继承自 AbstractServerChannel。


因为 ServerChannel 继承自 Channel,所以很自然的 AbstractServerChannel 又继承自 AbstractChannel。


接下来,我们通过对比分析 AbstractChannel 和 AbstractServerChannel,LocalChannel 和 LocalServerChannel 来一探 netty 中 channel 实现的底层原理。

AbstractChannel 和 AbstractServerChannel

AbstractChannel 是对 Channel 的最基本的实现。先来看下 AbstractChannel 中都有那些功能。


首先 AbstractChannel 中定义了 Channel 接口中要返回的一些和 channel 相关的基本属性,包括父 channel,channel id,pipline,localAddress,remoteAddress,eventLoop 等,如下所示:


    private final Channel parent;    private final ChannelId id;    private final DefaultChannelPipeline pipeline;    private volatile SocketAddress localAddress;    private volatile SocketAddress remoteAddress;    private volatile EventLoop eventLoop;
private final Unsafe unsafe;
复制代码


要注意的是 AbstractChannel 中还有一个非常中要的 Unsafe 属性。


Unsafe 本身就是 Channel 接口中定义的一个内部接口,它的作用就是为各个不同类型的 transport 提供特定的实现。


从名字可以看出 Unsafe 是一个不安全的实现,它只是在 netty 的源代码中使用,它是不能出现在用户代码中的。或者你可以将 Unsafe 看做是底层的实现,而包裹他的 AbstractChannel 或者其他的 Channel 是对底层实现的封装,对于普通用户来说,他们只需要使用 Channel 就可以了,并不需要深入到更底层的内容。


另外,对于 Unsafe 来说,除了下面几个方法之外,剩余的方法必须从 I/O thread 中调用:


localAddress()remoteAddress()closeForcibly()register(EventLoop, ChannelPromise)deregister(ChannelPromise)voidPromise()
复制代码


和一些基本的状态相关的数据:


private volatile boolean registered;private boolean closeInitiated;
复制代码


除了基本的属性设置和读取之外,我们 channel 中最终要的方法主要有下面几个:


  1. 用于建立服务器端服务的 bind 方法:


public ChannelFuture bind(SocketAddress localAddress) {        return pipeline.bind(localAddress);    }
复制代码


  1. 用于客户端建立和服务器端连接的 connect 方法:


public ChannelFuture connect(SocketAddress remoteAddress) {        return pipeline.connect(remoteAddress);    }
复制代码


  1. 断开连接的 disconnect 方法:


public ChannelFuture disconnect() {        return pipeline.disconnect();    }
复制代码


  1. 关闭 channel 的 close 方法:


public ChannelFuture close() {        return pipeline.close();    }
复制代码


  1. 取消注册的 deregister 方法:


public ChannelFuture deregister() {        return pipeline.deregister();    }
复制代码


  1. 刷新数据的 flush 方法:


    public Channel flush() {        pipeline.flush();        return this;    }
复制代码


  1. 读取数据的 read 方法:


    public Channel read() {        pipeline.read();        return this;    }
复制代码


  1. 写入数据的方法:


    public ChannelFuture write(Object msg) {        return pipeline.write(msg);    }
复制代码


可以看到这些 channel 中的读写和绑定工作都是由和 channel 相关的 pipeline 来执行的。


其实也很好理解,channel 只是一个通道,和数据相关的操作,还是需要在管道中执行。


我们以 bind 方法为例子,看一下 AbstractChannel 中的 pipline 是怎么实现的。


在 AbstractChannel 中,默认的 pipeline 是 DefaultChannelPipeline,它的 bind 方法如下:


        public void bind(                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {            unsafe.bind(localAddress, promise);        }
复制代码


这里的 unsafe 实际上就是 AbstractChannel 中的 unsafe,unsafe 中的 bind 方法最终会调用 AbstractChannel 中的 dobind 方法:


protected abstract void doBind(SocketAddress localAddress) throws Exception;
复制代码


所以归根到底,如果是基于 AbstractChannel 的各种实现,那么只需要实现它的这些 do*方法即可。


好了,AbstractChannel 的介绍完毕了。 我们再来看一下 AbstractServerChannel。AbstractServerChannel 继承自 AbstractChannel 并且实现了 ServerChannel 接口。


public abstract class AbstractServerChannel extends AbstractChannel implements ServerChannel 
复制代码


我们知道 ServerChannel 和 Channel 实际上是相同的,所以 AbstractServerChannel 只是在 AbstractChannel 的实现上进行了一些调整。


在 AbstractServerChannel 中,我们一起来观察一下 AbstractServerChannel 和 AbstractChannel 到底有什么不同。


首先是 AbstractServerChannel 的构造函数:


protected AbstractServerChannel() {        super(null);    }
复制代码


构造函数中,super 的 parent channel 是 null,表示 ServerChannel 本身并不存在父 channel,这是 ServerChannel 和 client channel 的第一个不同之处。因为 server channel 可以通过 worker event loop 来接受 client channel,所以 server channel 是 client channel 的父 channel。


另外,我们还观察几个方法的实现:


public SocketAddress remoteAddress() {        return null;    }
复制代码


对于 ServerChannel 来说不需要主动连接到远程的 Server,所以并没有 remoteAddress。


另外,因为断开连接是由 client 端主动调用的,所以 server channel 的 doDisconnect 会抛出不支持该操作的异常:


    protected void doDisconnect() throws Exception {        throw new UnsupportedOperationException();    }
复制代码


同时 ServerChannel 只是用来负责 accept 和 client channel 建立关联关系,所以 server channel 本身并不支持向 channel 内进行的 write 操作,所以这个 doWrite 方法也是不支持的:


    protected void doWrite(ChannelOutboundBuffer in) throws Exception {        throw new UnsupportedOperationException();    }
复制代码


最后 ServerChannel 只支持 bind 操作,所以 DefaultServerUnsafe 中的 connect 方法也会抛出 UnsupportedOperationException.

LocalChannel 和 LocalServerChannel

LocalChannel 和 LocalServerChannel 是 AbstractChannel 和 AbstractServerChannel 的最基本的实现。从名字就可以看出来,这两个 Channel 是本地 channel,我们来看一下这两个 Channel 的具体实现。


首先我们来看一下 LocalChannel,LocalChannel 有几点对 AbstractChannel 的扩展。


第一个扩展点是 LocalChannel 中添加了 channel 的几个状态:


private enum State { OPEN, BOUND, CONNECTED, CLOSED }
复制代码


通过不同的状态,可以对 channel 进行更加细粒度的控制。


另外 LocalChannel 中添加了一个非常重要的属性:


private volatile LocalChannel peer;
复制代码


因为 LocalChannel 表示的是客户端 channel,所以这个 peer 表示的是 client channel 对等的 server channel。接下来我们看一下具体的实现。


首先是 LocalChannel 的构造函数:


    protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {        super(parent);        config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));        this.peer = peer;        localAddress = parent.localAddress();        remoteAddress = peer.localAddress();    }
复制代码


LocalChannel 可以接受一个 LocalServerChannel 作为它的 parent,还有一个 LocalChannel 作为它的对等 channel。


那么这个 peer 是怎么创建的呢?


我们来看一下 LocalUnsafe 中 connect 的逻辑。


            if (state != State.BOUND) {                // Not bound yet and no localAddress specified - get one.                if (localAddress == null) {                    localAddress = new LocalAddress(LocalChannel.this);                }            }
if (localAddress != null) { try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); close(voidPromise()); return; } }
复制代码


首先判断当前 channel 的状态,如果是非绑定状态,那么需要进行绑定操作。首先根据传入的 LocalChannel 创建对应的 LocalAddress。


这个 LocalAddress 只是 LocalChannel 的一种表现形式,并没有什么特别的功能。


我们来看一下这个 doBind 方法:


    protected void doBind(SocketAddress localAddress) throws Exception {        this.localAddress =                LocalChannelRegistry.register(this, this.localAddress,                        localAddress);        state = State.BOUND;    }
复制代码


LocalChannelRegistry 中维护了一个 static 的 map,这个 map 中存放的就是注册过的 Channel.


这里注册是为了在后面方便的拿到对应的 channel。


注册好 localChannel 之后,接下来就是根据注册好的 remoteAddress 来获取对应的 LocalServerChannel,最后调用 LocalServerChannel 的 serve 方法创建一个新的 peer channel:


Channel boundChannel = LocalChannelRegistry.get(remoteAddress);            if (!(boundChannel instanceof LocalServerChannel)) {                Exception cause = new ConnectException("connection refused: " + remoteAddress);                safeSetFailure(promise, cause);                close(voidPromise());                return;            }
LocalServerChannel serverChannel = (LocalServerChannel) boundChannel; peer = serverChannel.serve(LocalChannel.this);
复制代码


serve 方法首先会创建一个新的 LocalChannel:


    protected LocalChannel newLocalChannel(LocalChannel peer) {        return new LocalChannel(this, peer);    }
复制代码


如果我们把之前的 Localchannel 称为 channelA,这里创建的新的 LocalChannel 称为 channelB。那么最后的结果就是 channelA 的 peer 是 channelB,而 channelB 的 parent 是 LocalServerChannel,channelB 的 peer 是 channelA。


这样就构成了一个对等 channel 之间的关系。


接下来我们看下 localChannel 的 read 和 write 到底是怎么工作的。


首先看一下 LocalChannel 的 doWrite 方法:


Object msg = in.current();...peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));in.remove();...finishPeerRead(peer);
复制代码


首先从 ChannelOutboundBuffer 拿到要写入的 msg,将其加入 peer 的 inboundBuffer 中,最后调用 finishPeerRead 方法。


从方法名字可以看出 finishPeerRead 就是调用 peer 的 read 方法。


事实上该方法会调用 peer 的 readInbound 方法,从刚刚写入的 inboundBuffer 中读取消息:


    private void readInbound() {        RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();        handle.reset(config());        ChannelPipeline pipeline = pipeline();        do {            Object received = inboundBuffer.poll();            if (received == null) {                break;            }            pipeline.fireChannelRead(received);        } while (handle.continueReading());
pipeline.fireChannelReadComplete(); }
复制代码


所以,对于 localChannel 来说,它的写实际上写入到 peer 的 inboundBuffer 中。然后再调用 peer 的读方法,从 inboundBuffer 中读取数据。


相较于 localChannel 来说,localServerChannel 多了一个 serve 方法,用来创建 peer channel,并调用 readInbound 开始从 inboundBuffer 中读取数据。

总结

本章详细介绍了 channel 和 serverChannel 的区别,和他们的最简单的本地实现。希望大家对 channel 和 serverChannel 的工作原理有了最基本的了解。


本文已收录于 http://www.flydean.com/04-2-netty-channel-vs-serverchannel-md/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

发布于: 刚刚阅读数: 2
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:channel,ServerChannel和netty中的实现