Skip to content

Commit

Permalink
Merge pull request ceph#27381 from markhpc/wip-prioritycache-manager
Browse files Browse the repository at this point in the history
common/PriorityCache: Implement a Cache Manager

Reviewed-by: Patrick Donnelly <[email protected]>
Reviewed-by: Neha Ojha <[email protected]>
  • Loading branch information
neha-ojha authored Apr 16, 2019
2 parents ffdf7c1 + 75ed3b0 commit 884d0de
Show file tree
Hide file tree
Showing 9 changed files with 452 additions and 197 deletions.
4 changes: 3 additions & 1 deletion src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ add_library(common_buffer_obj OBJECT
add_library(common_texttable_obj OBJECT
TextTable.cc)

add_library(common_prioritycache_obj OBJECT
PriorityCache.cc)

set(common_srcs
AsyncOpTracker.cc
BackTrace.cc
Expand All @@ -20,7 +23,6 @@ set(common_srcs
Mutex.cc
OutputDataSocket.cc
PluginRegistry.cc
PriorityCache.cc
Readahead.cc
SloppyCRCMap.cc
SubProcess.cc
Expand Down
330 changes: 327 additions & 3 deletions src/common/PriorityCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,17 @@
*/

#include "PriorityCache.h"
#include "common/dout.h"
#include "perfglue/heap_profiler.h"
#define dout_context cct
#define dout_subsys ceph_subsys_prioritycache
#undef dout_prefix
#define dout_prefix *_dout << "prioritycache "

namespace PriorityCache {
int64_t get_chunk(uint64_t usage, uint64_t total_bytes) {
namespace PriorityCache
{
int64_t get_chunk(uint64_t usage, uint64_t total_bytes)
{
uint64_t chunk = total_bytes;

// Find the nearest power of 2
Expand Down Expand Up @@ -50,6 +58,322 @@ namespace PriorityCache {
return val;
}

PriCache::~PriCache() {
Manager::Manager(CephContext *c,
uint64_t min,
uint64_t max,
uint64_t target) :
cct(c),
caches{},
min_mem(min),
max_mem(max),
target_mem(target),
tuned_mem(min)
{
PerfCountersBuilder b(cct, "prioritycache",
MallocStats::M_FIRST, MallocStats::M_LAST);

b.add_u64(MallocStats::M_TARGET_BYTES, "target_bytes",
"target process memory usage in bytes", "t",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(MallocStats::M_MAPPED_BYTES, "mapped_bytes",
"total bytes mapped by the process", "m",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(MallocStats::M_UNMAPPED_BYTES, "unmapped_bytes",
"unmapped bytes that the kernel has yet to reclaimed", "u",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(MallocStats::M_HEAP_BYTES, "heap_bytes",
"aggregate bytes in use by the heap", "h",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(MallocStats::M_CACHE_BYTES, "cache_bytes",
"current memory available for caches.", "c",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

logger = b.create_perf_counters();
cct->get_perfcounters_collection()->add(logger);

tune_memory();
}

Manager::~Manager()
{
clear();
cct->get_perfcounters_collection()->remove(logger);
delete logger;
}

void Manager::tune_memory()
{
size_t heap_size = 0;
size_t unmapped = 0;
uint64_t mapped = 0;

ceph_heap_release_free_memory();
ceph_heap_get_numeric_property("generic.heap_size", &heap_size);
ceph_heap_get_numeric_property("tcmalloc.pageheap_unmapped_bytes", &unmapped);
mapped = heap_size - unmapped;

uint64_t new_size = tuned_mem;
new_size = (new_size < max_mem) ? new_size : max_mem;
new_size = (new_size > min_mem) ? new_size : min_mem;

// Approach the min/max slowly, but bounce away quickly.
if ((uint64_t) mapped < target_mem) {
double ratio = 1 - ((double) mapped / target_mem);
new_size += ratio * (max_mem - new_size);
} else {
double ratio = 1 - ((double) target_mem / mapped);
new_size -= ratio * (new_size - min_mem);
}

ldout(cct, 5) << __func__
<< " target: " << target_mem
<< " mapped: " << mapped
<< " unmapped: " << unmapped
<< " heap: " << heap_size
<< " old mem: " << tuned_mem
<< " new mem: " << new_size << dendl;

tuned_mem = new_size;

logger->set(MallocStats::M_TARGET_BYTES, target_mem);
logger->set(MallocStats::M_MAPPED_BYTES, mapped);
logger->set(MallocStats::M_UNMAPPED_BYTES, unmapped);
logger->set(MallocStats::M_HEAP_BYTES, heap_size);
logger->set(MallocStats::M_CACHE_BYTES, new_size);
}

void Manager::insert(const std::string& name, std::shared_ptr<PriCache> c)
{
ceph_assert(!caches.count(name));
ceph_assert(!indexes.count(name));

caches.emplace(name, c);

// TODO: If we ever assign more than
// PERF_COUNTER_MAX_BOUND - PERF_COUNTER_LOWER_BOUND perf counters for
// priority caching we could run out of slots. Recycle them some day?
// Also note that start and end are *exclusive*.
int start = cur_index++;
int end = cur_index + Extra::E_LAST + 1;

ceph_assert(end < PERF_COUNTER_MAX_BOUND);
indexes.emplace(name, std::vector<int>(Extra::E_LAST + 1));

PerfCountersBuilder b(cct, "prioritycache:" + name, start, end);

b.add_u64(cur_index + Priority::PRI0, "pri0_bytes",
"bytes allocated to pri0", "p0",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Priority::PRI1, "pri1_bytes",
"bytes allocated to pri1", "p1",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Priority::PRI2, "pri2_bytes",
"bytes allocated to pri2", "p2",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Priority::PRI3, "pri3_bytes",
"bytes allocated to pri3", "p3",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Priority::PRI4, "pri4_bytes",
"bytes allocated to pri4", "p4",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Priority::PRI5, "pri5_bytes",
"bytes allocated to pri5", "p5",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Priority::PRI6, "pri6_bytes",
"bytes allocated to pri6", "p6",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Priority::PRI7, "pri7_bytes",
"bytes allocated to pri7", "p7",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Priority::PRI8, "pri8_bytes",
"bytes allocated to pri8", "p8",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Priority::PRI9, "pri9_bytes",
"bytes allocated to pri9", "p9",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Priority::PRI10, "pri10_bytes",
"bytes allocated to pri10", "p10",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Priority::PRI11, "pri11_bytes",
"bytes allocated to pri11", "p11",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Extra::E_RESERVED, "reserved_bytes",
"bytes reserved for future growth.", "r",
PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));

b.add_u64(cur_index + Extra::E_COMMITTED, "committed_bytes",
"total bytes committed,", "c",
PerfCountersBuilder::PRIO_CRITICAL, unit_t(UNIT_BYTES));

for (int i = 0; i < Extra::E_LAST+1; i++) {
indexes[name][i] = cur_index + i;
}

auto l = b.create_perf_counters();
loggers.emplace(name, l);
cct->get_perfcounters_collection()->add(l);

cur_index = end;
}

void Manager::erase(const std::string& name)
{
auto li = loggers.find(name);
if (li != loggers.end()) {
cct->get_perfcounters_collection()->remove(li->second);
delete li->second;
loggers.erase(li);
}
indexes.erase(name);
caches.erase(name);
}

void Manager::clear()
{
auto li = loggers.begin();
while (li != loggers.end()) {
cct->get_perfcounters_collection()->remove(li->second);
delete li->second;
li = loggers.erase(li);
}
indexes.clear();
caches.clear();
}

void Manager::balance()
{
int64_t mem_avail = tuned_mem;
// Each cache is going to get a little extra from get_chunk, so shrink the
// available memory here to compensate.
mem_avail -= get_chunk(1, tuned_mem) * caches.size();
ceph_assert(mem_avail >= 0);

// Assign memory for each priority level
for (int i = 0; i < Priority::LAST+1; i++) {
ldout(cct, 10) << __func__ << " assigning cache bytes for PRI: " << i << dendl;

auto pri = static_cast<Priority>(i);
balance_priority(&mem_avail, pri);

// Update the per-priority perf counters
for (auto it = caches.begin(); it != caches.end(); it++) {
auto bytes = it->second->get_cache_bytes(pri);
auto l = loggers.find(it->first);
l->second->set(indexes[it->first][pri], bytes);
}
}
// assert if we assigned more memory than is available.
ceph_assert(mem_avail >= 0);

for (auto it = caches.begin(); it != caches.end(); it++) {
// Commit the new cache size
int64_t committed = it->second->commit_cache_size(tuned_mem);

// Update the perf counters
int64_t alloc = it->second->get_cache_bytes();

auto l = loggers.find(it->first);
l->second->set(indexes[it->first][Extra::E_RESERVED], committed - alloc);
l->second->set(indexes[it->first][Extra::E_COMMITTED], committed);
}
}

void Manager::balance_priority(int64_t *mem_avail, Priority pri)
{
std::unordered_map<std::string, std::shared_ptr<PriCache>> tmp_caches = caches;
double cur_ratios = 0;
double new_ratios = 0;
uint64_t round = 0;

// First, zero this priority's bytes, sum the initial ratios.
for (auto it = caches.begin(); it != caches.end(); it++) {
it->second->set_cache_bytes(pri, 0);
cur_ratios += it->second->get_cache_ratio();
}

// For other priorities, loop until caches are satisified or we run out of
// memory (stop if we can't guarantee a full byte allocation).
while (!tmp_caches.empty() && *mem_avail > static_cast<int64_t>(tmp_caches.size())) {
uint64_t total_assigned = 0;
for (auto it = tmp_caches.begin(); it != tmp_caches.end();) {
int64_t cache_wants = it->second->request_cache_bytes(pri, tuned_mem);
// Usually the ratio should be set to the fraction of the current caches'
// assigned ratio compared to the total ratio of all caches that still
// want memory. There is a special case where the only caches left are
// all assigned 0% ratios but still want memory. In that case, give
// them an equal shot at the remaining memory for this priority.
double ratio = 1.0 / tmp_caches.size();
if (cur_ratios > 0) {
ratio = it->second->get_cache_ratio() / cur_ratios;
}
int64_t fair_share = static_cast<int64_t>(*mem_avail * ratio);

ldout(cct, 10) << __func__ << " " << it->first
<< " pri: " << (int) pri
<< " round: " << round
<< " wanted: " << cache_wants
<< " ratio: " << it->second->get_cache_ratio()
<< " cur_ratios: " << cur_ratios
<< " fair_share: " << fair_share
<< " mem_avail: " << *mem_avail
<< dendl;

if (cache_wants > fair_share) {
// If we want too much, take what we can get but stick around for more
it->second->add_cache_bytes(pri, fair_share);
total_assigned += fair_share;
new_ratios += it->second->get_cache_ratio();
++it;
} else {
// Otherwise assign only what we want
if (cache_wants > 0) {
it->second->add_cache_bytes(pri, cache_wants);
total_assigned += cache_wants;
}
// Either the cache didn't want anything or got what it wanted, so
// remove it from the tmp list.
it = tmp_caches.erase(it);
}
}
// Reset the ratios
*mem_avail -= total_assigned;
cur_ratios = new_ratios;
new_ratios = 0;
++round;
}

// If this is the last priority, divide up any remaining memory based
// solely on the ratios.
if (pri == Priority::LAST) {
uint64_t total_assigned = 0;
for (auto it = caches.begin(); it != caches.end(); it++) {
double ratio = it->second->get_cache_ratio();
int64_t fair_share = static_cast<int64_t>(*mem_avail * ratio);
it->second->set_cache_bytes(Priority::LAST, fair_share);
total_assigned += fair_share;
}
*mem_avail -= total_assigned;
return;
}
}

PriCache::~PriCache()
{
}
}
Loading

0 comments on commit 884d0de

Please sign in to comment.