Skip to content

Commit

Permalink
kv/KeyValueDB: Move PriCache implementation to ShardedCache.
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Nelson <[email protected]>
  • Loading branch information
Mark Nelson committed Jan 11, 2019
1 parent e920693 commit 832be4f
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 178 deletions.
59 changes: 5 additions & 54 deletions src/kv/KeyValueDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ using std::vector;
*
* Kyoto Cabinet or LevelDB should implement this
*/
class KeyValueDB : public PriorityCache::PriCache {
class KeyValueDB {
public:
/*
* See RocksDB's definition of a column family(CF) and how to use it.
Expand Down Expand Up @@ -270,9 +270,6 @@ class KeyValueDB : public PriorityCache::PriCache {
typedef std::shared_ptr< WholeSpaceIteratorImpl > WholeSpaceIterator;

private:
int64_t cache_bytes[PriorityCache::Priority::LAST+1] = { 0 };
double cache_ratio = 0;

// This class filters a WholeSpaceIterator by a prefix.
class PrefixIteratorImpl : public IteratorImpl {
const std::string prefix;
Expand Down Expand Up @@ -347,56 +344,6 @@ class KeyValueDB : public PriorityCache::PriCache {
return -EOPNOTSUPP;
}

// PriCache

virtual int64_t request_cache_bytes(PriorityCache::Priority pri, uint64_t chunk_bytes) const {
return -EOPNOTSUPP;
}

virtual int64_t get_cache_bytes(PriorityCache::Priority pri) const {
return cache_bytes[pri];
}

virtual int64_t get_cache_bytes() const {
int64_t total = 0;

for (int i = 0; i < PriorityCache::Priority::LAST + 1; i++) {
PriorityCache::Priority pri = static_cast<PriorityCache::Priority>(i);
total += get_cache_bytes(pri);
}
return total;
}

virtual void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
cache_bytes[pri] = bytes;
}

virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
cache_bytes[pri] += bytes;
}

virtual int64_t commit_cache_size(uint64_t total_cache) {
return -EOPNOTSUPP;
}

virtual int64_t get_committed_size() const {
return -EOPNOTSUPP;
}

virtual double get_cache_ratio() const {
return cache_ratio;
}

virtual void set_cache_ratio(double ratio) {
cache_ratio = ratio;
}

virtual string get_cache_name() const {
return "Unknown KeyValueDB Cache";
}

// End PriCache

virtual int set_cache_high_pri_pool_ratio(double ratio) {
return -EOPNOTSUPP;
}
Expand All @@ -405,6 +352,10 @@ class KeyValueDB : public PriorityCache::PriCache {
return -EOPNOTSUPP;
}

virtual std::shared_ptr<PriorityCache::PriCache> get_priority_cache() const {
return nullptr;
}

virtual ~KeyValueDB() {}

/// estimate space utilization for a prefix (in bytes)
Expand Down
68 changes: 1 addition & 67 deletions src/kv/RocksDBStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "rocksdb/filter_policy.h"
#include "rocksdb/utilities/convenience.h"
#include "rocksdb/merge_operator.h"
#include "kv/rocksdb_cache/BinnedLRUCache.h"

using std::string;
#include "common/perf_counters.h"
Expand Down Expand Up @@ -392,6 +391,7 @@ int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options&

if (g_conf()->rocksdb_cache_type == "binned_lru") {
bbt_opts.block_cache = rocksdb_cache::NewBinnedLRUCache(
cct,
block_cache_size,
g_conf()->rocksdb_cache_shard_bits);
} else if (g_conf()->rocksdb_cache_type == "lru") {
Expand Down Expand Up @@ -1268,72 +1268,6 @@ void RocksDBStore::compact_range(const string& start, const string& end)
db->CompactRange(options, &cstart, &cend);
}

int64_t RocksDBStore::request_cache_bytes(PriorityCache::Priority pri, uint64_t chunk_bytes) const
{
auto cache = bbt_opts.block_cache;

int64_t assigned = get_cache_bytes(pri);
int64_t usage = 0;
int64_t request = 0;
switch (pri) {
// PRI0 is for rocksdb's high priority items (indexes/filters)
case PriorityCache::Priority::PRI0:
{
usage += cache->GetPinnedUsage();
if (g_conf()->rocksdb_cache_type == "binned_lru") {
auto binned_cache =
std::static_pointer_cast<rocksdb_cache::BinnedLRUCache>(cache);
usage += binned_cache->GetHighPriPoolUsage();
}
break;
}
// All other cache items are currently shoved into the LAST priority.
case PriorityCache::Priority::LAST:
{
usage = get_cache_usage() - cache->GetPinnedUsage();
if (g_conf()->rocksdb_cache_type == "binned_lru") {
auto binned_cache =
std::static_pointer_cast<rocksdb_cache::BinnedLRUCache>(cache);
usage -= binned_cache->GetHighPriPoolUsage();
}
break;
}
default:
break;
}
request = (request > assigned) ? request - assigned : 0;
dout(10) << __func__ << " Priority: " << static_cast<uint32_t>(pri)
<< " Usage: " << usage << " Request: " << request << dendl;
return request;
}

int64_t RocksDBStore::get_cache_usage() const
{
return static_cast<int64_t>(bbt_opts.block_cache->GetUsage());
}

int64_t RocksDBStore::commit_cache_size(uint64_t total_bytes)
{
size_t old_bytes = bbt_opts.block_cache->GetCapacity();
int64_t new_bytes = PriorityCache::get_chunk(
get_cache_bytes(), total_bytes);
dout(10) << __func__ << " old: " << old_bytes
<< " new: " << new_bytes << dendl;
bbt_opts.block_cache->SetCapacity((size_t) new_bytes);

// Set the high priority pool ratio is this is the binned LRU cache.
if (g_conf()->rocksdb_cache_type == "binned_lru") {
auto binned_cache =
std::static_pointer_cast<rocksdb_cache::BinnedLRUCache>(bbt_opts.block_cache);
int64_t high_pri_bytes = PriorityCache::get_chunk(
binned_cache->GetHighPriPoolUsage()+1, total_bytes);
double ratio = (double) high_pri_bytes / new_bytes;
dout(10) << __func__ << " High Pri Pool Ratio set to " << ratio << dendl;
binned_cache->SetHighPriPoolRatio(ratio);
}
return new_bytes;
}

RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
{
delete dbiter;
Expand Down
23 changes: 10 additions & 13 deletions src/kv/RocksDBStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "rocksdb/iostats_context.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table.h"
#include "kv/rocksdb_cache/BinnedLRUCache.h"
#include <errno.h>
#include "common/errno.h"
#include "common/dout.h"
Expand Down Expand Up @@ -120,7 +121,6 @@ class RocksDBStore : public KeyValueDB {
bool disableWAL;
bool enable_rmrange;
void compact() override;
int64_t high_pri_watermark;

void compact_async() override {
compact_range_async(string(), string());
Expand Down Expand Up @@ -159,8 +159,7 @@ class RocksDBStore : public KeyValueDB {
compact_thread(this),
compact_on_mount(false),
disableWAL(false),
enable_rmrange(cct->_conf->rocksdb_enable_rmrange),
high_pri_watermark(0)
enable_rmrange(cct->_conf->rocksdb_enable_rmrange)
{}

~RocksDBStore() override;
Expand Down Expand Up @@ -478,17 +477,9 @@ class RocksDBStore : public KeyValueDB {
return total_size;
}

virtual int64_t request_cache_bytes(
PriorityCache::Priority pri, uint64_t cache_bytes) const override;
virtual int64_t commit_cache_size(uint64_t total_cache) override;
virtual int64_t get_committed_size() const override {
return bbt_opts.block_cache->GetCapacity();
virtual int64_t get_cache_usage() const override {
return static_cast<int64_t>(bbt_opts.block_cache->GetUsage());
}
virtual std::string get_cache_name() const override {
return "RocksDB Block Cache";
}
virtual int64_t get_cache_usage() const override;


int set_cache_size(uint64_t s) override {
cache_size = s;
Expand All @@ -499,6 +490,12 @@ class RocksDBStore : public KeyValueDB {
int set_cache_capacity(int64_t capacity);
int64_t get_cache_capacity();

virtual std::shared_ptr<PriorityCache::PriCache> get_priority_cache()
const override {
return dynamic_pointer_cast<PriorityCache::PriCache>(
bbt_opts.block_cache);
}

WholeSpaceIterator get_wholespace_iterator() override;
};

Expand Down
73 changes: 64 additions & 9 deletions src/kv/rocksdb_cache/BinnedLRUCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

#include "BinnedLRUCache.h"

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string>

#define dout_context cct
#define dout_subsys ceph_subsys_rocksdb
#undef dout_prefix
#define dout_prefix *_dout << "rocksdb: "

namespace rocksdb_cache {

BinnedLRUHandleTable::BinnedLRUHandleTable() : list_(nullptr), length_(0), elems_(0) {
Expand Down Expand Up @@ -459,9 +463,12 @@ std::string BinnedLRUCacheShard::GetPrintableOptions() const {
return std::string(buffer);
}

BinnedLRUCache::BinnedLRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit, double high_pri_pool_ratio)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit) {
BinnedLRUCache::BinnedLRUCache(CephContext *c,
size_t capacity,
int num_shard_bits,
bool strict_capacity_limit,
double high_pri_pool_ratio)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit), cct(c) {
num_shards_ = 1 << num_shard_bits;
// TODO: Switch over to use mempool
int rc = posix_memalign((void**) &shards_,
Expand Down Expand Up @@ -542,9 +549,57 @@ size_t BinnedLRUCache::GetHighPriPoolUsage() const {
return usage;
}

std::shared_ptr<rocksdb::Cache> NewBinnedLRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit,
double high_pri_pool_ratio) {
// PriCache

int64_t BinnedLRUCache::request_cache_bytes(PriorityCache::Priority pri, uint64_t total_cache) const
{
int64_t assigned = get_cache_bytes(pri);
int64_t request = 0;

switch (pri) {
// PRI0 is for rocksdb's high priority items (indexes/filters)
case PriorityCache::Priority::PRI0:
{
request = GetHighPriPoolUsage();
break;
}
// All other cache items are currently shoved into the LAST priority.
case PriorityCache::Priority::LAST:
{
request = GetUsage();
request -= GetHighPriPoolUsage();
break;
}
default:
break;
}
request = (request > assigned) ? request - assigned : 0;
ldout(cct, 10) << __func__ << " Priority: " << static_cast<uint32_t>(pri)
<< " Request: " << request << dendl;
return request;
}

int64_t BinnedLRUCache::commit_cache_size(uint64_t total_bytes)
{
size_t old_bytes = GetCapacity();
int64_t new_bytes = PriorityCache::get_chunk(
get_cache_bytes(), total_bytes);
ldout(cct, 10) << __func__ << " old: " << old_bytes
<< " new: " << new_bytes << dendl;
SetCapacity((size_t) new_bytes);
double ratio =
(double) get_cache_bytes(PriorityCache::Priority::PRI0) / new_bytes;
ldout(cct, 10) << __func__ << " High Pri Pool Ratio set to " << ratio << dendl;
SetHighPriPoolRatio(ratio);
return new_bytes;
}

std::shared_ptr<rocksdb::Cache> NewBinnedLRUCache(
CephContext *c,
size_t capacity,
int num_shard_bits,
bool strict_capacity_limit,
double high_pri_pool_ratio) {
if (num_shard_bits >= 20) {
return nullptr; // the cache cannot be sharded into too many fine pieces
}
Expand All @@ -555,8 +610,8 @@ std::shared_ptr<rocksdb::Cache> NewBinnedLRUCache(size_t capacity, int num_shard
if (num_shard_bits < 0) {
num_shard_bits = GetDefaultCacheShardBits(capacity);
}
return std::make_shared<BinnedLRUCache>(capacity, num_shard_bits,
strict_capacity_limit, high_pri_pool_ratio);
return std::make_shared<BinnedLRUCache>(
c, capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio);
}

} // namespace rocksdb_cache
20 changes: 17 additions & 3 deletions src/kv/rocksdb_cache/BinnedLRUCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
#include <mutex>

#include "ShardedCache.h"

#include "common/autovector.h"
#include "common/dout.h"
#include "include/ceph_assert.h"
#include "common/ceph_context.h"

namespace rocksdb_cache {

Expand Down Expand Up @@ -47,6 +48,7 @@ namespace rocksdb_cache {
// RUCache::Release (to move into state 2) or BinnedLRUCacheShard::Erase (for state 3)

std::shared_ptr<rocksdb::Cache> NewBinnedLRUCache(
CephContext *c,
size_t capacity,
int num_shard_bits = -1,
bool strict_capacity_limit = false,
Expand Down Expand Up @@ -291,8 +293,8 @@ class alignas(CACHE_LINE_SIZE) BinnedLRUCacheShard : public CacheShard {

class BinnedLRUCache : public ShardedCache {
public:
BinnedLRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio);
BinnedLRUCache(CephContext *c, size_t capacity, int num_shard_bits,
bool strict_capacity_limit, double high_pri_pool_ratio);
virtual ~BinnedLRUCache();
virtual const char* Name() const override { return "BinnedLRUCache"; }
virtual CacheShard* GetShard(int shard) override;
Expand All @@ -311,7 +313,19 @@ class BinnedLRUCache : public ShardedCache {
// Retrieves high pri pool usage
size_t GetHighPriPoolUsage() const;

// PriorityCache
virtual int64_t request_cache_bytes(
PriorityCache::Priority pri, uint64_t total_cache) const;
virtual int64_t commit_cache_size(uint64_t total_cache);
virtual int64_t get_committed_size() const {
return GetCapacity();
}
virtual std::string get_cache_name() const {
return "RocksDB Binned LRU Cache";
}

private:
CephContext *cct;
BinnedLRUCacheShard* shards_;
int num_shards_ = 0;
};
Expand Down
Loading

0 comments on commit 832be4f

Please sign in to comment.