Skip to content

Commit

Permalink
Pin top-level index on partitioned index/filter blocks (facebook#4037)
Browse files Browse the repository at this point in the history
Summary:
Top-level index in partitioned index/filter blocks are small and could be pinned in memory. So far we use that by cache_index_and_filter_blocks to false. This however make it difficult to keep account of the total memory usage. This patch introduces pin_top_level_index_and_filter which in combination with cache_index_and_filter_blocks=true keeps the top-level index in cache and yet pinned them to avoid cache misses and also cache lookup overhead.
Closes facebook#4037

Differential Revision: D8596218

Pulled By: maysamyabandeh

fbshipit-source-id: 3a5f7f9ca6b4b525b03ff6bd82354881ae974ad2
  • Loading branch information
Maysam Yabandeh authored and facebook-github-bot committed Jun 22, 2018
1 parent c726f7f commit 80ade9a
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 102 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### New Features
* Changes the format of index blocks by storing the key in their raw form rather than converting them to InternalKey. This saves 8 bytes per index key. The feature is backward compatbile but not forward compatible. It is disabled by default unless format_version 3 or above is used.
* Improve the performance of iterators doing long range scans by using readahead, when using direct IO.
* pin_top_level_index_and_filter (default true) in BlockBasedTableOptions can be used in combination with cache_index_and_filter_blocks to prefetch and pin the top-level index of partitioned index and filter blocks in cache. It has no impact when cache_index_and_filter_blocks is false.

### Bug Fixes
* fix deadlock with enable_pipelined_write=true and max_successive_merges > 0
Expand Down
5 changes: 5 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,11 @@ void rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache(
options->rep.pin_l0_filter_and_index_blocks_in_cache = v;
}

void rocksdb_block_based_options_set_pin_top_level_index_and_filter(
rocksdb_block_based_table_options_t* options, unsigned char v) {
options->rep.pin_top_level_index_and_filter = v;
}

void rocksdb_options_set_block_based_table_factory(
rocksdb_options_t *opt,
rocksdb_block_based_table_options_t* table_options) {
Expand Down
1 change: 1 addition & 0 deletions examples/rocksdb_option_file_example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
block_restart_interval=16
cache_index_and_filter_blocks=false
pin_l0_filter_and_index_blocks_in_cache=false
pin_top_level_index_and_filter=false
index_type=kBinarySearch
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory
3 changes: 3 additions & 0 deletions include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,9 @@ rocksdb_block_based_options_set_cache_index_and_filter_blocks_with_high_priority
extern ROCKSDB_LIBRARY_API void
rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache(
rocksdb_block_based_table_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_block_based_options_set_pin_top_level_index_and_filter(
rocksdb_block_based_table_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_block_based_table_factory(
rocksdb_options_t* opt, rocksdb_block_based_table_options_t* table_options);

Expand Down
7 changes: 7 additions & 0 deletions include/rocksdb/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ struct BlockBasedTableOptions {
// evicted from cache when the table reader is freed.
bool pin_l0_filter_and_index_blocks_in_cache = false;

// If cache_index_and_filter_blocks is true and the below is true, then
// the top-level index of partitioned filter and index blocks are stored in
// the cache, but a reference is held in the "table reader" object so the
// blocks are pinned and only evicted from cache when the table reader is
// freed. This is not limited to l0 in LSM tree.
bool pin_top_level_index_and_filter = true;

// The index type that will be used for this table.
enum IndexType : char {
// A space efficient index block that is optimized for
Expand Down
1 change: 1 addition & 0 deletions options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
"cache_index_and_filter_blocks=1;"
"cache_index_and_filter_blocks_with_high_priority=true;"
"pin_l0_filter_and_index_blocks_in_cache=1;"
"pin_top_level_index_and_filter=1;"
"index_type=kHashSearch;"
"checksum=kxxHash;hash_index_allow_collision=1;no_block_cache=1;"
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
Expand Down
3 changes: 3 additions & 0 deletions table/block_based_table_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ std::string BlockBasedTableFactory::GetPrintableTableOptions() const {
" pin_l0_filter_and_index_blocks_in_cache: %d\n",
table_options_.pin_l0_filter_and_index_blocks_in_cache);
ret.append(buffer);
snprintf(buffer, kBufferSize, " pin_top_level_index_and_filter: %d\n",
table_options_.pin_top_level_index_and_filter);
ret.append(buffer);
snprintf(buffer, kBufferSize, " index_type: %d\n",
table_options_.index_type);
ret.append(buffer);
Expand Down
4 changes: 4 additions & 0 deletions table/block_based_table_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},
{"block_align",
{offsetof(struct BlockBasedTableOptions, block_align),
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},
{"pin_top_level_index_and_filter",
{offsetof(struct BlockBasedTableOptions,
pin_top_level_index_and_filter),
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}};
#endif // !ROCKSDB_LITE
} // namespace rocksdb
96 changes: 63 additions & 33 deletions table/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -898,17 +898,39 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
rep->ioptions.info_log);
}

const bool pin =
// prefetch both index and filters, down to all partitions
const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;
BlockBasedTableOptions::IndexType index_type = new_table->UpdateIndexType();
// prefetch the first level of index
const bool prefetch_index =
prefetch_all ||
(table_options.pin_top_level_index_and_filter &&
index_type == BlockBasedTableOptions::kTwoLevelIndexSearch);
// prefetch the first level of filter
const bool prefetch_filter =
prefetch_all || (table_options.pin_top_level_index_and_filter &&
rep->filter_type == Rep::FilterType::kPartitionedFilter);
// Partition fitlers cannot be enabled without partition indexes
assert(!prefetch_index || prefetch_filter);
// pin both index and filters, down to all partitions
const bool pin_all =
rep->table_options.pin_l0_filter_and_index_blocks_in_cache && level == 0;
// pin the first level of index
const bool pin_index =
pin_all || (table_options.pin_top_level_index_and_filter &&
index_type == BlockBasedTableOptions::kTwoLevelIndexSearch);
// pin the first level of filter
const bool pin_filter =
pin_all || (table_options.pin_top_level_index_and_filter &&
rep->filter_type == Rep::FilterType::kPartitionedFilter);
// pre-fetching of blocks is turned on
// Will use block cache for index/filter blocks access
// Always prefetch index and filter for level 0
if (table_options.cache_index_and_filter_blocks) {
if (prefetch_index_and_filter_in_cache || level == 0) {
assert(table_options.block_cache != nullptr);
assert(table_options.block_cache != nullptr);
if (prefetch_index) {
// Hack: Call NewIndexIterator() to implicitly add index to the
// block_cache

CachableEntry<IndexReader> index_entry;
bool prefix_extractor_changed = false;
// check prefix_extractor match only if hash based index is used
Expand All @@ -924,27 +946,29 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// On success it should give us ownership of the `CachableEntry` by
// populating `index_entry`.
assert(index_entry.value != nullptr);
index_entry.value->CacheDependencies(pin);
if (pin) {
if (prefetch_all) {
index_entry.value->CacheDependencies(pin_all);
}
if (pin_index) {
rep->index_entry = std::move(index_entry);
} else {
index_entry.Release(table_options.block_cache.get());
}

// Hack: Call GetFilter() to implicitly add filter to the block_cache
auto filter_entry = new_table->GetFilter(prefix_extractor);
if (filter_entry.value != nullptr) {
filter_entry.value->CacheDependencies(pin, prefix_extractor);
}
// if pin_l0_filter_and_index_blocks_in_cache is true, and this is
// a level0 file, then save it in rep_->filter_entry; it will be
// released in the destructor only, hence it will be pinned in the
// cache while this reader is alive
if (pin) {
rep->filter_entry = filter_entry;
} else {
filter_entry.Release(table_options.block_cache.get());
}
}
}
if (s.ok() && prefetch_filter) {
// Hack: Call GetFilter() to implicitly add filter to the block_cache
auto filter_entry = new_table->GetFilter(prefix_extractor);
if (filter_entry.value != nullptr && prefetch_all) {
filter_entry.value->CacheDependencies(pin_all, prefix_extractor);
}
// if pin_filter is true then save it in rep_->filter_entry; it will be
// released in the destructor only, hence it will be pinned in the
// cache while this reader is alive
if (pin_filter) {
rep->filter_entry = filter_entry;
} else {
filter_entry.Release(table_options.block_cache.get());
}
}
} else {
Expand All @@ -960,7 +984,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// are hence follow the configuration for pin and prefetch regardless of
// the value of cache_index_and_filter_blocks
if (prefetch_index_and_filter_in_cache || level == 0) {
rep->index_reader->CacheDependencies(pin);
rep->index_reader->CacheDependencies(pin_all);
}

// Set filter block
Expand All @@ -973,7 +997,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// Refer to the comment above about paritioned indexes always being
// cached
if (filter && (prefetch_index_and_filter_in_cache || level == 0)) {
filter->CacheDependencies(pin, prefix_extractor);
filter->CacheDependencies(pin_all, prefix_extractor);
}
}
} else {
Expand Down Expand Up @@ -2419,18 +2443,11 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
return in_cache;
}

// REQUIRES: The following fields of rep_ should have already been populated:
// 1. file
// 2. index_handle,
// 3. options
// 4. internal_comparator
// 5. index_type
Status BlockBasedTable::CreateIndexReader(
FilePrefetchBuffer* prefetch_buffer, IndexReader** index_reader,
InternalIterator* preloaded_meta_index_iter, int level) {
BlockBasedTableOptions::IndexType BlockBasedTable::UpdateIndexType() {
// Some old version of block-based tables don't have index type present in
// table properties. If that's the case we can safely use the kBinarySearch.
auto index_type_on_file = BlockBasedTableOptions::kBinarySearch;
BlockBasedTableOptions::IndexType index_type_on_file =
BlockBasedTableOptions::kBinarySearch;
if (rep_->table_properties) {
auto& props = rep_->table_properties->user_collected_properties;
auto pos = props.find(BlockBasedTablePropertyNames::kIndexType);
Expand All @@ -2441,6 +2458,19 @@ Status BlockBasedTable::CreateIndexReader(
rep_->index_type = index_type_on_file;
}
}
return index_type_on_file;
}

// REQUIRES: The following fields of rep_ should have already been populated:
// 1. file
// 2. index_handle,
// 3. options
// 4. internal_comparator
// 5. index_type
Status BlockBasedTable::CreateIndexReader(
FilePrefetchBuffer* prefetch_buffer, IndexReader** index_reader,
InternalIterator* preloaded_meta_index_iter, int level) {
auto index_type_on_file = UpdateIndexType();

auto file = rep_->file.get();
const InternalKeyComparator* icomparator = &rep_->internal_comparator;
Expand Down
14 changes: 9 additions & 5 deletions table/block_based_table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ class BlockBasedTable : public TableReader {

void ReadMeta(const Footer& footer);

// Figure the index type, update it in rep_, and also return it.
BlockBasedTableOptions::IndexType UpdateIndexType();

// Create a index reader based on the index type stored in the table.
// Optionally, user can pass a preloaded meta_index_iter for the index that
// need to access extra meta blocks for index construction. This parameter
Expand Down Expand Up @@ -478,11 +481,12 @@ struct BlockBasedTable::Rep {
// block to extract prefix without knowing if a key is internal or not.
unique_ptr<SliceTransform> internal_prefix_transform;

// only used in level 0 files:
// when pin_l0_filter_and_index_blocks_in_cache is true, we do use the
// LRU cache, but we always keep the filter & idndex block's handle checked
// out here (=we don't call Release()), plus the parsed out objects
// the LRU cache will never push flush them out, hence they're pinned
// only used in level 0 files when pin_l0_filter_and_index_blocks_in_cache is
// true or in all levels when pin_top_level_index_and_filter is set in
// combination with partitioned index/filters: then we do use the LRU cache,
// but we always keep the filter & index block's handle checked out here (=we
// don't call Release()), plus the parsed out objects the LRU cache will never
// push flush them out, hence they're pinned
CachableEntry<FilterBlockReader> filter_entry;
CachableEntry<IndexReader> index_entry;
// range deletion meta-block is pinned through reader's lifetime when LRU
Expand Down
132 changes: 68 additions & 64 deletions table/table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2330,70 +2330,74 @@ TEST_P(BlockBasedTableTest, NoObjectInCacheAfterTableClose) {
}
for (bool index_and_filter_in_cache : {true, false}) {
for (bool pin_l0 : {true, false}) {
if (pin_l0 && !index_and_filter_in_cache) {
continue;
}
// Create a table
Options opt;
unique_ptr<InternalKeyComparator> ikc;
ikc.reset(new test::PlainInternalKeyComparator(opt.comparator));
opt.compression = kNoCompression;
BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
table_options.block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
table_options.pin_l0_filter_and_index_blocks_in_cache = pin_l0;
table_options.partition_filters = partition_filter;
table_options.cache_index_and_filter_blocks =
index_and_filter_in_cache;
// big enough so we don't ever lose cached values.
table_options.block_cache = std::shared_ptr<rocksdb::Cache>(
new MockCache(16 * 1024 * 1024, 4, false, 0.0));
table_options.filter_policy.reset(
rocksdb::NewBloomFilterPolicy(10, block_based_filter));
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));

bool convert_to_internal_key = false;
TableConstructor c(BytewiseComparator(), convert_to_internal_key,
level);
std::string user_key = "k01";
std::string key =
InternalKey(user_key, 0, kTypeValue).Encode().ToString();
c.Add(key, "hello");
std::vector<std::string> keys;
stl_wrappers::KVMap kvmap;
const ImmutableCFOptions ioptions(opt);
const MutableCFOptions moptions(opt);
c.Finish(opt, ioptions, moptions, table_options, *ikc, &keys,
&kvmap);

// Doing a read to make index/filter loaded into the cache
auto table_reader =
dynamic_cast<BlockBasedTable*>(c.GetTableReader());
PinnableSlice value;
GetContext get_context(opt.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value,
nullptr, nullptr, nullptr, nullptr);
InternalKey ikey(user_key, 0, kTypeValue);
auto s = table_reader->Get(ReadOptions(), key, &get_context,
moptions.prefix_extractor.get());
ASSERT_EQ(get_context.State(), GetContext::kFound);
ASSERT_STREQ(value.data(), "hello");

// Close the table
c.ResetTableReader();

auto usage = table_options.block_cache->GetUsage();
auto pinned_usage = table_options.block_cache->GetPinnedUsage();
// The only usage must be for marked data blocks
ASSERT_EQ(usage, MockCache::marked_size_);
// There must be some pinned data since PinnableSlice has not
// released them yet
ASSERT_GT(pinned_usage, 0);
// Release pinnable slice reousrces
value.Reset();
pinned_usage = table_options.block_cache->GetPinnedUsage();
ASSERT_EQ(pinned_usage, 0);
for (bool pin_top_level : {true, false}) {
if (pin_l0 && !index_and_filter_in_cache) {
continue;
}
// Create a table
Options opt;
unique_ptr<InternalKeyComparator> ikc;
ikc.reset(new test::PlainInternalKeyComparator(opt.comparator));
opt.compression = kNoCompression;
BlockBasedTableOptions table_options =
GetBlockBasedTableOptions();
table_options.block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
table_options.pin_l0_filter_and_index_blocks_in_cache = pin_l0;
table_options.pin_top_level_index_and_filter = pin_top_level;
table_options.partition_filters = partition_filter;
table_options.cache_index_and_filter_blocks =
index_and_filter_in_cache;
// big enough so we don't ever lose cached values.
table_options.block_cache = std::shared_ptr<rocksdb::Cache>(
new MockCache(16 * 1024 * 1024, 4, false, 0.0));
table_options.filter_policy.reset(
rocksdb::NewBloomFilterPolicy(10, block_based_filter));
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));

bool convert_to_internal_key = false;
TableConstructor c(BytewiseComparator(), convert_to_internal_key,
level);
std::string user_key = "k01";
std::string key =
InternalKey(user_key, 0, kTypeValue).Encode().ToString();
c.Add(key, "hello");
std::vector<std::string> keys;
stl_wrappers::KVMap kvmap;
const ImmutableCFOptions ioptions(opt);
const MutableCFOptions moptions(opt);
c.Finish(opt, ioptions, moptions, table_options, *ikc, &keys,
&kvmap);

// Doing a read to make index/filter loaded into the cache
auto table_reader =
dynamic_cast<BlockBasedTable*>(c.GetTableReader());
PinnableSlice value;
GetContext get_context(opt.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value,
nullptr, nullptr, nullptr, nullptr);
InternalKey ikey(user_key, 0, kTypeValue);
auto s = table_reader->Get(ReadOptions(), key, &get_context,
moptions.prefix_extractor.get());
ASSERT_EQ(get_context.State(), GetContext::kFound);
ASSERT_STREQ(value.data(), "hello");

// Close the table
c.ResetTableReader();

auto usage = table_options.block_cache->GetUsage();
auto pinned_usage = table_options.block_cache->GetPinnedUsage();
// The only usage must be for marked data blocks
ASSERT_EQ(usage, MockCache::marked_size_);
// There must be some pinned data since PinnableSlice has not
// released them yet
ASSERT_GT(pinned_usage, 0);
// Release pinnable slice reousrces
value.Reset();
pinned_usage = table_options.block_cache->GetPinnedUsage();
ASSERT_EQ(pinned_usage, 0);
}
}
}
}
Expand Down
Loading

0 comments on commit 80ade9a

Please sign in to comment.