Skip to content

Commit

Permalink
Blob storage pr
Browse files Browse the repository at this point in the history
Summary:
The final pull request for Blob Storage.
Closes facebook#2269

Differential Revision: D5033189

Pulled By: yiwu-arbug

fbshipit-source-id: 6356b683ccd58cbf38a1dc55e2ea400feecd5d06
  • Loading branch information
anirbanr-fb authored and facebook-github-bot committed May 10, 2017
1 parent 492fc49 commit d85ff49
Show file tree
Hide file tree
Showing 29 changed files with 5,687 additions and 215 deletions.
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,10 @@ set(SOURCES
util/xxhash.cc
utilities/backupable/backupable_db.cc
utilities/blob_db/blob_db.cc
utilities/blob_db/blob_db_impl.cc
utilities/blob_db/blob_log_reader.cc
utilities/blob_db/blob_log_writer.cc
utilities/blob_db/blob_log_format.cc
utilities/checkpoint/checkpoint_impl.cc
utilities/col_buf_decoder.cc
utilities/col_buf_encoder.cc
Expand Down Expand Up @@ -658,6 +662,7 @@ set(TESTS
util/heap_test.cc
util/rate_limiter_test.cc
util/slice_transform_test.cc
util/timer_queue_test.cc
util/thread_list_test.cc
util/thread_local_test.cc
utilities/backupable/backupable_db_test.cc
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ TESTS = \
ttl_test \
date_tiered_test \
backupable_db_test \
blob_db_test \
document_db_test \
json_document_test \
sim_cache_test \
Expand All @@ -424,6 +423,7 @@ TESTS = \
options_settable_test \
options_util_test \
event_logger_test \
timer_queue_test \
cuckoo_table_builder_test \
cuckoo_table_reader_test \
cuckoo_table_db_test \
Expand Down Expand Up @@ -1307,6 +1307,9 @@ db_bench_tool_test: tools/db_bench_tool_test.o $(BENCHTOOLOBJECTS) $(TESTHARNESS
event_logger_test: util/event_logger_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

timer_queue_test: util/timer_queue_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

sst_dump_test: tools/sst_dump_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

Expand Down
6 changes: 6 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ cpp_library(
"util/xxhash.cc",
"utilities/backupable/backupable_db.cc",
"utilities/blob_db/blob_db.cc",
"utilities/blob_db/blob_db_impl.cc",
"utilities/blob_db/blob_db_options_impl.cc",
"utilities/blob_db/blob_file.cc",
"utilities/blob_db/blob_log_reader.cc",
"utilities/blob_db/blob_log_writer.cc",
"utilities/blob_db/blob_log_format.cc",
"utilities/checkpoint/checkpoint_impl.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc",
Expand Down
4 changes: 3 additions & 1 deletion db/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ CompactionIterator::CompactionIterator(
latest_snapshot_ = snapshots_->back();
}
if (compaction_filter_ != nullptr) {
if (compaction_filter_->IgnoreSnapshots()) ignore_snapshots_ = true;
if (compaction_filter_->IgnoreSnapshots()) {
ignore_snapshots_ = true;
}
} else {
ignore_snapshots_ = false;
}
Expand Down
22 changes: 22 additions & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,14 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
delete casted_s;
}

bool DBImpl::HasActiveSnapshotLaterThanSN(SequenceNumber sn) {
InstrumentedMutexLock l(&mutex_);
if (snapshots_.empty()) {
return false;
}
return (snapshots_.newest()->GetSequenceNumber() > sn);
}

#ifndef ROCKSDB_LITE
Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
TablePropertiesCollection* props) {
Expand Down Expand Up @@ -1821,6 +1829,20 @@ ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
return cf_memtables->GetColumnFamilyHandle();
}

// REQUIRED: mutex is NOT held.
ColumnFamilyHandle* DBImpl::GetColumnFamilyHandleUnlocked(
uint32_t column_family_id) {
ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();

InstrumentedMutexLock l(&mutex_);

if (!cf_memtables->Seek(column_family_id)) {
return nullptr;
}

return cf_memtables->GetColumnFamilyHandle();
}

void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
const Range& range,
uint64_t* const count,
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ class DBImpl : public DB {

virtual SequenceNumber GetLatestSequenceNumber() const override;

bool HasActiveSnapshotLaterThanSN(SequenceNumber sn);

#ifndef ROCKSDB_LITE
using DB::ResetStats;
virtual Status ResetStats() override;
Expand Down Expand Up @@ -465,6 +467,9 @@ class DBImpl : public DB {
// mutex is released.
ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t column_family_id);

// Same as above, should called without mutex held and not on write thread.
ColumnFamilyHandle* GetColumnFamilyHandleUnlocked(uint32_t column_family_id);

// Returns the number of currently running flushes.
// REQUIREMENT: mutex_ must be held when calling this function.
int num_running_flushes() {
Expand Down
21 changes: 17 additions & 4 deletions env/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,14 @@ class PosixEnv : public Env {
return s;
}

virtual Status NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options) override {
virtual Status OpenWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options,
bool reopen = false) {
result->reset();
Status s;
int fd = -1;
int flags = O_CREAT | O_TRUNC;
int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC);
// Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
if (options.use_direct_writes && !options.use_mmap_writes) {
// Note: we should avoid O_APPEND here due to ta the following bug:
Expand Down Expand Up @@ -333,6 +334,18 @@ class PosixEnv : public Env {
return s;
}

virtual Status NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options) override {
return OpenWritableFile(fname, result, options, false);
}

virtual Status ReopenWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options) override {
return OpenWritableFile(fname, result, options, true);
}

virtual Status ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
unique_ptr<WritableFile>* result,
Expand Down
2 changes: 0 additions & 2 deletions include/rocksdb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,6 @@ class SequentialFile {
// aligned buffer for Direct I/O
virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; }

virtual void Rewind() {}

// Remove any kind of caching of data from the offset to offset+length
// of this file. If the length is 0, then it refers to the end of file.
// If the system is not caching the file contents, then this is a noop.
Expand Down
7 changes: 7 additions & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ LIB_SOURCES = \
util/xxhash.cc \
utilities/backupable/backupable_db.cc \
utilities/blob_db/blob_db.cc \
utilities/blob_db/blob_db_impl.cc \
utilities/blob_db/blob_db_options_impl.cc \
utilities/blob_db/blob_file.cc \
utilities/blob_db/blob_log_reader.cc \
utilities/blob_db/blob_log_writer.cc \
utilities/blob_db/blob_log_format.cc \
utilities/checkpoint/checkpoint_impl.cc \
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \
utilities/convenience/info_log_finder.cc \
Expand Down Expand Up @@ -308,6 +314,7 @@ MAIN_SOURCES = \
util/log_write_bench.cc \
util/rate_limiter_test.cc \
util/slice_transform_test.cc \
util/timer_queue_test.cc \
util/thread_list_test.cc \
util/thread_local_test.cc \
utilities/backupable/backupable_db_test.cc \
Expand Down
32 changes: 26 additions & 6 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,10 @@ DEFINE_bool(optimistic_transaction_db, false,
"Open a OptimisticTransactionDB instance. "
"Required for randomtransaction benchmark.");

DEFINE_bool(use_blob_db, false,
"Open a BlobDB instance. "
"Required for largevalue benchmark.");

DEFINE_bool(transaction_db, false,
"Open a TransactionDB instance. "
"Required for randomtransaction benchmark.");
Expand Down Expand Up @@ -630,8 +634,6 @@ DEFINE_bool(report_bg_io_stats, false,
DEFINE_bool(use_stderr_info_logger, false,
"Write info logs to stderr instead of to LOG file. ");

DEFINE_bool(use_blob_db, false, "Whether to use BlobDB. ");

static enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
assert(ctype);

Expand Down Expand Up @@ -1128,6 +1130,15 @@ class RandomGenerator {
pos_ += len;
return Slice(data_.data() + pos_ - len, len);
}

Slice GenerateWithTTL(unsigned int len) {
assert(len <= data_.size());
if (pos_ + len > data_.size()) {
pos_ = 0;
}
pos_ += len;
return Slice(data_.data() + pos_ - len, len);
}
};

static void AppendWithSpace(std::string* str, Slice msg) {
Expand Down Expand Up @@ -3227,9 +3238,14 @@ void VerifyDBFromDB(std::string& truth_db_name) {
if (s.ok()) {
db->db = ptr;
}
#endif // ROCKSDB_LITE
} else if (FLAGS_use_blob_db) {
s = NewBlobDB(options, db_name, &db->db);
blob_db::BlobDBOptions blob_db_options;
blob_db::BlobDB* ptr;
s = blob_db::BlobDB::Open(options, blob_db_options, db_name, &ptr);
if (s.ok()) {
db->db = ptr;
}
#endif // ROCKSDB_LITE
} else {
s = DB::Open(options, db_name, &db->db);
}
Expand Down Expand Up @@ -3406,8 +3422,12 @@ void VerifyDBFromDB(std::string& truth_db_name) {
int64_t rand_num = key_gens[id]->Next();
GenerateKeyFromInt(rand_num, FLAGS_num, &key);
if (FLAGS_use_blob_db) {
s = db_with_cfh->db->Put(write_options_, key,
gen.Generate(value_size_));
Slice val = gen.Generate(value_size_);
int ttl = rand() % 86400;
blob_db::BlobDB* blobdb =
static_cast<blob_db::BlobDB*>(db_with_cfh->db);
s = blobdb->PutWithTTL(write_options_, key, val, ttl);

} else if (FLAGS_num_column_families <= 1) {
batch.Put(key, gen.Generate(value_size_));
} else {
Expand Down
2 changes: 2 additions & 0 deletions util/file_reader_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class SequentialFileReader {

Status Skip(uint64_t n);

void Rewind();

SequentialFile* file() { return file_.get(); }

bool use_direct_io() const { return file_->use_direct_io(); }
Expand Down
Loading

0 comments on commit d85ff49

Please sign in to comment.