如何封装原生的 -Java-NIO- 以及扩展?详细到让你分分钟搞定 Netty
接着是步骤6
,就是绑定本地端口然后启动服务。这是比较重要的一步,我们来分析 ServerBootstrap 的 bind 方法。
private ChannelFuture doBind(final SocketAddress localAddress) {//初始化一个 channel,final ChannelFuture regFuture = initAndRegister();//获取 channelfinal Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}//如果这个 channel 的注册事件完成了 if (regFuture.isDone()) {//再产生一个异步任务,进行端口监听 ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {//设置一个进度条的任务,等待注册事件完成后,就开始端口的监听 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {promise.setFailure(cause);} else {promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}
上面有两个比较着重讲的方法 initAndRegister()
和 doBind0()
。下面先将 initAndRegister()
方法。
final ChannelFuture initAndRegister() {
Channel channel = null;try {//通过 channel 工厂生成一个 channelchannel = channelFactory.newChannel();init(channel);} catch (Throwable t) {if (channel != null) {channel.unsafe().closeForcibly();return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}//将这个 channel 注册进 parentEventLoopChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}
在 ChannelFactory 生成一个 channel 后,就进行了 ServerBootstrap.init()
方法的调用。这个方法的主要作用是给 channel 继续宁一些参数和配置的设置。
void init(Channel channel) {setChannelOptions(channel, newOptionsArray(), logger); //设置 channel 的 optionsetAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY)); //设置属性
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);}final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline(); //获取 pipelineChannelHandler handler = config.handler(); //这里获取的 handler,对应的是 AbstractBootstrap 的 handler,这个是通过 ServerBootstrap.handler() 方法设置的 if (handler != null) {pipeline.addLast(handler); //添加进入 pipeline,这个是为了让每个处理的都能首先调用这个 handler}//执行任务,设置子 handler。这里对用的是 ServerBootstrap.childHandler() 方法设置的 handlerch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}
然后到了 doBind0()
方法。
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {//执行到这里,说明任务已经被注册到 loopgroup//所以可以开始一个监听端口的任务 channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}
上述代码是不断地添加 handler 进入 pipleline,所以我们可以来看看 NioServerSocketChannel 的 ChannelPipiline 的组成。
----> in streamHandler handler -> ServerBootstrapAccept -> Tail Handler<---- out stream
到此,Netty 服务器监听的相关资源初始化已经完毕了。但是上面我只是粗略地讲了简单的步骤,还有一步比较重要的我还想细讲一下,那就是 -- 注册 NioServerSocketChannel 到 Reactor 线程上的多路复用器上。
注册的代码在 initAndRegister()
方法里面的代码 ↓
ChannelFuture regFuture = config().group().register(channel);
config().group()
指的是 ServerBootstrap 的 parentLoopGroup,而 register()
其实是 parentLoopGroup 的父类 MultithreadEventLoopGroup 的 register()
。
public EventLoop next() {return (EventLoop) super.next();}
这里的 next() 方法调用的是其父类 MultithreadEventExecutorGroup 的 next()
public EventExecutor next() {return chooser.next();}
这里的 chooser 是 MultithreadEventExecutorGroup 的成员属性,它可以对根据目前 ExectuorGroup 中的 EventExecutor 的情况策略选择 EventExecutor。这里默认使用的是 DefaultEventExecutorChooserFactory,这个是基于轮询策略操作的。它里面有两个内部类,它们的区别在于轮询的方式不相同。例如 9 个 EventExecutor,第一个请求给第一个 EventExecutor,第二个请求给第二个 EventExecutor...直到第九个请求给第九个 EventExecutor,到了第十个请求,又从头再来,给第一个 EventExecutor。
chooserFactory 最后会选择出 EventExecutor 后,就可以将 Channel 进行注册了。在 Netty 的 NioEventLoopGroup 中 EventExecutor 都是 SingleThreadEventLoop 来承担的(如果你继续跟进代码的话,你会发现其实 EventExecutor 实际上就是一个 Java 原生的线程池,最后实现的是一个 ExecutorService )。
接下来,我们获取到了 EventExecutor 后,就可以让它帮忙注册了。
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//...省略非必要代码 AbstractChannel.this.eventLoop = eventLoop;//判断是否是自身发起的操作;//如果是,则不存在并发操作,直接注册//如果不是,封装成一个 Task 放入消息对列异步执行 if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {//...省略非必要代码}}}
我们发现了实际注册的是 register0()
,我们继续跟进
private void register0(ChannelPromise promise) {try {//检查是否在注册的过程中被取消了,同时确保 channel 是处于开启状态 if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister(); //进行注册 neverRegistered = false;registered = true;
//在被注册前首先调用 channel 的 handlerAdded 方法,这个算是生命周期方法吧 pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);pipeline.fireChannelRegistered();
//channel 只有在注册的时候才会调用一次 channelActive,后面都不会调用了。同时也是防止"取消注册"或"重新注册"的事件会反复调用 channelActiveif (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {//这里需要注意,如果之前 channel 被注册了而且设置了 autoRead 这意味着我们需要开始读取以便我们处理入站数据。beginRead();}}} catch (Throwable t) {// 关掉 channel 避免 FD 泄漏 closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
我们发现虽然上面的方法写着 register,但实际上只是调用了一下 Netty 定义的生命周期函数。实际将 Channel 挂到 Selector 的代码在 doRegister()
方法里面。
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {//...省略}}}
上面代码可能有让人疑惑的地方。为什么注册 OP_ACCEPT(16) 到多路复用器上,怎么注册 0 呢?0 表示已注册,但不进行任何操作。这样做的原因是
注册方法是多台的。它既可以被 NioServerSocketChannel 用来监听客户端的连接接入,也可以注册 SocketChannel 用来监听网络读或写操作。
通过 SelectionKey 的 interceptOps(int pos) 可以方便修改监听的操作位。所以,此处注册需要获取 SelectionKey 并给 AbstractNioChannel 的成员变量 selectionKey 赋值。
当注册成功后,触发了 ChannelRegistered 事件,这个事件也是整个 pipeline 都会触发的。
ChannelRegistered 触发完后,就会判断是否 ServerSocketChannel 监听是否成功,如果成功,需要出发 NioServerSocketChannel 的 ChannelActive 事件。
if(isAcitve()) {pipeline.fireChannelActive();}
isAcitve() 方法也是多态。如果服务端判断是否监听启动;如果是客户端查看 TCP 是否连接完成。channelActive() 事件在 ChannelPipeline 中传递,完成之后根据配置决定是否自动出发 Channel 读操作,下面是代码实现
public ChannlePipeline fireChannelActive() {head.fireChannelActive();if(channel.config().isAutoRead()) {channel.read();}
return this;}
AbstractChannel 的读操作出发了 ChannelPipeline 的读操作,最终调用到 HeadHandler 的读方法,代码如下
public void read(ChannelHandlerContext ctx){unsafe.beginRead();}
继续看 AbstractUnsafe 的 beginRead 方法,代码如下
public void beginRead() {if(!isAcitve()) {return;}
try {doBeginRead();}
//...省略代码}
由于不同类型的 Channel 对于读操作的处理是不同的,所以合格 beginRead 也算是多态方法。对于 NIO 的 channel,无论是客户端还是服务端,都是修改网络监听操作位为自身感兴趣的 shi
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}
readPending = true;
final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}
JDK SelectionKey 有四种操作类型,分别为:
OP_READ = 1<<0
OP_WRITE = 1<<2
OP_CONNECT = 1<<3
OP_ACCEPT = 1<<4
每个操作位代表一种网络操作类型,分别为 0001,0010,0100,1000,这样做的好处是方便地通过位操作来进行网络操作位的状态判断和状态修改,从而提升操作性能。
OK! 我们服务端的源码基本上已经梳理完成了,下面继续看客户端的代码。
客户端接入
负责处理网络读写,连接和客户端情感求接入的 Reactor 线程是 NioEventLoop,我们分析一下客户端是怎么接入的。当多路复用器检测到准备就绪的 channel,默认执行 processSelectedKeysOptimized,代码如下
private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}}
由于 selectedKeys 不为空,所以执行 processSelectedKeysOptimized 方法。然后再看方法代码
if (a instanceof AbstractNioChannel) {(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}
由于 NioEventLoop 属于 AbstractNioChannel,所以执行 processSelectedKey 方法。processSelectedKey 顾名思义,就是处理所选择 selectionKey。我们看方法核心代码
int readyOps = k.readyOps();if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// 删除 OP_CONNECT,否则 Selector.select(..)将始终不阻塞返回= int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);
unsafe.finishConnect();}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {// 调用 forceFlush,它还会在没有东西可写时清除 OP_WRITEch.unsafe().forceFlush();}
// 还要检查 readOps 是否为 0,以解决 JDK 中可能导致的错误到一个旋转循环 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}
我们完整看到了在 NioEventLoop 如何处理四种事件。来看看读事件。我们可以发现读事件是使用 unsafe 来实现的。unsafe 有两种实现,分别为 NioByteUnsafe 和 NioMessageUnsafe。由于是 NioEventLoop,所以使用 NioByteUnsafe。我们来看看它的 read() 方法。
public void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);
boolean closed = false;Throwable exception = null;try {try {//第一部分,读取 SocketChanneldo {int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}
allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}//第二部分,开始触发 fireChannelRead 方法 int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();
if (exception != null) {closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);}
if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {
评论