Skip to content

Commit

Permalink
Update VersionSet last seqno after LogAndApply (facebook#10051)
Browse files Browse the repository at this point in the history
Summary:
This PR fixes the issue of unstable snapshot during external SST file ingestion. Credit ajkr for the following walk through:  consider these relevant steps for of IngestExternalFile():

(1) increase seqno while holding mutex -- https://github.com/facebook/rocksdb/blob/677d2b4a8f8fd19d0c39a9ee8f648742e610688d/db/db_impl/db_impl.cc#L4768
(2) LogAndApply() -- https://github.com/facebook/rocksdb/blob/677d2b4a8f8fd19d0c39a9ee8f648742e610688d/db/db_impl/db_impl.cc#L4797-L4798
  (a) write to MANIFEST with mutex released https://github.com/facebook/rocksdb/blob/a96a4a2f7ba7633ab2cc51defd1e923e20d239a6/db/version_set.cc#L4407
  (b) apply to in-memory state with mutex held

A snapshot taken during (2a) will be unstable. In particular, queries against that snapshot will not include data from the ingested file before (2b), and will include data from the ingested file after (2b).

Pull Request resolved: facebook#10051

Test Plan:
Added a new unit test: `ExternalSSTFileBasicTest.WriteAfterReopenStableSnapshotWhileLoggingToManifest`.
```
make external_sst_file_basic_test
./external_sst_file_basic_test
```

Reviewed By: ajkr

Differential Revision: D36654033

Pulled By: cbi42

fbshipit-source-id: bf720cca313e0cf211585960f3aff04853a31b96
  • Loading branch information
cbi42 authored and facebook-github-bot committed May 25, 2022
1 parent b71466e commit b0e1906
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 13 deletions.
34 changes: 21 additions & 13 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4809,19 +4809,6 @@ Status DBImpl::IngestExternalFiles(
}
}
if (status.ok()) {
int consumed_seqno_count =
ingestion_jobs[0].ConsumedSequenceNumbersCount();
for (size_t i = 1; i != num_cfs; ++i) {
consumed_seqno_count =
std::max(consumed_seqno_count,
ingestion_jobs[i].ConsumedSequenceNumbersCount());
}
if (consumed_seqno_count > 0) {
const SequenceNumber last_seqno = versions_->LastSequence();
versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastSequence(last_seqno + consumed_seqno_count);
}
autovector<ColumnFamilyData*> cfds_to_commit;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<autovector<VersionEdit*>> edit_lists;
Expand Down Expand Up @@ -4851,6 +4838,27 @@ Status DBImpl::IngestExternalFiles(
status =
versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
edit_lists, &mutex_, directories_.GetDbDir());
// It is safe to update VersionSet last seqno here after LogAndApply since
// LogAndApply persists last sequence number from VersionEdits,
// which are from file's largest seqno and not from VersionSet.
//
// It is necessary to update last seqno here since LogAndApply releases
// mutex when persisting MANIFEST file, and the snapshots taken during
// that period will not be stable if VersionSet last seqno is updated
// before LogAndApply.
int consumed_seqno_count =
ingestion_jobs[0].ConsumedSequenceNumbersCount();
for (size_t i = 1; i != num_cfs; ++i) {
consumed_seqno_count =
std::max(consumed_seqno_count,
ingestion_jobs[i].ConsumedSequenceNumbersCount());
}
if (consumed_seqno_count > 0) {
const SequenceNumber last_seqno = versions_->LastSequence();
versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
versions_->SetLastSequence(last_seqno + consumed_seqno_count);
}
}

if (status.ok()) {
Expand Down
57 changes: 57 additions & 0 deletions db/external_sst_file_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,63 @@ TEST_F(ExternalSSTFileBasicTest, VerifySstUniqueId) {
ASSERT_EQ(skipped, 1);
}

TEST_F(ExternalSSTFileBasicTest, StableSnapshotWhileLoggingToManifest) {
const std::string kPutVal = "put_val";
const std::string kIngestedVal = "ingested_val";

ASSERT_OK(Put("k", kPutVal, WriteOptions()));
ASSERT_OK(Flush());

std::string external_file = sst_files_dir_ + "/file_to_ingest.sst";
{
SstFileWriter sst_file_writer{EnvOptions(), CurrentOptions()};
ASSERT_OK(sst_file_writer.Open(external_file));
ASSERT_OK(sst_file_writer.Put("k", kIngestedVal));
ASSERT_OK(sst_file_writer.Finish());
}

const Snapshot* snapshot = nullptr;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WriteManifest", [&](void* /* arg */) {
// Prevent memory leak: this callback may be called multiple times
// and previous snapshot need to be freed
db_->ReleaseSnapshot(snapshot);
snapshot = db_->GetSnapshot();
ReadOptions read_opts;
read_opts.snapshot = snapshot;
std::string value;
ASSERT_OK(db_->Get(read_opts, "k", &value));
ASSERT_EQ(kPutVal, value);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(db_->IngestExternalFile(db_->DefaultColumnFamily(), {external_file},
IngestExternalFileOptions()));
auto ingested_file_seqno = db_->GetLatestSequenceNumber();
ASSERT_NE(nullptr, snapshot);
// snapshot is taken before SST ingestion is done
ASSERT_EQ(ingested_file_seqno, snapshot->GetSequenceNumber() + 1);

ReadOptions read_opts;
read_opts.snapshot = snapshot;
std::string value;
ASSERT_OK(db_->Get(read_opts, "k", &value));
ASSERT_EQ(kPutVal, value);

db_->ReleaseSnapshot(snapshot);

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
// After reopen, sequence number should be up current such that
// ingested value is read
Reopen(CurrentOptions());
ASSERT_OK(db_->Get(ReadOptions(), "k", &value));
ASSERT_EQ(kIngestedVal, value);

// New write should get higher seqno compared to ingested file
ASSERT_OK(Put("k", kPutVal, WriteOptions()));
ASSERT_EQ(db_->GetLatestSequenceNumber(), ingested_file_seqno + 1);
}

INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest,
testing::Values(std::make_tuple(true, true),
std::make_tuple(true, false),
Expand Down

0 comments on commit b0e1906

Please sign in to comment.