Skip to content

Commit

Permalink
Fix race in options taking effect (facebook#11929)
Browse files Browse the repository at this point in the history
Summary:
In follow-up to facebook#11922, fix a race in functions like CreateColumnFamily and SetDBOptions where the DB reports one option setting but a different one is left in effect.

To fix, we can add an extra mutex around these rare operations. We don't want to hold the DB mutex during I/O or other slow things because of the many purposes it serves, but a mutex more limited to these cases should be fine.

I believe this would fix a write-write race in facebook#10079 but not the read-write race.

Intended follow-up to this:
* Should be able to remove write thread synchronization from DBImpl::WriteOptionsFile

Pull Request resolved: facebook#11929

Test Plan:
Added two mini-stress style regression tests that fail with >1% probability before this change:
DBOptionsTest::SetStatsDumpPeriodSecRace
ColumnFamilyTest::CreateAndDropPeriodicRace

I haven't reproduced such an inconsistency between in-memory options and on disk latest options, but this change at least improves safety and adds a test anyway:
DBOptionsTest::SetStatsDumpPeriodSecRace

Reviewed By: ajkr

Differential Revision: D50024506

Pulled By: pdillinger

fbshipit-source-id: 1e99a9ed4d96fdcf3ac5061ec6b3cee78aecdda4
  • Loading branch information
pdillinger authored and facebook-github-bot committed Oct 12, 2023
1 parent b2fe148 commit d010b02
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 22 deletions.
52 changes: 51 additions & 1 deletion db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2473,7 +2473,10 @@ void DropSingleColumnFamily(ColumnFamilyTest* cf_test, int cf_id,
}
} // anonymous namespace

TEST_P(ColumnFamilyTest, CreateAndDropRace) {
// This test attempts to set up a race condition in a way that is no longer
// possible, causing the test to hang. If DBImpl::options_mutex_ is removed
// in the future, this test might become relevant again.
TEST_P(ColumnFamilyTest, DISABLED_CreateAndDropRace) {
const int kCfCount = 5;
std::vector<ColumnFamilyOptions> cf_opts;
std::vector<Comparator*> comparators;
Expand Down Expand Up @@ -2535,6 +2538,53 @@ TEST_P(ColumnFamilyTest, CreateAndDropRace) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_P(ColumnFamilyTest, CreateAndDropPeriodicRace) {
// This is a mini-stress test looking for inconsistency between the set of
// CFs in the DB, particularly whether any use preserve_internal_time_seconds,
// and whether that is accurately reflected in the periodic task setup.
constexpr size_t kNumThreads = 12;
std::vector<std::thread> threads;
bool last_cf_on = Random::GetTLSInstance()->OneIn(2);

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::RegisterRecordSeqnoTimeWorker:BeforePeriodicTaskType",
[&](void* /*arg*/) { std::this_thread::yield(); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

ASSERT_EQ(column_family_options_.preserve_internal_time_seconds, 0U);
ColumnFamilyOptions other_opts = column_family_options_;
ColumnFamilyOptions last_opts = column_family_options_;
(last_cf_on ? last_opts : other_opts).preserve_internal_time_seconds =
1000000;
Open();

for (size_t i = 0; i < kNumThreads; i++) {
threads.emplace_back([this, &other_opts, i]() {
ColumnFamilyHandle* cfh;
ASSERT_OK(db_->CreateColumnFamily(other_opts, std::to_string(i), &cfh));
ASSERT_OK(db_->DropColumnFamily(cfh));
ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh));
});
}

ColumnFamilyHandle* last_cfh;
ASSERT_OK(db_->CreateColumnFamily(last_opts, "last", &last_cfh));

for (auto& t : threads) {
t.join();
}

bool task_enabled = dbfull()->TEST_GetPeriodicTaskScheduler().TEST_HasTask(
PeriodicTaskType::kRecordSeqnoTime);
ASSERT_EQ(last_cf_on, task_enabled);

ASSERT_OK(db_->DropColumnFamily(last_cfh));
ASSERT_OK(db_->DestroyColumnFamilyHandle(last_cfh));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) {
const uint64_t kBaseRate = 800000u;
db_options_.delayed_write_rate = kBaseRate;
Expand Down
23 changes: 18 additions & 5 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,10 @@ Status DBImpl::StartPeriodicTaskScheduler() {
}

Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
if (!from_db_open) {
options_mutex_.AssertHeld();
}

uint64_t min_preserve_seconds = std::numeric_limits<uint64_t>::max();
uint64_t max_preserve_seconds = std::numeric_limits<uint64_t>::min();
bool mapping_was_empty = false;
Expand All @@ -840,11 +844,6 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
}
mapping_was_empty = seqno_to_time_mapping_.Empty();
}
// FIXME: because we released the db mutex, there's a race here where
// if e.g. I create or drop two column families in parallel, I might end up
// with the periodic task scheduler in the wrong state. We don't want to
// just keep holding the mutex, however, because of global timer and mutex
// in PeriodicTaskScheduler.

