Skip to content

Commit

Permalink
Don't return leaked Objects back to StupidPool, because this is dange…
Browse files Browse the repository at this point in the history
…rous. Reuse Cleaners in StupidPool. Make StupidPools named. Add StupidPool.leakedObjectCount(). Minor fixes (apache#3631)
  • Loading branch information
leventov authored and himanshug committed Dec 26, 2016
1 parent 76cb06a commit 3380012
Show file tree
Hide file tree
Showing 19 changed files with 241 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public static class BenchmarkPool
{
private final AtomicLong numPools = new AtomicLong(0L);
private final StupidPool<Object> pool = new StupidPool<>(
"simpleObject pool",
new Supplier<Object>()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ public void setup() throws IOException
}

StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 250_000_000),
0,
Integer.MAX_VALUE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,12 @@ public void setup() throws IOException
}

factory = new TopNQueryRunnerFactory(
new StupidPool<>(new OffheapBufferGenerator("compute", 250000000), 0, Integer.MAX_VALUE),
new StupidPool<>(
"TopNBenchmark-compute-bufferPool",
new OffheapBufferGenerator("compute", 250000000),
0,
Integer.MAX_VALUE
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
Expand Down
208 changes: 176 additions & 32 deletions common/src/main/java/io/druid/collections/StupidPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,77 +19,182 @@

package io.druid.collections;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import sun.misc.Cleaner;

import java.lang.ref.WeakReference;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
*/
public class StupidPool<T>
{
private static final Logger log = new Logger(StupidPool.class);

private final String name;
private final Supplier<T> generator;

private final Queue<T> objects = new ConcurrentLinkedQueue<>();
/**
* StupidPool Implementation Note
* It is assumed that StupidPools are never reclaimed by the GC, either stored in static fields or global singleton
* injector like Guice. Otherwise false positive "Not closed! Object leaked from..." could be reported. To avoid
* this, StupidPool should be made closeable (or implement {@link io.druid.java.util.common.lifecycle.LifecycleStop}
* and registered in the global lifecycle), in this close() method all {@link ObjectResourceHolder}s should be drained
* from the {@code objects} queue, and notifier.disable() called for them.
*/
private final Queue<ObjectResourceHolder> objects = new ConcurrentLinkedQueue<>();
/**
* {@link ConcurrentLinkedQueue}'s size() is O(n) queue traversal apparently for the sake of being 100%
* wait-free, that is not required by {@code StupidPool}. In {@code poolSize} we account the queue size
* ourselves, to avoid traversal of {@link #objects} in {@link #tryReturnToPool}.
*/
private final AtomicLong poolSize = new AtomicLong(0);
private final AtomicLong leakedObjectsCounter = new AtomicLong(0);

//note that this is just the max entries in the cache, pool can still create as many buffers as needed.
private final int objectsCacheMaxCount;

public StupidPool(
Supplier<T> generator
)
public StupidPool(String name, Supplier<T> generator)
{
this(generator, 0, Integer.MAX_VALUE);
this(name, generator, 0, Integer.MAX_VALUE);
}

public StupidPool(
Supplier<T> generator,
int initCount,
int objectsCacheMaxCount
)
public StupidPool(String name, Supplier<T> generator, int initCount, int objectsCacheMaxCount)
{
Preconditions.checkArgument(
initCount <= objectsCacheMaxCount,
"initCount[%s] must be less/equal to objectsCacheMaxCount[%s]",
initCount,
objectsCacheMaxCount
);
this.name = name;
this.generator = generator;
this.objectsCacheMaxCount = objectsCacheMaxCount;

for (int i = 0; i < initCount; i++) {
objects.add(generator.get());
objects.add(makeObjectWithHandler());
poolSize.incrementAndGet();
}
}

@Override
public String toString()
{
return "StupidPool{" +
"name=" + name +
", objectsCacheMaxCount=" + objectsCacheMaxCount +
", poolSize=" + poolSize() +
"}";
}

public ResourceHolder<T> take()
{
final T obj = objects.poll();
return obj == null ? new ObjectResourceHolder(generator.get()) : new ObjectResourceHolder(obj);
ObjectResourceHolder resourceHolder = objects.poll();
if (resourceHolder == null) {
return makeObjectWithHandler();
} else {
poolSize.decrementAndGet();
return resourceHolder;
}
}

private ObjectResourceHolder makeObjectWithHandler()
{
T object = generator.get();
ObjectId objectId = new ObjectId();
ObjectLeakNotifier notifier = new ObjectLeakNotifier(this);
// Using objectId as referent for Cleaner, because if the object itself (e. g. ByteBuffer) is leaked after taken
// from the pool, and the ResourceHolder is not closed, Cleaner won't notify about the leak.
return new ObjectResourceHolder(object, objectId, Cleaner.create(objectId, notifier), notifier);
}

@VisibleForTesting
long poolSize() {
return poolSize.get();
}

@VisibleForTesting
long leakedObjectsCount()
{
return leakedObjectsCounter.get();
}

private void tryReturnToPool(T object, ObjectId objectId, Cleaner cleaner, ObjectLeakNotifier notifier)
{
long currentPoolSize;
do {
currentPoolSize = poolSize.get();
if (currentPoolSize >= objectsCacheMaxCount) {
notifier.disable();
// Effectively does nothing, because notifier is disabled above. The purpose of this call is to deregister the
// cleaner from the internal global linked list of all cleaners in the JVM, and let it be reclaimed itself.
cleaner.clean();
// Important to use the objectId after notifier.disable() (in the logging statement below), otherwise VM may
// already decide that the objectId is unreachable and run Cleaner before notifier.disable(), that would be
// reported as a false-positive "leak". Ideally reachabilityFence(objectId) should be inserted here.
log.debug("cache num entries is exceeding in [%s], objectId [%s]", this, objectId);
return;
}
} while (!poolSize.compareAndSet(currentPoolSize, currentPoolSize + 1));
if (!objects.offer(new ObjectResourceHolder(object, objectId, cleaner, notifier))) {
impossibleOffsetFailed(object, objectId, cleaner, notifier);
}
}

/**
* This should be impossible, because {@link ConcurrentLinkedQueue#offer(Object)} event don't have `return false;` in
* it's body in OpenJDK 8.
*/
private void impossibleOffsetFailed(T object, ObjectId objectId, Cleaner cleaner, ObjectLeakNotifier notifier)
{
poolSize.decrementAndGet();
notifier.disable();
// Effectively does nothing, because notifier is disabled above. The purpose of this call is to deregister the
// cleaner from the internal global linked list of all cleaners in the JVM, and let it be reclaimed itself.
cleaner.clean();
log.error(
new ISE("Queue offer failed"),
"Could not offer object [%s] back into the queue in [%s], objectId [%s]",
object,
objectId
);
}

private class ObjectResourceHolder implements ResourceHolder<T>
{
private AtomicBoolean closed = new AtomicBoolean(false);
private final T object;
private final AtomicReference<T> objectRef;
private ObjectId objectId;
private Cleaner cleaner;
private ObjectLeakNotifier notifier;

public ObjectResourceHolder(final T object)
ObjectResourceHolder(
final T object,
final ObjectId objectId,
final Cleaner cleaner,
final ObjectLeakNotifier notifier
)
{
this.object = object;
this.objectRef = new AtomicReference<>(object);
this.objectId = objectId;
this.cleaner = cleaner;
this.notifier = notifier;
}

// WARNING: it is entirely possible for a caller to hold onto the object and call ObjectResourceHolder.close,
// Then still use that object even though it will be offered to someone else in StupidPool.take
@Override
public T get()
{
if (closed.get()) {
final T object = objectRef.get();
if (object == null) {
throw new ISE("Already Closed!");
}

Expand All @@ -99,30 +204,69 @@ public T get()
@Override
public void close()
{
if (!closed.compareAndSet(false, true)) {
log.warn(new ISE("Already Closed!"), "Already closed");
return;
}
if (objects.size() < objectsCacheMaxCount) {
if (!objects.offer(object)) {
log.warn(new ISE("Queue offer failed"), "Could not offer object [%s] back into the queue", object);
final T object = objectRef.get();
if (object != null && objectRef.compareAndSet(object, null)) {
try {
tryReturnToPool(object, objectId, cleaner, notifier);
}
finally {
// Need to null reference to objectId because if ObjectResourceHolder is closed, but leaked, this reference
// will prevent reporting leaks of ResourceHandlers when this object and objectId are taken from the pool
// again.
objectId = null;
// Nulling cleaner and notifier is not strictly needed, but harmless for sure.
cleaner = null;
notifier = null;
}
} else {
log.debug("cache num entries is exceeding max limit [%s]", objectsCacheMaxCount);
}
}
}

private static class ObjectLeakNotifier implements Runnable
{
/**
* Don't reference {@link StupidPool} directly to prevent it's leak through the internal global chain of Cleaners.
*/
final WeakReference<StupidPool<?>> poolReference;
final AtomicLong leakedObjectsCounter;
final AtomicBoolean disabled = new AtomicBoolean(false);

ObjectLeakNotifier(StupidPool<?> pool)
{
poolReference = new WeakReference<StupidPool<?>>(pool);
leakedObjectsCounter = pool.leakedObjectsCounter;
}

@Override
protected void finalize() throws Throwable
public void run()
{
try {
if (!closed.get()) {
log.warn("Not closed! Object was[%s]. Allowing gc to prevent leak.", object);
if (!disabled.getAndSet(true)) {
leakedObjectsCounter.incrementAndGet();
log.warn("Not closed! Object leaked from %s. Allowing gc to prevent leak.", poolReference.get());
}
}
finally {
super.finalize();
// Exceptions must not be thrown in Cleaner.clean(), which calls this ObjectReclaimer.run() method
catch (Exception e) {
try {
log.error(e, "Exception in ObjectLeakNotifier.run()");
}
catch (Exception ignore) {
// ignore
}
}
}

public void disable()
{
disabled.set(true);
}
}

/**
* Plays the role of the reference for Cleaner, see comment in {@link #makeObjectWithHandler}
*/
private static class ObjectId
{
}
}
29 changes: 22 additions & 7 deletions common/src/test/java/io/druid/collections/StupidPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
package io.druid.collections;

import com.google.common.base.Supplier;

import io.druid.java.util.common.ISE;

import org.easymock.EasyMock;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
Expand All @@ -45,7 +43,7 @@ public void setUp()
generator = EasyMock.createMock(Supplier.class);
EasyMock.expect(generator.get()).andReturn(defaultString).anyTimes();
EasyMock.replay(generator);
poolOfString = new StupidPool<>(generator);
poolOfString = new StupidPool<>("poolOfString", generator);
resourceHolderObj = poolOfString.take();
}

Expand All @@ -72,10 +70,27 @@ public void testExceptionInResourceHolderGet() throws IOException
resourceHolderObj.get();
}

@Test
public void testFinalizeInResourceHolder()
@Test(timeout = 60_000)
public void testResourceHandlerClearedByJVM() throws InterruptedException
{
if (System.getProperty("java.version").startsWith("1.7")) {
// This test is unreliable on Java 7, probably GC is not triggered by System.gc(). It is not a problem because
// this test should ever pass on any version of Java to prove that StupidPool doesn't introduce leaks itself and
// actually cleans the leaked objects.
return;
}
String leakedString = createDanglingObjectHandler();
// Wait until dangling object string is returned to the pool
for (int i = 0; i < 6000 && poolOfString.leakedObjectsCount() == 0; i++) {
System.gc();
byte[] garbage = new byte[10_000_000];
Thread.sleep(10);
}
Assert.assertEquals(leakedString, 1, poolOfString.leakedObjectsCount());
}

private String createDanglingObjectHandler()
{
resourceHolderObj = null;
System.runFinalization();
return poolOfString.take().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void testTopNWithDistinctCountAgg() throws Exception
{
TopNQueryEngine engine = new TopNQueryEngine(
new StupidPool<ByteBuffer>(
"TopNQueryEngine-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public static Iterable<Object[]> constructorFeeder() throws IOException
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
new StupidPool<ByteBuffer>(
"TopNQueryRunnerFactory-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
Expand Down
Loading

0 comments on commit 3380012

Please sign in to comment.