Skip to content

Commit

Permalink
Charge blob cache usage against the global memory limit (facebook#10321)
Browse files Browse the repository at this point in the history
Summary:
To help service owners to manage their memory budget effectively, we have been working towards counting all major memory users inside RocksDB towards a single global memory limit (see e.g. https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager#cost-memory-used-in-memtable-to-block-cache). The global limit is specified by the capacity of the block-based table's block cache, and is technically implemented by inserting dummy entries ("reservations") into the block cache. The goal of this task is to support charging the memory usage of the new blob cache against this global memory limit when the backing cache of the blob cache and the block cache are different.

This PR is a part of facebook#10156

Pull Request resolved: facebook#10321

Reviewed By: ltamasi

Differential Revision: D37913590

Pulled By: gangliao

fbshipit-source-id: eaacf23907f82dc7d18964a3f24d7039a2937a72
  • Loading branch information
gangliao authored and facebook-github-bot committed Jul 19, 2022
1 parent 18a61a1 commit 0b6bc10
Show file tree
Hide file tree
Showing 20 changed files with 661 additions and 74 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ set(SOURCES
cache/cache_entry_roles.cc
cache/cache_key.cc
cache/cache_reservation_manager.cc
cache/charged_cache.cc
cache/clock_cache.cc
cache/compressed_secondary_cache.cc
cache/fast_lru_cache.cc
Expand Down
8 changes: 5 additions & 3 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Rocksdb Change Log
## Unreleased
* Added `prepopulate_blob_cache` to ColumnFamilyOptions. If enabled, prepopulate warm/hot blobs which are already in memory into blob cache at the time of flush. On a flush, the blob that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this blob back into memory again, which is avoided by enabling this option. This further helps if the workload exhibits high temporal locality, where most of the reads go to recently written data. This also helps in case of the remote file system since it involves network traffic and higher latencies.
* Support using secondary cache with the blob cache. When creating a blob cache, the user can set a secondary blob cache by configuring `secondary_cache` in LRUCacheOptions.
* Charge memory usage of blob cache when the backing cache of the blob cache and the block cache are different. If an operation reserving memory for blob cache exceeds the avaible space left in the block cache at some point (i.e, causing a cache full under `LRUCacheOptions::strict_capacity_limit` = true), creation will fail with `Status::MemoryLimit()`. To opt in this feature, enable charging `CacheEntryRole::kBlobCache` in `BlockBasedTableOptions::cache_usage_options`.

### Public API changes
* Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions.

Expand All @@ -14,9 +18,7 @@
* Added support for blob caching in order to cache frequently used blobs for BlobDB.
* User can configure the new ColumnFamilyOptions `blob_cache` to enable/disable blob caching.
* Either sharing the backend cache with the block cache or using a completely separate cache is supported.
* A new abstraction interface called `BlobSource` for blob read logic gives all users access to blobs, whether they are in the blob cache, secondary cache, or (remote) storage. Blobs can be potentially read both while handling user reads (`Get`, `MultiGet`, or iterator) and during compaction (while dealing with compaction filters, Merges, or garbage collection) but eventually all blob reads go through `Version::GetBlob` or, for MultiGet, `Version::MultiGetBlob` (and then get dispatched to the interface -- `BlobSource`).
* Added `prepopulate_blob_cache` to ColumnFamilyOptions. If enabled, prepopulate warm/hot blobs which are already in memory into blob cache at the time of flush. On a flush, the blob that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this blob back into memory again, which is avoided by enabling this option. This further helps if the workload exhibits high temporal locality, where most of the reads go to recently written data. This also helps in case of the remote file system since it involves network traffic and higher latencies.
* Support using secondary cache with the blob cache. When creating a blob cache, the user can set a secondary blob cache by configuring `secondary_cache` in LRUCacheOptions.
* A new abstraction interface called `BlobSource` for blob read logic gives all users access to blobs, whether they are in the blob cache, secondary cache, or (remote) storage. Blobs can be potentially read both while handling user reads (`Get`, `MultiGet`, or iterator) and during compaction (while dealing with compaction filters, Merges, or garbage collection) but eventually all blob reads go through `Version::GetBlob` or, for MultiGet, `Version::MultiGetBlob` (and then get dispatched to the interface -- `BlobSource`).
* Add experimental tiered compaction feature `AdvancedColumnFamilyOptions::preclude_last_level_data_seconds`, which makes sure the new data inserted within preclude_last_level_data_seconds won't be placed on cold tier (the feature is not complete).

### Public API changes
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"cache/cache_entry_roles.cc",
"cache/cache_key.cc",
"cache/cache_reservation_manager.cc",
"cache/charged_cache.cc",
"cache/clock_cache.cc",
"cache/compressed_secondary_cache.cc",
"cache/fast_lru_cache.cc",
Expand Down Expand Up @@ -348,6 +349,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"cache/cache_entry_roles.cc",
"cache/cache_key.cc",
"cache/cache_reservation_manager.cc",
"cache/charged_cache.cc",
"cache/clock_cache.cc",
"cache/compressed_secondary_cache.cc",
"cache/fast_lru_cache.cc",
Expand Down
2 changes: 2 additions & 0 deletions cache/cache_entry_roles.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ std::array<std::string, kNumCacheEntryRoles> kCacheEntryRoleToCamelString{{
"FilterConstruction",
"BlockBasedTableReader",
"FileMetadata",
"BlobCache",
"Misc",
}};

Expand All @@ -38,6 +39,7 @@ std::array<std::string, kNumCacheEntryRoles> kCacheEntryRoleToHyphenString{{
"filter-construction",
"block-based-table-reader",
"file-metadata",
"blob-cache",
"misc",
}};

Expand Down
1 change: 1 addition & 0 deletions cache/cache_reservation_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,5 @@ template class CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>;
template class CacheReservationManagerImpl<CacheEntryRole::kMisc>;
template class CacheReservationManagerImpl<CacheEntryRole::kWriteBuffer>;
template class CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>;
template class CacheReservationManagerImpl<CacheEntryRole::kBlobCache>;
} // namespace ROCKSDB_NAMESPACE
117 changes: 117 additions & 0 deletions cache/charged_cache.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "cache/charged_cache.h"

#include "cache/cache_reservation_manager.h"

namespace ROCKSDB_NAMESPACE {

ChargedCache::ChargedCache(std::shared_ptr<Cache> cache,
std::shared_ptr<Cache> block_cache)
: cache_(cache),
cache_res_mgr_(std::make_shared<ConcurrentCacheReservationManager>(
std::make_shared<
CacheReservationManagerImpl<CacheEntryRole::kBlobCache>>(
block_cache))) {}

Status ChargedCache::Insert(const Slice& key, void* value, size_t charge,
DeleterFn deleter, Handle** handle,
Priority priority) {
Status s = cache_->Insert(key, value, charge, deleter, handle, priority);
if (s.ok()) {
// Insert may cause the cache entry eviction if the cache is full. So we
// directly call the reservation manager to update the total memory used
// in the cache.
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
.PermitUncheckedError();
}
return s;
}

Status ChargedCache::Insert(const Slice& key, void* value,
const CacheItemHelper* helper, size_t charge,
Handle** handle, Priority priority) {
Status s = cache_->Insert(key, value, helper, charge, handle, priority);
if (s.ok()) {
// Insert may cause the cache entry eviction if the cache is full. So we
// directly call the reservation manager to update the total memory used
// in the cache.
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
.PermitUncheckedError();
}
return s;
}

Cache::Handle* ChargedCache::Lookup(const Slice& key, Statistics* stats) {
return cache_->Lookup(key, stats);
}

Cache::Handle* ChargedCache::Lookup(const Slice& key,
const CacheItemHelper* helper,
const CreateCallback& create_cb,
Priority priority, bool wait,
Statistics* stats) {
auto handle = cache_->Lookup(key, helper, create_cb, priority, wait, stats);
// Lookup may promote the KV pair from the secondary cache to the primary
// cache. So we directly call the reservation manager to update the total
// memory used in the cache.
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
.PermitUncheckedError();
return handle;
}

bool ChargedCache::Release(Cache::Handle* handle, bool useful,
bool erase_if_last_ref) {
size_t memory_used_delta = cache_->GetUsage(handle);
bool erased = cache_->Release(handle, useful, erase_if_last_ref);
if (erased) {
assert(cache_res_mgr_);
cache_res_mgr_
->UpdateCacheReservation(memory_used_delta, /* increase */ false)
.PermitUncheckedError();
}
return erased;
}

bool ChargedCache::Release(Cache::Handle* handle, bool erase_if_last_ref) {
size_t memory_used_delta = cache_->GetUsage(handle);
bool erased = cache_->Release(handle, erase_if_last_ref);
if (erased) {
assert(cache_res_mgr_);
cache_res_mgr_
->UpdateCacheReservation(memory_used_delta, /* increase */ false)
.PermitUncheckedError();
}
return erased;
}

void ChargedCache::Erase(const Slice& key) {
cache_->Erase(key);
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
.PermitUncheckedError();
}

void ChargedCache::EraseUnRefEntries() {
cache_->EraseUnRefEntries();
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
.PermitUncheckedError();
}

void ChargedCache::SetCapacity(size_t capacity) {
cache_->SetCapacity(capacity);
// SetCapacity can result in evictions when the cache capacity is decreased,
// so we would want to update the cache reservation here as well.
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
.PermitUncheckedError();
}

} // namespace ROCKSDB_NAMESPACE
121 changes: 121 additions & 0 deletions cache/charged_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#pragma once

#include <string>

#include "port/port.h"
#include "rocksdb/cache.h"

namespace ROCKSDB_NAMESPACE {

class ConcurrentCacheReservationManager;

// A cache interface which wraps around another cache and takes care of
// reserving space in block cache towards a single global memory limit, and
// forwards all the calls to the underlying cache.
class ChargedCache : public Cache {
public:
ChargedCache(std::shared_ptr<Cache> cache,
std::shared_ptr<Cache> block_cache);
~ChargedCache() override = default;

Status Insert(const Slice& key, void* value, size_t charge, DeleterFn deleter,
Handle** handle, Priority priority) override;
Status Insert(const Slice& key, void* value, const CacheItemHelper* helper,
size_t charge, Handle** handle = nullptr,
Priority priority = Priority::LOW) override;

Cache::Handle* Lookup(const Slice& key, Statistics* stats) override;
Cache::Handle* Lookup(const Slice& key, const CacheItemHelper* helper,
const CreateCallback& create_cb, Priority priority,
bool wait, Statistics* stats = nullptr) override;

bool Release(Cache::Handle* handle, bool useful,
bool erase_if_last_ref = false) override;
bool Release(Cache::Handle* handle, bool erase_if_last_ref = false) override;

void Erase(const Slice& key) override;
void EraseUnRefEntries() override;

static const char* kClassName() { return "ChargedCache"; }
const char* Name() const override { return kClassName(); }

uint64_t NewId() override { return cache_->NewId(); }

void SetCapacity(size_t capacity) override;

void SetStrictCapacityLimit(bool strict_capacity_limit) override {
cache_->SetStrictCapacityLimit(strict_capacity_limit);
}

bool HasStrictCapacityLimit() const override {
return cache_->HasStrictCapacityLimit();
}

void* Value(Cache::Handle* handle) override { return cache_->Value(handle); }

bool IsReady(Cache::Handle* handle) override {
return cache_->IsReady(handle);
}

void Wait(Cache::Handle* handle) override { cache_->Wait(handle); }

void WaitAll(std::vector<Handle*>& handles) override {
cache_->WaitAll(handles);
}

bool Ref(Cache::Handle* handle) override { return cache_->Ref(handle); }

size_t GetCapacity() const override { return cache_->GetCapacity(); }

size_t GetUsage() const override { return cache_->GetUsage(); }

size_t GetUsage(Cache::Handle* handle) const override {
return cache_->GetUsage(handle);
}

size_t GetPinnedUsage() const override { return cache_->GetPinnedUsage(); }

size_t GetCharge(Cache::Handle* handle) const override {
return cache_->GetCharge(handle);
}

Cache::DeleterFn GetDeleter(Cache::Handle* handle) const override {
return cache_->GetDeleter(handle);
}

void ApplyToAllEntries(
const std::function<void(const Slice& key, void* value, size_t charge,
Cache::DeleterFn deleter)>& callback,
const Cache::ApplyToAllEntriesOptions& opts) override {
cache_->ApplyToAllEntries(callback, opts);
}

void ApplyToAllCacheEntries(void (*callback)(void* value, size_t charge),
bool thread_safe) override {
cache_->ApplyToAllCacheEntries(callback, thread_safe);
}

std::string GetPrintableOptions() const override {
return cache_->GetPrintableOptions();
}

void DisownData() override { return cache_->DisownData(); }

inline Cache* GetCache() const { return cache_.get(); }

inline ConcurrentCacheReservationManager* TEST_GetCacheReservationManager()
const {
return cache_res_mgr_.get();
}

private:
std::shared_ptr<Cache> cache_;
std::shared_ptr<ConcurrentCacheReservationManager> cache_res_mgr_;
};

} // namespace ROCKSDB_NAMESPACE
4 changes: 2 additions & 2 deletions cache/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,9 @@ class LRUCache
virtual void WaitAll(std::vector<Handle*>& handles) override;
std::string GetPrintableOptions() const override;

// Retrieves number of elements in LRU, for unit test purpose only.
// Retrieves number of elements in LRU, for unit test purpose only.
size_t TEST_GetLRUSize();
// Retrieves high pri pool ratio.
// Retrieves high pri pool ratio.
double GetHighPriPoolRatio();

private:
Expand Down
4 changes: 2 additions & 2 deletions cache/lru_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1480,10 +1480,10 @@ class LRUCacheWithStat : public LRUCache {
return LRUCache::Insert(key, value, charge, deleter, handle, priority);
}
Status Insert(const Slice& key, void* value, const CacheItemHelper* helper,
size_t chargge, Handle** handle = nullptr,
size_t charge, Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
insert_count_++;
return LRUCache::Insert(key, value, helper, chargge, handle, priority);
return LRUCache::Insert(key, value, helper, charge, handle, priority);
}
Handle* Lookup(const Slice& key, Statistics* stats) override {
lookup_count_++;
Expand Down
2 changes: 1 addition & 1 deletion cache/sharded_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ShardedCache : public Cache {
DeleterFn deleter, Handle** handle,
Priority priority) override;
virtual Status Insert(const Slice& key, void* value,
const CacheItemHelper* helper, size_t chargge,
const CacheItemHelper* helper, size_t charge,
Handle** handle = nullptr,
Priority priority = Priority::LOW) override;
virtual Handle* Lookup(const Slice& key, Statistics* stats) override;
Expand Down
15 changes: 14 additions & 1 deletion db/blob/blob_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <cassert>
#include <string>

#include "cache/cache_reservation_manager.h"
#include "cache/charged_cache.h"
#include "db/blob/blob_file_reader.h"
#include "db/blob/blob_log_format.h"
#include "monitoring/statistics.h"
Expand All @@ -26,7 +28,18 @@ BlobSource::BlobSource(const ImmutableOptions* immutable_options,
statistics_(immutable_options->statistics.get()),
blob_file_cache_(blob_file_cache),
blob_cache_(immutable_options->blob_cache),
lowest_used_cache_tier_(immutable_options->lowest_used_cache_tier) {}
lowest_used_cache_tier_(immutable_options->lowest_used_cache_tier) {
#ifndef ROCKSDB_LITE
auto bbto =
immutable_options->table_factory->GetOptions<BlockBasedTableOptions>();
if (bbto &&
bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache)
.charged == CacheEntryRoleOptions::Decision::kEnabled) {
blob_cache_ = std::make_shared<ChargedCache>(immutable_options->blob_cache,
bbto->block_cache);
}
#endif // ROCKSDB_LITE
}

BlobSource::~BlobSource() = default;

Expand Down
2 changes: 2 additions & 0 deletions db/blob/blob_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class BlobSource {
blob_file_reader);
}

inline Cache* GetBlobCache() const { return blob_cache_.get(); }

bool TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
uint64_t offset) const;

Expand Down
Loading

0 comments on commit 0b6bc10

Please sign in to comment.