Skip to content

Commit

Permalink
LevelDB 1.13
Browse files Browse the repository at this point in the history
Fix issues 77, 87, 182, 190.

Additionally, fix the bug described in
https://groups.google.com/d/msg/leveldb/yL6h1mAOc20/vLU64RylIdMJ
where a large contiguous keyspace of deleted data was not getting
compacted.

Also fix a bug where options.max_open_files was not getting clamped
properly.
  • Loading branch information
davidsgrogan committed Aug 21, 2013
1 parent 5bd76dc commit 748539c
Show file tree
Hide file tree
Showing 13 changed files with 363 additions and 67 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ TESTHARNESS = ./util/testharness.o $(TESTUTIL)

TESTS = \
arena_test \
autocompact_test \
bloom_test \
c_test \
cache_test \
Expand Down Expand Up @@ -70,7 +71,7 @@ SHARED = $(SHARED1)
else
# Update db.h if you change these.
SHARED_MAJOR = 1
SHARED_MINOR = 12
SHARED_MINOR = 13
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
Expand Down Expand Up @@ -114,6 +115,9 @@ leveldbutil: db/leveldb_main.o $(LIBOBJECTS)
arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)

autocompact_test: db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)

bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)

Expand Down
118 changes: 118 additions & 0 deletions db/autocompact_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "leveldb/db.h"
#include "db/db_impl.h"
#include "leveldb/cache.h"
#include "util/testharness.h"
#include "util/testutil.h"

namespace leveldb {

class AutoCompactTest {
public:
std::string dbname_;
Cache* tiny_cache_;
Options options_;
DB* db_;

AutoCompactTest() {
dbname_ = test::TmpDir() + "/autocompact_test";
tiny_cache_ = NewLRUCache(100);
options_.block_cache = tiny_cache_;
DestroyDB(dbname_, options_);
options_.create_if_missing = true;
options_.compression = kNoCompression;
ASSERT_OK(DB::Open(options_, dbname_, &db_));
}

~AutoCompactTest() {
delete db_;
DestroyDB(dbname_, Options());
delete tiny_cache_;
}

std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key%06d", i);
return std::string(buf);
}

uint64_t Size(const Slice& start, const Slice& limit) {
Range r(start, limit);
uint64_t size;
db_->GetApproximateSizes(&r, 1, &size);
return size;
}

void DoReads(int n);
};

static const int kValueSize = 200 * 1024;
static const int kTotalSize = 100 * 1024 * 1024;
static const int kCount = kTotalSize / kValueSize;

// Read through the first n keys repeatedly and check that they get
// compacted (verified by checking the size of the key space).
void AutoCompactTest::DoReads(int n) {
std::string value(kValueSize, 'x');
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);

// Fill database
for (int i = 0; i < kCount; i++) {
ASSERT_OK(db_->Put(WriteOptions(), Key(i), value));
}
ASSERT_OK(dbi->TEST_CompactMemTable());

// Delete everything
for (int i = 0; i < kCount; i++) {
ASSERT_OK(db_->Delete(WriteOptions(), Key(i)));
}
ASSERT_OK(dbi->TEST_CompactMemTable());

// Get initial measurement of the space we will be reading.
const int64_t initial_size = Size(Key(0), Key(n));
const int64_t initial_other_size = Size(Key(n), Key(kCount));

// Read until size drops significantly.
std::string limit_key = Key(n);
for (int read = 0; true; read++) {
ASSERT_LT(read, 100) << "Taking too long to compact";
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst();
iter->Valid() && iter->key().ToString() < limit_key;
iter->Next()) {
// Drop data
}
delete iter;
// Wait a little bit to allow any triggered compactions to complete.
Env::Default()->SleepForMicroseconds(1000000);
uint64_t size = Size(Key(0), Key(n));
fprintf(stderr, "iter %3d => %7.3f MB [other %7.3f MB]\n",
read+1, size/1048576.0, Size(Key(n), Key(kCount))/1048576.0);
if (size <= initial_size/10) {
break;
}
}

// Verify that the size of the key space not touched by the reads
// is pretty much unchanged.
const int64_t final_other_size = Size(Key(n), Key(kCount));
ASSERT_LE(final_other_size, initial_other_size + 1048576);
ASSERT_GE(final_other_size, initial_other_size/5 - 1048576);
}

TEST(AutoCompactTest, ReadAll) {
DoReads(kCount);
}

TEST(AutoCompactTest, ReadHalf) {
DoReads(kCount/2);
}

} // namespace leveldb

