Skip to content

Commit

Permalink
io_queues: register individual classes with collectd
Browse files Browse the repository at this point in the history
It would be useful to know what is going on inside each priority class
if we are to properly analyze the characteristics of a living system.

This patch changes the registration mechanism slightly so that we can
include collectd counters for each of the registered classes.

Signed-off-by: Glauber Costa <[email protected]>
Message-Id: <[email protected]>
  • Loading branch information
Glauber Costa authored and avikivity committed Jan 24, 2016
1 parent 8e52055 commit 97f418a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 16 deletions.
8 changes: 7 additions & 1 deletion apps/fair_queue_tester/fair_queue_tester.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ static std::default_random_engine random_generator(random_seed);

class context {
struct class_data {
static int idgen();

uint32_t _shares;
io_priority_class _iop;
unsigned _final = 0;
Expand All @@ -48,7 +50,7 @@ class context {

class_data(uint32_t shares)
: _shares(shares)
, _iop(engine().register_one_priority_class(shares))
, _iop(engine().register_one_priority_class(sprint("test-class-%d", idgen()), shares))
{}
};
std::vector<class_data> _cl;
Expand Down Expand Up @@ -123,6 +125,10 @@ class context {
}
};

int context::class_data::idgen() {
static thread_local int id = 0;
return id++;
}

int main(int ac, char** av) {
namespace bpo = boost::program_options;
Expand Down
67 changes: 56 additions & 11 deletions core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ reactor::flush_pending_aio() {

const io_priority_class& default_priority_class() {
static thread_local auto shard_default_class = [] {
return engine().register_one_priority_class(1);
return engine().register_one_priority_class("default", 1);
}();
return shard_default_class;
}
Expand Down Expand Up @@ -608,46 +608,91 @@ io_queue::~io_queue() {
// And that will happen only when there are no more fibers to run. If we ever change
// that, then this has to change.
for (auto&& pclasses: _priority_classes) {
_fq.unregister_priority_class(pclasses.second);
_fq.unregister_priority_class(pclasses.second->ptr);
}
}

std::array<std::atomic<uint32_t>, io_queue::_max_classes> io_queue::_registered_shares;
// We could very well just add the name to the io_priority_class. However, because that
// structure is passed along all the time - and sometimes we can't help but copy it, better keep
// it lean. The name won't really be used for anything other than monitoring.
std::array<sstring, io_queue::_max_classes> io_queue::_registered_names;

void io_queue::fill_shares_array() {
for (unsigned i = 0; i < _max_classes; ++i) {
_registered_shares[i].store(0);
}
}

io_priority_class io_queue::register_one_priority_class(uint32_t shares) {
io_priority_class io_queue::register_one_priority_class(sstring name, uint32_t shares) {
uint32_t unused = 0;
for (unsigned i = 0; i < _max_classes; ++i) {
auto s = _registered_shares[i].compare_exchange_strong(unused, shares, std::memory_order_acq_rel);
if (s) {
io_priority_class p;
_registered_names[i] = name;
p.val = i;
return std::move(p);
};
}
throw std::runtime_error("No more room for new I/O priority classes");
}

io_queue::priority_class_data::priority_class_data(sstring name, priority_class_ptr ptr)
: ptr(ptr)
, bytes(0)
, ops(0)
, collectd_reg(scollectd::registrations({
scollectd::add_polled_metric(scollectd::type_instance_id("io_queue"
, scollectd::per_cpu_plugin_instance
, "derive", name)
, scollectd::make_typed(scollectd::data_type::DERIVE, bytes)
),
scollectd::add_polled_metric(scollectd::type_instance_id("io_queue"
, scollectd::per_cpu_plugin_instance
, "total_operations", name)
, scollectd::make_typed(scollectd::data_type::DERIVE, ops)
)
}))
{
}

io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_class& pc, shard_id owner) {
auto it_pclass = _priority_classes.find(pc);
if (it_pclass == _priority_classes.end()) {
auto shares = _registered_shares.at(pc).load(std::memory_order_acquire);
auto name = _registered_names.at(pc);
// A note on naming:
//
// We could just add the owner as the instance id and have something like:
// io_queue-<class_owner>-<counter>-<class_name>
//
// However, when there are more than one shard per I/O queue, it is very useful
// to know which shards are being served by the same queue. Therefore, a better name
// scheme is:
//
// io_queue-<queue_owner>-<counter>-<class_name>-<class_owner>
//
// This conveys all the information we need and allows one to easily group all classes from
// the same I/O queue (by filtering by instance ID)
auto ret = _priority_classes.emplace(pc, make_lw_shared<priority_class_data>(sprint("%s-%d", name, owner), _fq.register_priority_class(shares)));
it_pclass = ret.first;
}
return *(it_pclass->second);
}

template <typename Func>
future<io_event>
io_queue::queue_request(shard_id coordinator, const io_priority_class& pc, size_t len, Func prepare_io) {
return smp::submit_to(coordinator, [&pc, len, prepare_io = std::move(prepare_io)] {
return smp::submit_to(coordinator, [&pc, len, prepare_io = std::move(prepare_io), owner = engine().cpu_id()] {
auto& queue = *(engine()._io_queue);
unsigned weight = 1 + len/(16 << 10);
// First time will hit here, and then we create the class. It is important
// that we create the shared pointer in the same shard it will be used at later.
auto it_pclass = queue._priority_classes.find(pc);
if (it_pclass == queue._priority_classes.end()) {
auto shares = _registered_shares.at(pc).load(std::memory_order_acquire);
auto ret = queue._priority_classes.emplace(pc, queue._fq.register_priority_class(shares));
it_pclass = ret.first;
}
return queue._fq.queue((*it_pclass).second, weight, [prepare_io = std::move(prepare_io)] {
auto& pclass = queue.find_or_create_class(pc, owner);
pclass.bytes += len;
pclass.ops++;
return queue._fq.queue(pclass.ptr, weight, [prepare_io = std::move(prepare_io)] {
return engine().submit_io(std::move(prepare_io));
});
});
Expand Down
19 changes: 15 additions & 4 deletions core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,24 @@ private:
size_t _capacity;
std::vector<shard_id> _io_topology;

std::unordered_map<unsigned, priority_class_ptr> _priority_classes;
struct priority_class_data {
priority_class_ptr ptr;
size_t bytes;
uint64_t ops;
std::vector<scollectd::registration> collectd_reg;
priority_class_data(sstring name, priority_class_ptr ptr);
};

std::unordered_map<unsigned, lw_shared_ptr<priority_class_data>> _priority_classes;
fair_queue _fq;

static constexpr unsigned _max_classes = 1024;
static std::array<std::atomic<uint32_t>, _max_classes> _registered_shares;
static std::array<sstring, _max_classes> _registered_names;

static io_priority_class register_one_priority_class(sstring name, uint32_t shares);

static io_priority_class register_one_priority_class(uint32_t shares);
priority_class_data& find_or_create_class(const io_priority_class& pc, shard_id owner);
public:
static void fill_shares_array();

Expand Down Expand Up @@ -738,8 +749,8 @@ public:
return *_io_queue;
}

io_priority_class register_one_priority_class(uint32_t shares) {
return io_queue::register_one_priority_class(shares);
io_priority_class register_one_priority_class(sstring name, uint32_t shares) {
return io_queue::register_one_priority_class(std::move(name), shares);
}

void configure(boost::program_options::variables_map config);
Expand Down

0 comments on commit 97f418a

Please sign in to comment.