Skip to content

Commit

Permalink
reactor: privatize thread_pool
Browse files Browse the repository at this point in the history
Move thread_pool to reactor.cc. Unfortunately, this means making it a unique_ptr,
because the reactor class is public.
  • Loading branch information
avikivity committed Jan 15, 2019
1 parent 0b3fd0d commit f1ec463
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 71 deletions.
43 changes: 1 addition & 42 deletions include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/scheduling.hh>
#include "internal/pollable_fd.hh"
#include "internal/syscall_work_queue.hh"

#ifdef HAVE_OSV
#include <osv/sched.hh>
Expand Down Expand Up @@ -249,46 +248,6 @@ private:
friend class smp;
};

class thread_pool {
reactor* _reactor;
uint64_t _aio_threaded_fallbacks = 0;
#ifndef HAVE_OSV
syscall_work_queue inter_thread_wq;
posix_thread _worker_thread;
std::atomic<bool> _stopped = { false };
std::atomic<bool> _main_thread_idle = { false };
public:
explicit thread_pool(reactor* r, sstring thread_name);
~thread_pool();
template <typename T, typename Func>
future<T> submit(Func func) {
++_aio_threaded_fallbacks;
return inter_thread_wq.submit<T>(std::move(func));
}
uint64_t operation_count() const { return _aio_threaded_fallbacks; }

unsigned complete() { return inter_thread_wq.complete(); }
// Before we enter interrupt mode, we must make sure that the syscall thread will properly
// generate signals to wake us up. This means we need to make sure that all modifications to
// the pending and completed fields in the inter_thread_wq are visible to all threads.
//
// Simple release-acquire won't do because we also need to serialize all writes that happens
// before the syscall thread loads this value, so we'll need full seq_cst.
void enter_interrupt_mode() { _main_thread_idle.store(true, std::memory_order_seq_cst); }
// When we exit interrupt mode, however, we can safely used relaxed order. If any reordering
// takes place, we'll get an extra signal and complete will be called one extra time, which is
// harmless.
void exit_interrupt_mode() { _main_thread_idle.store(false, std::memory_order_relaxed); }

#else
public:
template <typename T, typename Func>
future<T> submit(Func func) { std::cout << "thread_pool not yet implemented on osv\n"; abort(); }
#endif
private:
void work(sstring thread_name);
};

class reactor_backend_selector;

enum class open_flags {
Expand Down Expand Up @@ -672,7 +631,7 @@ private:
};

signals _signals;
thread_pool _thread_pool;
std::unique_ptr<thread_pool> _thread_pool;
friend class thread_pool;
friend class internal::cpu_stall_detector;

Expand Down
97 changes: 69 additions & 28 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <seastar/core/thread_cputime_clock.hh>
#include <seastar/util/log.hh>
#include "core/file-impl.hh"
#include "syscall_work_queue.hh"
#include <cassert>
#include <unistd.h>
#include <fcntl.h>
Expand Down Expand Up @@ -246,6 +247,46 @@ void lowres_clock_impl::update() {
counters::_system_now.store(system_count, std::memory_order_relaxed);
}

class thread_pool {
reactor* _reactor;
uint64_t _aio_threaded_fallbacks = 0;
#ifndef HAVE_OSV
syscall_work_queue inter_thread_wq;
posix_thread _worker_thread;
std::atomic<bool> _stopped = { false };
std::atomic<bool> _main_thread_idle = { false };
public:
explicit thread_pool(reactor* r, sstring thread_name);
~thread_pool();
template <typename T, typename Func>
future<T> submit(Func func) {
++_aio_threaded_fallbacks;
return inter_thread_wq.submit<T>(std::move(func));
}
uint64_t operation_count() const { return _aio_threaded_fallbacks; }

unsigned complete() { return inter_thread_wq.complete(); }
// Before we enter interrupt mode, we must make sure that the syscall thread will properly
// generate signals to wake us up. This means we need to make sure that all modifications to
// the pending and completed fields in the inter_thread_wq are visible to all threads.
//
// Simple release-acquire won't do because we also need to serialize all writes that happens
// before the syscall thread loads this value, so we'll need full seq_cst.
void enter_interrupt_mode() { _main_thread_idle.store(true, std::memory_order_seq_cst); }
// When we exit interrupt mode, however, we can safely used relaxed order. If any reordering
// takes place, we'll get an extra signal and complete will be called one extra time, which is
// harmless.
void exit_interrupt_mode() { _main_thread_idle.store(false, std::memory_order_relaxed); }

#else
public:
template <typename T, typename Func>
future<T> submit(Func func) { std::cout << "thread_pool not yet implemented on osv\n"; abort(); }
#endif
private:
void work(sstring thread_name);
};

