Netty 源码—编解码原理(二)
- 2025-03-28 福建
本文字数:25562 字
阅读完需:约 84 分钟
5.Netty 里常见的开箱即用的解码器
(3)基于分隔符解码器
可以向基于分隔符解码器 DelimiterBasedFrameDecoder 传递一个分隔符列表,这样该解码器就会按照分隔符列表对数据包进行拆分。基于分隔符解码器的 decode()方法和基于行分隔符解码器的 decode()方法基本类似。
//A decoder that splits the received ByteBufs by one or more delimiters.
public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
private final ByteBuf[] delimiters;
private final int maxFrameLength;
private final boolean stripDelimiter;
private final boolean failFast;
private boolean discardingTooLongFrame;
private int tooLongFrameLength;
private final LineBasedFrameDecoder lineBasedDecoder;
...
//Creates a new instance.
//@param maxFrameLength,the maximum length of the decoded frame.
//A TooLongFrameException is thrown if the length of the frame exceeds this value.
//@param stripDelimiter,whether the decoded frame should strip out the delimiter or not
//@param failFast,If true, a TooLongFrameException is thrown as soon as the decoder
//notices the length of the frame will exceed maxFrameLength regardless of
//whether the entire frame has been read.
//If false, a TooLongFrameException is thrown after the entire frame that exceeds maxFrameLength has been read.
//@param delimiters the delimiters
public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
validateMaxFrameLength(maxFrameLength);
if (delimiters == null) {
throw new NullPointerException("delimiters");
}
if (delimiters.length == 0) {
throw new IllegalArgumentException("empty delimiters");
}
if (isLineBased(delimiters) && !isSubclass()) {
lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
this.delimiters = null;
} else {
this.delimiters = new ByteBuf[delimiters.length];
for (int i = 0; i < delimiters.length; i ++) {
ByteBuf d = delimiters[i];
validateDelimiter(d);
this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
}
lineBasedDecoder = null;
}
this.maxFrameLength = maxFrameLength;
this.stripDelimiter = stripDelimiter;
this.failFast = failFast;
}
//Returns true if the delimiters are "\n" and "\r\n".
private static boolean isLineBased(final ByteBuf[] delimiters) {
if (delimiters.length != 2) {
return false;
}
ByteBuf a = delimiters[0];
ByteBuf b = delimiters[1];
if (a.capacity() < b.capacity()) {
a = delimiters[1];
b = delimiters[0];
}
return a.capacity() == 2 && b.capacity() == 1
&& a.getByte(0) == '\r' && a.getByte(1) == '\n'
&& b.getByte(0) == '\n';
}
//Return true if the current instance is a subclass of DelimiterBasedFrameDecoder
private boolean isSubclass() {
return getClass() != DelimiterBasedFrameDecoder.class;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
//Create a frame out of the {@link ByteBuf} and return it.
//@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to
//@param buffer,the ByteBuf from which to read data
//@return frame,the ByteBuf which represent the frame or null if no frame could be created.
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);
}
//Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE;
ByteBuf minDelim = null;
for (ByteBuf delim: delimiters) {
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
if (minDelim != null) {
int minDelimLength = minDelim.capacity();
ByteBuf frame;
if (discardingTooLongFrame) {
//We've just finished discarding a very large frame.
//Go back to the initial state.
discardingTooLongFrame = false;
buffer.skipBytes(minFrameLength + minDelimLength);
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
if (minFrameLength > maxFrameLength) {
//Discard read frame.
buffer.skipBytes(minFrameLength + minDelimLength);
fail(minFrameLength);
return null;
}
if (stripDelimiter) {
frame = buffer.readRetainedSlice(minFrameLength);
buffer.skipBytes(minDelimLength);
} else {
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;
} else {
if (!discardingTooLongFrame) {
if (buffer.readableBytes() > maxFrameLength) {
//Discard the content of the buffer until a delimiter is found.
tooLongFrameLength = buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
discardingTooLongFrame = true;
if (failFast) {
fail(tooLongFrameLength);
}
}
} else {
//Still discarding the buffer since a delimiter is not found.
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
private void fail(long frameLength) {
if (frameLength > 0) {
throw new TooLongFrameException("frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded");
} else {
throw new TooLongFrameException("frame length exceeds " + maxFrameLength + " - discarding");
}
}
//Returns the number of bytes between the readerIndex of the haystack and the first needle found in the haystack.
//-1 is returned if no needle is found in the haystack.
private static int indexOf(ByteBuf haystack, ByteBuf needle) {
for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) {
int haystackIndex = i;
int needleIndex;
for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) {
if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
break;
} else {
haystackIndex ++;
if (haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) {
return -1;
}
}
}
if (needleIndex == needle.capacity()) {
//Found the needle from the haystack!
return i - haystack.readerIndex();
}
}
return -1;
}
private static void validateDelimiter(ByteBuf delimiter) {
if (delimiter == null) {
throw new NullPointerException("delimiter");
}
if (!delimiter.isReadable()) {
throw new IllegalArgumentException("empty delimiter");
}
}
private static void validateMaxFrameLength(int maxFrameLength) {
if (maxFrameLength <= 0) {
throw new IllegalArgumentException("maxFrameLength must be a positive integer: " + maxFrameLength);
}
}
...
}
(4)基于长度域解码器
主要的逻辑步骤如下:
一.丢弃模式的处理
二.获取待拆数据包的大小
三.对数据包进行长度校验
四.跳过指定字节长度
五.抽取数据包
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
private final ByteOrder byteOrder;//表示字节流表示的数据是大端还是小端,用于长度域的读取
private final int maxFrameLength;//表示数据包的最大长度
private final int lengthFieldOffset;//表示长度域的偏移量
private final int lengthFieldLength;//表示长度域的长度
private final int lengthFieldEndOffset;//表示紧跟长度域字段后面的第一字节在整个数据包中的偏移量
private final int lengthAdjustment;//表示数据包体长度调整大小,长度域只表示数据包体的长度
private final int initialBytesToStrip;//表示拿到完整的数据包之后,向业务解码器传递之前,应该跳过多少字节
private final boolean failFast;//默认为true,否则可能会OOM
private boolean discardingTooLongFrame;
private long tooLongFrameLength;
private long bytesToDiscard;
...
//Creates a new instance.
//@param byteOrder,the ByteOrder of the length field
//@param maxFrameLength,the maximum length of the frame.
//If the length of the frame is greater than this value, TooLongFrameException will be thrown.
//@param lengthFieldOffset,the offset of the length field
//@param lengthFieldLength,the length of the length field
//@param lengthAdjustment,the compensation value to add to the value of the length field
//@param initialBytesToStrip,the number of first bytes to strip out from the decoded frame
//@param failFast,If true, a TooLongFrameException is thrown as soon as the decoder notices the length of the frame
//will exceed maxFrameLength regardless of whether the entire frame has been read.
//If false, a TooLongFrameException is thrown after the entire frame that exceeds maxFrameLength has been read.
public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
...
this.byteOrder = byteOrder;
this.maxFrameLength = maxFrameLength;
this.lengthFieldOffset = lengthFieldOffset;
this.lengthFieldLength = lengthFieldLength;
this.lengthAdjustment = lengthAdjustment;
lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
this.initialBytesToStrip = initialBytesToStrip;
this.failFast = failFast;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
//Create a frame out of the {@link ByteBuf} and return it.
//@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to
//@param in,the ByteBuf from which to read data
//@return frame,the ByteBuf which represent the frame or null if no frame could be created.
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//步骤一开始:丢弃模式的处理
if (discardingTooLongFrame) {
//如果当前处于丢弃模式,则先计算需要丢弃多少字节,取当前还需可丢弃字节和可读字节的最小值
long bytesToDiscard = this.bytesToDiscard;
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
in.skipBytes(localBytesToDiscard);//进行丢弃
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
failIfNecessary(false);
}
//步骤一结束
//步骤二开始:获取待拆数据包的大小
//如果当前可读字节还没达到长度域的偏移,说明肯定是读不到长度域的,则直接不读
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
//计算长度域的实际字节偏移
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
//拿到实际的未调整过的数据包长度
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
//如果拿到的长度为负数,则直接跳过长度域并抛出异常
if (frameLength < 0) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);
}
//调整数据包的长度,后面统一做拆分
frameLength += lengthAdjustment + lengthFieldEndOffset;
//步骤二结束
//步骤三开始:对数据包进行长度校验
//整个数据包的长度还没有长度域长,则直接抛出异常
if (frameLength < lengthFieldEndOffset) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + lengthFieldEndOffset);
}
//数据包长度超出最大数据包长度,进入丢弃模式
if (frameLength > maxFrameLength) {
long discard = frameLength - in.readableBytes();
tooLongFrameLength = frameLength;
if (discard < 0) {
//当前可读字节已达到frameLength,直接跳过frameLength字节
//丢弃之后,后面又可能就是一个合法的数据包了
in.skipBytes((int) frameLength);
} else {
//当前可读字节未达到frameLength,说明后面未读到的字节也需要丢弃,进入丢弃模式,先把当前累积的字节全部丢弃
discardingTooLongFrame = true;
//bytesToDiscard表示还需要丢弃多少字节
bytesToDiscard = discard;
in.skipBytes(in.readableBytes());
}
//调用failIfNecessary判断是否需要抛出异常
failIfNecessary(true);
return null;
}
//步骤三结束
//步骤四开始:跳过指定字节长度
//never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
if (in.readableBytes() < frameLengthInt) {
//如果可读字节还是小于数据包的长度,则返回,下次继续读取
return null;
}
if (initialBytesToStrip > frameLengthInt) {
//如果跳过的字节大于数据包的长度,则抛异常
in.skipBytes(frameLengthInt);
throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + initialBytesToStrip);
}
in.skipBytes(initialBytesToStrip);
//步骤四结束
//步骤五开始:抽取数据包
//拿到当前累积数据的读指针
int readerIndex = in.readerIndex();
//拿到待抽取数据包的实际长度进行抽取
int actualFrameLength = frameLengthInt - initialBytesToStrip;
//进行抽取数据
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
//移动读指针
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
return buffer.retainedSlice(index, length);
}
//拿到实际的未调整过的数据包长度
//如果长度域代表的值表达的含义不是正常的int、short等类型,则可以重写这个方法
//比如有的长度域虽然是4字节,比如0x1234,但是它的含义是十进制的,即长度就是十进制的1234
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
buf = buf.order(order);
long frameLength;
switch (length) {
case 1:
frameLength = buf.getUnsignedByte(offset);
break;
case 2:
frameLength = buf.getUnsignedShort(offset);
break;
case 3:
frameLength = buf.getUnsignedMedium(offset);
break;
case 4:
frameLength = buf.getUnsignedInt(offset);
break;
case 8:
frameLength = buf.getLong(offset);
break;
default:
throw new DecoderException("unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
}
return frameLength;
}
private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
//不需要再丢弃后面的未读字节,就开始重置丢弃状态
if (bytesToDiscard == 0) {
//Reset to the initial state and tell the handlers that the frame was too large.
long tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
discardingTooLongFrame = false;
//如果没有设置快速失败,或者设置了快速失败并且是第一次检测到大包错误,则抛出异常,让Handler处理
if (!failFast || failFast && firstDetectionOfTooLongFrame) {
fail(tooLongFrameLength);
}
} else {
//如果设置了快速失败,并且是第一次检测到打包错误,则抛出异常,让Handler处理
if (failFast && firstDetectionOfTooLongFrame) {
fail(tooLongFrameLength);
}
}
}
protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
return buffer.retainedSlice(index, length);
}
private void fail(long frameLength) {
if (frameLength > 0) {
throw new TooLongFrameException("Adjusted frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded");
} else {
throw new TooLongFrameException("Adjusted frame length exceeds " + maxFrameLength + " - discarding");
}
}
...
}
6.writeAndFlush()方法的大体步骤
(1)writeAndFlush()方法的调用入口
入口通常是:ctx.channel().writeAndFlush()。
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//网络连接tcp三次握手后,就会建立和封装一个Channel(网络连接的通信管道)
//此时这个Channel就可以实现一个激活
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel Active......");
ctx.channel().writeAndFlush("test5");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Channel Read: " + (String)msg);
String response = "Hello World......";
ByteBuf responseByteBuf = Unpooled.buffer();
responseByteBuf.writeBytes(response.getBytes());
ctx.channel().writeAndFlush(responseByteBuf);
System.out.println("Channel Write: " + response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel Read Complete......");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
(2)writeAndFlush()方法的执行流程
首先从 tail 结点开始往前传播。然后逐个调用 ChannelHandler 的 write()方法,直到某个 ChannelHandler 不再往前传播 write 事件。接着逐个调用 ChannelHandler 的 flush()方法,直到某个 ChannelHandler 不再往前传播 flush 事件。
一般而言,只要每个 ChannelHandler 都往下传播 write 事件和 flush 事件,那么最后都会传播到 HeadContext 结点的 write()方法和 flush()方法,然后分别执行 unsafe.write()和 unsafe.flush()将数据通过底层的 unsafe 写到 JDK 底层的 Channel。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final DefaultChannelPipeline pipeline;
...
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
...
}
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
//从TailContext开始传播
//但TailContext没有重写writeAndFlush()方法
//所以会调用AbstractChannelHandlerContext的writeAndFlush()方法
return tail.writeAndFlush(msg);
}
...
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
...
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) throw new NullPointerException("msg");
if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
return promise;
}
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
//反向遍历链表进行查找
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
//最终都会由Reactor线程处理Channel的数据读写
if (executor.inEventLoop()) {
if (flush) {
//调用结点的invokeWriteAndFlush()方法
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
//逐个调用ChannelHandler结点的write()方法,但前提是当前ChannelHandler可以往下传
//即write()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.write()往下传播
invokeWrite0(msg, promise);
//逐个调用ChannelHandler结点的flush()方法,但前提是当前ChannelHandler可以往下传
//即flush()方法在最后也像ChannelOutboundHandlerAdapter那样,调用了ctx.flush()往下传播
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
//逐个调用,最终回到HeadContext的write()方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
private void invokeFlush0() {
try {
//逐个调用,最终回到HeadContext的flush()方法
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
...
}
public class DefaultChannelPipeline implements ChannelPipeline {
...
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
...
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
}
...
}
//Skeleton implementation of a ChannelOutboundHandler. This implementation just forwards each method call via the ChannelHandlerContext.
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
//Calls ChannelHandlerContext#bind(SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
//Calls ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
//Calls ChannelHandlerContext#disconnect(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
}
//Calls ChannelHandlerContext#close(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}
//Calls ChannelHandlerContext#deregister(ChannelPromise) to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
//Calls ChannelHandlerContext#read() to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
//Calls ChannelHandlerContext#write(Object, ChannelPromise)} to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
//Calls ChannelHandlerContext#flush() to forward to the next ChannelOutboundHandler in the ChannelPipeline.
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
7.MessageToByteEncoder 的编码步骤
(1)编码的具体步骤
步骤一:判断对象
判断当前 ChannelHandler 结点能否处理写入传入的 Java 对象。如果能处理,则往下执行,否则直接传递给下一个 ChannelHandler 结点进行处理。
步骤二:分配内存
给新创建的 ByteBuf 对象分配一块内存空间,这块内存空间将会存放由 Java 对象转换来的字节数据。
步骤三:调用 encode
子类会实现 MessageToByteEncoder 的抽象方法 encode()来定义自己的编码协议,子类的 encode()方法会将 Java 对象转换来的字节数据写入 ByteBuf。
步骤四:释放对象
由于传入的 Java 对象已经转换成 ByteBuf 字节流了,所以传入的 Java 对象已不再使用可进行释放。
步骤五:传播数据
当子类的 encode()方法将数据写入了 ByteBuf 对象以及释放完对象之后,则会往前一个 ChannelHandler 结点传播该 ByteBuf 对象,否则往前一个 ChannelHandler 结点传播空对象。
步骤六:释放内存
如果出现异常或者 ByteBuf 没有写入数据或者 ByteBuf 在 pipeline 中已处理完,则释放分配给 ByteBuf 对象的内存。
//ChannelOutboundHandlerAdapter which encodes message in a stream-like fashion from one message to an ByteBuf.
//Example implementation which encodes Integers to a ByteBuf.
//public class IntegerEncoder extends MessageToByteEncoder<Integer> {
// @code @Override
// public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
// out.writeInt(msg);
// }
//}
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean preferDirect;
protected MessageToByteEncoder() {
this(true);
}
protected MessageToByteEncoder(Class<? extends I> outboundMessageType) {
this(outboundMessageType, true);
}
//Create a new instance which will try to detect the types to match out of the type parameter of the class.
//@param preferDirect,true if a direct ByteBuf should be tried to be used as target for the encoded messages.
//If false is used it will allocate a heap ByteBuf, which is backed by an byte array.
protected MessageToByteEncoder(boolean preferDirect) {
matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
this.preferDirect = preferDirect;
}
//Create a new instance
//@param outboundMessageType,The tpye of messages to match
//@param preferDirect,true if a direct ByteBuf should be tried to be used as target for the encoded messages.
//If false is used it will allocate a heap ByteBuf, which is backed by an byte array.
protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) {
matcher = TypeParameterMatcher.get(outboundMessageType);
this.preferDirect = preferDirect;
}
//Returns true if the given message should be handled.
//If false it will be passed to the next ChannelOutboundHandler in the ChannelPipeline.
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
//步骤一:判断当前ChannelHandler能否处理写入的消息
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;//强制转换
//步骤二:给ByteBuf对象分配内存
buf = allocateBuffer(ctx, cast, preferDirect);
try {
//步骤三:调用子类实现的encode()方法
encode(ctx, cast, buf);
} finally {
//步骤四:释放对象
//既然自定义的Java对象msg已经转换为ByteBuf对象了,那么该对象已经没有用,需要释放掉了
//注意:当传入的msg的类型是ByteBuf类型时,则不需要释放
ReferenceCountUtil.release(cast);
}
//步骤五:如果buf中写入了数据,就把buf传到下一个ChannelHandler结点
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
//步骤六:如果buf中没有写入数据,则释放buf,并将一个空数据传到下一个ChannelHandler结点
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();//当buf在pipeline中处理完了,需要进行释放
}
}
}
//Allocate a ByteBuf which will be used as argument of #encode(ChannelHandlerContext, I, ByteBuf).
//Sub-classes may override this method to returna ByteBuf with a perfect matching initialCapacity.
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception {
if (preferDirect) {
return ctx.alloc().ioBuffer();
} else {
return ctx.alloc().heapBuffer();
}
}
//Encode a message into a ByteBuf.
//This method will be called for each written message that can be handled by this encoder.
//@param ctx,the ChannelHandlerContext which this MessageToByteEncoder belongs to
//@param msg,the message to encode
//@param out,the ByteBuf into which the encoded message will be written
//@throws Exception,is thrown if an error accour
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
}
(2)编码步骤的总结
在 MessageToByteEncoder 的编码过程中,首先会判断当前 ChannelHandler 能否处理传入的 Java 对象,如果能处理就对新创建的 ByteBuf 对象分配一块内存空间。然后由子类的 encode()方法实现具体的编码协议,并且把编码后的数据存放到分配给 ByteBuf 对象的内存空间中。最后把 ByteBuf 对象往前一个 ChannelHandler 结点进行传播。
如果在编码的过程中出现异常,那么就把已申请出来的、分配给 ByteBuf 对象的内存空间进行释放。
如果传入的 Java 对象就是一个 ByteBuf 对象,那么 Netty 在自定义编码结束后,会自动帮忙释放该对象,不需要在子类中对该对象进行释放。
(3)子类实现编码的例子
下面的 Encoder 便实现了将自定义的 Response 对象转换为字节流并写到 Socket 底层的效果。
public class Encoder extends MessageToByteEncoder<Response> {
protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception {
out.writeByte(response.getVersion());
out.writeInt(4+ response.getData().length);
out.writeBytes(response.getData());
}
}
8.unsafe.write()将数据添加到写缓冲区
(1)unsafe.write()的入口
不管是 ctx.channel().write()还是 ctx.write(),最终都会来到 pipeline 中的 head 结点。
public class DefaultChannelPipeline implements ChannelPipeline {
...
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
...
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
}
...
}
(2)unsafe.write()的主要逻辑
unsafe.write()方法将数据添加到写缓冲区(写队列)的主要逻辑如下。
一.Direct 化 ByteBuf 对象
如果传进来的 ByteBuf 对象不是堆外内存,那么就直接转换成堆外内存,并且估算出其大小。
二.添加到写缓冲区
转换成堆外内存的 ByteBuf 对象首先会被封装成一个 Entry 对象,然后再将该 Entry 对象添加到写缓冲区,其中会通过几个指针来标识写缓冲区的状态。
三.设置写状态
如果内存不足,那么是不可以一直往写缓冲区里添加 ByteBuf 对象的。如果写缓冲区已经大于默认的 64KB 的大小,则会通过自旋 + CAS 设置当前 Channel 为不可写状态。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final DefaultChannelPipeline pipeline;
...
protected abstract class AbstractUnsafe implements Unsafe {
//写缓冲区(写队列)
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
...
@Override
public final void write(Object msg, ChannelPromise promise) {
//确保该方法的调用是在Reactor线程中
assertEventLoop();
//写缓冲区
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
...
int size;
try {
//转换成堆外内存
msg = filterOutboundMessage(msg);
//估算出需要写入的ByteBuf的size
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
//将转换成堆外内存的msg添加到写缓冲区outboundBuffer
outboundBuffer.addMessage(msg, size, promise);
}
...
}
...
}
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
...
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
...
}
(3)写缓冲区(写队列)的数据结构
ChannelOutboundBuffer 里的数据结构是一个单向链表,单向链表的每个结点都是一个 Entry 对象。在一个 Entry 对象中会包含着待写出的 ByteBuf 对象及消息回调 promise。flushedEntry 指针表示第一个被写入 Socket 缓冲区的结点,unflushedEntry 指针表示第一个未被写入 Socket 缓冲区的结点,tailEntry 指针表示 ChannelOutboundBuffer 缓冲区的最后一个结点。
初次调用 ChannelOutboundBuffer 的 addMessage()方法后,flushedEntry 指针指向 NULL,unflushedEntry 指针和 tailEntry 指针都指向新添加的结点。调用多次 ChannelOutboundBuffer 的 addMessage()方法后,如果 flushedEntry 指针一直指向 NULL,则表示现在还没有结点的 ByteBuf 对象写出到 Socket 缓冲区。如果 unflushedEntry 指针之后有 n 个结点,则表示当前还有 n 个结点的 ByteBuf 对象还没写出到 Socket 缓冲区。
public final class ChannelOutboundBuffer {
private final Channel channel;
//Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
//The Entry that is the first in the linked-list structure that was flushed
private Entry flushedEntry;
//The Entry which is the first unflushed in the linked-list structure
private Entry unflushedEntry;
//The Entry which represents the tail of the buffer
private Entry tailEntry;
...
ChannelOutboundBuffer(AbstractChannel channel) {
this.channel = channel;
}
//Add given message to this ChannelOutboundBuffer.
//The given {@link ChannelPromise} will be notified once the message was written.
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
//increment pending bytes after adding message to the unflushed arrays.
incrementPendingOutboundBytes(size, false);
}
static final class Entry {
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@Override
protected Entry newObject(Handle handle) {
return new Entry(handle);
}
};
private final Handle<Entry> handle;
Entry next;
Object msg;
ByteBuffer[] bufs;
ByteBuffer buf;
ChannelPromise promise;
long progress;
long total;
int pendingSize;
int count = -1;
boolean cancelled;
private Entry(Handle<Entry> handle) {
this.handle = handle;
}
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
Entry entry = RECYCLER.get();
entry.msg = msg;
entry.pendingSize = size;
entry.total = total;
entry.promise = promise;
return entry;
}
...
}
}
9.unsafe.flush()刷新写缓冲区的数据
(1)unsafe.flush()的入口
不管是 ctx.channel().flush()还是 ctx.flush(),最终都会来到 pipeline 中的 head 结点。
public class DefaultChannelPipeline implements ChannelPipeline {
...
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
...
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
}
...
}
(2)unsafe.flush()的主要逻辑
步骤一:设置 flushedEntry 指针指向 unflushedEntry 指针所指向的 Entry 结点,并统计需要刷新的 Entry 结点的数量。
步骤二:遍历写缓冲区的 Entry 结点把对应的 ByteBuf 对象写到 Socket,然后移除 Entry 结点。如果写缓冲区大小已经小于 32KB,则通过自旋 + CAS 设置 Channel 为可写状态。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final DefaultChannelPipeline pipeline;
...
protected abstract class AbstractUnsafe implements Unsafe {
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
...
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
//步骤一
outboundBuffer.addFlush();
//步骤二
flush0();
}
protected void flush0() {
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
...
doWrite(outboundBuffer);
...
}
}
//Flush the content of the given buffer to the remote peer.
protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
}
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
...
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
//默认自旋16次,以提高内存使用率和写的吞吐量
int writeSpinCount = config().getWriteSpinCount();
do {
Object msg = in.current();
if (msg == null) {
//重新注册,不关注OP_WRITE事件
clearOpWrite();
return;
}
writeSpinCount -= doWriteInternal(in, msg);
} while(writeSpinCount > 0);
incompleteWrite(setOpWrite);
}
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) {
...
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
//从写缓冲区(写队列)中移除结点
in.remove();
return 0;
}
//把ByteBuf对象写到Socket里
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
//从写缓冲区(写队列)中移除结点
in.remove();
}
return 1;
}
...
}
protected final void clearOpWrite() {
final SelectionKey key = selectionKey();
//Check first if the key is still valid as it may be canceled as part of the deregistration from the EventLoop.
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
}
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
...
}
public final class ChannelOutboundBuffer {
private final Channel channel;
//Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
//The Entry that is the first in the linked-list structure that was flushed
private Entry flushedEntry;
//The Entry which is the first unflushed in the linked-list structure
private Entry unflushedEntry;
//The Entry which represents the tail of the buffer
private Entry tailEntry;
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER;
@SuppressWarnings("UnusedDeclaration")
private volatile int unwritable;
//The number of flushed entries that are not written yet
private int flushed;
...
//设置flushedEntry指针指向unflushedEntry指针所指向的Entry结点,
//并统计需要刷新的Entry结点的数量
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
flushed ++;//所要flush的结点数
entry = entry.next;
} while (entry != null);
unflushedEntry = null;
}
}
public boolean remove() {
//获取当前正在被flush的结点
Entry e = flushedEntry;
Object msg = e.msg;
//获取该结点的回调对象
ChannelPromise promise = e.promise;
int size = e.pendingSize;
//从写缓冲队列中移除结点
removeEntry(e);
if (!e.cancelled) {
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
//如果写缓冲区大小小于32KB,就通过自旋+CAS设置Channel状态为可写
decrementPendingOutboundBytes(size, false, true);
}
//回收实体
e.recycle();
return true;
}
private void removeEntry(Entry e) {
if (-- flushed == 0) {
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
flushedEntry = e.next;
}
}
//Return the current message to write or null if nothing was flushed before and so is ready to be written.
public Object current() {
Entry entry = flushedEntry;
if (entry == null) {
return null;
}
return entry.msg;
}
//Notify the ChannelPromise of the current message about writing progress.
public void progress(long amount) {
Entry e = flushedEntry;
assert e != null;
ChannelPromise p = e.promise;
if (p instanceof ChannelProgressivePromise) {
long progress = e.progress + amount;
e.progress = progress;
((ChannelProgressivePromise) p).tryProgress(progress, e.total);
}
}
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
...
}
10.如何把对象变成字节流写到 unsafe 底层
当调用 ctx.channel().writeAndFlush(user)将自定义的 User 对象沿着整个 Pipeline 进行传播时:
首先会调用 tail 结点的 write()方法开始往前传播,传播到一个继承自 MessageToByteEncoder 的结点。该结点会实现 MessageToByteEncoder 的 encode()方法来把自定义的 User 对象转换成一个 ByteBuf 对象。转换的过程首先会由 MessageToByteEncoder 分配一个 ByteBuf 对象,然后再调用其子类实现的抽象方法 encode()将 User 对象填充到 ByteBuf 对象中。填充完之后继续调用 write()方法把该 ByteBuf 对象往前进行传播,默认下最终会传播到 head 结点。
其中 head 结点的 write()方法会通过底层的 unsafe 进行如下处理:把当前的 ByteBuf 对象添加到 unsafe 维护的一个写缓冲区里,同时计算写缓冲区大小是否超过 64KB。如果写缓冲区大小超过了 64KB,则设置当前 Channel 不可写。完成 write()方法的传播后,head 结点的 unsafe 对象维护的写缓冲区便对应着一个 ByteBuf 队列,它是一个单向链表。
然后会调用 tail 结点的 flush()方法开始往前传播,默认下最终会传播到 head 结点。head 结点在接收到 flush 事件时会通过底层的 unsafe 进行如下处理:首先进行指针调整,然后通过循环遍历从写缓冲区里把 ByteBuf 对象取出来。每拿出一个 ByteBuf 对象都会把它转化为 JDK 底层可以接受的 ByteBuffer 对象,最终通过 JDK 的 Channel 把该 ByteBuffer 对象写出去。每写完一个 ByteBuffer 对象都会把写缓冲区里的当前 ByteBuf 所在的 Entry 结点进行删除,并且判断如果当前写缓冲区里的大小已经小于 32KB 就通过自旋 + CAS 重新设置 Channel 为可写。
文章转载自:东阳马生架构

不在线第一只蜗牛
还未添加个人签名 2023-06-19 加入
还未添加个人简介
评论