Skip to content

Commit

Permalink
rpc: Keep dummy frame in the outgoing queue until negotiated
Browse files Browse the repository at this point in the history
Front entry in the queue is uncancellable. That's not perfect, but is
something we have to live with. However, there's one corner case when
this can be fixed -- during the connect+negotiate phase cancelling or
timeouting the packet can work. In fact it should work, because it used
to work before c437920 ("Replace RPC outgoing queue with continuation
chain", so it's actually the regression.

To make it happen client queues a dummy message in front of the outgoing
queue queue, so that any other subsequent sending would happen to be at
least the 2nd one which can be withdrawn from the queue. Once negotiated
the dummy entry goes away and does't interfere with the packets flow,
neither the front entry stays cancellable.

fixes: scylladb#1446

Signed-off-by: Pavel Emelyanov <[email protected]>
  • Loading branch information
xemul committed Feb 13, 2023
1 parent aeb69ff commit df920c4
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
2 changes: 2 additions & 0 deletions include/seastar/rpc/rpc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ class client : public rpc::connection, public weakly_referencable<client> {
}
};
};

void enqueue_zero_frame();
public:
template<typename Reply, typename Func>
struct reply_handler final : reply_handler_base {
Expand Down
26 changes: 25 additions & 1 deletion src/rpc/rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ namespace rpc {
_outgoing_queue.push_back(d);
_outgoing_queue_size++;
auto deleter = [this, it = _outgoing_queue.iterator_to(d)] {
// Front entry is most likely (unless _negotiated is unresolved) sitting
// Front entry is most likely (unless _negotiated is unresolved, check enqueue_zero_frame()) sitting
// inside send_entry() continuations and thus it cannot be cancelled.
if (it != _outgoing_queue.begin()) {
withdraw(it);
Expand Down Expand Up @@ -713,6 +713,29 @@ namespace rpc {
}
}

// This is the enlightened copy of the connection::send() method. Its intention is to
// keep a dummy entry in front of the queue while connect+negotiate is happenning so
// that all subsequent entries could abort on timeout or explicit cancellation.
void client::enqueue_zero_frame() {
if (_error) {
return;
}

auto p = std::make_unique<outgoing_entry>(snd_buf(0));
auto& d = *p;
_outgoing_queue.push_back(d);

// Make it in the background. Even if the client is stopped it will pick
// up all the entries hanging around
(void)std::exchange(_outgoing_queue_ready, d.done.get_future()).then_wrapped([p = std::move(p)] (auto f) mutable {
if (f.failed()) {
f.ignore_ready_future();
} else {
p->done.set_value();
}
});
}

client::client(const logger& l, void* s, client_options ops, socket socket, const socket_address& addr, const socket_address& local)
: rpc::connection(l, s), _socket(std::move(socket)), _server_addr(addr), _local_addr(local), _options(ops) {
_socket.set_reuseaddr(ops.reuseaddr);
Expand Down Expand Up @@ -811,6 +834,7 @@ namespace rpc {
_stopped.set_value();
});
});
enqueue_zero_frame();
}

client::client(const logger& l, void* s, const socket_address& addr, const socket_address& local)
Expand Down

0 comments on commit df920c4

Please sign in to comment.