Skip to content

Commit

Permalink
Merge pull request ceph#50105 from zhsgao/mds_export_state
Browse files Browse the repository at this point in the history
mds: add an asok command to dump export states

Reviewed-by: Venky Shankar <[email protected]>
  • Loading branch information
vshankar authored Oct 30, 2024
2 parents 94ce78a + 5506ed6 commit 7809b0e
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 21 deletions.
96 changes: 96 additions & 0 deletions qa/tasks/cephfs/test_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from tasks.cephfs.fuse_mount import FuseMount
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.exceptions import CommandFailedError
from teuthology.contextutil import safe_while, MaxWhileTries

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -628,3 +629,98 @@ def test_ephemeral_pin_shrink_mds(self):
log.info("{0} migrations have occured due to the cluster resizing".format(count))
# rebalancing from 3 -> 2 may cause half of rank 0/1 to move and all of rank 2
self.assertLessEqual((count/len(subtrees_old)), (1.0/3.0/2.0 + 1.0/3.0/2.0 + 1.0/3.0)*1.25) # aka .66 with 25% overbudget

class TestDumpExportStates(CephFSTestCase):
MDSS_REQUIRED = 2
CLIENTS_REQUIRED = 1

EXPORT_STATES = ['locking', 'discovering', 'freezing', 'prepping', 'warning', 'exporting']

def setUp(self):
super().setUp()

self.fs.set_max_mds(self.MDSS_REQUIRED)
self.status = self.fs.wait_for_daemons()

self.mount_a.run_shell_payload('mkdir -p test/export')

def tearDown(self):
super().tearDown()

def _wait_for_export_target(self, source, target, sleep=2, timeout=10):
try:
with safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
while proceed():
info = self.fs.getinfo().get_rank(self.fs.id, source)
log.info(f'waiting for rank {target} to be added to the export target')
if target in info['export_targets']:
return
except MaxWhileTries as e:
raise RuntimeError(f'rank {target} has not been added to export target after {timeout}s') from e

def _dump_export_state(self, rank):
states = self.fs.rank_asok(['dump_export_states'], rank=rank, status=self.status)
self.assertTrue(type(states) is list)
self.assertEqual(len(states), 1)
return states[0]

def _test_base(self, path, source, target, state_index, kill):
self.fs.rank_asok(['config', 'set', 'mds_kill_import_at', str(kill)], rank=target, status=self.status)

self.fs.rank_asok(['export', 'dir', path, str(target)], rank=source, status=self.status)
self._wait_for_export_target(source, target)

target_rank = self.fs.get_rank(rank=target, status=self.status)
self.delete_mds_coredump(target_rank['name'])

state = self._dump_export_state(source)

self.assertTrue(type(state['tid']) is int)
self.assertEqual(state['path'], path)
self.assertEqual(state['state'], self.EXPORT_STATES[state_index])
self.assertEqual(state['peer'], target)

return state

def _test_state_history(self, state):
history = state['state_history']
self.assertTrue(type(history) is dict)
size = 0
for name in self.EXPORT_STATES:
self.assertTrue(type(history[name]) is dict)
size += 1
if name == state['state']:
break
self.assertEqual(len(history), size)

def _test_freeze_tree(self, state, waiters):
self.assertTrue(type(state['freeze_tree_time']) is float)
self.assertEqual(state['unfreeze_tree_waiters'], waiters)

def test_discovering(self):
state = self._test_base('/test', 0, 1, 1, 1)

self._test_state_history(state)
self._test_freeze_tree(state, 0)

self.assertEqual(state['last_cum_auth_pins'], 0)
self.assertEqual(state['num_remote_waiters'], 0)

def test_prepping(self):
client_id = self.mount_a.get_global_id()

state = self._test_base('/test', 0, 1, 3, 3)

self._test_state_history(state)
self._test_freeze_tree(state, 0)

self.assertEqual(state['flushed_clients'], [client_id])
self.assertTrue(type(state['warning_ack_waiting']) is list)

