Skip to content

Commit

Permalink
mds/quiesce-db: optimize peer updates
Browse files Browse the repository at this point in the history
Prevent sending of the same version to the same peer more than once a second

Signed-off-by: Leonid Usov <[email protected]>
  • Loading branch information
leonid-s-usov committed May 13, 2024
1 parent 379ef71 commit eebf597
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 21 deletions.
71 changes: 51 additions & 20 deletions src/mds/QuiesceDbManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ void QuiesceDbManager::leader_record_ack(QuiesceInterface::PeerId from, QuiesceM
dout(15) << "my epoch is behind, ignoring this until my membership is updated" << dendl;
} else {
dout(5) << "will send the peer a full DB" << dendl;
info.diff_map.clear();
info.clear();
}
} else {
dout(20) << "ack " << diff_map << " from peer " << from << dendl;
Expand Down Expand Up @@ -827,48 +827,79 @@ int QuiesceDbManager::leader_update_set(Db::Sets::value_type& set_it, const Quie

QuiesceTimeInterval QuiesceDbManager::leader_upkeep_db()
{
std::map<QuiesceInterface::PeerId, std::deque<std::reference_wrapper<Db::Sets::value_type>>> peer_updates;

QuiesceTimeInterval next_event_at_age = QuiesceTimeInterval::max();
QuiesceSetVersion max_set_version = db.set_version;

struct PeerUpdate {
QuiesceInterface::PeerId peer;
PeerInfo& info;
std::deque<std::reference_wrapper<Db::Sets::value_type>> set_refs;
PeerUpdate(QuiesceInterface::PeerId peer, PeerInfo& info)
: peer(peer)
, info(info)
{}

QuiesceSetVersion known_set_version() const
{
return info.diff_map.db_version.set_version;
}
};

// populate peer_updates with peers except me
std::vector<PeerUpdate> peer_updates;
for (auto& [peer, info]: peers) {
// no need to replicate to myself
if (peer != membership.me) {
peer_updates.emplace_back(peer, info);
}
}

for(auto & set_it: db.sets) {
auto & [set_id, set] = set_it;
auto next_set_event_at_age = leader_upkeep_set(set_it);

max_set_version = std::max(max_set_version, set.version);
next_event_at_age = std::min(next_event_at_age, next_set_event_at_age);

for(auto const & [peer, info]: peers) {
for(auto & peer_update: peer_updates) {
// update remote peers if their version is lower than this set's
// don't update myself
if (peer == membership.me) {
continue;
}
if (info.diff_map.db_version.set_version < set.version) {
peer_updates[peer].emplace_back(set_it);
if (peer_update.known_set_version() < set.version) {
peer_update.set_refs.emplace_back(set_it);
}
}
}

db.set_version = max_set_version;

// update the peers
for (auto &[peer, sets]: peer_updates) {
QuiesceDbListing update;
update.db_age = db.get_age();
update.db_version = db.version();
std::ranges::copy(sets, std::inserter(update.sets, update.sets.end()));
const auto now = QuiesceClock::now();
static const QuiesceTimeInterval PEER_REPEATED_UPDATE_INTERVAL = std::chrono::seconds(1);
for (auto const & peer_update: peer_updates) {
if (peer_update.info.last_sent_version == db.version()) {
if (now < (peer_update.info.last_activity + PEER_REPEATED_UPDATE_INTERVAL)) {
// don't spam the peer with the same version
continue;
}
dout(5) << "repeated update of the peer " << peer_update.peer << " with version " << db.version() << dendl;
}

QuiesceDbListing listing;
listing.db_age = db.get_age();
listing.db_version = db.version();
std::ranges::copy(peer_update.set_refs, std::inserter(listing.sets, listing.sets.end()));

dout(20) << "updating peer " << peer << " with " << sets.size()
dout(20) << "updating peer " << peer_update.peer << " with " << peer_update.set_refs.size()
<< " sets modified in db version range ("
<< peers[peer].diff_map.db_version << ".." << db.set_version << "]" << dendl;
<< peer_update.known_set_version() << ".." << db.set_version << "]" << dendl;

auto rc = membership.send_listing_to(peer, std::move(update));
auto rc = membership.send_listing_to(peer_update.peer, std::move(listing));
if (rc != 0) {
dout(1) << "ERROR (" << rc << ") trying to replicate db version "
<< db.set_version << " with " << sets.size()
<< " sets to the peer " << peer << dendl;
<< db.set_version << " with " << peer_update.set_refs.size()
<< " sets to the peer " << peer_update.peer << dendl;
} else {
peer_update.info.last_activity = now;
peer_update.info.last_sent_version = db.version();
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/mds/QuiesceDbManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,20 @@ class QuiesceDbManager {
struct PeerInfo {
QuiesceMap diff_map;
QuiesceTimePoint last_activity;
QuiesceDbVersion last_sent_version;
PeerInfo(QuiesceMap&& diff_map, QuiesceTimePoint last_activity)
: diff_map(diff_map)
, last_activity(last_activity)
{
}
PeerInfo() { }
PeerInfo() {
last_activity = QuiesceTimePoint::min();
}
void clear() {
diff_map.clear();
last_activity = QuiesceTimePoint::min();
last_sent_version = {};
}
};
std::unordered_map<QuiesceInterface::PeerId, PeerInfo> peers;

Expand Down

0 comments on commit eebf597

Please sign in to comment.