Skip to content

Commit

Permalink
Ensure all leaks are detected in TestingPooledByteBufAllocator
Browse files Browse the repository at this point in the history
Use hard instead of weak references to ensure that all leaks are detected
during testing. When a leak is detected force a GC in an attempt to trigger
Netty leak debug information logging.
  • Loading branch information
ivmaykov authored and dain committed Oct 23, 2018
1 parent 9c98d1d commit 3603372
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 22 deletions.
6 changes: 6 additions & 0 deletions drift-transport-netty/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@
<artifactId>javax.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<optional>true</optional>
</dependency>

<!-- for testing -->
<dependency>
<groupId>io.airlift</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,56 @@
*/
package io.airlift.drift.transport.netty.buffer;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.ResourceLeakDetector;

import javax.annotation.concurrent.GuardedBy;

import java.io.Closeable;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.netty.util.ResourceLeakDetector.Level.PARANOID;

/**
* This is a custom {@link ByteBufAllocator} that tracks outstanding allocations and
* throws from the {@link #close)} method if it detects any leaked buffers.
* <p>
* Never use this class in production, it will cause your server to run out
* of memory! This is because it holds strong references to all allocated
* buffers and doesn't release them until {@link #close)} is called at the end of a
* unit test.
*/
public class TestingPooledByteBufAllocator
extends PooledByteBufAllocator
implements Closeable
{
@GuardedBy("this")
private final List<ByteBuf> trackedBuffers = new ArrayList<>();

private final ResourceLeakDetector.Level oldLevel;

public TestingPooledByteBufAllocator()
{
super(false);
this(getAndSetResourceLeakDetectorLevel(PARANOID));
}

@GuardedBy("this")
private final List<WeakReference<ByteBuf>> trackedBuffers = new ArrayList<>();
private static ResourceLeakDetector.Level getAndSetResourceLeakDetectorLevel(ResourceLeakDetector.Level newLevel)
{
ResourceLeakDetector.Level oldLevel = ResourceLeakDetector.getLevel();
ResourceLeakDetector.setLevel(newLevel);
return oldLevel;
}

private TestingPooledByteBufAllocator(ResourceLeakDetector.Level oldLevel)
{
super(false);
this.oldLevel = oldLevel;
}

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
Expand All @@ -65,35 +90,40 @@ public CompositeByteBuf compositeDirectBuffer(int maxNumComponents)
return track(super.compositeDirectBuffer(maxNumComponents));
}

public synchronized List<ByteBuf> getReferencedBuffers()
{
return trackedBuffers.stream()
.map(WeakReference::get)
.filter(Objects::nonNull)
.filter(byteBuf -> byteBuf.refCnt() > 0)
.collect(toImmutableList());
}

private synchronized CompositeByteBuf track(CompositeByteBuf byteBuf)
{
trackedBuffers.add(new WeakReference<>(byteBuf));
trackedBuffers.removeIf(byteBufWeakReference -> byteBufWeakReference.get() == null);
trackedBuffers.add(byteBuf);
return byteBuf;
}

private synchronized ByteBuf track(ByteBuf byteBuf)
{
trackedBuffers.add(new WeakReference<>(byteBuf));
trackedBuffers.removeIf(byteBufWeakReference -> byteBufWeakReference.get() == null);
trackedBuffers.add(byteBuf);
return byteBuf;
}

@Override
@SuppressFBWarnings(value = "DM_GC", justification = "Netty's leak detection only works if buffer is garbage collected")
public void close()
{
List<ByteBuf> referencedBuffers = getReferencedBuffers();
if (!referencedBuffers.isEmpty()) {
throw new AssertionError("LEAK");
try {
boolean leaked;
synchronized (this) {
leaked = trackedBuffers.stream()
.anyMatch(byteBuf -> byteBuf.refCnt() > 0);
trackedBuffers.clear();
}
// Throw an error if there were any leaked buffers
if (leaked) {
// Trigger a GC. This will hopefully (but not necessarily) print
// details about detected leaks to standard error before the error
// is thrown.
System.gc();
throw new AssertionError("LEAK");
}
}
finally {
ResourceLeakDetector.setLevel(oldLevel);
}
}
}

0 comments on commit 3603372

Please sign in to comment.