Skip to content

Commit

Permalink
Slot migration cancel crash fix (dragonflydb#2934)
Browse files Browse the repository at this point in the history
fix(cluster): crash dragonflydb#2928
  • Loading branch information
BorysTheDev authored Apr 19, 2024
1 parent c42b3dc commit 7666aae
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 39 deletions.
23 changes: 13 additions & 10 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)

lock_guard gu(set_config_mu);

lock_guard config_update_lk(
config_update_mu_); // to prevent simultaneous update config from outgoing migration
// TODO we shouldn't provide cntx into StartSlotMigrations
if (!StartSlotMigrations(new_config->GetNewOutgoingMigrations(tl_cluster_config), cntx)) {
return cntx->SendError("Can't start the migration");
Expand Down Expand Up @@ -706,19 +708,17 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
}
}

IncomingSlotMigration* ClusterFamily::CreateIncomingMigration(std::string source_id,
SlotRanges slots,
uint32_t shards_num) {
std::shared_ptr<IncomingSlotMigration> ClusterFamily::CreateIncomingMigration(std::string source_id,
SlotRanges slots,
uint32_t shards_num) {
lock_guard lk(migration_mu_);
for (const auto& mj : incoming_migrations_jobs_) {
if (mj->GetSourceID() == source_id) {
return nullptr;
}
}
return incoming_migrations_jobs_
.emplace_back(make_shared<IncomingSlotMigration>(
std::move(source_id), &server_family_->service(), std::move(slots), shards_num))
.get();
return incoming_migrations_jobs_.emplace_back(make_shared<IncomingSlotMigration>(
std::move(source_id), &server_family_->service(), std::move(slots), shards_num));
}

std::shared_ptr<IncomingSlotMigration> ClusterFamily::GetIncomingMigration(
Expand All @@ -742,7 +742,7 @@ void ClusterFamily::RemoveOutgoingMigrations(const std::vector<MigrationInfo>& m
OutgoingMigration& migration = *it->get();
LOG(INFO) << "Outgoing migration cancelled: slots " << SlotRange::ToString(migration.GetSlots())
<< " to " << migration.GetHostIp() << ":" << migration.GetPort();
migration.Cancel();
migration.Finish();
outgoing_migration_jobs_.erase(it);
}

Expand Down Expand Up @@ -833,8 +833,10 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
cntx->conn()->SetName(absl::StrCat("migration_flow_", source_id));

auto migration = GetIncomingMigration(source_id);
if (!migration)
if (!migration) {
// TODO process error when migration is canceled
return cntx->SendError(kIdNotFound);
}

DCHECK(cntx->sync_dispatch);
// we do this to be ignored by the dispatch tracker
Expand All @@ -847,7 +849,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
}

void ClusterFamily::UpdateConfig(const std::vector<SlotRange>& slots, bool enable) {
lock_guard gu(set_config_mu);
lock_guard gu(config_update_mu_);

auto new_config = tl_cluster_config->CloneWithChanges(slots, enable);

Expand All @@ -870,6 +872,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
[source_id](const auto& m) { return m.node_id == source_id; });
if (m_it == in_migrations.end()) {
LOG(WARNING) << "migration isn't in config";
// TODO process error if migration was canceled
return cntx->SendLong(OutgoingMigration::kInvalidAttempt);
}

Expand Down
7 changes: 5 additions & 2 deletions src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ class ClusterFamily {
void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx);

// create a IncomingSlotMigration entity which will execute migration
IncomingSlotMigration* CreateIncomingMigration(std::string source_id, SlotRanges slots,
uint32_t shards_num);
std::shared_ptr<IncomingSlotMigration> CreateIncomingMigration(std::string source_id,
SlotRanges slots,
uint32_t shards_num);

std::shared_ptr<IncomingSlotMigration> GetIncomingMigration(std::string_view source_id);

Expand All @@ -97,6 +98,8 @@ class ClusterFamily {
private:
ClusterShardInfo GetEmulatedShardInfo(ConnectionContext* cntx) const;

mutable util::fb2::Mutex config_update_mu_;

std::string id_;

ServerFamily* server_family_ = nullptr;
Expand Down
25 changes: 8 additions & 17 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,12 @@ OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();
}

void OutgoingMigration::Cancel() {
state_.store(MigrationState::C_CANCELLED);

auto start_cb = [this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
void OutgoingMigration::Finish() {
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard)
slot_migrations_[shard->shard_id()]->Cancel();
}
};
shard_set->pool()->AwaitFiberOnAll(std::move(start_cb));
});
state_.store(MigrationState::C_FINISHED);
}

