Skip to content

Commit

Permalink
smp: make the smp service group semaphore a named semaphore
Browse files Browse the repository at this point in the history
In the next patch we will add a timeout to said semaphore, which means
that error messages generated by it will appear in the application logs.
To help make these error messages more useful, use the named exception
factory and generates names that contain the smp service group, the
initiator (client) and receiver (server) shards.
Another forward looking change contained in this patch is defining and
using an smp specific timeout clock.
  • Loading branch information
denesb committed Feb 6, 2020
1 parent cbdf479 commit 1e499d5
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
8 changes: 6 additions & 2 deletions include/seastar/core/smp.hh
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,13 @@ smp_service_group default_smp_service_group() {
return smp_service_group(0);
}

void init_default_smp_service_group();
using smp_timeout_clock = lowres_clock;
using smp_service_group_semaphore = basic_semaphore<named_semaphore_exception_factory, smp_timeout_clock>;
using smp_service_group_semaphore_units = semaphore_units<named_semaphore_exception_factory, smp_timeout_clock>;

semaphore& get_smp_service_groups_semaphore(unsigned ssg_id, shard_id t);
void init_default_smp_service_group(shard_id cpu);

smp_service_group_semaphore& get_smp_service_groups_semaphore(unsigned ssg_id, shard_id t);

class smp_message_queue {
static constexpr size_t queue_length = 128;
Expand Down
6 changes: 3 additions & 3 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3033,7 +3033,7 @@ void smp_message_queue::submit_item(shard_id t, std::unique_ptr<smp_message_queu
auto ssg_id = internal::smp_service_group_id(item->ssg);
auto& sem = get_smp_service_groups_semaphore(ssg_id, t);
// Future indirectly forwarded to `item`.
(void)get_units(sem, 1).then_wrapped([this, item = std::move(item)] (future<semaphore_units<>> units_fut) mutable {
(void)get_units(sem, 1).then_wrapped([this, item = std::move(item)] (future<smp_service_group_semaphore_units> units_fut) mutable {
if (units_fut.failed()) {
item->fail_with(units_fut.get_exception());
++_compl;
Expand Down Expand Up @@ -3878,7 +3878,7 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
}
auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
throw_pthread_error(r);
init_default_smp_service_group();
init_default_smp_service_group(i);
allocate_reactor(i, backend_selector, reactor_cfg);
_reactors[i] = &engine();
for (auto& dev_id : disk_config.device_ids()) {
Expand All @@ -3900,7 +3900,7 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
});
}

init_default_smp_service_group();
init_default_smp_service_group(0);
try {
allocate_reactor(0, backend_selector, reactor_cfg);
} catch (const std::exception& e) {
Expand Down
21 changes: 14 additions & 7 deletions src/core/smp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <seastar/core/reactor.hh>
#include <seastar/core/future.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/print.hh>
#include <boost/range/algorithm/find_if.hpp>
#include <vector>

Expand All @@ -33,27 +34,33 @@ void smp_message_queue::work_item::process() {
}

struct smp_service_group_impl {
std::vector<semaphore> clients; // one client per server shard
std::vector<smp_service_group_semaphore> clients; // one client per server shard
};

static semaphore smp_service_group_management_sem{1};
static smp_service_group_semaphore smp_service_group_management_sem{1, named_semaphore_exception_factory{"smp_service_group_management_sem"}};
static thread_local std::vector<smp_service_group_impl> smp_service_groups;

static named_semaphore_exception_factory make_service_group_semaphore_exception_factory(unsigned id, shard_id client_cpu, shard_id this_cpu) {
return named_semaphore_exception_factory{format("smp_service_group#{} {}->{} semaphore", id, client_cpu, this_cpu)};
}

future<smp_service_group> create_smp_service_group(smp_service_group_config ssgc) {
ssgc.max_nonlocal_requests = std::max(ssgc.max_nonlocal_requests, smp::count - 1);
return smp::submit_to(0, [ssgc] {
return with_semaphore(smp_service_group_management_sem, 1, [ssgc] {
auto it = boost::range::find_if(smp_service_groups, [&] (smp_service_group_impl& ssgi) { return ssgi.clients.empty(); });
size_t id = it - smp_service_groups.begin();
return smp::invoke_on_all([ssgc, id] {
return parallel_for_each(smp::all_cpus(), [ssgc, id] (unsigned cpu) {
return smp::submit_to(cpu, [ssgc, id, cpu] {
if (id >= smp_service_groups.size()) {
smp_service_groups.resize(id + 1); // may throw
}
smp_service_groups[id].clients.reserve(smp::count); // may throw
auto per_client = smp::count > 1 ? ssgc.max_nonlocal_requests / (smp::count - 1) : 0u;
for (unsigned i = 0; i != smp::count; ++i) {
smp_service_groups[id].clients.emplace_back(per_client);
smp_service_groups[id].clients.emplace_back(per_client, make_service_group_semaphore_exception_factory(id, i, cpu));
}
});
}).handle_exception([id] (std::exception_ptr e) {
// rollback
return smp::invoke_on_all([id] {
Expand Down Expand Up @@ -81,16 +88,16 @@ future<> destroy_smp_service_group(smp_service_group ssg) {
});
}

void init_default_smp_service_group() {
void init_default_smp_service_group(shard_id cpu) {
smp_service_groups.emplace_back();
auto& ssg0 = smp_service_groups.back();
ssg0.clients.reserve(smp::count);
for (unsigned i = 0; i != smp::count; ++i) {
ssg0.clients.emplace_back(semaphore::max_counter());
ssg0.clients.emplace_back(smp_service_group_semaphore::max_counter(), make_service_group_semaphore_exception_factory(0, i, cpu));
}
}

semaphore& get_smp_service_groups_semaphore(unsigned ssg_id, shard_id t) {
smp_service_group_semaphore& get_smp_service_groups_semaphore(unsigned ssg_id, shard_id t) {
return smp_service_groups[ssg_id].clients[t];
}

Expand Down

0 comments on commit 1e499d5

Please sign in to comment.