Skip to content

Commit

Permalink
Dump Blob DB options to info log
Browse files Browse the repository at this point in the history
Summary:
* Dump blob db options to info log
* Remove BlobDBOptionsImpl to disallow dynamic cast *BlobDBOptions into *BlobDBOptionsImpl. Move options there to be constants or into BlobDBOptions. The dynamic cast is broken after facebook#2645
* Change some of the default options
* Remove blob_db_options.min_blob_size, which is unimplemented. Will implement it soon.
Closes facebook#2671

Differential Revision: D5529912

Pulled By: yiwu-arbug

fbshipit-source-id: dcd58ca981db5bcc7f123b65a0d6f6ae0dc703c7
  • Loading branch information
Yi Wu authored and facebook-github-bot committed Aug 1, 2017
1 parent 3218edc commit 1900771
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 248 deletions.
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,6 @@ set(SOURCES
utilities/backupable/backupable_db.cc
utilities/blob_db/blob_db.cc
utilities/blob_db/blob_db_impl.cc
utilities/blob_db/blob_db_options_impl.cc
utilities/blob_db/blob_dump_tool.cc
utilities/blob_db/blob_file.cc
utilities/blob_db/blob_log_reader.cc
Expand Down
1 change: 0 additions & 1 deletion TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ cpp_library(
"utilities/backupable/backupable_db.cc",
"utilities/blob_db/blob_db.cc",
"utilities/blob_db/blob_db_impl.cc",
"utilities/blob_db/blob_db_options_impl.cc",
"utilities/blob_db/blob_file.cc",
"utilities/blob_db/blob_log_reader.cc",
"utilities/blob_db/blob_log_writer.cc",
Expand Down
1 change: 0 additions & 1 deletion src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ LIB_SOURCES = \
utilities/backupable/backupable_db.cc \
utilities/blob_db/blob_db.cc \
utilities/blob_db/blob_db_impl.cc \
utilities/blob_db/blob_db_options_impl.cc \
utilities/blob_db/blob_file.cc \
utilities/blob_db/blob_log_reader.cc \
utilities/blob_db/blob_log_writer.cc \
Expand Down
5 changes: 1 addition & 4 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3311,10 +3311,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
} else if (FLAGS_use_blob_db) {
blob_db::BlobDBOptions blob_db_options;
blob_db::BlobDB* ptr;
s = CreateLoggerFromOptions(db_name, options, &options.info_log);
if (s.ok()) {
s = blob_db::BlobDB::Open(options, blob_db_options, db_name, &ptr);
}
s = blob_db::BlobDB::Open(options, blob_db_options, db_name, &ptr);
if (s.ok()) {
db->db = ptr;
}
Expand Down
85 changes: 57 additions & 28 deletions utilities/blob_db/blob_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@
//
#ifndef ROCKSDB_LITE

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include "utilities/blob_db/blob_db.h"

#include <inttypes.h>

#include "db/write_batch_internal.h"
#include "monitoring/instrumented_mutex.h"
#include "options/cf_options.h"
Expand Down Expand Up @@ -94,23 +101,31 @@ Status BlobDB::Open(const Options& options, const BlobDBOptions& bdb_options,
return s;
}

Status BlobDB::Open(const DBOptions& db_options,
Status BlobDB::Open(const DBOptions& db_options_input,
const BlobDBOptions& bdb_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, BlobDB** blob_db,
bool no_base_db) {
*blob_db = nullptr;
Status s;

DBOptions db_options(db_options_input);
if (db_options.info_log == nullptr) {
s = CreateLoggerFromOptions(dbname, db_options, &db_options.info_log);
if (!s.ok()) {
return s;
}
}

DBOptions my_db_options(db_options);
FlushBeginListener_t fblistener =
std::make_shared<BlobDBFlushBeginListener>();
CompactionListener_t ce_listener =
std::make_shared<EvictAllVersionsCompactionListener>();
ReconcileWalFilter_t rw_filter = std::make_shared<BlobReconcileWalFilter>();

my_db_options.listeners.emplace_back(fblistener);
my_db_options.listeners.emplace_back(ce_listener);
my_db_options.wal_filter = rw_filter.get();
db_options.listeners.emplace_back(fblistener);
db_options.listeners.emplace_back(ce_listener);
db_options.wal_filter = rw_filter.get();

{
MutexLock l(&listener_mutex);
Expand All @@ -120,19 +135,25 @@ Status BlobDB::Open(const DBOptions& db_options,
}

// we need to open blob db first so that recovery can happen
BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, my_db_options);
BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options);
fblistener->SetImplPtr(bdb);
ce_listener->SetImplPtr(bdb);
rw_filter->SetImplPtr(bdb);

Status s = bdb->OpenPhase1();
if (!s.ok()) return s;
s = bdb->OpenPhase1();
if (!s.ok()) {
return s;
}

if (no_base_db) return s;
if (no_base_db) {
return s;
}

DB* db = nullptr;
s = DB::Open(my_db_options, dbname, column_families, handles, &db);
if (!s.ok()) return s;
s = DB::Open(db_options, dbname, column_families, handles, &db);
if (!s.ok()) {
return s;
}

// set the implementation pointer
s = bdb->LinkToBaseDB(db);
Expand All @@ -141,28 +162,36 @@ Status BlobDB::Open(const DBOptions& db_options,
bdb = nullptr;
}
*blob_db = bdb;
bdb_options.Dump(db_options.info_log.get());
return s;
}

BlobDB::BlobDB(DB* db) : StackableDB(db) {}

////////////////////////////////////////////////////////////////////////////////
//
//
// std::function<int(double)> fnCaller =
// std::bind(&A::fn, &anInstance, std::placeholders::_1);
////////////////////////////////////////////////////////////////////////////////
BlobDBOptions::BlobDBOptions()
: blob_dir("blob_dir"),
path_relative(true),
is_fifo(false),
blob_dir_size(1000ULL * 1024ULL * 1024ULL * 1024ULL),
ttl_range_secs(3600),
min_blob_size(512),
bytes_per_sync(0),
blob_file_size(256 * 1024 * 1024),
num_concurrent_simple_blobs(4),
compression(kNoCompression) {}
void BlobDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s",
blob_dir.c_str());
ROCKS_LOG_HEADER(log, " blob_db_options.path_relative: %d",
path_relative);
ROCKS_LOG_HEADER(log, " blob_db_options.is_fifo: %d",
is_fifo);
ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir_size: %" PRIu64,
blob_dir_size);
ROCKS_LOG_HEADER(log, " blob_db_options.ttl_range_secs: %" PRIu32,
ttl_range_secs);
ROCKS_LOG_HEADER(log, " blob_db_options.bytes_per_sync: %" PRIu64,
bytes_per_sync);
ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64,
blob_file_size);
ROCKS_LOG_HEADER(log, "blob_db_options.num_concurrent_simple_blobs: %" PRIu32,
num_concurrent_simple_blobs);
ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p",
ttl_extractor.get());
ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d",
static_cast<int>(compression));
ROCKS_LOG_HEADER(log, " blob_db_options.disable_background_tasks: %d",
disable_background_tasks);
}

} // namespace blob_db
} // namespace rocksdb
Expand Down
32 changes: 13 additions & 19 deletions utilities/blob_db/blob_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,45 +31,41 @@ class TTLExtractor;
struct BlobDBOptions {
// name of the directory under main db, where blobs will be stored.
// default is "blob_dir"
std::string blob_dir;
std::string blob_dir = "blob_dir";

// whether the blob_dir path is relative or absolute.
bool path_relative;
bool path_relative = true;

// is the eviction strategy fifo based
bool is_fifo;
bool is_fifo = false;

// maximum size of the blob dir. Once this gets used, up
// evict the blob file which is oldest (is_fifo )
// 0 means no limits
uint64_t blob_dir_size;
uint64_t blob_dir_size = 0;

// a new bucket is opened, for ttl_range. So if ttl_range is 600seconds
// (10 minutes), and the first bucket starts at 1471542000
// then the blob buckets will be
// first bucket is 1471542000 - 1471542600
// second bucket is 1471542600 - 1471543200
// and so on
uint32_t ttl_range_secs;

// at what size will the blobs be stored in separate log rather than
// inline
uint64_t min_blob_size;
uint32_t ttl_range_secs = 3600;

// at what bytes will the blob files be synced to blob log.
uint64_t bytes_per_sync;
uint64_t bytes_per_sync = 0;

// the target size of each blob file. File will become immutable
// after it exceeds that size
uint64_t blob_file_size;
uint64_t blob_file_size = 256 * 1024 * 1024;

// how many files to use for simple blobs at one time
uint32_t num_concurrent_simple_blobs;
uint32_t num_concurrent_simple_blobs = 1;

// Instead of setting TTL explicitly by calling PutWithTTL or PutUntil,
// applications can set a TTLExtractor which can extract TTL from key-value
// pairs.
std::shared_ptr<TTLExtractor> ttl_extractor;
std::shared_ptr<TTLExtractor> ttl_extractor = nullptr;

// eviction callback.
// this function will be called for every blob that is getting
Expand All @@ -78,14 +74,12 @@ struct BlobDBOptions {
gc_evict_cb_fn;

// what compression to use for Blob's
CompressionType compression;

// default constructor
BlobDBOptions();
CompressionType compression = kNoCompression;

BlobDBOptions(const BlobDBOptions& in) = default;
// Disable all background job.
bool disable_background_tasks = false;

virtual ~BlobDBOptions() = default;
void Dump(Logger* log) const;
};

class BlobDB : public StackableDB {
Expand Down
50 changes: 19 additions & 31 deletions utilities/blob_db/blob_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
total_blob_space_(0),
open_p1_done_(false),
debug_level_(0) {
const BlobDBOptionsImpl* options_impl =
static_cast_with_check<const BlobDBOptionsImpl, const BlobDBOptions>(
&blob_db_options);
if (options_impl) {
bdb_options_ = *options_impl;
}
blob_dir_ = (bdb_options_.path_relative)
? dbname + "/" + bdb_options_.blob_dir
: bdb_options_.blob_dir;
Expand Down Expand Up @@ -263,12 +257,6 @@ BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options)
total_periods_write_(0),
total_periods_ampl_(0),
total_blob_space_(0) {
assert(db_impl_ != nullptr);
const BlobDBOptionsImpl* options_impl =
static_cast_with_check<const BlobDBOptionsImpl, const BlobDBOptions>(
&blob_db_options);
bdb_options_ = *options_impl;

if (!bdb_options_.blob_dir.empty())
blob_dir_ = (bdb_options_.path_relative)
? db_->GetName() + "/" + bdb_options_.blob_dir
Expand Down Expand Up @@ -304,27 +292,27 @@ Status BlobDBImpl::OpenPhase1() {
void BlobDBImpl::StartBackgroundTasks() {
// store a call to a member function and object
tqueue_.add(
bdb_options_.reclaim_of_period_millisecs,
kReclaimOpenFilesPeriodMillisecs,
std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
tqueue_.add(bdb_options_.gc_check_period_millisecs,
tqueue_.add(kGCCheckPeriodMillisecs,
std::bind(&BlobDBImpl::RunGC, this, std::placeholders::_1));
tqueue_.add(
bdb_options_.deletion_check_period_millisecs,
kDeleteCheckPeriodMillisecs,
std::bind(&BlobDBImpl::EvictDeletions, this, std::placeholders::_1));
tqueue_.add(
bdb_options_.deletion_check_period_millisecs,
kDeleteCheckPeriodMillisecs,
std::bind(&BlobDBImpl::EvictCompacted, this, std::placeholders::_1));
tqueue_.add(
bdb_options_.delete_obsf_period_millisecs,
kDeleteObsoletedFilesPeriodMillisecs,
std::bind(&BlobDBImpl::DeleteObsFiles, this, std::placeholders::_1));
tqueue_.add(bdb_options_.sanity_check_period_millisecs,
tqueue_.add(kSanityCheckPeriodMillisecs,
std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
tqueue_.add(bdb_options_.wa_stats_period_millisecs,
tqueue_.add(kWriteAmplificationStatsPeriodMillisecs,
std::bind(&BlobDBImpl::WaStats, this, std::placeholders::_1));
tqueue_.add(bdb_options_.fsync_files_period_millisecs,
tqueue_.add(kFSyncFilesPeriodMillisecs,
std::bind(&BlobDBImpl::FsyncFiles, this, std::placeholders::_1));
tqueue_.add(
bdb_options_.check_seqf_period_millisecs,
kCheckSeqFilesPeriodMillisecs,
std::bind(&BlobDBImpl::CheckSeqFiles, this, std::placeholders::_1));
}

Expand Down Expand Up @@ -1606,8 +1594,9 @@ std::pair<bool, int64_t> BlobDBImpl::FsyncFiles(bool aborted) {
std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
if (aborted) return std::make_pair(false, -1);

if (open_file_count_.load() < bdb_options_.open_files_trigger)
if (open_file_count_.load() < kOpenFilesTrigger) {
return std::make_pair(true, -1);
}

// in the future, we should sort by last_access_
// instead of closing every file
Expand All @@ -1628,7 +1617,7 @@ std::pair<bool, int64_t> BlobDBImpl::WaStats(bool aborted) {

WriteLock wl(&mutex_);

if (all_periods_write_.size() < bdb_options_.wa_num_stats_periods) {
if (all_periods_write_.size() < kWriteAmplificationStatsPeriods) {
total_periods_write_ -= (*all_periods_write_.begin());
total_periods_ampl_ = (*all_periods_ampl_.begin());

Expand Down Expand Up @@ -1868,15 +1857,14 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, std::time_t tt,
return true;
}

if (bdb_options_.ttl_range_secs <
bdb_options_.partial_expiration_gc_range_secs) {
if (bdb_options_.ttl_range_secs < kPartialExpirationGCRangeSecs) {
*reason = "has ttl but partial expiration not turned on";
return false;
}

ReadLock lockbfile_r(&bfile->mutex_);
bool ret = ((bfile->deleted_size_ * 100.0 / bfile->file_size_.load()) >
bdb_options_.partial_expiration_pct);
kPartialExpirationPercentage);
if (ret) {
*reason = "deleted blobs beyond threshold";
} else {
Expand All @@ -1895,13 +1883,14 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, std::time_t tt,
ReadLock lockbfile_r(&bfile->mutex_);

if ((bfile->deleted_size_ * 100.0 / bfile->file_size_.load()) >
bdb_options_.partial_expiration_pct) {
kPartialExpirationPercentage) {
*reason = "deleted simple blobs beyond threshold";
return true;
}

// if we haven't reached limits of disk space, don't DELETE
if (total_blob_space_.load() < bdb_options_.blob_dir_size) {
if (bdb_options_.blob_dir_size == 0 ||
total_blob_space_.load() < bdb_options_.blob_dir_size) {
*reason = "disk space not exceeded";
return false;
}
Expand Down Expand Up @@ -2057,7 +2046,7 @@ void BlobDBImpl::FilterSubsetOfFiles(
uint64_t last_id, size_t files_to_collect) {
// 100.0 / 15.0 = 7
uint64_t next_epoch_increment = static_cast<uint64_t>(
std::ceil(100 / static_cast<double>(bdb_options_.gc_file_pct)));
std::ceil(100 / static_cast<double>(kGCFilePercentage)));
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
std::time_t tt = std::chrono::system_clock::to_time_t(now);

Expand Down Expand Up @@ -2114,8 +2103,7 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {

// 15% of files are collected each call to space out the IO and CPU
// consumption.
size_t files_to_collect =
(bdb_options_.gc_file_pct * blob_files.size()) / 100;
size_t files_to_collect = (kGCFilePercentage * blob_files.size()) / 100;

std::vector<std::shared_ptr<BlobFile>> to_process;
FilterSubsetOfFiles(blob_files, &to_process, current_epoch_, last_id,
Expand Down
Loading

0 comments on commit 1900771

Please sign in to comment.