def test_exporting(self):
state = self._test_base('/test', 0, 1, 5, 5)

self._test_state_history(state)
self._test_freeze_tree(state, 0)

self.assertTrue(type(state['notify_ack_waiting']) is list)
10 changes: 10 additions & 0 deletions src/mds/CDir.h
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,16 @@ class CDir : public MDSCacheObject, public Counter<CDir> {

void maybe_finish_freeze();

size_t count_unfreeze_tree_waiters() {
size_t n = count_unfreeze_dir_waiters();
_walk_tree([&n](CDir *dir) {
n += dir->count_unfreeze_dir_waiters();
return true;
});
return n;
}
inline size_t count_unfreeze_dir_waiters() const { return count_waiters(WAIT_UNFREEZE); }

std::pair<bool,bool> is_freezing_or_frozen_tree() const {
if (freeze_tree_state) {
if (freeze_tree_state->frozen)
Expand Down
2 changes: 2 additions & 0 deletions src/mds/MDSCacheObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ class MDSCacheObject {
}
bool is_waiter_for(waitmask_t mask);

inline size_t count_waiters(uint64_t mask) const { return waiting.count(mask); }

virtual void add_waiter(uint64_t mask, MDSContext *c) {
add_waiter(waitmask_t(mask), c);
}
Expand Down
4 changes: 4 additions & 0 deletions src/mds/MDSDaemon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ void MDSDaemon::set_up_admin_socket()
asok_hook,
"show recent ops, sorted by op duration");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_export_states",
asok_hook,
"dump export states");
ceph_assert(r == 0);
r = admin_socket->register_command("scrub_path name=path,type=CephString "
"name=scrubops,type=CephChoices,"
"strings=force|recursive|repair,n=N,req=false "
Expand Down
3 changes: 3 additions & 0 deletions src/mds/MDSRank.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2769,6 +2769,9 @@ void MDSRankDispatcher::handle_asok_command(
if (!op_tracker.dump_historic_ops(f, true)) {
*css << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
}
} else if (command == "dump_export_states") {
std::lock_guard l(mds_lock);
mdcache->migrator->dump_export_states(f);
} else if (command == "osdmap barrier") {
int64_t target_epoch = 0;
bool got_val = cmd_getval(cmdmap, "target_epoch", target_epoch);
Expand Down
113 changes: 93 additions & 20 deletions src/mds/Migrator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,12 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
case EXPORT_LOCKING:
dout(10) << "export state=locking : dropping locks and removing auth_pin" << dendl;
num_locking_exports--;
it->second.state = EXPORT_CANCELLED;
it->second.set_state(EXPORT_CANCELLED);
dir->auth_unpin(this);
break;
case EXPORT_DISCOVERING:
dout(10) << "export state=discovering : canceling freeze and removing auth_pin" << dendl;
it->second.state = EXPORT_CANCELLED;
it->second.set_state(EXPORT_CANCELLED);
dir->unfreeze_tree(); // cancel the freeze
dir->auth_unpin(this);
if (notify_peer &&
Expand All @@ -286,7 +286,7 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)

case EXPORT_FREEZING:
dout(10) << "export state=freezing : canceling freeze" << dendl;
it->second.state = EXPORT_CANCELLED;
it->second.set_state(EXPORT_CANCELLED);
dir->unfreeze_tree(); // cancel the freeze
if (dir->is_subtree_root())
mdcache->try_subtree_merge(dir);
Expand All @@ -301,13 +301,13 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
// NOTE: state order reversal, warning comes after prepping
case EXPORT_WARNING:
dout(10) << "export state=warning : unpinning bounds, unfreezing, notifying" << dendl;
it->second.state = EXPORT_CANCELLING;
it->second.set_state(EXPORT_CANCELLING);
// fall-thru

case EXPORT_PREPPING:
if (state != EXPORT_WARNING) {
dout(10) << "export state=prepping : unpinning bounds, unfreezing" << dendl;
it->second.state = EXPORT_CANCELLED;
it->second.set_state(EXPORT_CANCELLED);
}

{
Expand Down Expand Up @@ -340,7 +340,7 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)

case EXPORT_EXPORTING:
dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
it->second.state = EXPORT_CANCELLING;
it->second.set_state(EXPORT_CANCELLING);
export_reverse(dir, it->second);
break;

Expand Down Expand Up @@ -865,7 +865,7 @@ void Migrator::export_dir(CDir *dir, mds_rank_t dest)
ceph_assert(export_state.count(dir) == 0);
export_state_t& stat = export_state[dir];
num_locking_exports++;
stat.state = EXPORT_LOCKING;
stat.set_state(EXPORT_LOCKING);
stat.peer = dest;
stat.tid = mdr->reqid.tid;
stat.mut = mdr;
Expand Down Expand Up @@ -1140,7 +1140,7 @@ void Migrator::dispatch_export_dir(const MDRequestRef& mdr, int count)

if (results.size() == 1 && results.front().first == dir) {
num_locking_exports--;
it->second.state = EXPORT_DISCOVERING;
it->second.set_state(EXPORT_DISCOVERING);
// send ExportDirDiscover (ask target)
filepath path;
dir->inode->make_path(path);
Expand Down Expand Up @@ -1191,7 +1191,7 @@ void Migrator::dispatch_export_dir(const MDRequestRef& mdr, int count)
ceph_assert(export_state.count(sub) == 0);
auto& stat = export_state[sub];
num_locking_exports++;
stat.state = EXPORT_LOCKING;
stat.set_state(EXPORT_LOCKING);
stat.peer = dest;
stat.tid = _mdr->reqid.tid;
stat.mut = _mdr;
Expand Down Expand Up @@ -1244,7 +1244,7 @@ void Migrator::handle_export_discover_ack(const cref_t<MExportDirDiscoverAck> &m

if (m->is_success()) {
// move to freezing the subtree
it->second.state = EXPORT_FREEZING;
it->second.set_state(EXPORT_FREEZING);
auto&& mdr = boost::static_pointer_cast<MDRequestImpl>(std::move(it->second.mut));
ceph_assert(!it->second.mut); // should have been moved out of

Expand Down Expand Up @@ -1427,18 +1427,18 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
}

// send.
it->second.state = EXPORT_PREPPING;
it->second.set_state(EXPORT_PREPPING);
mds->send_message_mds(prep, it->second.peer);
ceph_assert(g_conf()->mds_kill_export_at != 4);

// make sure any new instantiations of caps are flushed out
ceph_assert(it->second.warning_ack_waiting.empty());

set<client_t> export_client_set;
get_export_client_set(dir, export_client_set);
ceph_assert(it->second.export_client_set.empty());
get_export_client_set(dir, it->second.export_client_set);

MDSGatherBuilder gather(g_ceph_context);
mds->server->flush_client_sessions(export_client_set, gather);
mds->server->flush_client_sessions(it->second.export_client_set, gather);
if (gather.has_subs()) {
it->second.warning_ack_waiting.insert(MDS_RANK_NONE);
gather.set_finisher(new C_M_ExportSessionsFlushed(this, dir, it->second.tid));
Expand Down Expand Up @@ -1537,7 +1537,7 @@ void Migrator::handle_export_prep_ack(const cref_t<MExportDirPrepAck> &m)

}

it->second.state = EXPORT_WARNING;
it->second.set_state(EXPORT_WARNING);

ceph_assert(g_conf()->mds_kill_export_at != 6);
// nobody to warn?
Expand Down Expand Up @@ -1587,8 +1587,8 @@ void Migrator::export_go_synced(CDir *dir, uint64_t tid)
dout(7) << *dir << " to " << dest << dendl;

mdcache->show_subtrees();
it->second.state = EXPORT_EXPORTING;

it->second.set_state(EXPORT_EXPORTING);
ceph_assert(g_conf()->mds_kill_export_at != 7);

ceph_assert(dir->is_frozen_tree_root());
Expand Down Expand Up @@ -1933,7 +1933,7 @@ void Migrator::handle_export_ack(const cref_t<MExportDirAck> &m)
auto bp = m->imported_caps.cbegin();
decode(it->second.peer_imported, bp);

it->second.state = EXPORT_LOGGINGFINISH;
it->second.set_state(EXPORT_LOGGINGFINISH);
ceph_assert(g_conf()->mds_kill_export_at != 9);
set<CDir*> bounds;
mdcache->get_subtree_bounds(dir, bounds);
Expand Down Expand Up @@ -1970,7 +1970,7 @@ void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>&
ceph_assert(stat.state == EXPORT_CANCELLING);

if (stat.notify_ack_waiting.empty()) {
stat.state = EXPORT_CANCELLED;
stat.set_state(EXPORT_CANCELLED);
return;
}

Expand Down Expand Up @@ -2095,7 +2095,7 @@ void Migrator::export_logged_finish(CDir *dir)
}

// wait for notifyacks
stat.state = EXPORT_NOTIFYING;
stat.set_state(EXPORT_NOTIFYING);
ceph_assert(g_conf()->mds_kill_export_at != 11);

// no notifies to wait for?
Expand Down Expand Up @@ -3217,6 +3217,79 @@ void Migrator::import_finish(CDir *dir, bool notify, bool last)
}
}

void Migrator::dump_export_states(Formatter *f)
{
f->open_array_section("states");
for (const auto& [dir, state] : export_state) {
f->open_object_section("state");

f->dump_unsigned("tid", state.tid);

dir->dump(f, CDir::DUMP_PATH | CDir::DUMP_DIRFRAG);

f->dump_string("state", get_export_statename(state.state));

f->open_object_section("state_history");
for (const auto& [s, _1] : state.state_history) {
f->open_object_section(get_export_statename(s));
f->dump_stream("start_at") << state.get_start_time(s);
f->dump_float("time_spent", state.get_time_spent(s));
f->close_section();
}
f->close_section();

f->dump_int("peer", state.peer);

switch (state.state) {
case EXPORT_DISCOVERING:
case EXPORT_FREEZING:
f->dump_stream("last_cum_auth_pins_change") << state.last_cum_auth_pins_change;
f->dump_int("last_cum_auth_pins", state.last_cum_auth_pins);
f->dump_int("num_remote_waiters", state.num_remote_waiters);

break;

case EXPORT_PREPPING:
case EXPORT_WARNING:
f->open_array_section("flushed_clients");
for (const auto &client : state.export_client_set)
f->dump_int("client", client.v);
f->close_section();

f->open_array_section("warning_ack_waiting");
for (const auto &rank : state.warning_ack_waiting)
f->dump_int("rank", rank);
f->close_section();

if (state.state == EXPORT_PREPPING)
break;
// fall-thru

case EXPORT_EXPORTING:
case EXPORT_LOGGINGFINISH:
case EXPORT_NOTIFYING:
f->open_array_section("notify_ack_waiting");
for (const auto &rank : state.notify_ack_waiting)
f->dump_int("rank", rank);
f->close_section();

break;

default:
break;
}

if (state.state >= EXPORT_DISCOVERING) {
f->dump_unsigned("approx_size", state.approx_size);
f->dump_unsigned("unfreeze_tree_waiters", dir->count_unfreeze_tree_waiters());
f->dump_float("freeze_tree_time", state.get_freeze_tree_time());
}

f->close_section();
}
f->close_section();
}

void Migrator::decode_import_inode(CDentry *dn, bufferlist::const_iterator& blp,
mds_rank_t oldauth, LogSegment *ls,
map<CInode*, map<client_t,Capability::Export> >& peer_exports,
Expand Down
Loading

0 comments on commit 7809b0e

Please sign in to comment.