uint64_t seqno_time_cadence = 0;
if (min_preserve_seconds != std::numeric_limits<uint64_t>::max()) {
Expand All @@ -855,6 +854,9 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF;
}

TEST_SYNC_POINT_CALLBACK(
"DBImpl::RegisterRecordSeqnoTimeWorker:BeforePeriodicTaskType", nullptr);

Status s;
if (seqno_time_cadence == 0) {
s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kRecordSeqnoTime);
Expand Down Expand Up @@ -918,6 +920,7 @@ Status DBImpl::CancelPeriodicTaskScheduler() {

// esitmate the total size of stats_history_
size_t DBImpl::EstimateInMemoryStatsHistorySize() const {
stats_history_mutex_.AssertHeld();
size_t size_total =
sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
if (stats_history_.size() == 0) return size_total;
Expand Down Expand Up @@ -1208,6 +1211,7 @@ Status DBImpl::SetOptions(
return Status::InvalidArgument("empty input");
}

InstrumentedMutexLock ol(&options_mutex_);
MutableCFOptions new_options;
Status s;
Status persist_options_status;
Expand Down Expand Up @@ -1266,6 +1270,7 @@ Status DBImpl::SetDBOptions(
return Status::InvalidArgument("empty input");
}

InstrumentedMutexLock ol(&options_mutex_);
MutableDBOptions new_options;
Status s;
Status persist_options_status = Status::OK();
Expand Down Expand Up @@ -3362,6 +3367,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family,
ColumnFamilyHandle** handle) {
assert(handle != nullptr);
InstrumentedMutexLock ol(&options_mutex_);
Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
if (s.ok()) {
s.UpdateIfOk(WrapUpCreateColumnFamilies({&cf_options}));
Expand All @@ -3374,6 +3380,7 @@ Status DBImpl::CreateColumnFamilies(
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles) {
assert(handles != nullptr);
InstrumentedMutexLock ol(&options_mutex_);
handles->clear();
size_t num_cf = column_family_names.size();
Status s;
Expand All @@ -3397,6 +3404,7 @@ Status DBImpl::CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles) {
assert(handles != nullptr);
InstrumentedMutexLock ol(&options_mutex_);
handles->clear();
size_t num_cf = column_families.size();
Status s;
Expand All @@ -3423,6 +3431,7 @@ Status DBImpl::CreateColumnFamilies(
Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) {
options_mutex_.AssertHeld();
// TODO: plumb Env::IOActivity
const ReadOptions read_options;
Status s;
Expand Down Expand Up @@ -3514,6 +3523,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,

Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
assert(column_family != nullptr);
InstrumentedMutexLock ol(&options_mutex_);
Status s = DropColumnFamilyImpl(column_family);
if (s.ok()) {
s = WriteOptionsFile(true /*need_mutex_lock*/,
Expand All @@ -3524,6 +3534,7 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {

Status DBImpl::DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families) {
InstrumentedMutexLock ol(&options_mutex_);
Status s;
bool success_once = false;
for (auto* handle : column_families) {
Expand Down Expand Up @@ -5164,6 +5175,8 @@ Status DestroyDB(const std::string& dbname, const Options& options,

Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
bool need_enter_write_thread) {
options_mutex_.AssertHeld();

WriteThread::Writer w;
if (need_mutex_lock) {
mutex_.Lock();
Expand Down
14 changes: 12 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2379,9 +2379,19 @@ class DBImpl : public DB {
// Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_;

// In addition to mutex_, log_write_mutex_ protected writes to stats_history_
// Guards changes to DB and CF options to ensure consistency between
// * In-memory options objects
// * Settings in effect
// * Options file contents
// while allowing the DB mutex to be released during slow operations like
// persisting options file or modifying global periodic task timer.
// Always acquired *before* DB mutex when this one is applicable.
InstrumentedMutex options_mutex_;

// Guards reads and writes to in-memory stats_history_.
InstrumentedMutex stats_history_mutex_;
// In addition to mutex_, log_write_mutex_ protected writes to logs_ and

// In addition to mutex_, log_write_mutex_ protects writes to logs_ and
// logfile_number_. With two_write_queues it also protects alive_log_files_,
// and log_empty_. Refer to the definition of each variable below for more
// details.
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ SeqnoToTimeMapping DBImpl::TEST_GetSeqnoToTimeMapping() const {


size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const {
InstrumentedMutexLock l(&const_cast<DBImpl*>(this)->stats_history_mutex_);
return EstimateInMemoryStatsHistorySize();
}
} // namespace ROCKSDB_NAMESPACE
Expand Down
9 changes: 5 additions & 4 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,8 @@ Status DBImpl::PersistentStatsProcessFormatVersion() {
if (s.ok()) {
ColumnFamilyOptions cfo;
OptimizeForPersistentStats(&cfo);
s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
s = CreateColumnFamilyImpl(cfo, kPersistentStatsColumnFamilyName,
&handle);
}
if (s.ok()) {
persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
Expand Down Expand Up @@ -924,7 +925,7 @@ Status DBImpl::InitPersistStatsColumnFamily() {
ColumnFamilyHandle* handle = nullptr;
ColumnFamilyOptions cfo;
OptimizeForPersistentStats(&cfo);
s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
s = CreateColumnFamilyImpl(cfo, kPersistentStatsColumnFamilyName, &handle);
persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
mutex_.Lock();
}
Expand Down Expand Up @@ -1988,6 +1989,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,

impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
RecoveryContext recovery_ctx;
impl->options_mutex_.Lock();
impl->mutex_.Lock();

// Handles create_if_missing, error_if_exists
Expand Down Expand Up @@ -2124,7 +2126,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
// The WriteOptionsFile() will release and lock the mutex internally.
persist_options_status = impl->WriteOptionsFile(
false /*need_mutex_lock*/, false /*need_enter_write_thread*/);

*dbptr = impl;
impl->opened_successfully_ = true;
impl->DeleteObsoleteFiles();
Expand Down Expand Up @@ -2245,10 +2246,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) {
s = impl->StartPeriodicTaskScheduler();
}

if (s.ok()) {
s = impl->RegisterRecordSeqnoTimeWorker(/*from_db_open=*/true);
}
impl->options_mutex_.Unlock();
if (!s.ok()) {
for (auto* h : *handles) {
delete h;
Expand Down
50 changes: 50 additions & 0 deletions db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "rocksdb/convenience.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/stats_history.h"
#include "rocksdb/utilities/options_util.h"
#include "test_util/mock_time_env.h"
#include "test_util/sync_point.h"
#include "test_util/testutil.h"
Expand Down Expand Up @@ -741,6 +742,55 @@ TEST_F(DBOptionsTest, SetStatsDumpPeriodSec) {
Close();
}

TEST_F(DBOptionsTest, SetStatsDumpPeriodSecRace) {
// This is a mini-stress test looking for inconsistency between the reported
// state of the option and the behavior in effect for the DB, after the last
// modification to that option (indefinite inconsistency).
std::vector<std::thread> threads;
for (int i = 0; i < 12; i++) {
threads.emplace_back([this, i]() {
ASSERT_OK(dbfull()->SetDBOptions(
{{"stats_dump_period_sec", i % 2 ? "100" : "0"}}));
});
}

for (auto& t : threads) {
t.join();
}

bool stats_dump_set = dbfull()->GetDBOptions().stats_dump_period_sec > 0;
bool task_enabled = dbfull()->TEST_GetPeriodicTaskScheduler().TEST_HasTask(
PeriodicTaskType::kDumpStats);

ASSERT_EQ(stats_dump_set, task_enabled);
}

TEST_F(DBOptionsTest, SetOptionsAndFileRace) {
// This is a mini-stress test looking for inconsistency between the reported
// state of the option and what is persisted in the options file, after the
// last modification to that option (indefinite inconsistency).
std::vector<std::thread> threads;
for (int i = 0; i < 12; i++) {
threads.emplace_back([this, i]() {
ASSERT_OK(dbfull()->SetOptions({{"ttl", std::to_string(i * 100)}}));
});
}

for (auto& t : threads) {
t.join();
}

auto setting_in_mem = dbfull()->GetOptions().ttl;

std::vector<ColumnFamilyDescriptor> cf_descs;
DBOptions db_options;
ConfigOptions cfg;
cfg.env = env_;
ASSERT_OK(LoadLatestOptions(cfg, dbname_, &db_options, &cf_descs, nullptr));
ASSERT_EQ(cf_descs.size(), 1);
ASSERT_EQ(setting_in_mem, cf_descs[0].options.ttl);
}

TEST_F(DBOptionsTest, SetOptionsStatsPersistPeriodSec) {
Options options;
options.create_if_missing = true;
Expand Down
2 changes: 1 addition & 1 deletion monitoring/instrumented_mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class InstrumentedMutex {

void Unlock() { mutex_.Unlock(); }

void AssertHeld() { mutex_.AssertHeld(); }
void AssertHeld() const { mutex_.AssertHeld(); }

private:
void LockInternal();
Expand Down
2 changes: 1 addition & 1 deletion port/port_example.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Mutex {
// Optionally crash if this thread does not hold this mutex.
// The implementation must be fast, especially if NDEBUG is
// defined. The implementation is allowed to skip all checks.
void AssertHeld();
void AssertHeld() const;
};

class CondVar {
Expand Down
2 changes: 1 addition & 1 deletion port/port_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ bool Mutex::TryLock() {
return ret;
}

void Mutex::AssertHeld() {
void Mutex::AssertHeld() const {
#ifndef NDEBUG
assert(locked_);
#endif
Expand Down
8 changes: 4 additions & 4 deletions port/port_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ class Mutex {

bool TryLock();

// this will assert if the mutex is not locked
// it does NOT verify that mutex is held by a calling thread
void AssertHeld();
// This will fail assertion if the mutex is not locked.
// It does NOT verify that mutex is held by a calling thread.
void AssertHeld() const;

// Also implement std Lockable
inline void lock() { Lock(); }
Expand Down Expand Up @@ -139,7 +139,7 @@ class RWMutex {
void WriteLock();
void ReadUnlock();
void WriteUnlock();
void AssertHeld() {}
void AssertHeld() const {}

private:
pthread_rwlock_t mu_; // the underlying platform mutex
Expand Down
4 changes: 2 additions & 2 deletions port/win/port_win.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class Mutex {

// this will assert if the mutex is not locked
// it does NOT verify that mutex is held by a calling thread
void AssertHeld() {
void AssertHeld() const {
#ifndef NDEBUG
assert(locked_);
#endif
Expand Down Expand Up @@ -159,7 +159,7 @@ class RWMutex {
void WriteUnlock() { ReleaseSRWLockExclusive(&srwLock_); }

// Empty as in POSIX
void AssertHeld() {}
void AssertHeld() const {}

private:
SRWLOCK srwLock_;
Expand Down
1 change: 1 addition & 0 deletions unreleased_history/bug_fixes/options_race.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Fixed a rare race bug involving a concurrent combination of Create/DropColumnFamily and/or Set(DB)Options that could lead to inconsistency between (a) the DB's reported options state, (b) the DB options in effect, and (c) the latest persisted OPTIONS file.
2 changes: 1 addition & 1 deletion util/distributed_mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class DMutex : public folly::DistributedMutex {
explicit DMutex(bool IGNORED_adaptive = false) { (void)IGNORED_adaptive; }

// currently no-op
void AssertHeld() {}
void AssertHeld() const {}
};
using DMutexLock = std::lock_guard<folly::DistributedMutex>;

Expand Down

0 comments on commit d010b02

Please sign in to comment.