Skip to content

Commit

Permalink
smp: prepare infrastructure for a timeout
Browse files Browse the repository at this point in the history
Add a timeout parameter to `smp_message_queue::submit()` and propagate it
down to `smp::message_queue::submit_item()`, where it is passed to
`get_units()`.
No API function is accepting a timeout yet.
  • Loading branch information
denesb committed Feb 6, 2020
1 parent 1e499d5 commit 366c255
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
6 changes: 4 additions & 2 deletions include/seastar/core/smp.hh
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ using smp_timeout_clock = lowres_clock;
using smp_service_group_semaphore = basic_semaphore<named_semaphore_exception_factory, smp_timeout_clock>;
using smp_service_group_semaphore_units = semaphore_units<named_semaphore_exception_factory, smp_timeout_clock>;

static constexpr smp_timeout_clock::time_point smp_no_timeout = smp_timeout_clock::time_point::max();

void init_default_smp_service_group(shard_id cpu);

smp_service_group_semaphore& get_smp_service_groups_semaphore(unsigned ssg_id, shard_id t);
Expand Down Expand Up @@ -225,7 +227,7 @@ public:
futurize_t<std::result_of_t<Func()>> submit(shard_id t, smp_service_group ssg, Func&& func) {
auto wi = std::make_unique<async_work_item<Func>>(*this, ssg, std::forward<Func>(func));
auto fut = wi->get_future();
submit_item(t, std::move(wi));
submit_item(t, smp_no_timeout, std::move(wi));
return fut;
}
void start(unsigned cpuid);
Expand All @@ -236,7 +238,7 @@ public:
void stop();
private:
void work();
void submit_item(shard_id t, std::unique_ptr<work_item> wi);
void submit_item(shard_id t, smp_timeout_clock::time_point timeout, std::unique_ptr<work_item> wi);
void respond(work_item* wi);
void move_pending();
void flush_request_batch();
Expand Down
4 changes: 2 additions & 2 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3028,12 +3028,12 @@ bool smp_message_queue::pure_poll_tx() const {
return !const_cast<lf_queue&>(_completed).empty();
}

void smp_message_queue::submit_item(shard_id t, std::unique_ptr<smp_message_queue::work_item> item) {
void smp_message_queue::submit_item(shard_id t, smp_timeout_clock::time_point timeout, std::unique_ptr<smp_message_queue::work_item> item) {
// matching signal() in process_completions()
auto ssg_id = internal::smp_service_group_id(item->ssg);
auto& sem = get_smp_service_groups_semaphore(ssg_id, t);
// Future indirectly forwarded to `item`.
(void)get_units(sem, 1).then_wrapped([this, item = std::move(item)] (future<smp_service_group_semaphore_units> units_fut) mutable {
(void)get_units(sem, 1, timeout).then_wrapped([this, item = std::move(item)] (future<smp_service_group_semaphore_units> units_fut) mutable {
if (units_fut.failed()) {
item->fail_with(units_fut.get_exception());
++_compl;
Expand Down

0 comments on commit 366c255

Please sign in to comment.