Skip to content

Commit

Permalink
reactor: convert task quota signal to a timerfd+thread
Browse files Browse the repository at this point in the history
In Linux, signals are protected by a global lock, and thus don't scale with
increasing core counts.  We already observed this on large machines, and
it will grow worse with our plans for sub-task-quota timing with the
CPU scheduler.

To prevent a lock meltdown, convert the signal to a timerfd + a thread
to wait for it and update g_need_preempt.  We pay the cost of a context
switch, but that is not very different from signal delivery, (perhaps even
cheaper), and scales with increasing core counts.

In the future we may want to have timer threads service g_need_preempt
for other reactors on the same processor socket, if it proves necessary.
  • Loading branch information
avikivity committed Jan 1, 2017
1 parent d3e2ea4 commit f03ebd3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 21 deletions.
40 changes: 20 additions & 20 deletions core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,6 @@ inline int alarm_signal() {
return SIGRTMIN;
}

inline int task_quota_signal() {
return SIGRTMIN + 1;
}

// Installs signal handler stack for current thread.
// The stack remains installed as long as the returned object is kept alive.
// When it goes out of scope the previous handler is restored.
Expand Down Expand Up @@ -266,10 +262,12 @@ reactor::reactor()
[&] { timer_thread_func(); }, sched::thread::attr().stack(4096).name("timer_thread").pin(sched::cpu::current()))
, _engine_thread(sched::thread::current())
#endif
, _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC))
, _cpu_started(0)
, _io_context(0)
, _io_context_available(max_aio)
, _reuseport(posix_reuseport_detect()) {
, _reuseport(posix_reuseport_detect())
, _task_quota_timer_thread(&reactor::task_quota_timer_thread_fn, this) {

seastar::thread_impl::init();
auto r = ::io_setup(max_aio, &_io_context);
Expand All @@ -288,11 +286,7 @@ reactor::reactor()
sev.sigev_signo = alarm_signal();
r = timer_create(CLOCK_MONOTONIC, &sev, &_steady_clock_timer);
assert(r >= 0);
sev.sigev_signo = task_quota_signal();
r = timer_create(CLOCK_MONOTONIC, &sev, &_task_quota_timer);
assert(r >= 0);
sigemptyset(&mask);
sigaddset(&mask, task_quota_signal());
r = ::pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
assert(r == 0);
#endif
Expand All @@ -304,7 +298,8 @@ reactor::reactor()
}

reactor::~reactor() {
timer_delete(_task_quota_timer);
_dying.store(true, std::memory_order_relaxed);
_task_quota_timer_thread.join();
timer_delete(_steady_clock_timer);
auto eraser = [](auto& list) {
while (!list.empty()) {
Expand All @@ -317,6 +312,18 @@ reactor::~reactor() {
eraser(_expired_manual_timers);
}

void
reactor::task_quota_timer_thread_fn() {
while (!_dying.load(std::memory_order_relaxed)) {
uint64_t events;
_task_quota_timer.read(&events, 8);
_local_need_preempt = true;
// We're in a different thread, but guaranteed to be on the same core, so even
// a signal fence is overdoing it
std::atomic_signal_fence(std::memory_order_seq_cst);
}
}

void
reactor::clear_task_quota(int) {
g_need_preempt = true;
Expand Down Expand Up @@ -2335,16 +2342,9 @@ int reactor::run() {
load_timer.arm_periodic(1s);

itimerspec its = seastar::posix::to_relative_itimerspec(_task_quota, _task_quota);
auto r = timer_settime(_task_quota_timer, 0, &its, nullptr);
assert(r == 0);
_task_quota_timer.timerfd_settime(0, its);
auto& task_quote_itimerspec = its;

struct sigaction sa_task_quota = {};
sa_task_quota.sa_handler = &reactor::clear_task_quota;
sa_task_quota.sa_flags = SA_RESTART;
r = sigaction(task_quota_signal(), &sa_task_quota, nullptr);
assert(r == 0);

bool idle = false;

std::function<bool()> check_for_work = [this] () {
Expand Down Expand Up @@ -2398,11 +2398,11 @@ int reactor::run() {
if (idle_end - idle_start > _max_poll_time) {
// Turn off the task quota timer to avoid spurious wakeiups
struct itimerspec zero_itimerspec = {};
timer_settime(_task_quota_timer, 0, &zero_itimerspec, nullptr);
_task_quota_timer.timerfd_settime(0, zero_itimerspec);
sleep();
// We may have slept for a while, so freshen idle_end
idle_end = steady_clock_type::now();
timer_settime(_task_quota_timer, 0, &task_quote_itimerspec, nullptr);
_task_quota_timer.timerfd_settime(0, task_quote_itimerspec);
}
} else {
// We previously ran pure_check_for_work(), might not actually have performed
Expand Down
6 changes: 5 additions & 1 deletion core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ private:
promise<std::unique_ptr<network_stack>> _network_stack_ready_promise;
int _return = 0;
timer_t _steady_clock_timer = {};
timer_t _task_quota_timer = {};
file_desc _task_quota_timer;
promise<> _start_promise;
semaphore _cpu_started;
uint64_t _tasks_processed = 0;
Expand Down Expand Up @@ -730,6 +730,9 @@ private:
std::atomic<bool> _sleeping alignas(64);
pthread_t _thread_id alignas(64) = pthread_self();
bool _strict_o_direct = true;
bool& _local_need_preempt{g_need_preempt}; // for access from the _task_quota_timer_thread
std::thread _task_quota_timer_thread;
std::atomic<bool> _dying{false};
private:
static std::chrono::nanoseconds calculate_poll_time();
static void clear_task_quota(int);
Expand Down Expand Up @@ -782,6 +785,7 @@ private:

void run_tasks(circular_buffer<std::unique_ptr<task>>& tasks);
bool posix_reuseport_detect();
void task_quota_timer_thread_fn();
public:
static boost::program_options::options_description get_options_description();
reactor();
Expand Down

0 comments on commit f03ebd3

Please sign in to comment.