Skip to content

Commit

Permalink
Speed up Snappy uncompression, new Logger interface.
Browse files Browse the repository at this point in the history
- Removed one copy of an uncompressed block contents changing
  the signature of Snappy_Uncompress() so it uncompresses into a
  flat array instead of a std::string.
        
  Speeds up readrandom ~10%.

- Instead of a combination of Env/WritableFile, we now have a
  Logger interface that can be easily overridden applications
  that want to supply their own logging.

- Separated out the gcc and Sun Studio parts of atomic_pointer.h
  so we can use 'asm', 'volatile' keywords for Sun Studio.




git-svn-id: https://leveldb.googlecode.com/svn/trunk@39 62dab493-f737-651d-591e-8d6aee1b9529
  • Loading branch information
[email protected] committed Jul 21, 2011
1 parent 6872ace commit 60bd801
Show file tree
Hide file tree
Showing 17 changed files with 261 additions and 212 deletions.
7 changes: 4 additions & 3 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -472,13 +472,14 @@ class Benchmark {
std::string compressed;
bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
int64_t bytes = 0;
std::string uncompressed;
char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
&uncompressed);
bytes += uncompressed.size();
uncompressed);
bytes += input.size();
FinishedSingleOp();
}
delete[] uncompressed;

if (!ok) {
message_ = "(snappy failure)";
Expand Down
45 changes: 17 additions & 28 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,6 @@ struct DBImpl::CompactionState {
}
};

namespace {
class NullWritableFile : public WritableFile {
public:
virtual Status Append(const Slice& data) { return Status::OK(); }
virtual Status Close() { return Status::OK(); }
virtual Status Flush() { return Status::OK(); }
virtual Status Sync() { return Status::OK(); }
};
}