template <typename T>
struct syscall_result {
T result;
Expand Down Expand Up @@ -1000,7 +1041,7 @@ reactor::reactor(unsigned id, reactor_backend_selector rbs)
, _cpu_stall_detector(std::make_unique<cpu_stall_detector>(this))
, _io_context(0)
, _reuseport(posix_reuseport_detect())
, _thread_pool(this, seastar::format("syscall-{}", id)) {
, _thread_pool(std::make_unique<thread_pool>(this, seastar::format("syscall-{}", id))) {
_task_queues.push_back(std::make_unique<task_queue>(0, "main", 1000));
_task_queues.push_back(std::make_unique<task_queue>(1, "atexit", 1000));
_at_destroy_tasks = _task_queues.back().get();
Expand Down Expand Up @@ -1672,7 +1713,7 @@ reactor::flush_pending_aio() {
}
if (!_pending_aio_retry.empty()) {
auto retries = std::exchange(_pending_aio_retry, {});
_thread_pool.submit<syscall_result<int>>([this, retries] () mutable {
_thread_pool->submit<syscall_result<int>>([this, retries] () mutable {
auto r = io_submit(_io_context, retries.size(), retries.data());
return wrap_syscall<int>(r);
}).then([this, retries] (syscall_result<int> result) {
Expand Down Expand Up @@ -2519,7 +2560,7 @@ file::file(int fd, file_open_options options)
future<file>
reactor::open_file_dma(sstring name, open_flags flags, file_open_options options) {
static constexpr mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; // 0644
return _thread_pool.submit<syscall_result<int>>([name, flags, options, strict_o_direct = _strict_o_direct] {
return _thread_pool->submit<syscall_result<int>>([name, flags, options, strict_o_direct = _strict_o_direct] {
// We want O_DIRECT, except in two cases:
// - tmpfs (which doesn't support it, but works fine anyway)
// - strict_o_direct == false (where we forgive it being not supported)
Expand Down Expand Up @@ -2563,7 +2604,7 @@ reactor::open_file_dma(sstring name, open_flags flags, file_open_options options

future<>
reactor::remove_file(sstring pathname) {
return engine()._thread_pool.submit<syscall_result<int>>([pathname] {
return engine()._thread_pool->submit<syscall_result<int>>([pathname] {
return wrap_syscall<int>(::remove(pathname.c_str()));
}).then([pathname] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("remove failed", pathname);
Expand All @@ -2573,7 +2614,7 @@ reactor::remove_file(sstring pathname) {

future<>
reactor::rename_file(sstring old_pathname, sstring new_pathname) {
return engine()._thread_pool.submit<syscall_result<int>>([old_pathname, new_pathname] {
return engine()._thread_pool->submit<syscall_result<int>>([old_pathname, new_pathname] {
return wrap_syscall<int>(::rename(old_pathname.c_str(), new_pathname.c_str()));
}).then([old_pathname, new_pathname] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("rename failed", old_pathname, new_pathname);
Expand All @@ -2583,7 +2624,7 @@ reactor::rename_file(sstring old_pathname, sstring new_pathname) {

future<>
reactor::link_file(sstring oldpath, sstring newpath) {
return engine()._thread_pool.submit<syscall_result<int>>([oldpath, newpath] {
return engine()._thread_pool->submit<syscall_result<int>>([oldpath, newpath] {
return wrap_syscall<int>(::link(oldpath.c_str(), newpath.c_str()));
}).then([oldpath, newpath] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("link failed", oldpath, newpath);
Expand Down Expand Up @@ -2613,7 +2654,7 @@ directory_entry_type stat_to_entry_type(__mode_t type) {

future<compat::optional<directory_entry_type>>
reactor::file_type(sstring name) {
return _thread_pool.submit<syscall_result_extra<struct stat>>([name] {
return _thread_pool->submit<syscall_result_extra<struct stat>>([name] {
struct stat st;
auto ret = stat(name.c_str(), &st);
return wrap_syscall(ret, st);
Expand All @@ -2632,7 +2673,7 @@ reactor::file_type(sstring name) {

future<uint64_t>
reactor::file_size(sstring pathname) {
return _thread_pool.submit<syscall_result_extra<struct stat>>([pathname] {
return _thread_pool->submit<syscall_result_extra<struct stat>>([pathname] {
struct stat st;
auto ret = stat(pathname.c_str(), &st);
return wrap_syscall(ret, st);
Expand All @@ -2644,7 +2685,7 @@ reactor::file_size(sstring pathname) {

future<bool>
reactor::file_exists(sstring pathname) {
return _thread_pool.submit<syscall_result_extra<struct stat>>([pathname] {
return _thread_pool->submit<syscall_result_extra<struct stat>>([pathname] {
struct stat st;
auto ret = stat(pathname.c_str(), &st);
return wrap_syscall(ret, st);
Expand All @@ -2659,7 +2700,7 @@ reactor::file_exists(sstring pathname) {

future<fs_type>
reactor::file_system_at(sstring pathname) {
return _thread_pool.submit<syscall_result_extra<struct statfs>>([pathname] {
return _thread_pool->submit<syscall_result_extra<struct statfs>>([pathname] {
struct statfs st;
auto ret = statfs(pathname.c_str(), &st);
return wrap_syscall(ret, st);
Expand All @@ -2685,7 +2726,7 @@ reactor::file_system_at(sstring pathname) {

future<struct statvfs>
reactor::statvfs(sstring pathname) {
return _thread_pool.submit<syscall_result_extra<struct statvfs>>([pathname] {
return _thread_pool->submit<syscall_result_extra<struct statvfs>>([pathname] {
struct statvfs st;
auto ret = ::statvfs(pathname.c_str(), &st);
return wrap_syscall(ret, st);
Expand All @@ -2698,7 +2739,7 @@ reactor::statvfs(sstring pathname) {

future<file>
reactor::open_directory(sstring name) {
return _thread_pool.submit<syscall_result<int>>([name] {
return _thread_pool->submit<syscall_result<int>>([name] {
return wrap_syscall<int>(::open(name.c_str(), O_DIRECTORY | O_CLOEXEC | O_RDONLY));
}).then([name] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("open failed", name);
Expand All @@ -2708,7 +2749,7 @@ reactor::open_directory(sstring name) {

future<>
reactor::make_directory(sstring name) {
return _thread_pool.submit<syscall_result<int>>([name] {
return _thread_pool->submit<syscall_result<int>>([name] {
return wrap_syscall<int>(::mkdir(name.c_str(), S_IRWXU));
}).then([name] (syscall_result<int> sr) {
sr.throw_fs_exception_if_error("mkdir failed", name);
Expand All @@ -2717,7 +2758,7 @@ reactor::make_directory(sstring name) {

future<>
reactor::touch_directory(sstring name) {
return engine()._thread_pool.submit<syscall_result<int>>([name] {
return engine()._thread_pool->submit<syscall_result<int>>([name] {
return wrap_syscall<int>(::mkdir(name.c_str(), S_IRWXU));
}).then([name] (syscall_result<int> sr) {
if (sr.error != EEXIST) {
Expand Down Expand Up @@ -2808,7 +2849,7 @@ posix_file_impl::flush(void) {
if (engine()._bypass_fsync) {
return make_ready_future<>();
}
return engine()._thread_pool.submit<syscall_result<int>>([this] {
return engine()._thread_pool->submit<syscall_result<int>>([this] {
return wrap_syscall<int>(::fdatasync(_fd));
}).then([] (syscall_result<int> sr) {
sr.throw_if_error();
Expand All @@ -2818,7 +2859,7 @@ posix_file_impl::flush(void) {

future<struct stat>
posix_file_impl::stat(void) {
return engine()._thread_pool.submit<syscall_result_extra<struct stat>>([this] {
return engine()._thread_pool->submit<syscall_result_extra<struct stat>>([this] {
struct stat st;
auto ret = ::fstat(_fd, &st);
return wrap_syscall(ret, st);
Expand All @@ -2830,7 +2871,7 @@ posix_file_impl::stat(void) {

future<>
posix_file_impl::truncate(uint64_t length) {
return engine()._thread_pool.submit<syscall_result<int>>([this, length] {
return engine()._thread_pool->submit<syscall_result<int>>([this, length] {
return wrap_syscall<int>(::ftruncate(_fd, length));
}).then([] (syscall_result<int> sr) {
sr.throw_if_error();
Expand All @@ -2849,7 +2890,7 @@ blockdev_file_impl::truncate(uint64_t length) {

future<>
posix_file_impl::discard(uint64_t offset, uint64_t length) {
return engine()._thread_pool.submit<syscall_result<int>>([this, offset, length] () mutable {
return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable {
return wrap_syscall<int>(::fallocate(_fd, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE,
offset, length));
}).then([] (syscall_result<int> sr) {
Expand All @@ -2866,7 +2907,7 @@ posix_file_impl::allocate(uint64_t position, uint64_t length) {
if (!supported) {
return make_ready_future<>();
}
return engine()._thread_pool.submit<syscall_result<int>>([this, position, length] () mutable {
return engine()._thread_pool->submit<syscall_result<int>>([this, position, length] () mutable {
auto ret = ::fallocate(_fd, FALLOC_FL_ZERO_RANGE|FALLOC_FL_KEEP_SIZE, position, length);
if (ret == -1 && errno == EOPNOTSUPP) {
ret = 0;
Expand All @@ -2884,7 +2925,7 @@ posix_file_impl::allocate(uint64_t position, uint64_t length) {

future<>
blockdev_file_impl::discard(uint64_t offset, uint64_t length) {
return engine()._thread_pool.submit<syscall_result<int>>([this, offset, length] () mutable {
return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable {
uint64_t range[2] { offset, length };
return wrap_syscall<int>(::ioctl(_fd, BLKDISCARD, &range));
}).then([] (syscall_result<int> sr) {
Expand Down Expand Up @@ -2924,7 +2965,7 @@ posix_file_impl::close() noexcept {
_refcount = nullptr;
auto closed = [fd] () noexcept {
try {
return engine()._thread_pool.submit<syscall_result<int>>([fd] {
return engine()._thread_pool->submit<syscall_result<int>>([fd] {
return wrap_syscall<int>(::close(fd));
});
} catch (...) {
Expand All @@ -2939,7 +2980,7 @@ posix_file_impl::close() noexcept {

future<uint64_t>
blockdev_file_impl::size(void) {
return engine()._thread_pool.submit<syscall_result_extra<size_t>>([this] {
return engine()._thread_pool->submit<syscall_result_extra<size_t>>([this] {
uint64_t size;
int ret = ::ioctl(_fd, BLKGETSIZE64, &size);
return wrap_syscall(ret, size);
Expand Down Expand Up @@ -2981,7 +3022,7 @@ posix_file_impl::list_directory(std::function<future<> (directory_entry de)> nex
auto eofcond = [w] { return w->eof; };
return do_until(eofcond, [w, this] {
if (w->current == w->total) {
return engine()._thread_pool.submit<syscall_result<long>>([w , this] () {
return engine()._thread_pool->submit<syscall_result<long>>([w , this] () {
auto ret = ::syscall(__NR_getdents64, _fd, reinterpret_cast<linux_dirent64*>(w->buffer), buffer_size);
return wrap_syscall(ret);
}).then([w] (syscall_result<long> ret) {
Expand Down Expand Up @@ -3191,7 +3232,7 @@ void reactor::register_metrics() {
// total_operations value:DERIVE:0:U
sm::make_derive("fsyncs", _fsyncs, sm::description("Total number of fsync operations")),
// total_operations value:DERIVE:0:U
sm::make_derive("io_threaded_fallbacks", std::bind(&thread_pool::operation_count, &_thread_pool),
sm::make_derive("io_threaded_fallbacks", std::bind(&thread_pool::operation_count, _thread_pool.get()),
sm::description("Total number of io-threaded-fallbacks operations")),

});
Expand Down Expand Up @@ -3561,22 +3602,22 @@ class reactor::syscall_pollfn final : public reactor::pollfn {
public:
syscall_pollfn(reactor& r) : _r(r) {}
virtual bool poll() final override {
return _r._thread_pool.complete();
return _r._thread_pool->complete();
}
virtual bool pure_poll() override final {
return poll(); // actually performs work, but triggers no user continuations, so okay
}
virtual bool try_enter_interrupt_mode() override {
_r._thread_pool.enter_interrupt_mode();
_r._thread_pool->enter_interrupt_mode();
if (poll()) {
// raced
_r._thread_pool.exit_interrupt_mode();
_r._thread_pool->exit_interrupt_mode();
return false;
}
return true;
}
virtual void exit_interrupt_mode() override final {
_r._thread_pool.exit_interrupt_mode();
_r._thread_pool->exit_interrupt_mode();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#pragma once

#include "pollable_fd.hh"
#include <seastar/core/internal/pollable_fd.hh>
#include <seastar/core/future.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/util/std-compat.hh>
Expand Down

0 comments on commit f1ec463

Please sign in to comment.