1.读数据入口
当客户端 Channel 的 Reactor 线程 NioEventLoop 检测到有读事件时,会执行 NioByteUnsafe 的 read()方法。该方法会调用 doReadBytes()方法将 TCP 缓冲区的数据读到由 ByteBufAllocator 分配的一个 ByteBuf 对象中,然后通过 pipeline.fireChannelRead()方法带上这个 ByteBuf 对象向下传播 ChannelRead 事件。
在传播的过程中,首先会来到 pipeline 的 head 结点的 channelRead()方法。该方法会继续带着那个 ByteBuf 对象向下传播 ChannelRead 事件,比如会来到 ByteToMessageDecoder 结点的 channelRead()方法。
注意:服务端 Channel 的 unsafe 变量是一个 NioMessageUnsafe 对象,客户端 Channel 的 unsafe 变量是一个 NioByteUnsafe 对象。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.public final class NioEventLoop extends SingleThreadEventLoop { Selector selector; private SelectedSelectionKeySet selectedKeys; private boolean needsToSelectAgain; private int cancelledKeys; ... @Override protected void run() { for (;;) { ... //1.调用select()方法执行一次事件轮询 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } ... //2.处理产生IO事件的Channel needsToSelectAgain = false; processSelectedKeys(); ... //3.执行外部线程放入TaskQueue的任务 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } private void processSelectedKeys() { if (selectedKeys != null) { //selectedKeys.flip()会返回一个数组 processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } } private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { //1.首先取出IO事件 final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null;//Help GC //2.然后获取对应的Channel和处理该Channel //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { //网络事件的处理 processSelectedKey(k, (AbstractNioChannel) a); } else { //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } //3.最后判断是否应该再进行一次轮询 if (needsToSelectAgain) { for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); //selectedKeys.flip()会返回一个数组 selectedKeys = this.selectedKeys.flip(); i = -1; } } } private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ... try { int readyOps = k.readyOps(); ... //新连接已准备接入或者已经存在的连接有数据可读 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //如果是新连接已准备接入,则调用NioMessageUnsafe的read()方法 //如果是已经存在的连接有数据可读,执行的是NioByteUnsafe的read()方法 unsafe.read(); if (!ch.isOpen()) { return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } ...}
public abstract class AbstractNioByteChannel extends AbstractNioChannel { ... protected class NioByteUnsafe extends AbstractNioUnsafe { ... @Override public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); //创建ByteBuf分配器 final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config);
ByteBuf byteBuf = null; do { //1.分配一个ByteBuf byteBuf = allocHandle.allocate(allocator); //2.将数据读取到分配的ByteBuf中 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } ... //3.调用DefaultChannelPipeline的fireChannelRead()方法从Head结点开始传播事件 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading());
allocHandle.readComplete(); //4.调用DefaultChannelPipeline的fireChannelReadComplete()方法从Head结点开始传播事件 pipeline.fireChannelReadComplete(); ... } }}
复制代码
NioByteUnsafe 主要会进行如下处理:
一.通过客户端 Channel 的 ChannelConfig 获取内存分配器 ByteBufAllocator,然后用内存分配器来分配一个 ByteBuf 对象
二.将客户端 Channel 中的 TCP 缓冲区的数据读取到 ByteBuf 对象
三.读取完数据后便调用 DefaultChannelPipeline 的 fireChannelReadComplete()方法,从 HeadContext 结点开始在整个 ChannelPipeline 中传播 ChannelRead 事件
2.拆包原理
一.不用 Netty 的拆包原理
不断地从 TCP 缓冲区中读取数据,每次读完都判断是否为一个完整的数据包。如果当前读取的数据不足以拼接成一个完整的数据包,就保留数据,继续从 TCP 缓冲区中读。如果当前读取的数据加上已读取的数据足够拼成完整的数据包,就将拼好的数据包往业务传递,而多余的数据则保留。
二.Netty 的拆包原理
Netty 拆包基类内部会有一个字节容器,每次读取到数据就添加到字节容器中。然后尝试对累加的字节数据进行拆包,拆成一个完整的业务数据包,这个拆包基类叫 ByteToMessageDecoder。
3.ByteToMessageDecoder 解码步骤
(1)解码的整体介绍
一.累加字节流
Netty 会通过一个 ByteBuf 字节容器 cumulation,把所有读取到的字节流累加到该字节容器。
二.调用子类的 decode()方法进行解析
把累加字节容器里的字节流通过子类的 decode()方法进行解析。
三.将解析到的 ByteBuf 向下传播
如果调用子类的 decode()方法可以解析到一个 ByteBuf 对象,则将这个 ByteBuf 对象向下传播。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { ... @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //只基于ByteBuf对象进行解码 if (msg instanceof ByteBuf) { //1.累加字节流 //2.调用子类的decode()方法进行解析 //3.清理累加字节容器 //4.将解析到的ByteBuf向下传播 } else { ctx.fireChannelRead(msg); } } ...}
复制代码
(2)首先累加字节流
如果当前字节容器中没有数据,那么就将字节容器的指针指向新读取的数据。如果当前字节容器中有数据,那么就调用累加器的 cumulate()方法将数据累加到字节容器。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { ByteBuf cumulation;//字节容器 private Cumulator cumulator = MERGE_CUMULATOR;//默认的累加器 private boolean decodeWasNull; private boolean first; private int discardAfterReads = 16; private int numReads; ... //Cumulate ByteBufs by merge them into one ByteBuf's, using memory copies. public static final Cumulator MERGE_CUMULATOR = new Cumulator() { //累加器的累加方法,会传入一个字节容器cumulation @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer;//一个大的字节容器,用来copy传入的字节容器 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1) { buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } buffer.writeBytes(in);//将当前数据累加到copy的字节容器中 in.release(); return buffer;//返回copy的字节容器 } };
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { //1.累加字节流 ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) {//如果当前字节容器中没有数据 //就将字节容器的指针指向新读取的数据 cumulation = data; } else {//如果当前字节容器中有数据 //则调用累加器的cumulate()方法将数据累加到字节容器 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //2.将字节容器里的数据传递给业务拆包器进行拆包 //调用callDecode()方法对数据进行拆包 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { //3.清理字节容器 if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { numReads = 0; discardSomeReadBytes(); } //4.将解析到的ByteBuf向下传播 int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } } static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { ByteBuf oldCumulation = cumulation; cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); cumulation.writeBytes(oldCumulation); oldCumulation.release(); return cumulation; } ...}
复制代码
(3)然后调用子类的 decode()方法进行解析
将数据累加到字节容器之后,就会调用 callDecode()方法,这个方法会尝试将字节容器的数据拆分成业务数据包并将业务数据包放入业务数据容器 out 中。
Netty 对各种用户协议的支持就体现在 ByteToMessageDecoder 的抽象方法 decode()中,decode()方法的入参是当前读取到的未被处理的所有数据 in 和业务数据包容器 out,所有拆包器都需要实现 ByteToMessageDecoder 的 decoed()方法。
拆包器完成一次拆包后:如果没有拆到一个完整的数据包,此时若拆包器未读取任何数据则跳出循环,否则继续拆包。如果已经拆到一个完整的数据包,但此时拆包器未读取任何数据,则抛出一个异常 DecodeException。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { ... //Called once data should be decoded from the given ByteBuf. //This method will call #decode(ChannelHandlerContext, ByteBuf, List) as long as decoding should take place. //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to //@param in,the ByteBuf from which to read data //@param out,the List to which decoded messages should be added protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { int outSize = out.size(); if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); //Check if this handler was removed before continuing with decoding. //If it was removed, it is not safe to continue to operate on the buffer. if (ctx.isRemoved()) { break; } outSize = 0; }
int oldInputLength = in.readableBytes(); //调用拆包器实现的decode()方法 decode(ctx, in, out); //拆包器完成一次拆包后: //Check if this handler was removed before continuing the loop. //If it was removed, it is not safe to continue to operate on the buffer. if (ctx.isRemoved()) { break; } //outSize == out.size()表示没有拆到一个完整的数据包 if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { //此时拆包器未读取任何数据则跳出循环 break; } else { //此时拆包器已读取到数据则继续拆包 continue; } } //执行到此处表明已经拆到一个完整的数据包 if (oldInputLength == in.readableBytes()) { //此时拆包器未读取任何数据,于是抛出一个异常DecodeException throw new DecoderException(StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } } //Decode the from one ByteBuf to an other. //This method will be called till either the input ByteBuf has nothing to read //when return from this method or till nothing was read from the input ByteBuf. //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to //@param in,the ByteBuf from which to read data //@param out,the List to which decoded messages should be added //@throws Exception,is thrown if an error accour protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception; ...}
复制代码
(4)接着清理字节容器
拆包器完成拆包后,只是从字节容器中取走了数据,但这部分空间对字节容器来说还依然被保留。而字节容器每次累加字节数据时都是将字节数据追加到尾部,如果不对字节容器进行清理,那么时间一长可能就会 OOM。
正常情况下,每次读取完数据之后,ByteToMessageDecoder 解码器都会在 channelReadComplete()方法里清理字节容器。但是如果发送端发送数据过快,那么解码器的 channelReadComplete()方法可能会很久才被调用一次。
所以为了防止发送端发送数据过快,ByteToMessageDecoder 会在读取完一次数据并完成业务拆包后,清理字节容器。如果字节容器当前已无数据可读,则调用字节容器的 release()方法释放字节容器。如果字节容器当前还有数据可读,并且已经连续读取了 16 次还有未拆包的数据,那么就进行压缩处理。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { private int discardAfterReads = 16; ... @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { //1.累加字节流 ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) {//如果当前字节容器中没有数据 //就将字节容器的指针指向新读取的数据 cumulation = data; } else {//如果当前字节容器中有数据 //则调用累加器的cumulate()方法将数据累加到字节容器 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //2.将字节容器里的数据传递给业务拆包器进行拆包 //调用callDecode()方法对数据进行拆包 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { //3.清理字节容器 if (cumulation != null && !cumulation.isReadable()) { //如果字节容器当前已无数据可读,则设置numReads为0,并释放字节容器cumulation numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) {//numReads >= 16 //如果当前还有数据可读,并且已经连续读取了16次即numReads >= 16, //此时字节容器中仍有未被业务拆包器拆包的数据,那么就做一次压缩处理; numReads = 0; discardSomeReadBytes(); } //4.将解析到的ByteBuf向下传播 int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } } //Get numElements out of the CodecOutputList and forward these through the pipeline. static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) { //遍历业务数据包容器 for(int i = 0; i < numElements; i ++) { //将一个个完整的业务数据包ByteBuf传递到后续的ChannelHandler进行处理 ctx.fireChannelRead(msgs.getUnsafe(i)); } } ...}
复制代码
(5)最后将解析到的 ByteBuf 向下传播
也就是调用 fireChannelRead()方法,遍历业务数据包容器,将一个个完整的业务数据包 ByteBuf 传递到后续的 ChannelHandler 中进行处理。
4.解码器抽象的解码过程总结
解码过程是通过一个叫 ByteToMessageDecoder 的抽象解码器来实现的,ByteToMessageDecoder 实现的解码过程分为如下四步。
步骤一:累加字节流
也就是把当前读到的字节流累加到一个字节容器里。
步骤二:调用子类的 decode()方法进行解析
ByteToMessageDecoder 的 decode()方法是一个抽象方法,不同种类的解码器会有自己的 decode()方法逻辑。该 decode()方法被调用时会传入两个关键参数:一个是 ByteBuf 对象表示当前累加的字节流,一个是 List 列表用来存放被成功解码的业务数据包。
步骤三:清理字节容器
为了防止发送端发送数据过快,ByteToMessageDecoder 会在读取完一次数据并完成业务拆包后,清理字节容器。
步骤四:传播已解码的业务数据包
如果 List 列表里有解析出来的业务数据包,那么就通过 pipeline 的事件传播机制往下进行传播。
5.Netty 里常见的开箱即用的解码器
(1)基于固定长度解码器
判断当前字节容器可读字节是否小于固定长度。
//A decoder that splits the received ByteBufs by the fixed number of bytes. //For example, if you received the following four fragmented packets://+---+----+------+----+//| A | BC | DEFG | HI |//+---+----+------+----+//A FixedLengthFrameDecoder (3) will decode them into the following three packets with the fixed length://+-----+-----+-----+//| ABC | DEF | GHI |//+-----+-----+-----+public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private final int frameLength; //Creates a new instance. public FixedLengthFrameDecoder(int frameLength) { if (frameLength <= 0) { throw new IllegalArgumentException("frameLength must be a positive integer: " + frameLength); } this.frameLength = frameLength; }
@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } }
//Create a frame out of the ByteBuf and return it. //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to //@param in,the ByteBuf from which to read data //@return frame,the ByteBuf which represent the frame or null if no frame could be created. protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in.readableBytes() < frameLength) { return null; } else { return in.readRetainedSlice(frameLength); } }}
复制代码
(2)基于行分隔符解码器
基于行分隔符的拆包器可以同时处理\n 和\r\n 两种类型的行分隔符,其处理逻辑分为非丢弃模式和丢弃模式、找到行分隔符和未找到行分隔符的情况。
一.非丢弃模式时找到行分隔符
首先新建一个帧,也就是 ByteBuf frame。然后计算需要解码的数据包的长度和分隔符的长度。接着判断需要拆包的长度是否大于该拆包器允许的最大长度,如果大于,则丢弃这段数据,返回 null。然后将一个完整的数据包取出,如果 stripDelimiter 在构造方法中被设置为 false,则数据包含分隔符。
二.非丢弃模式时未找到行分隔符
首先取得当前字节容器的可读字节数,然后判断是否超出允许的最大长度。如果没超过最大长度,则直接返回 null,字节容器的数据没有改变。如果已超过最大长度,则进入丢弃模式,设置 discarding 为 true。
三.丢弃模式下找到行分隔符
这种情况下需要将分隔符之前的数据都丢弃。在计算出分隔符的长度之后,会通过移动字节容器的 readerIndex 指针把分隔符之前的数据全部丢弃,当然丢弃的数据也包括分隔符。经过这么一次丢弃后,后面就有可能是正常的数据包。于是设置 discarding 为 false 进入非丢弃模式,这样下次解码数据包时就会进入正常的解码流程。
四.丢弃模式下未找到行分隔符
由于当前还处于丢弃模式,没有找到行分隔符意味着当前一个完整的数据包还没丢弃完,所以当前数据继续丢弃,移动字节容器的 readerIndex 指针。
//A decoder that splits the received {@link ByteBuf}s on line endings.//Both "\n" and "\r\n" are handled.//For a more general delimiter-based decoder, see DelimiterBasedFrameDecoder.public class LineBasedFrameDecoder extends ByteToMessageDecoder { //Maximum length of a frame we're willing to decode. private final int maxLength; //Whether or not to throw an exception as soon as we exceed maxLength. private final boolean failFast; private final boolean stripDelimiter; //True if we're discarding input because we're already over maxLength. private boolean discarding; private int discardedBytes;
public LineBasedFrameDecoder(final int maxLength) { this(maxLength, true, false); }
public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) { this.maxLength = maxLength; this.failFast = failFast; this.stripDelimiter = stripDelimiter; }
@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } }
//Create a frame out of the ByteBuf and return it. //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to //@param buffer,the ByteBuf from which to read data //@return frame,the ByteBuf which represent the frame or null if no frame could be created. protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { final int eol = findEndOfLine(buffer); if (!discarding) {//非丢弃模式 if (eol >= 0) {//找到行分隔符 //新建一个帧,也就是ByteBuf frame final ByteBuf frame; //计算需要解码的数据包的长度 final int length = eol - buffer.readerIndex(); //计算分隔符的长度 final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1; //判断需要拆包的长度是否大于该拆包器允许的最大长度 if (length > maxLength) { //如果大于,则丢弃这段数据,返回null buffer.readerIndex(eol + delimLength); fail(ctx, length); return null; } //将一个完整的数据包取出 if (stripDelimiter) { frame = buffer.readRetainedSlice(length); buffer.skipBytes(delimLength); } else { //如果stripDelimiter在构造方法中被设置为false,则数据包含分隔符 frame = buffer.readRetainedSlice(length + delimLength); } return frame; } else {//未找到行分隔符 //首先取得当前字节容器的可读字节数 final int length = buffer.readableBytes(); //然后判断是否超出允许的最大长度 if (length > maxLength) { //如果已超过最大长度,则进入丢弃模式,设置discarding为true discardedBytes = length; buffer.readerIndex(buffer.writerIndex()); discarding = true; if (failFast) { fail(ctx, "over " + discardedBytes); } } //如果没超过最大长度,则直接返回null,字节容器的数据没有改变 return null; } } else {//丢弃模式 if (eol >= 0) {//找到行分隔符 final int length = discardedBytes + eol - buffer.readerIndex(); //计算出分隔符的长度 final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1; //把分隔符之前的数据全部丢弃,移动字节容器的readerIndex指针 buffer.readerIndex(eol + delimLength); discardedBytes = 0; //经过这么一次丢弃后,后面就有可能是正常的数据包 //于是设置discarding为false,这样下次解码数据包时就会进入正常的解码流程 discarding = false; if (!failFast) { fail(ctx, length); } } else {//未找到行分隔符 //当前还处于丢弃模式,没有找到行分隔符意味着当前一个完整的数据包还没丢弃完 //所以当前数据继续丢弃,移动字节容器的readerIndex指针 discardedBytes += buffer.readableBytes(); buffer.readerIndex(buffer.writerIndex()); } return null; } }
private void fail(final ChannelHandlerContext ctx, int length) { fail(ctx, String.valueOf(length)); }
private void fail(final ChannelHandlerContext ctx, String length) { ctx.fireExceptionCaught(new TooLongFrameException("frame length (" + length + ") exceeds the allowed maximum (" + maxLength + ')')); }
//Returns the index in the buffer of the end of line found. //Returns -1 if no end of line was found in the buffer. private static int findEndOfLine(final ByteBuf buffer) { int i = buffer.forEachByte(ByteProcessor.FIND_LF); if (i > 0 && buffer.getByte(i - 1) == '\r') { i--; } return i; }}
复制代码
文章转载自:东阳马生架构
原文链接:https://www.cnblogs.com/mjunz/p/18796941
体验地址:http://www.jnpfsoft.com/?from=001YH
评论