// Fix user-supplied options to be reasonable
template <class T,class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
Expand All @@ -96,11 +86,10 @@ Options SanitizeOptions(const std::string& dbname,
// Open a log file in the same directory as the db
src.env->CreateDir(dbname); // In case it does not exist
src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
Status s = src.env->NewWritableFile(InfoLogFileName(dbname),
&result.info_log);
Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
if (!s.ok()) {
// No place suitable for logging
result.info_log = new NullWritableFile;
result.info_log = NULL;
}
}
if (result.block_cache == NULL) {
Expand Down Expand Up @@ -201,7 +190,7 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
if (s->ok() || options_.paranoid_checks) {
// No change needed
} else {
Log(env_, options_.info_log, "Ignoring error %s", s->ToString().c_str());
Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
*s = Status::OK();
}
}
Expand Down Expand Up @@ -247,7 +236,7 @@ void DBImpl::DeleteObsoleteFiles() {
if (type == kTableFile) {
table_cache_->Evict(number);
}
Log(env_, options_.info_log, "Delete type=%d #%lld\n",
Log(options_.info_log, "Delete type=%d #%lld\n",
int(type),
static_cast<unsigned long long>(number));
env_->DeleteFile(dbname_ + "/" + filenames[i]);
Expand Down Expand Up @@ -336,11 +325,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
SequenceNumber* max_sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
WritableFile* info_log;
Logger* info_log;
const char* fname;
Status* status; // NULL if options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
Log(env, info_log, "%s%s: dropping %d bytes; %s",
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == NULL ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
if (this->status != NULL && this->status->ok()) *this->status = s;
Expand Down Expand Up @@ -370,7 +359,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
// large sequence numbers).
log::Reader reader(file, &reporter, true/*checksum*/,
0/*initial_offset*/);
Log(env_, options_.info_log, "Recovering log #%llu",
Log(options_.info_log, "Recovering log #%llu",
(unsigned long long) log_number);

// Read all the records and add to a memtable
Expand Down Expand Up @@ -434,7 +423,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
meta.number = versions_->NewFileNumber();
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
Log(env_, options_.info_log, "Level-0 table #%llu: started",
Log(options_.info_log, "Level-0 table #%llu: started",
(unsigned long long) meta.number);

Status s;
Expand All @@ -444,7 +433,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
mutex_.Lock();
}

Log(env_, options_.info_log, "Level-0 table #%llu: %lld bytes %s",
Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
(unsigned long long) meta.number,
(unsigned long long) meta.file_size,
s.ToString().c_str());
Expand Down Expand Up @@ -613,7 +602,7 @@ void DBImpl::BackgroundCompaction() {
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit());
VersionSet::LevelSummaryStorage tmp;
Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
static_cast<unsigned long long>(f->file_size),
Expand All @@ -631,7 +620,7 @@ void DBImpl::BackgroundCompaction() {
} else if (shutting_down_.Acquire_Load()) {
// Ignore compaction errors found during shutting down
} else {
Log(env_, options_.info_log,
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
if (options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
Expand Down Expand Up @@ -727,7 +716,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
s = iter->status();
delete iter;
if (s.ok()) {
Log(env_, options_.info_log,
Log(options_.info_log,
"Generated table #%llu: %lld keys, %lld bytes",
(unsigned long long) output_number,
(unsigned long long) current_entries,
Expand All @@ -740,7 +729,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,

Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld();
Log(env_, options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
Expand Down Expand Up @@ -776,7 +765,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions

Log(env_, options_.info_log, "Compacting %d@%d + %d@%d files",
Log(options_.info_log, "Compacting %d@%d + %d@%d files",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
Expand Down Expand Up @@ -859,7 +848,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
last_sequence_for_key = ikey.sequence;
}
#if 0
Log(env_, options_.info_log,
Log(options_.info_log,
" Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
"%d smallest_snapshot: %d",
ikey.user_key.ToString().c_str(),
Expand Down Expand Up @@ -925,7 +914,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
status = InstallCompactionResults(compact);
}
VersionSet::LevelSummaryStorage tmp;
Log(env_, options_.info_log,
Log(options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}
Expand Down Expand Up @@ -1112,7 +1101,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(env_, options_.info_log, "waiting...\n");
Log(options_.info_log, "waiting...\n");
bg_cv_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
Expand Down
20 changes: 10 additions & 10 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Repairer {
for (size_t i = 0; i < tables_.size(); i++) {
bytes += tables_[i].meta.file_size;
}
Log(env_, options_.info_log,
Log(options_.info_log,
"**** Repaired leveldb %s; "
"recovered %d files; %llu bytes. "
"Some data may have been lost. "
Expand Down Expand Up @@ -149,7 +149,7 @@ class Repairer {
std::string logname = LogFileName(dbname_, logs_[i]);
Status status = ConvertLogToTable(logs_[i]);
if (!status.ok()) {
Log(env_, options_.info_log, "Log #%llu: ignoring conversion error: %s",
Log(options_.info_log, "Log #%llu: ignoring conversion error: %s",
(unsigned long long) logs_[i],
status.ToString().c_str());
}
Expand All @@ -160,11 +160,11 @@ class Repairer {
Status ConvertLogToTable(uint64_t log) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
WritableFile* info_log;
Logger* info_log;
uint64_t lognum;
virtual void Corruption(size_t bytes, const Status& s) {
// We print error messages for corruption, but continue repairing.
Log(env, info_log, "Log #%llu: dropping %d bytes; %s",
Log(info_log, "Log #%llu: dropping %d bytes; %s",
(unsigned long long) lognum,
static_cast<int>(bytes),
s.ToString().c_str());
Expand Down Expand Up @@ -209,7 +209,7 @@ class Repairer {
if (status.ok()) {
counter += WriteBatchInternal::Count(&batch);
} else {
Log(env_, options_.info_log, "Log #%llu: ignoring %s",
Log(options_.info_log, "Log #%llu: ignoring %s",
(unsigned long long) log,
status.ToString().c_str());
status = Status::OK(); // Keep going with rest of file
Expand All @@ -231,7 +231,7 @@ class Repairer {
table_numbers_.push_back(meta.number);
}
}
Log(env_, options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
(unsigned long long) log,
counter,
(unsigned long long) meta.number,
Expand All @@ -247,7 +247,7 @@ class Repairer {
Status status = ScanTable(&t);
if (!status.ok()) {
std::string fname = TableFileName(dbname_, table_numbers_[i]);
Log(env_, options_.info_log, "Table #%llu: ignoring %s",
Log(options_.info_log, "Table #%llu: ignoring %s",
(unsigned long long) table_numbers_[i],
status.ToString().c_str());
ArchiveFile(fname);
Expand All @@ -270,7 +270,7 @@ class Repairer {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (!ParseInternalKey(key, &parsed)) {
Log(env_, options_.info_log, "Table #%llu: unparsable key %s",
Log(options_.info_log, "Table #%llu: unparsable key %s",
(unsigned long long) t->meta.number,
EscapeString(key).c_str());
continue;
Expand All @@ -291,7 +291,7 @@ class Repairer {
}
delete iter;
}
Log(env_, options_.info_log, "Table #%llu: %d entries %s",
Log(options_.info_log, "Table #%llu: %d entries %s",
(unsigned long long) t->meta.number,
counter,
status.ToString().c_str());
Expand Down Expand Up @@ -373,7 +373,7 @@ class Repairer {
new_file.append("/");
new_file.append((slash == NULL) ? fname.c_str() : slash + 1);
Status s = env_->RenameFile(fname, new_file);
Log(env_, options_.info_log, "Archiving %s: %s\n",
Log(options_.info_log, "Archiving %s: %s\n",
fname.c_str(), s.ToString().c_str());
}
};
Expand Down
4 changes: 2 additions & 2 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
std::vector<FileMetaData*> expanded1;
GetOverlappingInputs(level+1, new_start, new_limit, &expanded1);
if (expanded1.size() == c->inputs_[1].size()) {
Log(env_, options_->info_log,
Log(options_->info_log,
"Expanding@%d %d+%d to %d+%d\n",
level,
int(c->inputs_[0].size()),
Expand All @@ -1147,7 +1147,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
}

if (false) {
Log(env_, options_->info_log, "Compacting %d '%s' .. '%s'",
Log(options_->info_log, "Compacting %d '%s' .. '%s'",
level,
EscapeString(smallest.Encode()).c_str(),
EscapeString(largest.Encode()).c_str());
Expand Down
29 changes: 23 additions & 6 deletions include/leveldb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
namespace leveldb {

class FileLock;
class Logger;
class RandomAccessFile;
class SequentialFile;
class Slice;
Expand Down Expand Up @@ -134,8 +135,8 @@ class Env {
// same directory.
virtual Status GetTestDirectory(std::string* path) = 0;

// Write an entry to the log file with the specified format.
virtual void Logv(WritableFile* log, const char* format, va_list ap) = 0;
// Create and return a log file for storing informational messages.
virtual Status NewLogger(const std::string& fname, Logger** result) = 0;

// Returns the number of micro-seconds since some fixed point in time. Only
// useful for computing deltas of time.
Expand Down Expand Up @@ -210,6 +211,22 @@ class WritableFile {
void operator=(const WritableFile&);
};

// An interface for writing log messages.
class Logger {
public:
Logger() { }
virtual ~Logger();

// Write an entry to the log file with the specified format.
virtual void Logv(const char* format, va_list ap) = 0;

private:
// No copying allowed
Logger(const Logger&);
void operator=(const Logger&);
};


// Identifies a locked file.
class FileLock {
public:
Expand All @@ -222,9 +239,9 @@ class FileLock {
};

// Log the specified data to *info_log if info_log is non-NULL.
extern void Log(Env* env, WritableFile* info_log, const char* format, ...)
extern void Log(Logger* info_log, const char* format, ...)
# if defined(__GNUC__) || defined(__clang__)
__attribute__((__format__ (__printf__, 3, 4)))
__attribute__((__format__ (__printf__, 2, 3)))
# endif
;

Expand Down Expand Up @@ -284,8 +301,8 @@ class EnvWrapper : public Env {
virtual Status GetTestDirectory(std::string* path) {
return target_->GetTestDirectory(path);
}
virtual void Logv(WritableFile* log, const char* format, va_list ap) {
return target_->Logv(log, format, ap);
virtual Status NewLogger(const std::string& fname, Logger** result) {
return target_->NewLogger(fname, result);
}
uint64_t NowMicros() {
return target_->NowMicros();
Expand Down
6 changes: 3 additions & 3 deletions include/leveldb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ namespace leveldb {
class Cache;
class Comparator;
class Env;
class Logger;
class Snapshot;
class WritableFile;

// DB contents are stored in a set of blocks, each of which holds a
// sequence of key,value pairs. Each block may be compressed before
Expand Down Expand Up @@ -61,10 +61,10 @@ struct Options {
Env* env;

// Any internal progress/error information generated by the db will
// be to written to info_log if it is non-NULL, or to a file stored
// be written to info_log if it is non-NULL, or to a file stored
// in the same directory as the DB contents if info_log is NULL.
// Default: NULL
WritableFile* info_log;
Logger* info_log;

// -------------------
// Parameters that affect performance
Expand Down
14 changes: 11 additions & 3 deletions port/atomic_pointer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,24 @@ namespace port {
// http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx
#define LEVELDB_HAVE_MEMORY_BARRIER

// Gcc and Sun Studio on x86
#elif defined(ARCH_CPU_X86_FAMILY) && \
(defined(__GNUC__) || defined(__SUNPRO_CC))
// Gcc on x86
#elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__)
inline void MemoryBarrier() {
// See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
// this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
__asm__ __volatile__("" : : : "memory");
}
#define LEVELDB_HAVE_MEMORY_BARRIER

// Sun Studio
#elif defined(ARCH_CPU_X86_FAMILY) && defined(__SUNPRO_CC)
inline void MemoryBarrier() {
// See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
// this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
asm volatile("" : : : "memory");
}
#define LEVELDB_HAVE_MEMORY_BARRIER

// Mac OS
#elif defined(OS_MACOSX)
inline void MemoryBarrier() {
Expand Down
Loading

0 comments on commit 60bd801

Please sign in to comment.