Skip to content

Commit

Permalink
execution_stage: add support for scheduling_group
Browse files Browse the repository at this point in the history
Allow associating an execution stage with a scheduling group. Tasks
run under the stage will be accounted for under the scheduling group.
  • Loading branch information
avikivity committed Oct 1, 2017
1 parent c87e1bd commit 95ea249
Showing 1 changed file with 62 additions and 10 deletions.
72 changes: 62 additions & 10 deletions core/execution_stage.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "function_traits.hh"
#include "sstring.hh"
#include "metrics.hh"
#include "scheduling.hh"
#include "util/reference_wrapper.hh"
#include "util/gcc6-concepts.hh"
#include "../util/defer.hh"
Expand Down Expand Up @@ -120,13 +121,14 @@ public:
protected:
bool _empty = true;
bool _flush_scheduled = false;
scheduling_group _sg;
stats _stats;
sstring _name;
metrics::metric_group _metric_group;
protected:
virtual void do_flush() noexcept = 0;
public:
explicit execution_stage(const sstring& name);
explicit execution_stage(const sstring& name, scheduling_group sg = {});
virtual ~execution_stage();

execution_stage(const execution_stage&) = delete;
Expand Down Expand Up @@ -157,7 +159,7 @@ public:
return false;
}
_stats.tasks_scheduled++;
schedule(make_task([this] {
schedule(make_task(_sg, [this] {
do_flush();
_flush_scheduled = false;
}));
Expand Down Expand Up @@ -293,12 +295,15 @@ private:
_empty = _queue.empty();
}
public:
explicit concrete_execution_stage(const sstring& name, Function f)
: execution_stage(name)
explicit concrete_execution_stage(const sstring& name, scheduling_group sg, Function f)
: execution_stage(name, sg)
, _function(std::move(f))
{
_queue.reserve(flush_threshold);
}
explicit concrete_execution_stage(const sstring& name, Function f)
: concrete_execution_stage(name, scheduling_group(), std::move(f)) {
}

/// Enqueues a call to the stage's function
///
Expand Down Expand Up @@ -362,15 +367,52 @@ public:
/// ```
///
/// \param name unique name of the execution stage
/// \param sg scheduling group to run under
/// \param fn function to be executed by the stage
/// \return concrete_execution_stage
///
template<typename Function>
auto make_execution_stage(const sstring& name, Function&& fn) {
auto make_execution_stage(const sstring& name, scheduling_group sg, Function&& fn) {
using traits = function_traits<Function>;
return concrete_execution_stage<std::decay_t<Function>, typename traits::return_type,
typename traits::args_as_tuple>(name, std::forward<Function>(fn));
}

/// Creates a new execution stage (variant taking \ref scheduling_group)
///
/// Wraps given function object in a concrete_execution_stage. All arguments
/// of the function object are required to have move constructors that do not
/// throw. Function object may return a future or an immediate object or void.
///
/// Moving execution stages is discouraged and illegal after first function
/// call is enqueued.
///
/// Usage example:
/// ```
/// double do_something(int);
/// thread_local auto stage1 = seastar::make_execution_stage("execution-stage1", do_something);
///
/// future<double> func1(int val) {
/// return stage1(val);
/// }
///
/// future<double> do_some_io(int);
/// thread_local auto stage2 = seastar::make_execution_stage("execution-stage2", do_some_io);
///
/// future<double> func2(int val) {
/// return stage2(val);
/// }
/// ```
///
/// \param name unique name of the execution stage (variant not taking \ref scheduling_group)
/// \param fn function to be executed by the stage
/// \return concrete_execution_stage
///
template<typename Function>
auto make_execution_stage(const sstring& name, Function&& fn) {
return make_execution_stage(name, scheduling_group(), std::forward<Function>(fn));
}

/// Creates a new execution stage from a member function
///
/// Wraps a pointer to member function in a concrete_execution_stage. When
Expand All @@ -394,21 +436,31 @@ auto make_execution_stage(const sstring& name, Function&& fn) {
/// \param name unique name of the execution stage
/// \param fn member function to be executed by the stage
/// \return concrete_execution_stage
template<typename Ret, typename Object, typename... Args>
auto make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...)) {
return concrete_execution_stage<decltype(std::mem_fn(fn)), Ret, std::tuple<Object*, Args...>>(name, sg, std::mem_fn(fn));
}

template<typename Ret, typename Object, typename... Args>
auto make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...) const) {
return concrete_execution_stage<decltype(std::mem_fn(fn)), Ret, std::tuple<const Object*, Args...>>(name, sg, std::mem_fn(fn));
}

template<typename Ret, typename Object, typename... Args>
auto make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...)) {
return concrete_execution_stage<decltype(std::mem_fn(fn)), Ret, std::tuple<Object*, Args...>>(name, std::mem_fn(fn));
return make_execution_stage(name, scheduling_group(), fn);
}

template<typename Ret, typename Object, typename... Args>
auto make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...) const) {
return concrete_execution_stage<decltype(std::mem_fn(fn)), Ret, std::tuple<const Object*, Args...>>(name, std::mem_fn(fn));
return make_execution_stage(name, scheduling_group(), fn);
}

/// @}

inline execution_stage::execution_stage(const sstring& name)
: _name(name)

inline execution_stage::execution_stage(const sstring& name, scheduling_group sg)
: _sg(sg)
, _name(name)
{
internal::execution_stage_manager::get().register_execution_stage(*this);
auto undo = defer([&] { internal::execution_stage_manager::get().unregister_execution_stage(*this); });
Expand Down

0 comments on commit 95ea249

Please sign in to comment.