Skip to content

Commit

Permalink
core: reactor: Fix use-after-free during reactor shutdown
Browse files Browse the repository at this point in the history
Currenlty only reactor at shard 0 waits for other reactors to finish
processing before exiting. It may happen that non-main reactor exits
while there are still some tasks attempted to be submitted to it. This
result in use-after-free on remote->_sleeping in
smp_message_queue::lf_queue::maybe_wakeup()

Fix by ensuring that reactor threads exit when all reactors stopped
processing their tasks.
  • Loading branch information
tgrabiec committed Feb 16, 2016
1 parent 0f759f0 commit c20d260
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
10 changes: 10 additions & 0 deletions core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,7 @@ int reactor::run() {
while (!_at_destroy_tasks.empty()) {
run_tasks(_at_destroy_tasks);
}
smp::arrive_at_event_loop_end();
if (_id == 0) {
smp::join_all();
}
Expand Down Expand Up @@ -2291,6 +2292,7 @@ smp::get_options_description()
}

std::vector<smp::thread_adaptor> smp::_threads;
std::experimental::optional<boost::barrier> smp::_all_event_loops_done;
std::vector<reactor*> smp::_reactors;
smp_message_queue** smp::_qs;
std::thread::id smp::_tmain;
Expand Down Expand Up @@ -2333,6 +2335,12 @@ void smp::pin(unsigned cpu_id) {
}
#endif

void smp::arrive_at_event_loop_end() {
if (_all_event_loops_done) {
_all_event_loops_done->wait();
}
}

void smp::allocate_reactor() {
struct reactor_deleter {
void operator()(reactor* p) {
Expand Down Expand Up @@ -2481,6 +2489,8 @@ void smp::configure(boost::program_options::variables_map configuration)
engine()._io_coordinator = all_io_queues[queue_idx]->coordinator();
};

_all_event_loops_done.emplace(smp::count);

unsigned i;
for (i = 1; i < smp::count; i++) {
auto allocation = allocations[i];
Expand Down
3 changes: 3 additions & 0 deletions core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/optional.hpp>
#include <boost/program_options.hpp>
#include <boost/thread/barrier.hpp>
#include <set>
#include "util/eclipse.hh"
#include "future.hh"
Expand Down Expand Up @@ -931,6 +932,7 @@ class smp {
using thread_adaptor = posix_thread;
#endif
static std::vector<thread_adaptor> _threads;
static std::experimental::optional<boost::barrier> _all_event_loops_done;
static std::vector<reactor*> _reactors;
static smp_message_queue** _qs;
static std::thread::id _tmain;
Expand All @@ -943,6 +945,7 @@ public:
static boost::program_options::options_description get_options_description();
static void configure(boost::program_options::variables_map vm);
static void cleanup();
static void arrive_at_event_loop_end();
static void join_all();
static bool main_thread() { return std::this_thread::get_id() == _tmain; }

Expand Down

0 comments on commit c20d260

Please sign in to comment.