9.ChannelPipeline 添加 ChannelHandler
(1)常见的客户端代码
首先用一个拆包器 Spliter 对二进制数据流进行拆包,然后解码器 Decoder 会将拆出来的包进行解码,接着业务处理器 BusinessHandler 会处理解码出来的 Java 对象,最后编码器 Encoder 会将业务处理完的结果编码成二进制数据进行输出。
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(newSpliter());
p.addLast(new Decoder());
p.addLast(new BusinessHandler());
p.addLast(new Encoder());
}
});
复制代码
整个 ChannelPipeline 的结构如下所示:
这里共有两种不同类型的结点,结点之间通过双向链表连接。一种是 ChannelInboundHandler,用来处理 Inbound 事件,比如读取数据流进行加工处理。一种是 ChannelOutboundHandler,用来处理 Outbound 事件,比如当调用 writeAndFlush()方法时就会经过这种类型的 Handler。
(2)ChannelPipeline 添加 ChannelHandler 入口
当服务端 Channel 的 Reactor 线程轮询到新连接接入的事件时,就会调用 AbstractNioChannel 的内部类 NioUnsafe 的 read()方法,也就是调用 AbstractNioMessageChannel 的内部类 NioMessageUnsafe 的 read()方法。
然后会触发执行代码 pipeline.fireChannelRead()传播 ChannelRead 事件,从而最终触发调用 ServerBootstrapAcceptor 接入器的 channelRead()方法。
在 ServerBootstrapAcceptor 的 channelRead()方法中,便会通过执行代码 channel.pipeline().addLast()添加 ChannelHandler,也就是通过调用 DefaultChannelPipeline 的 addLast()方法添加 ChannelHandler。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
private SelectedSelectionKeySet selectedKeys;
private boolean needsToSelectAgain;
private int cancelledKeys;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
//2.处理产生IO事件的Channel
needsToSelectAgain = false;
processSelectedKeys();
...
//3.执行外部线程放入TaskQueue的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
//selectedKeys.flip()会返回一个数组
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
//1.首先取出IO事件
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;//Help GC
//2.然后获取对应的Channel和处理该Channel
//默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//网络事件的处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
//NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
//3.最后判断是否应该再进行一次轮询
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
//selectedKeys.flip()会返回一个数组
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
int readyOps = k.readyOps();
...
//boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入
//此时将调用Channel的unsafe变量来进行实际操作
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//调用AbstractNioMessageChannel的NioMessageUnsafe.read()方法
//进行新连接接入处理
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
...
}
//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
...
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//临时存放读到的连接NioSocketChannel
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
//断言确保该read()方法必须来自Reactor线程调用
assert eventLoop().inEventLoop();
//获得Channel对应的Pipeline
final ChannelPipeline pipeline = pipeline();
//获得Channel对应的RecvByteBufAllocator.Handle
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
do {
//1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel
//通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
} while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接
//2.设置并绑定NioSocketChannel
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
//调用DefaultChannelPipeline的fireChannelRead()方法
pipeline.fireChannelRead(readBuf.get(i));
}
//3.清理容器并触发DefaultChannelPipeline的fireChannelReadComplete()方法
readBuf.clear();
pipeline.fireChannelReadComplete();
}
}
...
}
//The default ChannelPipeline implementation.
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
...
protected DefaultChannelPipeline(Channel channel) {
...
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
//从Pipeline的第一个HeadContext处理器开始调用
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//调用AbstractChannelHandlerContext的fireChannelRead()方法
ctx.fireChannelRead(msg);
}
@Override
public ChannelHandler handler() {
return this;
}
...
}
...
}
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
...
//初始化服务端Channel时,会向其Pipeline添加ServerBootstrapAcceptor处理器
@Override
void init(Channel channel) throws Exception {
//1.设置服务端Channel的Option与Attr
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//2.设置客户端Channel的Option与Attr
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//3.配置服务端启动逻辑
ChannelPipeline p = channel.pipeline();
//p.addLast()用于定义服务端启动过程中需要执行哪些逻辑
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
//一.添加用户自定义的Handler,注意这是handler,而不是childHandler
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) pipeline.addLast(handler);
//二.添加一个特殊的Handler用于接收新连接
//自定义的childHandler会作为参数传入连接器ServerBootstrapAcceptor
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//调用DefaultChannelPipeline的addLast()方法
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup,
currentChildHandler,
currentChildOptions,
currentChildAttrs)
);
}
});
}
});
}
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
...
//channelRead()方法在新连接接入时被调用
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//1.给新连接的Channel添加用户自定义的Handler处理器
//这里的childHandler其实是一个特殊的Handler: ChannelInitializer
child.pipeline().addLast(childHandler);
//2.设置ChannelOption,主要和TCP连接一些底层参数及Netty自身对一个连接的参数有关
for (Entry<ChannelOption<?>, Object> e: childOptions) {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
}
//3.设置新连接Channel的属性
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
//4.绑定Reactor线程
//childGroup是一个NioEventLoopGroup,所以下面会调用其父类的register()方法
childGroup.register(child);
}
...
}
...
}
复制代码
(3)DefaultChannelPipeline 的 addLast()方法
使用 synchronized 关键字是为了防止多线程并发操作 ChannelPipeline 底层的双向链表,添加 ChannelHandler 结点的过程主要分为 4 个步骤:
步骤一:判断 ChannelHandler 是否重复添加
步骤二:创建结点
步骤三:添加结点到链表
步骤四:回调添加完成事件
这个结点便是 ChannelHandlerContext,Pipeline 里每个结点都是一个 ChannelHandlerContext。addLast()方法便是把 ChannelHandler 包装成一个 ChannelHandlerContext,然后添加到链表。
public class DefaultChannelPipeline implements ChannelPipeline {
...
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) throw new NullPointerException("handlers");
for (ChannelHandler h: handlers) {
if (h == null) break;
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//1.检查是否有重复的ChannelHandler结点
checkMultiplicity(handler);
//2.创建ChannelHandlerContext结点
newCtx = newContext(group, filterName(name, handler), handler);
//3.添加ChannelHandlerContext结点
addLast0(newCtx);
...
}
//4.回调用户方法
//通过这个方法告诉用户这个ChannelHandler已添加完成,用户在回调方法里可以处理事情了
callHandlerAdded0(newCtx);
return this;
}
...
}
复制代码
(4)检查是否重复添加 ChannelHandler 结点
Netty 使用了一个成员变量 added 来表示一个 ChannelHandler 是否已经添加。如果当前要添加的 ChannelHandler 是非共享的并且已经添加过,那么抛出异常,否则标识该 ChannelHandler 已添加。
如果一个 ChannelHandler 支持共享,那么它就可以无限次被添加到 ChannelPipeline 中。如果要让一个 ChannelHandler 支持共享,只需要加一个 @Sharable 注解即可。而 ChannelHandlerAdapter 的 isSharable()方法正是通过判断该 ChannelHandler 对应的类是否标有 @Sharable 注解来实现的。
Netty 为了性能优化,还使用了 ThreadLocal 来缓存 ChannelHandler 是否共享的情况。在高并发海量连接下,每次有新连接添加 ChannelHandler 都会调用 isSharable()方法,从而优化性能。
public class DefaultChannelPipeline implements ChannelPipeline {
...
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
...
}
//Skeleton implementation of a ChannelHandler.
public abstract class ChannelHandlerAdapter implements ChannelHandler {
//Not using volatile because it's used only for a sanity check.
boolean added;
//Return true if the implementation is Sharable and so can be added to different ChannelPipelines.
public boolean isSharable() {
//Cache the result of Sharable annotation detection to workaround a condition.
//We use a ThreadLocal and WeakHashMap to eliminate the volatile write/reads.
//Using different WeakHashMap instances per Thread is good enough for us and the number of Threads are quite limited anyway.
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
...
}
复制代码
(5)创建 ChannelHandlerContext 结点
根据 ChannelHandler 创建 ChannelHandlerContext 类型的结点时,会将该 ChannelHandler 的引用保存到结点的成员变量中。
public class DefaultChannelPipeline implements ChannelPipeline {
...
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//1.检查是否有重复的ChannelHandler结点
checkMultiplicity(handler);
//2.创建ChannelHandlerContext结点
newCtx = newContext(group, filterName(name, handler), handler);
//3.添加ChannelHandlerContext结点
addLast0(newCtx);
...
}
//4.回调用户方法
//通过这个方法告诉用户这个ChannelHandler已添加完成,用户在回调方法里可以处理事情了
callHandlerAdded0(newCtx);
return this;
}
//给ChannelHandler创建一个唯一性的名字
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
//根据ChannelHandler创建一个ChannelHandlerContext结点
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
...
}
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
...
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
...
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
...
}
复制代码
(6)添加 ChannelHandlerContext 结点
使用尾插法向双向链表添加结点。
public class DefaultChannelPipeline implements ChannelPipeline {
...
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
...
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
...
}
复制代码
(7)回调 handerAdded()方法
向 ChannelPipeline 添加完新结点后,会使用 CAS 修改结点的状态为 ADD_COMPLETE 表示结点添加完成,然后执行 ctx.handler().handlerAdded(ctx),回调用户在这个要添加的 ChannelHandler 中实现的 handerAdded()方法。
public class DefaultChannelPipeline implements ChannelPipeline {
...
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
//使用CAS修改结点的状态为ADD_COMPLETE表示结点添加完成
ctx.setAddComplete();
//回调用户在这个要添加的ChannelHandler中实现的handerAdded()方法
ctx.handler().handlerAdded(ctx);
}
...
}
//DemoHandler是用户定义的ChannelHandler
public class DemoHandler extends SimpleChannelInboundHandler<...> {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//这个DemoHandler结点被添加到ChannelPipeline之后,就会回调这里的方法
}
...
}
复制代码
最典型的一个回调就是用户代码的 ChannelInitializer 被添加完成后,会先调用其 initChannel()方法将用户自定义的 ChannelHandler 添加到 ChannelPipeline,然后再调用 pipeline.remove()方法将自身结点进行删除。
public class NettyServer {
private int port;
public NettyServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//监听端口的ServerSocketChannel
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
//设置一个ChannelInitializer类型的childHandler
//新连接接入时,会执行ServerBootstrapAcceptor.channelRead()中的代码"child.pipeline().addLast(childHandler)"
//也就是会把这个ChannelInitializer类型的结点会被添加到新连接Channel的Pipeline中
//添加完这个结点后会回调ChannelInitializer的handlerAdded()方法
//其中会调用ChannelInitializer的initChannel()方法给Pipeline添加真正的结点
//执行完initChannel()方法后,就会移除ChannelInitializer这个结点
.childHandler(new ChannelInitializer<SocketChannel>() {//处理每个客户端连接的SocketChannel
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new NettyServerHandler());//针对网络请求的处理逻辑
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();//同步等待启动服务器监控端口
channelFuture.channel().closeFuture().sync();//同步等待关闭启动服务器的结果
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
System.out.println("Starting Netty Server...");
int port = 8998;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new NettyServer(port).start();
}
}
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
...
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
private void remove(ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
//ChannelPipeline删除ChannelHandler结点(ChannelInitializer)
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}
...
}
复制代码
(8)ChannelPipeline 添加 ChannelHandler 总结
一.判断 ChannelHandler 是否重复添加的依据是:如果该 ChannelHandler 不是共享的且已被添加过,则拒绝添加。
二.否则就创建一个 ChannelHandlerContext 结点(ctx),并把这个 ChannelHandler 包装进去,也就是保存 ChannelHandler 的引用到 ChannelHandlerContext 的成员变量中。由于创建 ctx 时保存了 ChannelHandler 的引用、ChannelPipeline 的引用到成员变量,ChannelPipeline 又保存了 Channel 的引用,所以每个 ctx 都拥有一个 Channel 的所有信息。
三.接着通过双向链表的尾插法,将这个 ChannelHandlerContext 结点添加到 ChannelPipeline 中。
四.最后回调用户在这个要添加的 ChannelHandler 中实现的 handerAdded()方法。
10.ChannelPipeline 删除 ChannelHandler
Netty 最大的特征之一就是 ChannelHandler 是可插拔的,可以动态编织 ChannelPipeline。比如在客户端首次连接服务端时,需要进行权限认证,认证通过后就可以不用再认证了。下面的 AuthHandler 便实现了只对第一个传来的数据包进行认证校验。如果通过验证则删除此 AuthHandler,这样后续传来的数据包便不会再校验了。
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
...
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throw Exception {
if (verify(data)) {
ctx.pipeline().remove(this);
} else {
ctx.close();
}
}
}
复制代码
DefaultChannelPipeline 的 remove()方法如下:
public class DefaultChannelPipeline implements ChannelPipeline {
...
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
...
AbstractChannelHandlerContext ctx = head.next;
//遍历双向链表
for (;;) {
...
if (ctx.handler() == handler) return ctx;
ctx = ctx.next;
}
}
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
//Pipeline中的head和tail结点不能被删除
assert ctx != head && ctx != tail;
synchronized (this) {
//调整链表指针并删除
remove0(ctx);
...
}
//回调用户在这个要删除的ChannelHandler实现的handlerRemoved()方法
callHandlerRemoved0(ctx);
return ctx;
}
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
...
ctx.handler().handlerRemoved(ctx);
...
}
...
}
复制代码
ChannelPipeline 删除 ChannelHandler 的步骤:
一.遍历双向链表,根据 ChannelHandler 找到对应的 ChannelHandlerContext 结点。
二.通过调整 ChannelPipeline 中双向链表的指针来删除对应的 ChannelHandlerContext 结点。
三.回调用户在这个要删除的 ChannelHandler 实现的 handlerRemoved()方法,比如进行资源清理。
11.Inbound 事件的传播
(1)Unsafe 的介绍
Unsafe 和 ChannelPipeline 密切相关,ChannelPipeline 中有关 IO 的操作最终都会落地到 Unsafe 的。Unsafe 是不安全的意思,即不要在应用程序里直接使用 Unsafe 及它的衍生类对象。Unsafe 是在 Channel 中定义的,是属于 Channel 的内部类。Unsafe 中的接口操作都和 JDK 底层相关,包括:分配内存、Socket 四元组信息、注册事件循环、绑定端口、Socket 的连接和关闭、Socket 的读写。
//A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
//Returns the globally unique identifier of this Channel.
ChannelId id();
//Return the EventLoop this Channel was registered to.
EventLoop eventLoop();
//Returns the parent of this channel.
Channel parent();
//Returns the configuration of this channel.
ChannelConfig config();
//Returns true if the Channel is open and may get active later
boolean isOpen();
//Returns true if the Channel is registered with an EventLoop.
boolean isRegistered();
//Return true if the Channel is active and so connected.
boolean isActive();
//Return the ChannelMetadata of the Channel which describe the nature of the Channel.
ChannelMetadata metadata();
//Returns the local address where this channel is bound to.
//The returned SocketAddress is supposed to be down-cast into more concrete type such as InetSocketAddress to retrieve the detailed information.
SocketAddress localAddress();
//Returns the remote address where this channel is connected to.
//The returned SocketAddress is supposed to be down-cast into more concrete type such as InetSocketAddress to retrieve the detailed information.
SocketAddress remoteAddress();
//Returns the ChannelFuture which will be notified when this channel is closed.
//This method always returns the same future instance.
ChannelFuture closeFuture();
//Returns true if and only if the I/O thread will perform the requested write operation immediately.
//Any write requests made when this method returns false are queued until the I/O thread is ready to process the queued write requests.
boolean isWritable();
//Get how many bytes can be written until #isWritable() returns false.
//This quantity will always be non-negative. If #isWritable() is false then 0.
long bytesBeforeUnwritable();
//Get how many bytes must be drained from underlying buffers until #isWritable() returns true.
//This quantity will always be non-negative. If #isWritable() is true then 0.
long bytesBeforeWritable();
//Returns an <em>internal-use-only</em> object that provides unsafe operations.
Unsafe unsafe();
//Return the assigned ChannelPipeline.
ChannelPipeline pipeline();
//Return the assigned ByteBufAllocator which will be used to allocate ByteBufs.
ByteBufAllocator alloc();
@Override
Channel read();
@Override
Channel flush();
//Unsafe operations that should never be called from user-code.
//These methods are only provided to implement the actual transport, and must be invoked from an I/O thread except for the following methods:
//#invoker()
//#localAddress()
//#remoteAddress()
//#closeForcibly()
//#register(EventLoop, ChannelPromise)
//#deregister(ChannelPromise)
//#voidPromise()
interface Unsafe {
//Return the assigned RecvByteBufAllocator.Handle which will be used to allocate ByteBuf's when receiving data.
RecvByteBufAllocator.Handle recvBufAllocHandle();
//Return the SocketAddress to which is bound local or null if none.
SocketAddress localAddress();
//Return the SocketAddress to which is bound remote or null if none is bound yet.
SocketAddress remoteAddress();
//Register the Channel of the ChannelPromise and notify the ChannelFuture once the registration was complete.
void register(EventLoop eventLoop, ChannelPromise promise);
//Bind the SocketAddress to the Channel of the ChannelPromise and notify it once its done.
void bind(SocketAddress localAddress, ChannelPromise promise);
//Connect the Channel of the given ChannelFuture with the given remote SocketAddress.
//If a specific local SocketAddress should be used it need to be given as argument. Otherwise just pass null to it.
//he ChannelPromise will get notified once the connect operation was complete.
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
//Disconnect the Channel of the ChannelFuture and notify the ChannelPromise once the operation was complete.
void disconnect(ChannelPromise promise);
//Close the Channel of the ChannelPromise and notify the ChannelPromise once the operation was complete.
void close(ChannelPromise promise);
//Closes the Channel immediately without firing any events. Probably only useful when registration attempt failed.
void closeForcibly();
//Deregister the Channel of the ChannelPromise from EventLoop and notify the ChannelPromise once the operation was complete.
void deregister(ChannelPromise promise);
//Schedules a read operation that fills the inbound buffer of the first ChannelInboundHandler in the ChannelPipeline.
//If there's already a pending read operation, this method does nothing.
void beginRead();
//Schedules a write operation.
void write(Object msg, ChannelPromise promise);
//Flush out all write operations scheduled via #write(Object, ChannelPromise).
void flush();
//Return a special ChannelPromise which can be reused and passed to the operations in Unsafe.
//It will never be notified of a success or error and so is only a placeholder for operations
//that take a ChannelPromise as argument but for which you not want to get notified.
ChannelPromise voidPromise();
//Returns the ChannelOutboundBuffer of the Channel where the pending write requests are stored.
ChannelOutboundBuffer outboundBuffer();
}
}
public abstract class AbstractNioChannel extends AbstractChannel {
...
public interface NioUnsafe extends Unsafe {
//Return underlying SelectableChannel
SelectableChannel ch();
//Finish connect
void finishConnect();
//Read from underlying SelectableChannel
void read();
void forceFlush();
}
...
}
复制代码
(2)Unsafe 的继承结构
一.NioUnsafe 增加了可以访问底层 JDK 的 SelectableChannel 的功能,定义了从 SelectableChannel 读取数据的 read()方法。
二.AbstractUnsafe 实现了大部分 Unsafe 的功能。
三.AbstractNioUnsafe 主要是通过代理到其外部类 AbstractNioChannel 获得与 JDK NIO 相关的一些信息,比如 SelectableChannel、SelectionKey 等。
四.NioMessageUnsafe 和 NioByteUnsafe 是处在同一层次的抽象,Netty 将一个新连接的建立也当作一个 IO 操作来处理,这里 Message 的含义可以当作一个 SelectableChannel,读的意思就是接收一个 SelectableChannel。
(3)Unsafe 的分类
有两种类型的 Unsafe:一种是与连接的字节数据读写相关的 NioByteUnsafe,另一种是与新连接建立操作相关的 NioMessageUnsafe。
一.NioByteUnsafe 的读和写
NioByteUnsafe 的读会被委托到 NioByteChannel 的 doReadBytes()方法进行读取处理,doReadBytes()方法会将 JDK 的 SelectableChannel 的字节数据读取到 Netty 的 ByteBuf 中。
NioByteUnsafe 中的写有两个方法,一个是 write()方法,一个是 flush()方法。write()方法是将数据添加到 Netty 的缓冲区,flush()方法是将 Netty 缓冲区的字节流写到 TCP 缓冲区,并最终委托到 NioSocketChannel 的 doWrite()方法通过 JDK 底层 Channel 的 write()方法写数据。
//AbstractNioChannel base class for Channels that operate on bytes.
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
...
protected class NioByteUnsafe extends AbstractNioUnsafe {
...
//NioByteUnsafe的读
@Override
public final void read() {
...
doReadBytes(byteBuf);
...
}
}
}
public class NioSocketChannel extends AbstractNioByteChannel implements SocketChannel {
...
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
...
ByteBuffer[] nioBuffers = in.nioBuffers();
SocketChannel ch = javaChannel();
...
ByteBuffer nioBuffer = nioBuffers[0];
...
ch.write(nioBuffer)
...
}
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...
protected abstract class AbstractUnsafe implements Unsafe {
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
...
//NioByteUnsafe的写
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
...
outboundBuffer.addMessage(msg, size, promise);
}
//NioByteUnsafe的写
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
...
outboundBuffer.addFlush();
flush0();
}
@SuppressWarnings("deprecation")
protected void flush0() {
...
doWrite(outboundBuffer);
...
}
}
}
复制代码
二.NioMessageUnsafe 的读
NioMessageUnsafe 的读会委托到 NioServerSocketChannel 的 doReadMessages()方法进行处理。doReadMessages()方法会调用 JDK 的 accept()方法新建立一个连接,并将这个连接放到一个 List 里以方便后续进行批量处理。
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
...
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
//NioMessageUnsafe的读
@Override
public void read() {
...
doReadMessages(readBuf)
...
}
}
}
public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel {
...
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
return 0;
}
}
复制代码
(4)ChannelPipeline 中 Inbound 事件传播
当新连接已准备接入或者已经存在的连接有数据可读时,会在 NioEventLoop 的 processSelectedKey()方法中执行 unsafe.read()。
如果是新连接已准备接入,执行的是 NioMessageUnsafe 的 read()方法。如果是已经存在的连接有数据可读,执行的是 NioByteUnsafe 的 read()方法。
最后都会执行 pipeline.fireChannelRead()引发 ChannelPipeline 的读事件传播。首先会从 HeadContext 结点开始,也就是调用 HeadContext 的 channelRead()方法。然后触发调用 AbstractChannelHandlerContext 的 fireChannelRead()方法,接着通过 findContextInbound()方法找到 HeadContext 的下一个结点,然后通过 invokeChannelRead()方法继续调用该结点的 channelRead()方法,直到最后一个结点 TailContext。
public final class NioEventLoop extends SingleThreadEventLoop {
...
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
//新连接已准备接入或者已经存在的连接有数据可读
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//如果是新连接已准备接入,则调用NioMessageUnsafe的read()方法
//如果是已经存在的连接有数据可读,执行的是NioByteUnsafe的read()方法
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
}
...
}
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
...
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//创建ByteBuf分配器
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
...
do {
int localRead = doReadMessages(readBuf);
...
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//调用DefaultChannelPipeline的fireChannelRead()方法从Head结点开始传播事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
//调用DefaultChannelPipeline的fireChannelReadComplete()方法从Head结点开始传播事件
pipeline.fireChannelReadComplete();
...
}
...
}
}
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
...
protected class NioByteUnsafe extends AbstractNioUnsafe {
...
@Override
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//创建ByteBuf分配器
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
do {
//1.分配一个ByteBuf
byteBuf = allocHandle.allocate(allocator);
//2.将数据读取到分配的ByteBuf中
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
...
//3.调用DefaultChannelPipeline的fireChannelRead()方法从Head结点开始传播事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
//4.调用DefaultChannelPipeline的fireChannelReadComplete()方法从Head结点开始传播事件
pipeline.fireChannelReadComplete();
...
}
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
//ChannelPipeline的头结点
final AbstractChannelHandlerContext head;
...
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//调用AbstractChannelHandlerContext的fireChannelRead()方法
ctx.fireChannelRead(msg);
}
}
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
...
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//调用AbstractChannelHandlerContext的invokeChannelRead()方法
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
//比如调用HeadContext的channelRead()方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
//寻找下一个结点
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
}
复制代码
(5)ChannelPipeline 中的头结点和尾结点
HeadContext 是一个同时属于 Inbound 类型和 Outbound 类型的 ChannelHandler,TailContext 则只是一个属于 Inbound 类型的 ChannelHandler。
HeadContext 结点的作用就是作为头结点开始传递读写事件并调用 unsafe 进行实际的读写操作。比如 Channel 读完一次数据后,HeadContext 的 channelReadComplete()方法会被调用。然后继续执行如下的调用流程:readIfAutoRead() -> channel.read() -> pipeline.read() -> HeadContext.read() -> unsafe.beginRead() -> 再次注册读事件。所以 Channel 读完一次数据后,会继续向 Selector 注册读事件。这样只要 Channel 活跃就可以连续不断地读取数据,然后数据又会通过 ChannelPipeline 传递到 HeadContext 结点。
TailContext 结点的作用是通过让方法体为空来终止大部分事件的传播,它的 exceptionCaugh()方法和 channelRead()方法分别会发出告警日志以及释放到达该结点的对象。
public class DefaultChannelPipeline implements ChannelPipeline {
//ChannelPipeline的头结点
final AbstractChannelHandlerContext head;
//ChannelPipeline的尾结点
final AbstractChannelHandlerContext tail;
...
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//调用AbstractChannelHandlerContext的fireChannelRead()方法
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
}
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
...
}
//Called once a message hit the end of the ChannelPipeline without been handled by the user in ChannelInboundHandler#channelRead(ChannelHandlerContext, Object).
//This method is responsible to call ReferenceCountUtil#release(Object) on the given msg at some point.
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
复制代码
(6)Inbound 事件的传播总结
一般用户自定义的 ChannelInboundHandler 都继承自 ChannelInboundHandlerAdapter。如果用户代码没有覆盖 ChannelInboundHandlerAdapter 的 channelXXX()方法,那么 Inbound 事件会从 HeadContext 开始遍历 ChannelPipeline 的双向链表进行传播,并默认情况下传播到 TailContext 结点。
如果用户代码覆盖了 ChannelInboundHandlerAdapter 的 channelXXX()方法,那么事件传播就会在当前结点结束。所以如果此时这个 ChannelHandler 又忘记了手动释放业务对象 ByteBuf,则可能会造成内存泄露,而 SimpleChannelInboundHandler 则可以帮用户自动释放业务对象。
如果用户代码调用了 ChannelHandlerContext 的 fireXXX()方法来传播事件,那么该事件就从当前结点开始往下传播。
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
//Calls ChannelHandlerContext#fireChannelRegistered() to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
//Calls ChannelHandlerContext#fireChannelUnregistered() to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
//Calls ChannelHandlerContext#fireChannelActive() to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
//Calls ChannelHandlerContext#fireChannelInactive() to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
//Calls ChannelHandlerContext#fireChannelRead(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
//Calls ChannelHandlerContext#fireChannelReadComplete() to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
//Calls ChannelHandlerContext#fireUserEventTriggered(Object) to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
//Calls ChannelHandlerContext#fireChannelWritabilityChanged() to forward to the next ChannelInboundHandler in the ChannelPipeline.
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
//Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline.
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
复制代码
12.Outbound 事件的传播
(1)触发 Outbound 事件传播的入口
在消息推送系统中,可能会有如下代码,意思是根据用户 ID 获得对应的 Channel,然后向用户推送消息。
Channel channel = ChannelManager.getChannel(userId);
channel.writeAndFlush(response);
复制代码
(2)Outbound 事件传播的源码
如果通过 Channel 来传播 Outbound 事件,则是从 TailContext 开始传播的。
和 Inbound 事件一样,Netty 为了保证程序的高效执行,所有核心操作都要在 Reactor 线程中处理。如果业务线程调用了 Channel 的方法,那么 Netty 会将该操作封装成一个 Task 任务添加到任务队列中,随后在 Reactor 线程的事件循环中执行。
findContextOutbound()方法找 Outbound 结点的过程和 findContextInbound()方法找 Inbound 结点类似,需要反向遍历 ChannelPipeline 中的双向链表,一直遍历到第一个 Outbound 结点 HeadCountext。
如果用户的 ChannelHandler 覆盖了 Outbound 类型的方法,但没有把事件在方法中继续传播下去,那么会导致该事件的传播中断。
最后一个 Inbound 结点是 TailContext,最后一个 Outbound 结点是 HeadContext,而数据最终会落到 HeadContext 的 write()方法上。
下面是 channel.writeAndFlush()方法的源码:
public interface ChannelOutboundInvoker {
...
//Shortcut for call #write(Object) and #flush().
ChannelFuture writeAndFlush(Object msg);
...
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
...
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
...
}
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
//从TailContext开始传播
//但TailContext没有重写writeAndFlush()方法
//所以会调用AbstractChannelHandlerContext的writeAndFlush()方法
return tail.writeAndFlush(msg);
}
...
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
...
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) throw new NullPointerException("msg");
if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
return promise;
}
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
//反向遍历链表进行查找
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
//最终都会由Reactor线程处理Channel的数据读写
if (executor.inEventLoop()) {
if (flush) {
//调用结点的invokeWriteAndFlush()方法
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
//逐个调用ChannelHandler结点的write()方法,但前提是当前ChannelHandler可以往下传
//即write()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.write()往下传播
invokeWrite0(msg, promise);
//逐个调用ChannelHandler结点的flush()方法,但前提是当前ChannelHandler可以往下传
//即flush()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.flush()往下传播
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
//逐个调用,最终回到HeadContext的write()方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
private void invokeFlush0() {
try {
//逐个调用,最终回到HeadContext的flush()方法
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
...
}
public class DefaultChannelPipeline implements ChannelPipeline {
...
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
...
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
}
...
}
//Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext.
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
//Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
//Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
//Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
}
//Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}
//Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
//Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
//Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
//Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
复制代码
(3)总结
Outbound 事件的传播机制和 Inbound 事件的传播机制类似。但 Outbound 事件是从链表尾部开始向前传播,而 Inbound 事件是从链表头部开始向后传播。Outbound 事件传播中的写数据,最终都会落到 HeadContext 结点中的 unsafe 进行处理。
13.ChannelPipeline 中异常的传播
Inbound 事件和 Outbound 事件在传播时发生异常都会调用 notifyHandlerExecption()方法,该方法会按 Inbound 事件的传播顺序找每个结点的异常处理方法 exceptionCaught()进行处理。
我们通常在自定义的 ChannelHandler 中实现一个处理异常的方法 exceptionCaught(),统一处理 ChannelPipeline 过程中的所有异常。这个自定义 ChannelHandler 一般继承自 ChannelDuplexHandler,表示该结点既是一个 Inbound 结点,又是一个 Outbound 结点。
如果我们在自定义的 ChannelHandler 中没有处理异常,由于 ChannelHandler 通常都继承了 ChannelInboundHandlerAdapter,通过其默认实现的 exceptionCaught()方法可知异常会一直往下传递,直到最后一个结点的异常处理方法 exceptionCaught()中结束。因此如果异常处理方法 exceptionCaught()在 ChannelPipeline 中间的结点实现,则该结点后面的 ChannelHandler 抛出的异常就没法处理了。所以一般会在 ChannelHandler 链表的末尾结点实现处理异常的方法 exceptionCaught()。
需要注意的是:在任何结点中发生的异常都会向下一个结点进行传递。
public class DefaultChannelPipeline implements ChannelPipeline {
...
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
...
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
...
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
private void notifyHandlerException(Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {
logger.warn("...", cause);
}
return;
}
invokeExceptionCaught(cause);
}
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
//调用ChannelHandler的exceptionCaught()
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
if (logger.isDebugEnabled()) {
logger.debug("...",
ThrowableUtil.stackTraceToString(error), cause);
} else if (logger.isWarnEnabled()) {
logger.warn("...", error, cause);
}
}
} else {
fireExceptionCaught(cause);
}
}
@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
//调用下一个结点next的exceptionCaught()方法
invokeExceptionCaught(next, cause);
return this;
}
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
ObjectUtil.checkNotNull(cause, "cause");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeExceptionCaught(cause);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeExceptionCaught(cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
}
...
}
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
...
//Calls ChannelHandlerContext#fireExceptionCaught(Throwable) to forward to the next ChannelHandler in the ChannelPipeline.
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
复制代码
14.ChannelPipeline 总结
(1)ChannelPipeline 的初始化
ChannelPipeline 在服务端 Channel 和客户端 Channel 被创建时创建,创建 ChannelPipeline 的类是服务端 Channel 和客户端 Channel 的共同父类 AbstractChannel。
(2)ChannelPipeline 的数据结构
ChannelPipeline 中的数据结构是双向链表结构,每一个结点都是一个 ChannelHandlerContext 对象。ChannelHandlerContext 里包装了用户自定义的 ChannelHandler,即前者会保存后者的引用到其成员变量 handler 中。ChannelHandlerContext 中拥有 ChannelPipeline 和 Channel 的所有上下文信息。添加和删除 ChannelHandler 最终都是在 ChannelPipeline 的链表结构中添加和删除对应的 ChannelHandlerContext 结点。
(3)ChannelHandler 类型的判断
在旧版 Netty 中,会使用 instanceof 关键字来判断 ChannelHandler 的类型,并使用两个成员变量 inbound 和 outbound 来标识。在新版 Netty 中,会使用一个 16 位的二进制数 executionMask 来表示 ChannelHandler 具体实现的事件类型,若实现则给对应的位标 1。
(4)ChannelPipeline 的头尾结点
创建 ChannelPipeline 时会默认添加两个结点:HeadContext 结点和 TailContext 结点。HeadContext 结点的作用是作为头结点,开始传播读写事件,并且通过它的 unsafe 变量实现具体的读写操作。TailContext 结点的作用是起到终止事件传播(方法体为空)以及异常和对象未处理的告警。
(5)Channel 与 Unsafe
一个 Channel 对应一个 Unsafe,Unsafe 用于处理底层 IO 操作。NioServerSocketChannel 对应 NioMessageUnsafe,NioSocketChannel 对应 NioByteUnsafe。
(6)ChannelPipeline 的事件传播机制
ChannelPipeline 中的事件传播机制分为 3 种:Inbound 事件的传播、Outbound 事件的传播、异常事件的传播。
一.Inbound 事件的传播
如果通过 Channel 的 Pipeline 触发这类事件(默认情况下),那么触发的规则是从 head 结点开始不断寻找下一个 InboundHandler,最终落到 tail 结点。如果在当前 ChannelHandlerContext 上触发这类事件,那么事件只会从当前结点开始向下传播。
二.Outbound 事件的传播
如果通过 Channel 的 Pipeline 触发这类事件(默认情况下),那么触发的规则是从 tail 结点开始不断寻找上一个 InboundHandler,最终落到 head 结点。如果在当前 ChannelHandlerContext 上触发这类事件,那么事件只会从当前结点开始向上传播。
三.异常事件的传播
异常在 ChannelPipeline 中的双向链表传播时,无论 Inbound 结点还是 Outbound 结点,都是向下一个结点传播,直到 tail 结点为止。TailContext 结点会打印这些异常信息,最佳实践是在 ChannelPipeline 的最后实现异常处理方法 exceptionCaught()。
文章转载自:东阳马生架构
原文链接:https://www.cnblogs.com/mjunz/p/18790452
体验地址:http://www.jnpfsoft.com/?from=001YH
评论