int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}
51 changes: 22 additions & 29 deletions db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class CorruptionTest {
CorruptionTest() {
tiny_cache_ = NewLRUCache(100);
options_.env = &env_;
options_.block_cache = tiny_cache_;
dbname_ = test::TmpDir() + "/db_test";
DestroyDB(dbname_, options_);

Expand All @@ -50,17 +51,14 @@ class CorruptionTest {
delete tiny_cache_;
}

Status TryReopen(Options* options = NULL) {
Status TryReopen() {
delete db_;
db_ = NULL;
Options opt = (options ? *options : options_);
opt.env = &env_;
opt.block_cache = tiny_cache_;
return DB::Open(opt, dbname_, &db_);
return DB::Open(options_, dbname_, &db_);
}

void Reopen(Options* options = NULL) {
ASSERT_OK(TryReopen(options));
void Reopen() {
ASSERT_OK(TryReopen());
}

void RepairDB() {
Expand Down Expand Up @@ -92,6 +90,10 @@ class CorruptionTest {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
uint64_t key;
Slice in(iter->key());
if (in == "" || in == "~") {
// Ignore boundary keys.
continue;
}
if (!ConsumeDecimalNumber(&in, &key) ||
!in.empty() ||
key < next_expected) {
Expand Down Expand Up @@ -233,7 +235,7 @@ TEST(CorruptionTest, TableFile) {
dbi->TEST_CompactRange(1, NULL, NULL);

Corrupt(kTableFile, 100, 1);
Check(99, 99);
Check(90, 99);
}

TEST(CorruptionTest, TableFileIndexData) {
Expand Down Expand Up @@ -299,40 +301,31 @@ TEST(CorruptionTest, CompactionInputError) {
ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));

Corrupt(kTableFile, 100, 1);
Check(9, 9);
Check(5, 9);

// Force compactions by writing lots of values
Build(10000);
Check(10000, 10000);
}

TEST(CorruptionTest, CompactionInputErrorParanoid) {
Options options;
options.paranoid_checks = true;
options.write_buffer_size = 1048576;
Reopen(&options);
options_.paranoid_checks = true;
options_.write_buffer_size = 512 << 10;
Reopen();
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);

// Fill levels >= 1 so memtable compaction outputs to level 1
for (int level = 1; level < config::kNumLevels; level++) {
dbi->Put(WriteOptions(), "", "begin");
dbi->Put(WriteOptions(), "~", "end");
// Make multiple inputs so we need to compact.
for (int i = 0; i < 2; i++) {
Build(10);
dbi->TEST_CompactMemTable();
Corrupt(kTableFile, 100, 1);
env_.SleepForMicroseconds(100000);
}
dbi->CompactRange(NULL, NULL);

Build(10);
dbi->TEST_CompactMemTable();
ASSERT_EQ(1, Property("leveldb.num-files-at-level0"));

Corrupt(kTableFile, 100, 1);
Check(9, 9);

// Write must eventually fail because of corrupted table
Status s;
// Write must fail because of corrupted table
std::string tmp1, tmp2;
for (int i = 0; i < 10000 && s.ok(); i++) {
s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
}
Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2));
ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
}

Expand Down
41 changes: 27 additions & 14 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ Options SanitizeOptions(const std::string& dbname,
return result;
}

DBImpl::DBImpl(const Options& options, const std::string& dbname)
: env_(options.env),
internal_comparator_(options.comparator),
internal_filter_policy_(options.filter_policy),
options_(SanitizeOptions(
dbname, &internal_comparator_, &internal_filter_policy_, options)),
owns_info_log_(options_.info_log != options.info_log),
owns_cache_(options_.block_cache != options.block_cache),
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
: env_(raw_options.env),
internal_comparator_(raw_options.comparator),
internal_filter_policy_(raw_options.filter_policy),
options_(SanitizeOptions(dbname, &internal_comparator_,
&internal_filter_policy_, raw_options)),
owns_info_log_(options_.info_log != raw_options.info_log),
owns_cache_(options_.block_cache != raw_options.block_cache),
dbname_(dbname),
db_lock_(NULL),
shutting_down_(NULL),
Expand All @@ -130,6 +130,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
logfile_(NULL),
logfile_number_(0),
log_(NULL),
seed_(0),
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
manual_compaction_(NULL),
Expand All @@ -138,7 +139,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
has_imm_.Release_Store(NULL);

// Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options.max_open_files - kNumNonTableCacheFiles;
const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);

versions_ = new VersionSet(dbname_, &options_, table_cache_,
Expand Down Expand Up @@ -1027,7 +1028,8 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
} // namespace

Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot) {
SequenceNumber* latest_snapshot,
uint32_t* seed) {
IterState* cleanup = new IterState;
mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
Expand All @@ -1051,13 +1053,15 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
cleanup->version = versions_->current();
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);

*seed = ++seed_;
mutex_.Unlock();
return internal_iter;
}

Iterator* DBImpl::TEST_NewInternalIterator() {
SequenceNumber ignored;
return NewInternalIterator(ReadOptions(), &ignored);
uint32_t ignored_seed;
return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
}

int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
Expand Down Expand Up @@ -1114,12 +1118,21 @@ Status DBImpl::Get(const ReadOptions& options,

Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
uint32_t seed;
Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
return NewDBIterator(
&dbname_, env_, user_comparator(), internal_iter,
this, user_comparator(), iter,
(options.snapshot != NULL
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
: latest_snapshot),
seed);
}

void DBImpl::RecordReadSample(Slice key) {
MutexLock l(&mutex_);
if (versions_->current()->RecordReadSample(key)) {
MaybeScheduleCompaction();
}
}

const Snapshot* DBImpl::GetSnapshot() {
Expand Down
9 changes: 8 additions & 1 deletion db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,19 @@ class DBImpl : public DB {
// file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes();

// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes.
void RecordReadSample(Slice key);

private:
friend class DB;
struct CompactionState;
struct Writer;

Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot);
SequenceNumber* latest_snapshot,
uint32_t* seed);

Status NewDB();

Expand Down Expand Up @@ -135,6 +141,7 @@ class DBImpl : public DB {
WritableFile* logfile_;
uint64_t logfile_number_;
log::Writer* log_;
uint32_t seed_; // For sampling.

// Queue of writers.
std::deque<Writer*> writers_;
Expand Down
Loading

0 comments on commit 748539c

Please sign in to comment.