Skip to content

Commit

Permalink
reactor: virtualize high resolution timer processing
Browse files Browse the repository at this point in the history
The epoll backend uses signals for high resolution timers, so that a timer
expiration will set need_preempt(). But since we'll use a different method for
preemption, we need to move away from signals with the aio backend.

To prepare for that, move high resolution timer processing to reactor_backend.
  • Loading branch information
avikivity committed Dec 16, 2018
1 parent 8fb7985 commit 5aea50a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 26 deletions.
8 changes: 6 additions & 2 deletions include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ public:
virtual void handle_signal(int signo) = 0;
virtual void start_tick() = 0;
virtual void stop_tick() = 0;
virtual void arm_highres_timer(const ::itimerspec& ts) = 0;
};

// reactor backend using file-descriptor & epoll, suitable for running on
Expand All @@ -471,6 +472,8 @@ public:
class reactor_backend_epoll : public reactor_backend {
reactor* _r;
std::thread _task_quota_timer_thread;
timer_t _steady_clock_timer = {};
bool _timer_enabled = false;
private:
file_desc _epollfd;
future<> get_epoll_future(pollable_fd_state& fd,
Expand All @@ -480,7 +483,7 @@ private:
static void signal_received(int signo, siginfo_t* siginfo, void* ignore);
public:
explicit reactor_backend_epoll(reactor* r);
virtual ~reactor_backend_epoll() override { }
virtual ~reactor_backend_epoll() override;
virtual bool wait_and_process(int timeout, const sigset_t* active_sigmask) override;
virtual future<> readable(pollable_fd_state& fd) override;
virtual future<> writeable(pollable_fd_state& fd) override;
Expand All @@ -489,6 +492,7 @@ public:
virtual void handle_signal(int signo) override;
virtual void start_tick() override;
virtual void stop_tick() override;
virtual void arm_highres_timer(const ::itimerspec& ts) override;
};

#ifdef HAVE_OSV
Expand Down Expand Up @@ -760,7 +764,6 @@ private:
bool _handle_sigint = true;
promise<std::unique_ptr<network_stack>> _network_stack_ready_promise;
int _return = 0;
timer_t _steady_clock_timer = {};
file_desc _task_quota_timer;
promise<> _start_promise;
semaphore _cpu_started;
Expand Down Expand Up @@ -913,6 +916,7 @@ private:
uint64_t min_vruntime() const;
void request_preemption();
void reset_preemption_monitor();
void service_highres_timer();
public:
static boost::program_options::options_description get_options_description(std::chrono::duration<double> default_task_quota);
explicit reactor(unsigned id);
Expand Down
61 changes: 37 additions & 24 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,30 @@ wrap_syscall(int result, const Extra& extra) {
return syscall_result_extra<Extra>{result, errno, extra};
}

inline int alarm_signal() {
// We don't want to use SIGALRM, because the boost unit test library
// also plays with it.
return SIGRTMIN;
}

reactor_backend_epoll::reactor_backend_epoll(reactor* r)
: _r(r), _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC)) {
::epoll_event event;
event.events = EPOLLIN;
event.data.ptr = nullptr;
auto ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _r->_notify_eventfd.get(), &event);
throw_system_error_on(ret == -1);

struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD_ID;
sev._sigev_un._tid = syscall(SYS_gettid);
sev.sigev_signo = alarm_signal();
ret = timer_create(CLOCK_MONOTONIC, &sev, &_steady_clock_timer);
assert(ret >= 0);
}

reactor_backend_epoll::~reactor_backend_epoll() {
timer_delete(_steady_clock_timer);
}

void reactor_backend_epoll::start_tick() {
Expand All @@ -320,6 +337,17 @@ void reactor_backend_epoll::stop_tick() {
_task_quota_timer_thread.join();
}

void reactor_backend_epoll::arm_highres_timer(const ::itimerspec& its) {
auto ret = timer_settime(_steady_clock_timer, TIMER_ABSTIME, &its, NULL);
throw_system_error_on(ret == -1);
if (!_timer_enabled) {
_timer_enabled = true;
_r->_signals.handle_signal(alarm_signal(), [r = _r] {
r->service_highres_timer();
});
}
}

reactor::signals::signals() : _pending_signals(0) {
}

Expand Down Expand Up @@ -471,12 +499,6 @@ static void print_with_backtrace(const char* cause) noexcept {
print_with_backtrace(buf);
}

inline int alarm_signal() {
// We don't want to use SIGALRM, because the boost unit test library
// also plays with it.
return SIGRTMIN;
}

// Installs signal handler stack for current thread.
// The stack remains installed as long as the returned object is kept alive.
// When it goes out of scope the previous handler is restored.
Expand Down Expand Up @@ -599,12 +621,6 @@ reactor::reactor(unsigned id)
sigaddset(&mask, alarm_signal());
r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
assert(r == 0);
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD_ID;
sev._sigev_un._tid = syscall(SYS_gettid);
sev.sigev_signo = alarm_signal();
r = timer_create(CLOCK_MONOTONIC, &sev, &_steady_clock_timer);
assert(r >= 0);
sigemptyset(&mask);
sigaddset(&mask, cpu_stall_detector::signal_number());
r = ::pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
Expand All @@ -625,7 +641,6 @@ reactor::~reactor() {
assert(r == 0);

_backend.stop_tick();
timer_delete(_steady_clock_timer);
auto eraser = [](auto& list) {
while (!list.empty()) {
auto& timer = *list.begin();
Expand Down Expand Up @@ -2572,8 +2587,7 @@ void reactor::enable_timer(steady_clock_type::time_point when)
itimerspec its;
its.it_interval = {};
its.it_value = to_timespec(when);
auto ret = timer_settime(_steady_clock_timer, TIMER_ABSTIME, &its, NULL);
throw_system_error_on(ret == -1);
_backend.arm_highres_timer(its);
#else
using ns = std::chrono::nanoseconds;
WITH_LOCK(_timer_mutex) {
Expand Down Expand Up @@ -3257,6 +3271,14 @@ reactor::activate(task_queue& tq) {
_activating_task_queues.push_back(&tq);
}

void reactor::service_highres_timer() {
complete_timers(_timers, _expired_timers, [this] {
if (!_timers.empty()) {
enable_timer(_timers.get_next_timeout());
}
});
}

int reactor::run() {
auto signal_stack = install_signal_handler_stack();

Expand Down Expand Up @@ -3307,15 +3329,6 @@ int reactor::run() {
});

poller syscall_poller(std::make_unique<syscall_pollfn>(*this));
#ifndef HAVE_OSV
_signals.handle_signal(alarm_signal(), [this] {
complete_timers(_timers, _expired_timers, [this] {
if (!_timers.empty()) {
enable_timer(_timers.get_next_timeout());
}
});
});
#endif

poller drain_cross_cpu_freelist(std::make_unique<drain_cross_cpu_freelist_pollfn>());

Expand Down

0 comments on commit 5aea50a

Please sign in to comment.