您当前的位置: 首页 > 

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty源码解析-Netty内存泄露检测

恐龙弟旺仔 发布时间:2021-12-23 19:15:14 ,浏览量:0

前言:
在前一篇文章中,我们介绍了ByteBuf的引用计数器的使用,基本所有的ByteBuf都有相关计数的功能,那么这个计数有什么用呢。
实际主要就是做内存泄露检测用的。本文就其如何做检测来进行说明。
1.ByteBuf为什么会内存泄露?
在使用ByteBuf的时候,我们一般会使用Allocator来进行分配创建,如下所示:
// 创建一个非池化的HeapBuffer
ByteBuf heapBuffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer();
// 创建一个池化的DirectBuffer
ByteBuf directBuffer = PooledByteBufAllocator.DEFAULT.directBuffer();
在ChannelHandler使用完成之后,一般会主动调用ByteBuf.release()或被动调用
(比如实现SimpleChannelInboundHandler的相关处理器,会在channelRead()方法结束后主动调用release),源码如下所示
public abstract class SimpleChannelInboundHandler extends ChannelInboundHandlerAdapter {
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                // 主动释放
                ReferenceCountUtil.release(msg);
            }
        }
    }
}

public final class ReferenceCountUtil {
	public static boolean release(Object msg) {
        if (msg instanceof ReferenceCounted) {
            // 如果ByteBuf属于ReferenceCounted,则调用其release()方法
            return ((ReferenceCounted) msg).release();
        }
        return false;
    }
}
上面是我们在正常使用ByteBuf时的操作,如果我们没有调用ByteBuf.release()会怎么样?
* 针对非池化的HeapByteBuf对象,因为是从JVM堆中获取的,所以,当内存不足时,会主动释放这块内存
* 针对非池化的DirectByteBuf对象,属于堆外内存,JVM无法释放这块内存
* 针对池化的HeapByteBuf和DirectByteBuf,针对已使用完成的这块内存,无法归还给pool,到最后便无法从pool中获取到ByteBuf
所以,不正确的使用ByteBuf,会造成内存无法释放,资源无法归还资源池。
针对这个问题,Netty便提供了可以检测内存泄露的ByteBuf的机制。
 
2.内存泄漏检测原理(简析) 
既然ByteBuf使用完成之后,需要主动调用release()方法来释放内存,那么我们的内存泄露检测,就可以从这个角度入手。
我们创建对ByteBuf的弱引用,并使用一个ReferenceQueue关联到该弱引用。若ByteBuf对象失去强引用时,会被自动添加到该ReferenceQueue,
我们从该队列中检测对应是否已经调用了
release()方法即可。貌似这个方案是可行的,那么Netty是如何做的呢?
 
3.被检测的ByteBuf对象创建 
我们从源头来看下ByteBuf的创建过程,从这个过程中能看到Netty对于内存泄漏检测的用心良苦
3.1 UnpooledByteBufAllocator创建非池化对象
// UnpooledByteBufAllocator.newDirectBuffer
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
    // 是否使用内存检测,默认为true
    private final boolean disableLeakDetector;
    // 创建一个非池化的DirectByteBuf
	protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        final ByteBuf buf;
        if (PlatformDependent.hasUnsafe()) {
            buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
        return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
    }
    
    // toLeakAwareBuffer
    protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
        ResourceLeakTracker leak;
        // 根据ResourceLeakDetector.level创建不同的LeakAwareBuffer
        switch (ResourceLeakDetector.getLevel()) {
            case SIMPLE:
                leak = AbstractByteBuf.leakDetector.track(buf);
                if (leak != null) {
                    buf = new SimpleLeakAwareByteBuf(buf, leak);
                }
                break;
            case ADVANCED:
            case PARANOID:
                leak = AbstractByteBuf.leakDetector.track(buf);
                if (leak != null) {
                    buf = new AdvancedLeakAwareByteBuf(buf, leak);
                }
                break;
            default:
                break;
        }
        return buf;
    }
}
通过源码提供的方法可以看到,针对非池化的DirectByteBuf对象,才需要进行泄露检测。
看来,非池化的HeapByteBuf,完全交由JVM,不需要担心。
3.2 池化对象的创建
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
	// 创建池化的HeapByteBuf
    @Override
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena heapArena = cache.heapArena;

        final ByteBuf buf;
        if (heapArena != null) {
            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

    // 创建池化的DirectByteBuf
    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }
    
    // 包装一层可供检测的ByteBuf
    // 也是根据不同的级别来创建的,具体见4.1
    protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
        ResourceLeakTracker leak;
        switch (ResourceLeakDetector.getLevel()) {
            case SIMPLE:
                leak = AbstractByteBuf.leakDetector.track(buf);
                if (leak != null) {
                    buf = new SimpleLeakAwareByteBuf(buf, leak);
                }
                break;
            case ADVANCED:
            case PARANOID:
                leak = AbstractByteBuf.leakDetector.track(buf);
                if (leak != null) {
                    buf = new AdvancedLeakAwareByteBuf(buf, leak);
                }
                break;
            default:
                break;
        }
        return buf;
    }
}
针对池化的对象,无论是Heap类型还是Direct类型,都需要进行内存泄露检测
 
4.ResourceLeakDetector结构解析 
4.1 ResourceLeakDetector.Level  
从3中创建供检测的ByteBuf看到,根据不同的Level会创建不同的包装类,那么这个级别是什么意思呢?
public class ResourceLeakDetector {
    public enum Level {
        /**
         * 禁止进行泄露检测
         */
        DISABLED,
        /**
         * 默认等级,只针对取样的1%的ByteBuf进行内存泄漏检测,只打印一次结果
         */
        SIMPLE,
        /**
         * 同样也只对1%的ByteBuf样本进行检测,但是每种类型的泄漏只打印一次
         */
        ADVANCED,
        /**
         * 对所有的ByteBuf样本都进行检测
         */
        PARANOID;
    }
}
根据这个注释的解释来看还是有点懵,没关系,后续我们通过源码来分析后就能明确了解其含义。
4.2 ResourceLeakDetector基本结构分析
public class ResourceLeakDetector {
    // 默认为SIMPLE级别的,通过static方法获取
    private static Level level;
    
    private final Set> allLeaks;
        
        // 在这里创建
        private DefaultResourceLeak track0(T obj) {
            Level level = ResourceLeakDetector.level;
            // 如果是DISABLED级别,说明不需要进行内存泄露检测
            if (level == Level.DISABLED) {
                return null;
            }

            // PARANOID级别以下的,只检测1%的样本
            if (level.ordinal() < Level.PARANOID.ordinal()) {
                // samplingInterval默认值为128,使用random的情况下,等于0的概率等于1/128
                if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
                    // 汇报内存泄露情况
                    reportLeak();
                    // 并返回一个DefaultResourceLeak
                    return new DefaultResourceLeak(obj, refQueue, allLeaks);
                }
                return null;
            }
            reportLeak();
            return new DefaultResourceLeak(obj, refQueue, allLeaks);
        }
        
        // 构造方法描述
        DefaultResourceLeak(
                Object referent,
                ReferenceQueue refQueue,
                Set            
关注
打赏
1655041699
查看更多评论
0.0376s