From 81301186933fde4bfeffa306152137dd6592aa58 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Fri, 25 Mar 2016 11:52:48 +0100 Subject: [PATCH] core: Fix use-after-free of scollectd::impl If the destructor for the thread_local reactor holder runs after the destructor for the thread_local scollectd::impl, then the scollectd registrations contained in the native_network_stack's ipv4, when unregistering themselves, will access an invalid scollectd::impl. This patch fixes the problem by defining scollectd::impl before reactor_holder, ensuring they will be destroyed in reverse order. Signed-off-by: Duarte Nunes Message-Id: <1458903168-7838-1-git-send-email-duarte@scylladb.com> --- core/reactor.cc | 26 +- core/scollectd-impl.hh | 78 +++++ core/scollectd.cc | 650 ++++++++++++++++++++--------------------- 3 files changed, 403 insertions(+), 351 deletions(-) create mode 100644 core/scollectd-impl.hh diff --git a/core/reactor.cc b/core/reactor.cc index 5515523b93a..250e9123c6c 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -32,7 +32,7 @@ #include "net/posix-stack.hh" #include "resource.hh" #include "print.hh" -#include "scollectd.hh" +#include "scollectd-impl.hh" #include "util/conversions.hh" #include "core/future-util.hh" #include "thread.hh" @@ -2296,6 +2296,21 @@ smp::get_options_description() return opts; } +thread_local scollectd::impl scollectd_impl; + +scollectd::impl & scollectd::get_impl() { + return scollectd_impl; +} + +struct reactor_deleter { + void operator()(reactor* p) { + p->~reactor(); + free(p); + } +}; + +thread_local std::unique_ptr reactor_holder; + std::vector smp::_threads; std::experimental::optional smp::_all_event_loops_done; std::vector smp::_reactors; @@ -2347,15 +2362,6 @@ void smp::arrive_at_event_loop_end() { } void smp::allocate_reactor() { - struct reactor_deleter { - void operator()(reactor* p) { - p->~reactor(); - free(p); - } - }; - static thread_local std::unique_ptr - reactor_holder; - assert(!reactor_holder); // we cannot just write "local_engin = new reactor" since reactor's constructor diff --git a/core/scollectd-impl.hh b/core/scollectd-impl.hh new file mode 100644 index 00000000000..e3ea38d1032 --- /dev/null +++ b/core/scollectd-impl.hh @@ -0,0 +1,78 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2016 ScyllaDB + */ + +#pragma once + +#include "core/scollectd.hh" +#include "core/reactor.hh" + +namespace scollectd { + +using namespace std::chrono_literals; +using duration = std::chrono::milliseconds; + +static const ipv4_addr default_addr("239.192.74.66:25826"); +static const std::chrono::milliseconds default_period(1s); + +class impl { + net::udp_channel _chan; + timer<> _timer; + + sstring _host = "localhost"; + ipv4_addr _addr = default_addr; + std::chrono::milliseconds _period = default_period; + uint64_t _num_packets = 0; + uint64_t _millis = 0; + uint64_t _bytes = 0; + double _avg = 0; + +public: + typedef std::map > value_list_map; + typedef value_list_map::value_type value_list_pair; + + void add_polled(const type_instance_id & id, + const shared_ptr & values); + void remove_polled(const type_instance_id & id); + // explicitly send a type_instance value list (outside polling) + future<> send_metric(const type_instance_id & id, + const value_list & values); + future<> send_notification(const type_instance_id & id, + const sstring & msg); + // initiates actual value polling -> send to target "loop" + void start(const sstring & host, const ipv4_addr & addr, const std::chrono::milliseconds period); + void stop(); + +private: + void arm(); + void run(); + +public: + shared_ptr get_values(const type_instance_id & id); + std::vector get_instance_ids(); + +private: + value_list_map _values; + registrations _regs; +}; + +impl & get_impl(); + +}; diff --git a/core/scollectd.cc b/core/scollectd.cc index b736c09ade5..f879a4965df 100644 --- a/core/scollectd.cc +++ b/core/scollectd.cc @@ -28,10 +28,8 @@ #include #include -#include "scollectd.hh" +#include "scollectd-impl.hh" #include "core/future-util.hh" -#include "core/reactor.hh" -#include "net/api.hh" #include "scollectd_api.hh" bool scollectd::type_instance_id::operator<( @@ -53,377 +51,347 @@ bool scollectd::type_instance_id::operator==( namespace scollectd { -using namespace std::chrono_literals; -using duration = std::chrono::milliseconds; - - -static const ipv4_addr default_addr("239.192.74.66:25826"); -static const duration default_period(1s); const plugin_instance_id per_cpu_plugin_instance("#cpu"); -future<> send_metric(const type_instance_id &, const value_list &); +static const size_t payload_size = 1024; + +enum class part_type : uint16_t { + Host = 0x0000, // The name of the host to associate with subsequent data values + Time = 0x0001, // Time Numeric The timestamp to associate with subsequent data values, unix time format (seconds since epoch) + TimeHr = 0x0008, // Time (high resolution) Numeric The timestamp to associate with subsequent data values. Time is defined in 2–30 seconds since epoch. New in Version 5.0. + Plugin = 0x0002, // Plugin String The plugin name to associate with subsequent data values, e.g. "cpu" + PluginInst = 0x0003, // Plugin instance String The plugin instance name to associate with subsequent data values, e.g. "1" + Type = 0x0004, // Type String The type name to associate with subsequent data values, e.g. "cpu" + TypeInst = 0x0005, // Type instance String The type instance name to associate with subsequent data values, e.g. "idle" + Values = 0x0006, // Values other Data values, see above + Interval = 0x0007, // Interval Numeric Interval used to set the "step" when creating new RRDs unless rrdtool plugin forces StepSize. Also used to detect values that have timed out. + IntervalHr = 0x0009, // Interval (high resolution) Numeric The interval in which subsequent data values are collected. The interval is given in 2–30 seconds. New in Version 5.0. + Message = 0x0100, // Message (notifications) String + Severity = 0x0101, // Severity Numeric + Signature = 0x0200, // Signature (HMAC-SHA-256) other (todo) + Encryption = 0x0210, // Encryption (AES-256/OFB +}; + +// "Time is defined in 2^–30 seconds since epoch. New in Version 5.0." +typedef std::chrono::duration> collectd_hres_duration; -class impl { - net::udp_channel _chan; - timer<> _timer; +// yet another writer type, this one to construct collectd network +// protocol data. +struct cpwriter { + typedef std::array buffer_type; + typedef buffer_type::iterator mark_type; + typedef buffer_type::const_iterator const_mark_type; - sstring _host = "localhost"; - ipv4_addr _addr = default_addr; - duration _period = default_period; - uint64_t _num_packets = 0; - uint64_t _millis = 0; - uint64_t _bytes = 0; - double _avg = 0; + buffer_type _buf; + mark_type _pos; + bool _overflow = false; -public: - typedef std::map > value_list_map; - typedef value_list_map::value_type value_list_pair; + std::unordered_map _cache; - void add_polled(const type_instance_id & id, - const shared_ptr & values) { - _values[id] = values; + cpwriter() + : _pos(_buf.begin()) { + } + mark_type mark() const { + return _pos; + } + bool overflow() const { + return _overflow; + } + void reset(mark_type m) { + _pos = m; + _overflow = false; + } + size_t size() const { + return std::distance(_buf.begin(), const_mark_type(_pos)); + } + bool empty() const { + return _pos == _buf.begin(); + } + void clear() { + reset(_buf.begin()); + _cache.clear(); + _overflow = false; + } + const char * data() const { + return &_buf.at(0); } - void remove_polled(const type_instance_id & id) { - auto i = _values.find(id); - if (i != _values.end()) { - i->second = nullptr; + char * data() { + return &_buf.at(0); + } + cpwriter& check(size_t sz) { + size_t av = std::distance(_pos, _buf.end()); + _overflow |= av < sz; + return *this; + } + explicit operator bool() const { + return !_overflow; + } + bool operator!() const { + return !operator bool(); + } + template + cpwriter & write(_Iter s, _Iter e) { + if (check(std::distance(s, e))) { + _pos = std::copy(s, e, _pos); } + return *this; } - // explicitly send a type_instance value list (outside polling) - future<> send_metric(const type_instance_id & id, - const value_list & values) { - if (values.empty()) { - return make_ready_future(); + template + typename std::enable_if::value, cpwriter &>::type write( + const T & t) { + T tmp = net::hton(t); + auto * p = reinterpret_cast(&tmp); + auto * e = p + sizeof(T); + write(p, e); + return *this; + } + cpwriter & write(const sstring & s) { + write(s.begin(), s.end() + 1); // include \0 + return *this; + } + cpwriter & put(part_type type, const sstring & s) { + write(uint16_t(type)); + write(uint16_t(4 + s.size() + 1)); // include \0 + write(s); // include \0 + return *this; + } + cpwriter & put_cached(part_type type, const sstring & s) { + auto & cached = _cache[uint16_t(type)]; + if (cached != s) { + put(type, s); + cached = s; } - cpwriter out; - out.put(_host, duration(), id, values); - return _chan.send(_addr, net::packet(out.data(), out.size())); + return *this; } - future<> send_notification(const type_instance_id & id, - const sstring & msg) { - cpwriter out; - out.put(_host, id); - out.put(part_type::Message, msg); - return _chan.send(_addr, net::packet(out.data(), out.size())); + template + typename std::enable_if::value, cpwriter &>::type put( + part_type type, T t) { + write(uint16_t(type)); + write(uint16_t(4 + sizeof(t))); + write(t); + return *this; } - // initiates actual value polling -> send to target "loop" - void start(const sstring & host, const ipv4_addr & addr, const duration period) { - _period = period; - _addr = addr; - _host = host; - _chan = engine().net().make_udp_channel(); - _timer.set_callback(std::bind(&impl::run, this)); - - // dogfood ourselves - _regs = { - // total_bytes value:DERIVE:0:U - add_polled_metric( - type_instance_id("scollectd", per_cpu_plugin_instance, - "total_bytes", "sent"), - make_typed(data_type::DERIVE, _bytes)), - // total_requests value:DERIVE:0:U - add_polled_metric( - type_instance_id("scollectd", per_cpu_plugin_instance, - "total_requests"), - make_typed(data_type::DERIVE, _num_packets) - ), - // latency value:GAUGE:0:U - add_polled_metric( - type_instance_id("scollectd", per_cpu_plugin_instance, - "latency"), _avg), - // total_time_in_ms value:DERIVE:0:U - add_polled_metric( - type_instance_id("scollectd", per_cpu_plugin_instance, - "total_time_in_ms"), - make_typed(data_type::DERIVE, _millis) - ), - // total_values value:DERIVE:0:U - add_polled_metric( - type_instance_id("scollectd", per_cpu_plugin_instance, - "total_values"), - make_typed(data_type::DERIVE, std::bind(&value_list_map::size, &_values)) - ), - // records value:GAUGE:0:U - add_polled_metric( - type_instance_id("scollectd", per_cpu_plugin_instance, - "records"), - make_typed(data_type::GAUGE, std::bind(&value_list_map::size, &_values)) - ), - }; - - send_notification( - type_instance_id("scollectd", per_cpu_plugin_instance, - "network"), "daemon started"); - arm(); + cpwriter & put(part_type type, const value_list & v) { + auto s = v.size(); + auto sz = 6 + s + s * sizeof(uint64_t); + if (check(sz)) { + write(uint16_t(type)); + write(uint16_t(sz)); + write(uint16_t(s)); + v.types(reinterpret_cast(&(*_pos))); + _pos += s; + v.values(reinterpret_cast *>(&(*_pos))); + _pos += s * sizeof(uint64_t); + } + return *this; + } + cpwriter & put(const sstring & host, const type_instance_id & id) { + const auto ts = std::chrono::system_clock::now().time_since_epoch(); + const auto lrts = + std::chrono::duration_cast(ts).count(); + + put_cached(part_type::Host, host); + put(part_type::Time, uint64_t(lrts)); + // Seems hi-res timestamp does not work very well with + // at the very least my default collectd in fedora (or I did it wrong?) + // Use lo-res ts for now, it is probably quite sufficient. + put_cached(part_type::Plugin, id.plugin()); + // Optional + put_cached(part_type::PluginInst, + id.plugin_instance() == per_cpu_plugin_instance ? + to_sstring(engine().cpu_id()) : id.plugin_instance()); + put_cached(part_type::Type, id.type()); + // Optional + put_cached(part_type::TypeInst, id.type_instance()); + return *this; } - void stop() { - _timer.cancel(); - _regs.clear(); + cpwriter & put(const sstring & host, + const duration & period, + const type_instance_id & id, const value_list & v) { + const auto ps = std::chrono::duration_cast( + period).count(); + put(host, id); + put(part_type::Values, v); + if (ps != 0) { + put(part_type::IntervalHr, ps); + } + return *this; } +}; -private: - static const size_t payload_size = 1024; +void impl::add_polled(const type_instance_id & id, + const shared_ptr & values) { + _values[id] = values; +} - void arm() { - if (_period != duration()) { - _timer.arm(_period); - } +void impl::remove_polled(const type_instance_id & id) { + auto i = _values.find(id); + if (i != _values.end()) { + i->second = nullptr; + } +} + +// explicitly send a type_instance value list (outside polling) +future<> impl::send_metric(const type_instance_id & id, + const value_list & values) { + if (values.empty()) { + return make_ready_future(); } + cpwriter out; + out.put(_host, duration(), id, values); + return _chan.send(_addr, net::packet(out.data(), out.size())); +} + +future<> impl::send_notification(const type_instance_id & id, + const sstring & msg) { + cpwriter out; + out.put(_host, id); + out.put(part_type::Message, msg); + return _chan.send(_addr, net::packet(out.data(), out.size())); +} - enum class part_type - : uint16_t { - Host = 0x0000, // The name of the host to associate with subsequent data values - Time = 0x0001, // Time Numeric The timestamp to associate with subsequent data values, unix time format (seconds since epoch) - TimeHr = 0x0008, // Time (high resolution) Numeric The timestamp to associate with subsequent data values. Time is defined in 2–30 seconds since epoch. New in Version 5.0. - Plugin = 0x0002, // Plugin String The plugin name to associate with subsequent data values, e.g. "cpu" - PluginInst = 0x0003, // Plugin instance String The plugin instance name to associate with subsequent data values, e.g. "1" - Type = 0x0004, // Type String The type name to associate with subsequent data values, e.g. "cpu" - TypeInst = 0x0005, // Type instance String The type instance name to associate with subsequent data values, e.g. "idle" - Values = 0x0006, // Values other Data values, see above - Interval = 0x0007, // Interval Numeric Interval used to set the "step" when creating new RRDs unless rrdtool plugin forces StepSize. Also used to detect values that have timed out. - IntervalHr = 0x0009, // Interval (high resolution) Numeric The interval in which subsequent data values are collected. The interval is given in 2–30 seconds. New in Version 5.0. - Message = 0x0100, // Message (notifications) String - Severity = 0x0101, // Severity Numeric - Signature = 0x0200, // Signature (HMAC-SHA-256) other (todo) - Encryption = 0x0210, // Encryption (AES-256/OFB +// initiates actual value polling -> send to target "loop" +void impl::start(const sstring & host, const ipv4_addr & addr, const duration period) { + _period = period; + _addr = addr; + _host = host; + _chan = engine().net().make_udp_channel(); + _timer.set_callback(std::bind(&impl::run, this)); + + // dogfood ourselves + _regs = { + // total_bytes value:DERIVE:0:U + add_polled_metric( + type_instance_id("scollectd", per_cpu_plugin_instance, + "total_bytes", "sent"), + make_typed(data_type::DERIVE, _bytes)), + // total_requests value:DERIVE:0:U + add_polled_metric( + type_instance_id("scollectd", per_cpu_plugin_instance, + "total_requests"), + make_typed(data_type::DERIVE, _num_packets) + ), + // latency value:GAUGE:0:U + add_polled_metric( + type_instance_id("scollectd", per_cpu_plugin_instance, + "latency"), _avg), + // total_time_in_ms value:DERIVE:0:U + add_polled_metric( + type_instance_id("scollectd", per_cpu_plugin_instance, + "total_time_in_ms"), + make_typed(data_type::DERIVE, _millis) + ), + // total_values value:DERIVE:0:U + add_polled_metric( + type_instance_id("scollectd", per_cpu_plugin_instance, + "total_values"), + make_typed(data_type::DERIVE, std::bind(&value_list_map::size, &_values)) + ), + // records value:GAUGE:0:U + add_polled_metric( + type_instance_id("scollectd", per_cpu_plugin_instance, + "records"), + make_typed(data_type::GAUGE, std::bind(&value_list_map::size, &_values)) + ), }; - // "Time is defined in 2^–30 seconds since epoch. New in Version 5.0." - typedef std::chrono::duration> collectd_hres_duration; + send_notification( + type_instance_id("scollectd", per_cpu_plugin_instance, + "network"), "daemon started"); + arm(); +} - // yet another writer type, this one to construct collectd network - // protocol data. - struct cpwriter { - typedef std::array buffer_type; - typedef buffer_type::iterator mark_type; - typedef buffer_type::const_iterator const_mark_type; +void impl::stop() { + _timer.cancel(); + _regs.clear(); +} - buffer_type _buf; - mark_type _pos; - bool _overflow = false; - std::unordered_map _cache; +void impl::arm() { + if (_period != duration()) { + _timer.arm(_period); + } +} - cpwriter() - : _pos(_buf.begin()) { - } - mark_type mark() const { - return _pos; - } - bool overflow() const { - return _overflow; - } - void reset(mark_type m) { - _pos = m; - _overflow = false; - } - size_t size() const { - return std::distance(_buf.begin(), const_mark_type(_pos)); - } - bool empty() const { - return _pos == _buf.begin(); - } - void clear() { - reset(_buf.begin()); - _cache.clear(); - _overflow = false; - } - const char * data() const { - return &_buf.at(0); - } - char * data() { - return &_buf.at(0); - } - cpwriter& check(size_t sz) { - size_t av = std::distance(_pos, _buf.end()); - _overflow |= av < sz; - return *this; - } - explicit operator bool() const { - return !_overflow; - } - bool operator!() const { - return !operator bool(); - } - template - cpwriter & write(_Iter s, _Iter e) { - if (check(std::distance(s, e))) { - _pos = std::copy(s, e, _pos); - } - return *this; - } - template - typename std::enable_if::value, cpwriter &>::type write( - const T & t) { - T tmp = net::hton(t); - auto * p = reinterpret_cast(&tmp); - auto * e = p + sizeof(T); - write(p, e); - return *this; - } - cpwriter & write(const sstring & s) { - write(s.begin(), s.end() + 1); // include \0 - return *this; - } - cpwriter & put(part_type type, const sstring & s) { - write(uint16_t(type)); - write(uint16_t(4 + s.size() + 1)); // include \0 - write(s); // include \0 - return *this; - } - cpwriter & put_cached(part_type type, const sstring & s) { - auto & cached = _cache[uint16_t(type)]; - if (cached != s) { - put(type, s); - cached = s; +void impl::run() { + typedef value_list_map::iterator iterator; + typedef std::tuple context; + + auto ctxt = make_lw_shared(); + + // note we're doing this unsynced since we assume + // all registrations to this instance will be done on the + // same cpu, and without interuptions (no wait-states) + std::get(*ctxt) = _values.begin(); + + auto stop_when = [this, ctxt]() { + auto done = std::get(*ctxt) == _values.end(); + return done; + }; + // append as many values as we can fit into a packet (1024 bytes) + auto send_packet = [this, ctxt]() { + auto start = steady_clock_type::now(); + auto & i = std::get(*ctxt); + auto & out = std::get(*ctxt); + + out.clear(); + + while (i != _values.end()) { + // nullptr value list means removed value. so remove. + if (!i->second) { + i = _values.erase(i); + continue; } - return *this; - } - template - typename std::enable_if::value, cpwriter &>::type put( - part_type type, T t) { - write(uint16_t(type)); - write(uint16_t(4 + sizeof(t))); - write(t); - return *this; - } - cpwriter & put(part_type type, const value_list & v) { - auto s = v.size(); - auto sz = 6 + s + s * sizeof(uint64_t); - if (check(sz)) { - write(uint16_t(type)); - write(uint16_t(sz)); - write(uint16_t(s)); - v.types(reinterpret_cast(&(*_pos))); - _pos += s; - v.values(reinterpret_cast *>(&(*_pos))); - _pos += s * sizeof(uint64_t); + auto m = out.mark(); + out.put(_host, _period, i->first, *i->second); + if (!out) { + out.reset(m); + break; } - return *this; + ++i; } - cpwriter & put(const sstring & host, const type_instance_id & id) { - const auto ts = std::chrono::system_clock::now().time_since_epoch(); - const auto lrts = - std::chrono::duration_cast(ts).count(); - - put_cached(part_type::Host, host); - put(part_type::Time, uint64_t(lrts)); - // Seems hi-res timestamp does not work very well with - // at the very least my default collectd in fedora (or I did it wrong?) - // Use lo-res ts for now, it is probably quite sufficient. - put_cached(part_type::Plugin, id.plugin()); - // Optional - put_cached(part_type::PluginInst, - id.plugin_instance() == per_cpu_plugin_instance ? - to_sstring(engine().cpu_id()) : id.plugin_instance()); - put_cached(part_type::Type, id.type()); - // Optional - put_cached(part_type::TypeInst, id.type_instance()); - return *this; - } - cpwriter & put(const sstring & host, - const duration & period, - const type_instance_id & id, const value_list & v) { - const auto ps = std::chrono::duration_cast( - period).count(); - put(host, id); - put(part_type::Values, v); - if (ps != 0) { - put(part_type::IntervalHr, ps); - } - return *this; + if (out.empty()) { + return make_ready_future(); } + return _chan.send(_addr, net::packet(out.data(), out.size())).then([start, ctxt, this]() { + auto & out = std::get(*ctxt); + auto now = steady_clock_type::now(); + // dogfood stats + ++_num_packets; + _millis += std::chrono::duration_cast(now - start).count(); + _bytes += out.size(); + _avg = double(_millis) / _num_packets; + }).then_wrapped([] (auto&& f) { + try { + f.get(); + } catch (std::exception & ex) { + std::cout << "send failed: " << ex.what() << std::endl; + } catch (...) { + std::cout << "send failed: - unknown exception" << std::endl; + } + }); }; + do_until(stop_when, send_packet).then([this]() { + arm(); + }); +} - void run() { - typedef value_list_map::iterator iterator; - typedef std::tuple context; - - auto ctxt = make_lw_shared(); - - // note we're doing this unsynced since we assume - // all registrations to this instance will be done on the - // same cpu, and without interuptions (no wait-states) - std::get(*ctxt) = _values.begin(); - - auto stop_when = [this, ctxt]() { - auto done = std::get(*ctxt) == _values.end(); - return done; - }; - // append as many values as we can fit into a packet (1024 bytes) - auto send_packet = [this, ctxt]() { - auto start = steady_clock_type::now(); - auto & i = std::get(*ctxt); - auto & out = std::get(*ctxt); - - out.clear(); - - while (i != _values.end()) { - // nullptr value list means removed value. so remove. - if (!i->second) { - i = _values.erase(i); - continue; - } - auto m = out.mark(); - out.put(_host, _period, i->first, *i->second); - if (!out) { - out.reset(m); - break; - } - ++i; - } - if (out.empty()) { - return make_ready_future(); - } - return _chan.send(_addr, net::packet(out.data(), out.size())).then([start, ctxt, this]() { - auto & out = std::get(*ctxt); - auto now = steady_clock_type::now(); - // dogfood stats - ++_num_packets; - _millis += std::chrono::duration_cast(now - start).count(); - _bytes += out.size(); - _avg = double(_millis) / _num_packets; - }).then_wrapped([] (auto&& f) { - try { - f.get(); - } catch (std::exception & ex) { - std::cout << "send failed: " << ex.what() << std::endl; - } catch (...) { - std::cout << "send failed: - unknown exception" << std::endl; - } - }); - }; - do_until(stop_when, send_packet).then([this]() { - arm(); - }); - } -public: - shared_ptr get_values(const type_instance_id & id) { - return _values[id]; - } +shared_ptr impl::get_values(const type_instance_id & id) { + return _values[id]; +} - std::vector get_instance_ids() { - std::vector res; - for (auto i: _values) { - // Need to check for empty value_list, since unreg is two-stage. - // Not an issue for most uses, but unit testing etc that would like - // fully deterministic operation here would like us to only return - // actually active ids - if (i.second) { - res.emplace_back(i.first); - } +std::vector impl::get_instance_ids() { + std::vector res; + for (auto i: _values) { + // Need to check for empty value_list, since unreg is two-stage. + // Not an issue for most uses, but unit testing etc that would like + // fully deterministic operation here would like us to only return + // actually active ids + if (i.second) { + res.emplace_back(i.first); } - return res; } - -private: - value_list_map _values; - registrations _regs; -}; - -impl & get_impl() { - static thread_local impl per_cpu_instance; - return per_cpu_instance; + return res; } void add_polled(const type_instance_id & id,