Skip to content

Commit

Permalink
Metrics: Safer metadata transfer between shared
Browse files Browse the repository at this point in the history
This patch change how metrics value and meta data is passed bewtween
shards.

Sometimes, a single shard send a metric report with data that is
gothered from all other shards (i.e. The prometheus protocol).

In those cases, the metrics layer need to pass information about the
metrics (metadata) between the shards.

1. when reporting metrics, the values_copy structure now contains two
values:
  * value - a vector of vectors of value (vector per metric family)
  * metadata - a shared_ptr of a vector of metric family data (that
    contains metric family info and a vector of
    metric metadata)

2. The metadata is created on demened
  * it uses a dirty bit to decide if the shared_ptr is valid or not

3. The metadata only contains information for enabled metrics, for
   efficiency, an additional vector is stored in the impl class that holds
   the function that fetch the values.

The collectd and prometheus implementation where modified to use the new
data structure.

In prometheus an additional optimization is now possible.
The data that is collected from all shards and does not merge into a single
datastructure, instead, the merge is done while creating the reply.

See scylladb/scylladb#2350

Signed-off-by: Amnon Heiman <[email protected]>
  • Loading branch information
amnonh committed Jun 4, 2017
1 parent 78211d4 commit 1a4a08c
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 92 deletions.
89 changes: 69 additions & 20 deletions core/metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,44 +220,48 @@ bool metric_id::operator==(
// need to be available before the first users (reactor) will call it

shared_ptr<impl> get_local_impl() {
static thread_local auto the_impl = make_shared<impl>();
static thread_local auto the_impl = ::seastar::make_shared<impl>();
return the_impl;
}

void unregister_metric(const metric_id & id) {
shared_ptr<impl> map = get_local_impl();
auto i = map->get_value_map().find(id.full_name());
if (i != map->get_value_map().end()) {
void impl::remove_registration(const metric_id& id) {
auto i = get_value_map().find(id.full_name());
if (i != get_value_map().end()) {
auto j = i->second.find(id.labels());
if (j != i->second.end()) {
j->second = nullptr;
i->second.erase(j);
}
if (i->second.empty()) {
map->get_value_map().erase(i);
get_value_map().erase(i);
}
dirty();
}
}

void unregister_metric(const metric_id & id) {
get_local_impl()->remove_registration(id);
}

const value_map& get_value_map() {
return get_local_impl()->get_value_map();
}

values_copy get_values() {
values_copy res;

for (auto i : get_local_impl()->get_value_map()) {
std::vector<std::tuple<shared_ptr<registered_metric>, metric_value>> values;
for (auto&& v : i.second) {
if (v.second.get() && v.second->is_enabled()) {
values.emplace_back(v.second, (*(v.second))());
}
}
if (values.size() > 0) {
res[i.first] = std::move(values);
foreign_ptr<values_reference> get_values() {
shared_ptr<values_copy> res_ref = ::seastar::make_shared<values_copy>();
auto& res = *(res_ref.get());
auto& mv = res.values;
res.metadata = get_local_impl()->metadata();
auto & functions = get_local_impl()->functions();
mv.reserve(functions.size());
for (auto&& i : functions) {
value_vector values;
values.reserve(i.size());
for (auto&& v : i) {
values.emplace_back(v());
}
mv.emplace_back(std::move(values));
}
return std::move(res);
return res_ref;
}


Expand All @@ -269,6 +273,50 @@ instance_id_type shard() {
return sstring("0");
}

void impl::update_metrics_if_needed() {
if (_dirty) {
// Forcing the metadata to an empty initialization
// Will prevent using corrupted data if an exception is thrown
_metadata = ::seastar::make_shared<metric_metadata>();

auto mt_ref = ::seastar::make_shared<metric_metadata>();
auto &mt = *(mt_ref.get());
mt.reserve(_value_map.size());
_current_metrics.resize(_value_map.size());
size_t i = 0;
for (auto&& mf : _value_map) {
metric_metadata_vector metrics;
_current_metrics[i].clear();
for (auto&& m : mf.second) {
if (m.second && m.second->is_enabled()) {
metrics.emplace_back(m.second->info());
_current_metrics[i].emplace_back(m.second->get_function());
}
}
if (!metrics.empty()) {
// If nothing was added, no need to add the metric_family
// and no need to progress
mt.emplace_back(metric_family_metadata{mf.second.info(), std::move(metrics)});
i++;
}
}
// Maybe we didn't use all the original size
_current_metrics.resize(i);
_metadata = mt_ref;
_dirty = false;
}
}

shared_ptr<metric_metadata> impl::metadata() {
update_metrics_if_needed();
return _metadata;
}

std::vector<std::vector<metric_function>>& impl::functions() {
update_metrics_if_needed();
return _current_metrics;
}

void impl::add_registration(const metric_id& id, data_type type, metric_function f, const description& d, bool enabled) {
auto rm = ::seastar::make_shared<registered_metric>(id, f, enabled);
sstring name = id.full_name();
Expand All @@ -287,6 +335,7 @@ void impl::add_registration(const metric_id& id, data_type type, metric_function
_value_map[name].info().name = id.full_name();
_value_map[name][id.labels()] = rm;
}
dirty();
}

}
Expand Down
56 changes: 48 additions & 8 deletions core/metrics_api.hh
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,13 @@ public:
const metric_info& info() const {
return _info;
}
metric_function& get_function() {
return _f;
}
};



using register_ref = shared_ptr<registered_metric>;
using metric_instances = std::unordered_map<labels_type, register_ref>;
using metric_instances = std::map<labels_type, register_ref>;

class metric_family {
metric_instances _instances;
Expand Down Expand Up @@ -282,12 +283,36 @@ public:
return _instances.cend();
}

uint32_t size() const {
return _instances.size();
}

};

using value_map = std::unordered_map<sstring, metric_family>;
using value_holder = std::tuple<register_ref, metric_value>;
using value_vector = std::vector<value_holder>;
using values_copy = std::unordered_map<sstring, value_vector>;
using value_map = std::map<sstring, metric_family>;

using metric_metadata_vector = std::vector<metric_info>;

/*!
* \brief holds a metric family metadata
*
* The meta data of a metric family compose of the
* metadata of the family, and a vector of the metadata for
* each of the metric.
*/
struct metric_family_metadata {
metric_family_info mf;
metric_metadata_vector metrics;
};

using value_vector = std::vector<metric_value>;
using metric_metadata = std::vector<metric_family_metadata>;
using metric_values = std::vector<value_vector>;

struct values_copy {
shared_ptr<metric_metadata> metadata;
metric_values values;
};

struct config {
sstring hostname;
Expand All @@ -296,6 +321,9 @@ struct config {
class impl {
value_map _value_map;
config _config;
bool _dirty = true;
shared_ptr<metric_metadata> _metadata;
std::vector<std::vector<metric_function>> _current_metrics;
public:
value_map& get_value_map() {
return _value_map;
Expand All @@ -306,6 +334,7 @@ public:
}

void add_registration(const metric_id& id, data_type type, metric_function f, const description& d, bool enabled);
void remove_registration(const metric_id& id);
future<> stop() {
return make_ready_future<>();
}
Expand All @@ -315,11 +344,22 @@ public:
void set_config(const config& c) {
_config = c;
}

shared_ptr<metric_metadata> metadata();

std::vector<std::vector<metric_function>>& functions();

void update_metrics_if_needed();

void dirty() {
_dirty = true;
}
};

const value_map& get_value_map();
using values_reference = shared_ptr<values_copy>;

values_copy get_values();
foreign_ptr<values_reference> get_values();

shared_ptr<impl> get_local_impl();

Expand Down
Loading

0 comments on commit 1a4a08c

Please sign in to comment.