作者:京东物流 张弓言
一、背景
Netty 是一款优秀的高性能网络框架,内部通过 NIO 的方式来处理网络请求,在高负载下也能可靠和高效地处理 I/O 操作
作为较底层的网络通信框架,其被广泛应用在各种中间件的开发中,比如 RPC 框架、MQ、Elasticsearch 等,这些中间件框架的底层网络通信模块大都利用到了 Netty 强大的网络抽象
下面这篇文章将主要对 Netty 中的各个组件进行分析,并在介绍完了各个组件之后,通过 JSF 这个 RPC 框架为例来分析 Netty 的使用,希望让大家对 Netty 能有一个清晰的了解
二、Netty Server
通过 Netty 来构建一个简易服务端是比较简单的,代码如下:
public class NettyServer {
public static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ChannelFuture channelFuture = serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("Handler Added");
}
})
.childHandler(new ServerChannelInitializer())
.bind(8100);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
LOGGER.info("Netty Server Start !");
}
}
});
try {
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
复制代码
上面代码的主要逻辑如下:
新建服务端引导启动类 ServerBootstrap,内部封装了各个组件,用来进行服务端的启动
新建了两个 EventLoopGroup 用来进行连接处理,此时可以简单的将 EventLoopGroup 理解为多个线程的集合。bossGroup 中的线程用来处理新连接的建立,当新连接建立后,workerGroup 中的每个线程则都会和唯一的客户端 Channel 连接进行绑定,用来处理该 Channel 上的读、写事件
指定服务端创建的 Channel 类型为 NioServerSocketChannel
childOption 用来配置客户端连接的 NioSocketChannel 底层网络参数
handler 用来指定针对服务端 Channel 的处理器,内部定义了一系列的回调方法,会在服务端 Channel 发生指定事件时进行回调
childHandler 用来指定客户端 Channel 的处理器,当客户端 Channel 中发生指定事件时,会进行回调
bind 指定服务端监听端口号
三、Netty Client
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 1. 启动类
ChannelFuture channelFuture = new Bootstrap()
// 2. 添加 EventLoop
.group(workGroup)
// 3. 选择客户端 channel 实现
.channel(NioSocketChannel.class)
// 4. 添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ZAS ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new StringEncoder());
}
})
// 5. 连接到服务器
.connect(new InetSocketAddress("localhost", 8100));
channelFuture.addListener(future -> {
if (future.isSuccess()) {
((ChannelFuture) future).channel().writeAndFlush("hello");
}
});
channelFuture.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}
}
复制代码
上面代码的主要逻辑如下:
新建 Bootstrap 用来进行客户端启动
group() 指定一个 NioEventLoopGroup 实例,用来处理客户端连接的建立和后续事件处理
handler() 指定 Channel 处理器,
当将客户端启动类中的各个属性都设置完毕后,调用 connect() 方法进行服务端连接
从上面的的两个例子可以看出,如果想通过 Netty 实现一个简易的服务器其实是非常简单的,只需要在启动引导类中设置好对应属性,然后完成端口绑定就可以实现。但也正是因为这种简易的实现方式,导致很多人在学习 Netty 的过程中,发现代码是写的出来,但是对内部的组件有什么作用以及为什么这么写可能就不是很清楚了,因此希望通过这一系列文章来加深大家对 Netty 的理解
四、Netty 基本组件
Channel
Netty 中的 Channel 可以看成网络编程中的 Socket,其提供了一系列 IO 操作的 API,比如 read、write、bind、connect 等,大大降低了直接使用 Socket 类的复杂性
整体类继承关系如下:
从上面的继承关系可以看出,NioSocketChannel 和 NioServerSocketChannel 分别对应客户端和服务端的 Channel,两者的直接父类不一致,因此对外提供的功能也是不相同的。比如当发生 read 事件时,NioServerSocketChannel 的主要逻辑就是建立新的连接,而 NioSocketChannel 则是读取传输的字节进行业务处理
下面就以 NioServerSocketChannel 为例,带大家了解下该类的初始化过程,整体流程如下:
启动引导类中通过 channel() 指定底层创建的 Channel 类型
根据指定的 Channel 类型创建出 ChannelFactory,后续通过该工厂类进行 Channel 的实例化
实例化 Channel
channel() 指定 ChannelFactory 类型
在上面的服务端启动过程中,ServerBootstrap 调用 channel() 方法并传入 NioServerSocketChannel,其底层代码逻辑为:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
// ReflectiveChannelFactory 构造方法
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
复制代码
整体逻辑很简单,通过传入的 Class 对象指定一个 Channel 反射工厂,后续调用工厂方法获取指定类型的 Channel 对象
channel 实例化
当服务端启动引导类 ServerBootstrap 调用 bind() 方法之后,内部会走到 Channel 的实例化过程,代码精简如下:
// channel 初始化流程,内部通过 channelFactory 构造
final ChannelFuture initAndRegister() {
channel = channelFactory.newChannel();
}
// channelFactory 的 newChannel 方法逻辑
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
复制代码
ChannelFactory 的整体逻辑就是通过反射的方式新建 Channel 对象,而 Channel 对象的类型则是在启动引导类中通过 channel() 方法进行指定的
在实例化 Channel 的过程中,会对其内部的一些属性进行初始化,而对这些属性的了解,可以使我们对 Netty 中各个组件的作用范围有一个更加清晰的理解,下面看下 NioServerSocketChannel 的构造函数源码
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
复制代码
上述源码就是一层一层的父类构造,可以对照前面的类关系图进行阅读
NioServerSocketChannel 实例化过程中主要完成了以下内部属性的初始化:
unsafe 属性进行赋值为 NioMessageUnsafe,后续 Channel 上事件处理的主要逻辑都是由该类完成
pipeline 属性进行初始化赋值,pipeline 是 Channel 中特别重要的一个属性,后续的所有业务处理器都是通过该 pipeline 组织的
指定当前 Channel 的 readInterestOp 属性为 SelectionKey.OP_ACCEPT,用于后续绑定到 Selector 时指定当前 Channel 监听的事件类型
指定当前 Channel 非阻塞,ch.configureBlocking(false)
总结
对于 Channel 的实例化流程可以总结如下:
启动引导类中通过 channel() 方法指定生成的 ChannelFactory 类型
通过 ChannelFactory 来构造对应 Channel,并在实例化的过程中初始化了一些重要属性,比如 pipeline
ChannelPipeline
ChannelPipeline 也是 Netty 中的一个比较重要的组件,从上面的 Channel 实例化过程可以看出,每一个 Channel 实例中都会包含一个对应的 ChannelPipeline 属性
ChannelPipeline 初始化
ChannelPipeline 底层初始化源码:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
复制代码
从 ChannelPipeline 的构造函数可以看出,每一个 ChannelPipeline 底层都是一个双向链表结构,默认会包含 head 和 tail 头尾节点,用来进行一些默认的逻辑处理,处理细节会在后续文章中展现
addLast()
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 回调 ChannelHandler 中的 handlerAdded() 方法
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
复制代码
addLast() 方法是向 ChannelPipeline 中添加 ChannelHandler 用来进行业务处理
整个方法的逻辑为:
判断当前 ChannelHandler 是否已经添加
将当前 ChannelHandler 包装成 ChannelHandlerContext,并将其添加到 ChannelPipeline 的双向链表中
回调添加的 ChannelHandler 中的 handlerAdded() 方法
Channel、ChannelPipeline、ChannelHandler 关系
Channel、ChannelPipeline 和 ChannelHandler 三者的关系如图所示:
每一个 Channel 中都会包含一个 ChannelPipeline 属性
ChannelPipeline 是一个双向链表结构,默认会包含 HeadContext 和 TailContext 两个节点
当向 ChannelPipeline 中添加 ChannelHandler 时,会包装成 ChannelContext 插入到 ChannelPipeline 链表中
当 Channel 中发生指定事件时,该事件就会在 ChannelPipeline 中沿着双向链表进行传播,调用各个 ChannelHandler 中的指定方法,完成相应的业务处理
Netty 正是通过 ChannelPipeline 这一结构为用户提供了自定义业务逻辑的扩展点,用户只需要向 ChannelPipeline 中添加处理对应业务逻辑的 ChannelHandler,之后当指定事件发生时,该 ChannelHandler 中的对应方法就会进行回调,实现业务的处理
ChannelHandler
ChannelHandler 是 Netty 中业务处理的核心类,当有 IO 事件发生时,该事件会在 ChannelPipeline 中进行传播,并依次调用到 ChannelHandler 中的指定方法
ChannelHandler 的类继承关系如下:
从上面的类继承关系可以看出,ChannelHandler 大致可以分为 ChannelInboundHandler 和 ChannelOutboundHandler,分别用来处理读、写事件
ChannInboundHandler
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
复制代码
在 ChannelInboundHandler 中定义了一系列的回调方法,用户可以实现该接口并重写相应的方法来自定义的业务逻辑。
重写方法逻辑是简单的,但很多人其实不清楚的是这些回调方法到底在什么场景下会被调用,如何调用,只有了解了这些回调方法的调用时机,才能在更适宜的地方完成相应功能
channelRegistered
channelRegistered() 从方法名理解是当 Channel 完成注册之后会被调用,那么何为 Channel 注册?
下面就以 Netty 服务端启动过程中的部分源码为例(详细源码分析会在后续文章中),看下 channelRegistered() 的调用时机
在 Netty 服务端启动时,会调用到 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法,精简代码如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// neverRegistered 初始值为 true
boolean firstRegistration = neverRegistered;
// 将 Channel 绑定到对应 eventLoop 中的 Selector 上
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 调用 ChannelHandler 中的 ChannelRegistered()
pipeline.fireChannelRegistered();
}
}
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
复制代码
从 Netty 底层的 register() 方法可以看出,ChannelHandler 中的 ChannelRegistered() 调用时机是在调用 pipeline.fireChannelRegistered() 时触发的,此时已经完成的逻辑为:
通过传入的 EventLoopGroup 得到了该 Channel 对应的 EventLoop,并与 Channel 中的对应属性完成了绑定;AbstractChannel.this.eventLoop = eventLoop 逻辑
当前 Channel 已经绑定到了对应 EventLoop 中的 Selector 上;doRegister() 逻辑
ChannelHandler 中的 handlerAdded() 方法已经完成了回调;pipeline.invokeHandlerAddedIfNeeded() 逻辑
因此当 Channel 和对应的 Selector 完成了绑定,Channel 中 pipeline 上绑定的 ChannelHandler 的 channelRegisted() 方法就会进行回调
channelActive
上面已经分析了 channelRegistered() 方法的调用时机,也就是当 Channel 绑定到了对应 Selector 上之后就会进行回调,下面开始分析 channelActive() 方法的调用时机
对于服务端 Channel,前面还只是将 Channel 注册到了 Selector 上,还没有调用到 bind() 方法完成真正的底层端口绑定,那么有没有可能当服务端 Channel 完成端口监听之后,就会调用到 channelActive() 方法呢?
下面继续分析,在上面完成了 Channel 和 Selector 的注册之后,Netty 服务端启动过程中会继续调用到 io.netty.channel.AbstractChannel.AbstractUnsafe#bind 逻辑:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
复制代码
在该方法中完成了以下逻辑:
总结:
当 Channel 调用了 bind() 方法完成端口绑定之后,channelActive() 方法会进行回调
channelRead
该方法的调用时机,服务端和客户端是不一致的
服务端 channelRead
服务端 Channel 绑定到 Selector 上时监听的是 Accept 事件,当客户端有新连接接入时,会回调 channelRead() 方法,完成新连接的接入
Netty 在服务端启动过程中,会默认添加一个 ChannelHandler io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor 来处理新连接的接入
客户端 channelRead
当服务端处理完 Accept 事件后,会生成一个和客户端通信的 Channel,该 Channel 也会注册到对应的 Selector 上,并监听 read 事件
当客户端向该 Channel 中发送数据时就会触发 read 事件,调用到 channelRead() 方法(Netty 内部的源码处理会在后续的文章中进行分析)
exceptionCaught
当前 ChannelHandler 中各回调方法处理过程中如果发生了异常就会回调该方法
评论