Skip to content

Commit

Permalink
Refactoring of ReferenceCountingSegment and FireHydrant (apache#4154)
Browse files Browse the repository at this point in the history
* Refactoring of ReferenceCountingSegment and FireHydrant

* Address comment

* Fix FireHydrant.closeSegment()

* Address comment

* Added comments to ReferenceCountingSegment
  • Loading branch information
leventov authored and himanshug committed Sep 12, 2017
1 parent c0be050 commit 832cc29
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

package io.druid.query;

import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.segment.ReferenceCountingSegment;

import java.io.Closeable;
import java.util.Map;

/**
Expand All @@ -49,16 +47,20 @@ public ReferenceCountingSegmentQueryRunner(
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
final Closeable closeable = adapter.increment();
if (closeable != null) {
if (adapter.increment()) {
try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(queryPlus, responseContext);

return Sequences.withBaggage(baseSequence, closeable);
return Sequences.withBaggage(baseSequence, adapter.decrementOnceCloseable());
}
catch (RuntimeException e) {
CloseQuietly.close(closeable);
throw e;
catch (Throwable t) {
try {
adapter.decrement();
}
catch (Exception e) {
t.addSuppressed(e);
}
throw t;
}
} else {
// Segment was closed before we had a chance to increment the reference count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,48 @@

package io.druid.segment;

import com.google.common.base.Preconditions;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.Interval;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* ReferenceCountingSegment allows to call {@link #close()} before some other "users", which called {@link
* #increment()}, has not called {@link #decrement()} yet, and the wrapped {@link Segment} won't be actually closed
* until that. So ReferenceCountingSegment implements something like automatic reference count-based resource
* management.
*/
public class ReferenceCountingSegment extends AbstractSegment
{
private static final EmittingLogger log = new EmittingLogger(ReferenceCountingSegment.class);

private final Segment baseSegment;

private final Object lock = new Object();

private volatile int numReferences = 0;
private volatile boolean isClosed = false;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Phaser referents = new Phaser(1)
{
@Override
protected boolean onAdvance(int phase, int registeredParties)
{
Preconditions.checkState(registeredParties == 0);
// Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen
try {
baseSegment.close();
}
catch (Exception e) {
try {
log.error(e, "Exception while closing segment[%s]", baseSegment.getIdentifier());
}
catch (Exception e2) {
// ignore
}
}
// Always terminate.
return true;
}
};

public ReferenceCountingSegment(Segment baseSegment)
{
Expand All @@ -44,141 +69,78 @@ public ReferenceCountingSegment(Segment baseSegment)

public Segment getBaseSegment()
{
synchronized (lock) {
if (isClosed) {
return null;
}

return baseSegment;
}
return !isClosed() ? baseSegment : null;
}

public int getNumReferences()
{
return numReferences;
return Math.max(referents.getRegisteredParties() - 1, 0);
}

public boolean isClosed()
{
return isClosed;
return referents.isTerminated();
}

@Override
public String getIdentifier()
{
synchronized (lock) {
if (isClosed) {
return null;
}

return baseSegment.getIdentifier();
}
return !isClosed() ? baseSegment.getIdentifier() : null;
}

@Override
public Interval getDataInterval()
{
synchronized (lock) {
if (isClosed) {
return null;
}

return baseSegment.getDataInterval();
}
return !isClosed() ? baseSegment.getDataInterval() : null;
}

@Override
public QueryableIndex asQueryableIndex()
{
synchronized (lock) {
if (isClosed) {
return null;
}

return baseSegment.asQueryableIndex();
}
return !isClosed() ? baseSegment.asQueryableIndex() : null;
}

@Override
public StorageAdapter asStorageAdapter()
{
synchronized (lock) {
if (isClosed) {
return null;
}

return baseSegment.asStorageAdapter();
}
return !isClosed() ? baseSegment.asStorageAdapter() : null;
}

@Override
public void close() throws IOException
public void close()
{
synchronized (lock) {
if (isClosed) {
log.info("Failed to close, %s is closed already", baseSegment.getIdentifier());
return;
}

if (numReferences > 0) {
log.info("%d references to %s still exist. Decrementing.", numReferences, baseSegment.getIdentifier());

decrement();
} else {
log.info("Closing %s", baseSegment.getIdentifier());
innerClose();
}
if (closed.compareAndSet(false, true)) {
referents.arriveAndDeregister();
} else {
log.warn("close() is called more than once on ReferenceCountingSegment");
}
}

public Closeable increment()
public boolean increment()
{
synchronized (lock) {
if (isClosed) {
return null;
}

numReferences++;
final AtomicBoolean decrementOnce = new AtomicBoolean(false);
return new Closeable()
{
@Override
public void close() throws IOException
{
if (decrementOnce.compareAndSet(false, true)) {
decrement();
}
}
};
}
// Negative return from referents.register() means the Phaser is terminated.
return referents.register() >= 0;
}

private void decrement()
/**
* Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the
* returned Closeable object for the second time, it won't call {@link #decrement()} again.
*/
public Closeable decrementOnceCloseable()
{
synchronized (lock) {
if (isClosed) {
return;
}

if (--numReferences < 0) {
try {
innerClose();
}
catch (Exception e) {
log.error("Unable to close queryable index %s", getIdentifier());
}
AtomicBoolean decremented = new AtomicBoolean(false);
return () -> {
if (decremented.compareAndSet(false, true)) {
decrement();
} else {
log.warn("close() is called more than once on ReferenceCountingSegment.decrementOnceCloseable()");
}
}
};
}

private void innerClose() throws IOException
public void decrement()
{
synchronized (lock) {
log.info("Closing %s, numReferences: %d", baseSegment.getIdentifier(), numReferences);

isClosed = true;
baseSegment.close();
}
referents.arriveAndDeregister();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,56 +83,47 @@ public void close() throws IOException
public void testMultipleClose() throws Exception
{
Assert.assertFalse(segment.isClosed());
final Closeable closeable = segment.increment();
Assert.assertTrue(segment.getNumReferences() == 1);
Assert.assertTrue(segment.increment());
Assert.assertEquals(1, segment.getNumReferences());

Closeable closeable = segment.decrementOnceCloseable();
closeable.close();
closeable.close();
exec.submit(
new Runnable()
{
@Override
public void run()
{
try {
closeable.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
() -> {
try {
closeable.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
);
Assert.assertTrue(segment.getNumReferences() == 0);
).get();
Assert.assertEquals(0, segment.getNumReferences());
Assert.assertFalse(segment.isClosed());

segment.close();
segment.close();
exec.submit(
new Runnable()
{
@Override
public void run()
{
try {
segment.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
() -> {
try {
segment.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
);
).get();

Assert.assertTrue(segment.getNumReferences() == 0);
Assert.assertEquals(0, segment.getNumReferences());
Assert.assertTrue(segment.isClosed());

segment.increment();
segment.increment();
segment.increment();
Assert.assertTrue(segment.getNumReferences() == 0);
Assert.assertEquals(0, segment.getNumReferences());

segment.close();
Assert.assertTrue(segment.getNumReferences() == 0);
Assert.assertEquals(0, segment.getNumReferences());
}
}
Loading

0 comments on commit 832cc29

Please sign in to comment.