MigrationState OutgoingMigration::GetState() const {
Expand All @@ -108,8 +105,6 @@ void OutgoingMigration::SyncFb() {
}
};

state_.store(MigrationState::C_SYNC);

shard_set->pool()->AwaitFiberOnAll(std::move(start_cb));

for (auto& migration : slot_migrations_) {
Expand All @@ -121,13 +116,13 @@ void OutgoingMigration::SyncFb() {
// TODO implement blocking on migrated slots only

long attempt = 0;
while (state_.load() != MigrationState::C_CANCELLED && !FinishMigration(++attempt)) {
while (state_.load() != MigrationState::C_FINISHED && !FinalyzeMigration(++attempt)) {
// process commands that were on pause and try again
ThisFiber::SleepFor(500ms);
}
}

bool OutgoingMigration::FinishMigration(long attempt) {
bool OutgoingMigration::FinalyzeMigration(long attempt) {
bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
auto pause_fb_opt = Pause(server_family_->GetNonPriviligedListeners(), nullptr,
Expand Down Expand Up @@ -181,12 +176,8 @@ bool OutgoingMigration::FinishMigration(long attempt) {
}
} while (attempt_res != attempt);

shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard)
slot_migrations_[shard->shard_id()]->Cancel();
});
Finish();

state_.store(MigrationState::C_FINISHED);
cf_->UpdateConfig(migration_info_.slot_ranges, false);
VLOG(1) << "Config is updated for " << cf_->MyID();
return true;
Expand Down
15 changes: 8 additions & 7 deletions src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ class OutgoingMigration : private ProtocolClient {
// start migration process, sends INIT command to the target node
std::error_code Start(ConnectionContext* cntx);

// should be run for all shards
void StartFlow(journal::Journal* journal, io::Sink* dest);

void Cancel();
// mark migration as FINISHED and cancel migration if it's not finished yet
void Finish();

MigrationState GetState() const;

Expand All @@ -54,18 +52,21 @@ class OutgoingMigration : private ProtocolClient {
static constexpr long kInvalidAttempt = -1;

private:
// should be run for all shards
void StartFlow(journal::Journal* journal, io::Sink* dest);

MigrationState GetStateImpl() const;
// SliceSlotMigration manages state and data transfering for the corresponding shard
class SliceSlotMigration;

void SyncFb();
bool FinishMigration(long attempt);
bool FinalyzeMigration(long attempt);

private:
MigrationInfo migration_info_;
Context cntx_;
mutable util::fb2::Mutex flows_mu_;
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_ ABSL_GUARDED_BY(flows_mu_);
mutable util::fb2::Mutex finish_mu_;
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_;
ServerFamily* server_family_;
ClusterFamily* cf_;

Expand Down
7 changes: 4 additions & 3 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,12 @@ RestoreStreamer::~RestoreStreamer() {
}

void RestoreStreamer::Cancel() {
if (snapshot_version_ != 0) {
auto sver = snapshot_version_;
snapshot_version_ = 0; // to prevent double cancel in another fiber
if (sver != 0) {
fiber_cancellation_.Cancel();
db_slice_->UnregisterOnChange(snapshot_version_);
db_slice_->UnregisterOnChange(sver);
JournalStreamer::Cancel();
snapshot_version_ = 0;
}
}

Expand Down

0 comments on commit 7666aae

Please sign in to comment.