问题的起因来自线上一次 Tcp Proxy 代理逻辑处理的错误处理导致慢速的内存泄漏,现象是 Netty 服务所在的进程 RSS 缓慢增长到高点后维持在高点。根据现有的应用转发数据统计确实每天的上下行消息交互次数很高,当时一个错误的想法是 Netty 使用堆外内存池的方式会导致 RSS 升高。错误的判断会导致错误的处理结果所以还要找到导致 RSS 升高的真正原因。
1、增加 jvm 参数
-XX:NativeMemoryTracking=detail
-Dio.netty.leakDetectionLevel=advanced
(1)参数 NativeMemoryTracking 是用来追踪内存使用基于内存报告打点的方式查看前后内存增长的值
jcmd <pid> VM.native_memory baseline
jcmd <pid> VM.native_memory
对于jvm 内存跟踪的报告详细解释网上有很多这里不再进行重复说明,通过对两次时间点的分析发现Internal区使用内存很大可以判断是由于堆外内存分配导致的,目前只能初略判断是由于堆外内存增长导致的不能确定具体原因。
复制代码
(2)io.netty.leakDetectionLevel 用来打印打印 Netty 堆外内存泄漏的报告。
通过开启Netty内存泄漏报告来分析内存泄漏点即使用allocate分配的内存在哪里没有释放会有详细的堆栈信息打印。
复制代码
通常通过上述两种方式就能判断 Netty 内存泄漏的点,但是有点时候我们还需要判断一下堆外内存具体存储的内容来再次分析一下原因。
2、使用 pmap 分析堆外内存泄漏
pmap分析内存泄漏的方法网上有很多文章介绍了详细的使用教程,这里只说一下分析思路和试用场景,通过基于pmap的分析是基于内存段找到RSS最大的内存段后再使用gdb dump导出最大内存段来分析内存存储内容,这种办法我们对于常规的分析还是有所帮助的能让我们通过关键信息找到RSS最大内存段里面存储的关键信息。如果网络数据包使用的是加密方式传输会无法通过常规的strings查看十六进制内容来分析存储的具体数据。
复制代码
上面提供了内存泄漏排查的办法,下面我说一下由于使用错误的逻辑处理过程导致的 Netty 堆外内存泄漏
协议格式
在 Netty 中使用
ProtobufVarint32FrameDecoder 处理 PB 协议(协议长度+PB 序列化内容)
Netty 处理过程
继承 ChannelInboundHandlerAdapter 实现心跳过滤
ProtobufVarint32FrameDecoder
这个看似没有问题的逻辑处理在用法上的错误导致了堆外内存的泄漏,我们看一下继承 ChannelInboundHandlerAdapter 的处理过程
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer=null;
if(msg instanceof ByteBuf){
buffer=(ByteBuf) msg;
int size = buffer.readableBytes();
if(size>=2){
byte b1 = buffer.getByte(0);
byte b2 = buffer.getByte(1);
if (b1 == 0x00 && b2 == 0x00) {
ByteBuf heartBeat=buffer.readBytes(2);
heartBeat.release();
int remSize=buffer.readableBytes();
if(remSize>0){
super.channelRead(ctx, buffer);
}
return;
}
}else{
return;
}
}
}
复制代码
上面的处理过程看似是没有问题的但是 heartBeat.release()之后为什么还导致 RSS 上升呢?
buffer.readBytes() 实际是在 buf 中又重新分配了一块内存虽然使用了 release 进行释放但是这块内存是新分配的原有的 Buffer readerindex 移动了但是没有对原来的数据进行清理。
我们看一下 ByteToMessageDecoder 是如何处理的
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
selfFiredChannelRead = true;
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
try {
cumulation.release();
} catch (IllegalReferenceCountException e) {
//noinspection ThrowFromFinallyBlock
throw new IllegalReferenceCountException(
getClass().getSimpleName() + "#decode() might have released its input buffer, " +
"or passed it down the pipeline without a retain() call, " +
"which is not allowed.", e);
}
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes, so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
} finally {
out.recycle();
}
}
} else {
ctx.fireChannelRead(msg);
}
}
复制代码
这里关键的点在于 discardSomeReadBytes();在很多资料中介绍了 discardSomeReadBytes()和 discardReadBytes()的区别,这里我只简单说一下区别在于性能 discardReadBytes 对于连续的内存每次都要进行内存压缩而 discardSomeReadBytes()处理是根据特定条件做内存压缩,连续的内存压缩需要重新移动数组所以在性能上是有区别的。
当我们使用 Netty 开发应用时它为我们提供了方便强大的底层支撑,但是我们要对 Netty 的 api 进行深入了解才不会在编写代码上出现问题。
评论