Skip to content

Commit

Permalink
Fixes issue dain#7 "RuntimeException: Could not open table" was occur…
Browse files Browse the repository at this point in the history
…ring when you run the DbBenchmark

Active version objects are held in a weak hash map.  Once they are no longer in the map, then their associated file are candidates to be deleted.  The problem is that open Snapshots and Iterators were not holding to a reference of the Version object which they were associated with.  This resulted in files associated with the version being deleted while it was still being iterated.

Also added Version reference counting so that if the end user is properly closing the snapshots and iterators, then Version will be evicted from active versions map.  This should allow faster reclaiming of disk space even in situations where the JVM is not eagerly GCing.
  • Loading branch information
chirino committed Jan 6, 2012
1 parent 3188717 commit 51732e6
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 39 deletions.
21 changes: 12 additions & 9 deletions leveldb/src/main/java/org/iq80/leveldb/impl/DbImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ public String getProperty(String name)

private void deleteObsoleteFiles()
{
Preconditions.checkState(mutex.isHeldByCurrentThread());

// Make a set of all of the live files
List<Long> live = newArrayList(this.pendingOutputs);
for (FileMetaData fileMetaData : versions.getLiveFiles()) {
Expand Down Expand Up @@ -530,8 +532,8 @@ public byte[] get(byte[] key, ReadOptions options)
LookupKey lookupKey;
mutex.lock();
try {
long snapshot = getSnapshotNumber(options);
lookupKey = new LookupKey(Slices.wrappedBuffer(key), snapshot);
SnapshotImpl snapshot = getSnapshot(options);
lookupKey = new LookupKey(Slices.wrappedBuffer(key), snapshot.getLastSequence());

// First look in the memtable, then in the immutable memtable (if any).
LookupResult lookupResult = memTable.get(lookupKey);
Expand Down Expand Up @@ -644,7 +646,7 @@ public Snapshot writeInternal(WriteBatchImpl updates, WriteOptions options)
// Update memtable
updates.forEach(new InsertIntoHandler(memTable, sequenceBegin));

return new SnapshotImpl(sequenceEnd);
return new SnapshotImpl(versions.getCurrent(), sequenceEnd);
}
finally {
mutex.unlock();
Expand All @@ -671,7 +673,7 @@ public SeekingIteratorAdapter iterator(ReadOptions options)


// filter any entries not visible in our snapshot
long snapshot = getSnapshotNumber(options);
SnapshotImpl snapshot = getSnapshot(options);
SnapshotSeekingIterator snapshotIterator = new SnapshotSeekingIterator(rawIterator, snapshot, internalKeyComparator.getUserComparator());
return new SeekingIteratorAdapter(snapshotIterator);
}
Expand Down Expand Up @@ -713,21 +715,22 @@ public Snapshot getSnapshot()
{
mutex.lock();
try {
return new SnapshotImpl(versions.getLastSequence());
return new SnapshotImpl(versions.getCurrent(), versions.getLastSequence());
}
finally {
mutex.unlock();
}
}

private long getSnapshotNumber(ReadOptions options)
private SnapshotImpl getSnapshot(ReadOptions options)
{
long snapshot;
SnapshotImpl snapshot;
if (options.snapshot() != null) {
snapshot = ((SnapshotImpl) options.snapshot()).snapshot;
snapshot = (SnapshotImpl) options.snapshot();
}
else {
snapshot = versions.getLastSequence();
snapshot = new SnapshotImpl(versions.getCurrent(), versions.getLastSequence());
snapshot.close(); // To avoid holding the snapshot active..
}
return snapshot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import org.iq80.leveldb.util.Slices;

import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;

public class SeekingIteratorAdapter implements DBIterator
{
private final SnapshotSeekingIterator seekingIterator;
private final AtomicBoolean closed = new AtomicBoolean(false);

public SeekingIteratorAdapter(SnapshotSeekingIterator seekingIterator)
{
Expand Down Expand Up @@ -49,6 +51,11 @@ public DbEntry peekNext()
@Override
public void close()
{
// This is an end user API.. he might screw up and close multiple times.
// but we don't want the close multiple times as reference counts go bad.
if(closed.compareAndSet(false, true)) {
seekingIterator.close();
}
}

@Override
Expand Down
53 changes: 47 additions & 6 deletions leveldb/src/main/java/org/iq80/leveldb/impl/SnapshotImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,61 @@

import org.iq80.leveldb.Snapshot;

// todo implement snapshot tracking and cleanup
import java.util.concurrent.atomic.AtomicBoolean;

public class SnapshotImpl implements Snapshot
{
final long snapshot;
private final AtomicBoolean closed = new AtomicBoolean();
private final Version version;
private final long lastSequence;

SnapshotImpl(long snapshot)
SnapshotImpl(Version version, long lastSequence)
{
this.snapshot = snapshot;
this.version = version;
this.lastSequence = lastSequence;
this.version.retain();
}

@Override
public void close()
{
// todo
// throw new UnsupportedOperationException();
// This is an end user API.. he might screw up and close multiple times.
// but we don't want the version reference count going bad.
if(closed.compareAndSet(false, true)) {
this.version.release();
}
}

public long getLastSequence() {
return lastSequence;
}

public Version getVersion() {
return version;
}

@Override
public String toString() {
return Long.toString(lastSequence);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SnapshotImpl snapshot = (SnapshotImpl) o;

if (lastSequence != snapshot.lastSequence) return false;
if (!version.equals(snapshot.version)) return false;

return true;
}

@Override
public int hashCode() {
int result = version.hashCode();
result = 31 * result + (int) (lastSequence ^ (lastSequence >>> 32));
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,19 @@
public final class SnapshotSeekingIterator extends AbstractSeekingIterator<Slice, Slice>
{
private final DbIterator iterator;
private final long snapshot;
private final SnapshotImpl snapshot;
private final Comparator<Slice> userComparator;

public SnapshotSeekingIterator(DbIterator iterator, long snapshot, Comparator<Slice> userComparator)
public SnapshotSeekingIterator(DbIterator iterator, SnapshotImpl snapshot, Comparator<Slice> userComparator)
{
this.iterator = iterator;
this.snapshot = snapshot;
this.userComparator = userComparator;
this.snapshot.getVersion().retain();
}

public void close() {
this.snapshot.getVersion().release();
}

@Override
Expand All @@ -48,7 +53,7 @@ protected void seekToFirstInternal()
@Override
protected void seekInternal(Slice targetKey)
{
iterator.seek(new InternalKey(targetKey, snapshot, ValueType.VALUE));
iterator.seek(new InternalKey(targetKey, snapshot.getLastSequence(), ValueType.VALUE));
findNextUserEntry(null);
}

Expand Down Expand Up @@ -79,7 +84,7 @@ private void findNextUserEntry(Slice deletedKey)
InternalKey internalKey = iterator.peek().getKey();

// skip entries created after our snapshot
if (internalKey.getSequenceNumber() > snapshot) {
if (internalKey.getSequenceNumber() > snapshot.getLastSequence()) {
iterator.next();
continue;
}
Expand Down Expand Up @@ -108,4 +113,5 @@ public String toString()
sb.append('}');
return sb.toString();
}

}
31 changes: 24 additions & 7 deletions leveldb/src/main/java/org/iq80/leveldb/impl/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,25 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import org.iq80.leveldb.table.UserComparator;
import org.iq80.leveldb.util.*;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Ordering.natural;
import static org.iq80.leveldb.impl.DbConstants.*;
import static org.iq80.leveldb.impl.DbConstants.MAX_MEM_COMPACT_LEVEL;
import static org.iq80.leveldb.impl.DbConstants.NUM_LEVELS;
import static org.iq80.leveldb.impl.SequenceNumber.MAX_SEQUENCE_NUMBER;
import static org.iq80.leveldb.impl.VersionSet.MAX_GRAND_PARENT_OVERLAP_BYTES;

// todo this class should be immutable
public class Version implements SeekingIterable<InternalKey, Slice>
{
private final AtomicInteger retained = new AtomicInteger(1);
private final VersionSet versionSet;
private final Level0 level0;
private final List<Level> levels;

Expand All @@ -49,8 +49,6 @@ public class Version implements SeekingIterable<InternalKey, Slice>
private FileMetaData fileToCompact;
private int fileToCompactLevel;

private final VersionSet versionSet;

public Version(VersionSet versionSet)
{
this.versionSet = versionSet;
Expand Down Expand Up @@ -275,4 +273,23 @@ else if (getInternalKeyComparator().compare(fileMetaData.getSmallest(), key) > 0
return result;
}

public void retain() {
int was = retained.getAndIncrement();
assert was>0 : "Version was retain after it was disposed.";
}

public void release() {
int now = retained.decrementAndGet();
assert now >= 0 : "Version was released after it was disposed.";
if( now == 0 ) {
// The version is now disposed.
versionSet.removeVersion(this);
}
}

public boolean isDisposed() {
return retained.get() <= 0;
}


}
26 changes: 13 additions & 13 deletions leveldb/src/main/java/org/iq80/leveldb/impl/VersionSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,8 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.collect.Lists.newArrayList;
Expand Down Expand Up @@ -124,10 +115,20 @@ public void destroy()
private void appendVersion(Version version)
{
Preconditions.checkNotNull(version, "version is null");
Preconditions.checkArgument(version != current, "version is null");

Preconditions.checkArgument(version != current, "version is the current version");
Version previous = current;
current = version;
activeVersions.put(version, new Object());
if(previous!=null) {
previous.release();
}
}

public void removeVersion(Version version) {
Preconditions.checkNotNull(version, "version is null");
Preconditions.checkArgument(version != current, "version is the current version");
boolean removed = activeVersions.remove(version)!=null;
assert removed : "Expected the version to still be in the active set";
}

public InternalKeyComparator getInternalKeyComparator() {
Expand Down Expand Up @@ -174,7 +175,6 @@ public MergingIterator makeInputIterator(Compaction c)
// Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level.
// TODO(opt): use concatenating iterator for level-0 if there is no overlap
int space = (c.getLevel() == 0 ? c.getLevelInputs().size() + 1 : 2);
List<InternalIterator> list = newArrayList();
for (int which = 0; which < 2; which++) {
if (!c.getInputs()[which].isEmpty()) {
Expand Down

0 comments on commit 51732e6

Please sign in to comment.