Skip to content

Commit

Permalink
fix(server): Do not block admin-port commands (dragonflydb#2842)
Browse files Browse the repository at this point in the history
There are a few use cases which cause a temporary block of connections:
* `CLIENT PAUSE` command
* replica takeover
* cluster config / migration

Before this PR, these commands interfered with replication / migration
connections, which could cause delays and even deadlocks.

We do not want such internal connections to ever be blocked, and it's ok
to assume they won't issue regular "data" commands. As such, this PR
disables blocking any commands issued via an admin-port, and once merged
we'll recommend issuing replication and cluster migration via the admin
port.

Fixes dragonflydb#2703
  • Loading branch information
chakaz authored Apr 7, 2024
1 parent 2cf7f21 commit 10ebe93
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 13 deletions.
4 changes: 2 additions & 2 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
SlotSet before = tl_cluster_config ? tl_cluster_config->GetOwnedSlots() : SlotSet(true);

// Ignore blocked commands because we filter them with CancelBlockingOnThread
DispatchTracker tracker{server_family_->GetListeners(), cntx->conn(), false /* ignore paused */,
true /* ignore blocked */};
DispatchTracker tracker{server_family_->GetNonPriviligedListeners(), cntx->conn(),
false /* ignore paused */, true /* ignore blocked */};

auto blocking_filter = [&new_config](ArgSlice keys) {
bool moved = any_of(keys.begin(), keys.end(), [&](auto k) { return !new_config->IsMySlot(k); });
Expand Down
4 changes: 2 additions & 2 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ void OutgoingMigration::SyncFb() {

bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
auto pause_fb_opt =
Pause(server_family_->GetListeners(), nullptr, ClientPause::WRITE, is_pause_in_progress);
auto pause_fb_opt = Pause(server_family_->GetNonPriviligedListeners(), nullptr,
ClientPause::WRITE, is_pause_in_progress);

if (!pause_fb_opt) {
LOG(WARNING) << "Cluster migration finalization time out";
Expand Down
2 changes: 1 addition & 1 deletion src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {

// We need to await for all dispatches to finish: Otherwise a transaction might be scheduled
// after this function exits but before the actual shutdown.
facade::DispatchTracker tracker{sf_->GetListeners(), cntx->conn()};
facade::DispatchTracker tracker{sf_->GetNonPriviligedListeners(), cntx->conn()};
shard_set->pool()->Await([&](unsigned index, auto* pb) {
sf_->CancelBlockingOnThread();
tracker.TrackOnThread();
Expand Down
3 changes: 2 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
<< args << " in dbid=" << dfly_cntx->conn_state.db_index;
}

if (!dispatching_in_multi) { // Don't interrupt running multi commands
// Don't interrupt running multi commands or admin connections.
if (!dispatching_in_multi && (!cntx->conn() || !cntx->conn()->IsPrivileged())) {
bool is_write = cid->IsWriteOnly();
is_write |= cid->name() == "PUBLISH" || cid->name() == "EVAL" || cid->name() == "EVALSHA";
is_write |= cid->name() == "EXEC" && dfly_cntx->conn_state.exec_info.is_write;
Expand Down
22 changes: 16 additions & 6 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,7 @@ void ClientList(CmdArgList args, absl::Span<facade::Listener*> listeners, Connec
return rb->SendVerbatimString(result);
}

void ClientPauseCmd(CmdArgList args, absl::Span<facade::Listener*> listeners,
ConnectionContext* cntx) {
void ClientPauseCmd(CmdArgList args, vector<facade::Listener*> listeners, ConnectionContext* cntx) {
CmdArgParser parser(args);

auto timeout = parser.Next<uint64_t>();
Expand Down Expand Up @@ -604,15 +603,15 @@ optional<ReplicaOfArgs> ReplicaOfArgs::FromCmdArgs(CmdArgList args, ConnectionCo

} // namespace

std::optional<fb2::Fiber> Pause(absl::Span<facade::Listener* const> listeners,
facade::Connection* conn, ClientPause pause_state,
std::optional<fb2::Fiber> Pause(std::vector<facade::Listener*> listeners, facade::Connection* conn,
ClientPause pause_state,
std::function<bool()> is_pause_in_progress) {
// Track connections and set pause state to be able to wait untill all running transactions read
// the new pause state. Exlude already paused commands from the busy count. Exlude tracking
// blocked connections because: a) If the connection is blocked it is puased. b) We read pause
// state after waking from blocking so if the trasaction was waken by another running
// command that did not pause on the new state yet we will pause after waking up.
DispatchTracker tracker{listeners, conn, true /* ignore paused commands */,
DispatchTracker tracker{std::move(listeners), conn, true /* ignore paused commands */,
true /*ignore blocking*/};
shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) {
// Commands don't suspend before checking the pause state, so
Expand Down Expand Up @@ -1262,6 +1261,17 @@ std::optional<ReplicaOffsetInfo> ServerFamily::GetReplicaOffsetInfo() {
return nullopt;
}

vector<facade::Listener*> ServerFamily::GetNonPriviligedListeners() const {
std::vector<facade::Listener*> listeners;
listeners.reserve(listeners.size());
for (facade::Listener* listener : listeners_) {
if (!listener->IsPrivilegedInterface()) {
listeners.push_back(listener);
}
}
return listeners;
}

bool ServerFamily::HasReplica() const {
unique_lock lk(replicaof_mu_);
return replica_ != nullptr;
Expand Down Expand Up @@ -1565,7 +1575,7 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
} else if (sub_cmd == "LIST") {
return ClientList(sub_args, absl::MakeSpan(listeners_), cntx);
} else if (sub_cmd == "PAUSE") {
return ClientPauseCmd(sub_args, absl::MakeSpan(listeners_), cntx);
return ClientPauseCmd(sub_args, GetNonPriviligedListeners(), cntx);
} else if (sub_cmd == "TRACKING") {
return ClientTracking(sub_args, cntx);
} else if (sub_cmd == "KILL") {
Expand Down
4 changes: 3 additions & 1 deletion src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ class ServerFamily {
return listeners_;
}

std::vector<facade::Listener*> GetNonPriviligedListeners() const;

bool HasReplica() const;
std::optional<Replica::Info> GetReplicaInfo() const;

Expand Down Expand Up @@ -325,7 +327,7 @@ class ServerFamily {
};

// Reusable CLIENT PAUSE implementation that blocks while polling is_pause_in_progress
std::optional<util::fb2::Fiber> Pause(absl::Span<facade::Listener* const> listeners,
std::optional<util::fb2::Fiber> Pause(std::vector<facade::Listener*> listeners,
facade::Connection* conn, ClientPause pause_state,
std::function<bool()> is_pause_in_progress);

Expand Down

0 comments on commit 10ebe93

Please sign in to comment.