diff --git a/doc/cephfs/disaster-recovery-experts.rst b/doc/cephfs/disaster-recovery-experts.rst index 8db85a88611f0..1c585a664f356 100644 --- a/doc/cephfs/disaster-recovery-experts.rst +++ b/doc/cephfs/disaster-recovery-experts.rst @@ -259,4 +259,4 @@ run a forward scrub to repair them. Ensure you have an MDS running and issue: :: - ceph daemon mds.a scrub_path / recursive repair + ceph tell mds.a scrub start / recursive repair diff --git a/qa/tasks/cephfs/filesystem.py b/qa/tasks/cephfs/filesystem.py index 8764753c21c41..cac566a252fdc 100644 --- a/qa/tasks/cephfs/filesystem.py +++ b/qa/tasks/cephfs/filesystem.py @@ -921,6 +921,10 @@ def rank_asok(self, command, rank=0, status=None, timeout=None): info = self.get_rank(rank=rank, status=status) return self.json_asok(command, 'mds', info['name'], timeout=timeout) + def rank_tell(self, command, rank=0, status=None): + info = self.get_rank(rank=rank, status=status) + return json.loads(self.mon_manager.raw_cluster_cmd("tell", 'mds.{0}'.format(info['name']), *command)) + def read_cache(self, path, depth=None): cmd = ["dump", "tree", path] if depth is not None: diff --git a/qa/tasks/cephfs/test_damage.py b/qa/tasks/cephfs/test_damage.py index 459077b042823..43e94a2acf288 100644 --- a/qa/tasks/cephfs/test_damage.py +++ b/qa/tasks/cephfs/test_damage.py @@ -434,7 +434,7 @@ def test_damaged_dentry(self): self.mount_a.umount_wait() # Now repair the stats - scrub_json = self.fs.mds_asok(["scrub_path", "/subdir", "repair"]) + scrub_json = self.fs.rank_tell(["scrub", "start", "/subdir", "repair"]) log.info(json.dumps(scrub_json, indent=2)) self.assertEqual(scrub_json["passed_validation"], False) diff --git a/qa/tasks/cephfs/test_forward_scrub.py b/qa/tasks/cephfs/test_forward_scrub.py index e165780f31f18..b0f85e3213f62 100644 --- a/qa/tasks/cephfs/test_forward_scrub.py +++ b/qa/tasks/cephfs/test_forward_scrub.py @@ -232,7 +232,7 @@ def test_inotable_sync(self): self.mount_a.umount_wait() with self.assert_cluster_log("inode table repaired", invert_match=True): - out_json = self.fs.mds_asok(["scrub_path", "/", "repair", "recursive"]) + out_json = self.fs.rank_tell(["scrub", "start", "/", "repair", "recursive"]) self.assertNotEqual(out_json, None) self.mds_cluster.mds_stop() @@ -255,7 +255,7 @@ def test_inotable_sync(self): self.fs.wait_for_daemons() with self.assert_cluster_log("inode table repaired"): - out_json = self.fs.mds_asok(["scrub_path", "/", "repair", "recursive"]) + out_json = self.fs.rank_tell(["scrub", "start", "/", "repair", "recursive"]) self.assertNotEqual(out_json, None) self.mds_cluster.mds_stop() @@ -286,7 +286,7 @@ def test_backtrace_repair(self): "oh i'm sorry did i overwrite your xattr?") with self.assert_cluster_log("bad backtrace on inode"): - out_json = self.fs.mds_asok(["scrub_path", "/", "repair", "recursive"]) + out_json = self.fs.rank_tell(["scrub", "start", "/", "repair", "recursive"]) self.assertNotEqual(out_json, None) self.fs.mds_asok(["flush", "journal"]) backtrace = self.fs.read_backtrace(file_ino) diff --git a/qa/tasks/cephfs/test_recovery_pool.py b/qa/tasks/cephfs/test_recovery_pool.py index 97049b9c0a337..1684d170c8e31 100644 --- a/qa/tasks/cephfs/test_recovery_pool.py +++ b/qa/tasks/cephfs/test_recovery_pool.py @@ -186,7 +186,7 @@ def _rebuild_metadata(self, workload, other_pool=None, workers=1): for rank in self.recovery_fs.get_ranks(status=status): self.fs.mon_manager.raw_cluster_cmd('tell', "mds." + rank['name'], 'injectargs', '--debug-mds=20') - self.fs.rank_asok(['scrub_path', '/', 'recursive', 'repair'], rank=rank['rank'], status=status) + self.fs.rank_tell(['scrub', 'start', '/', 'recursive', 'repair'], rank=rank['rank'], status=status) log.info(str(self.mds_cluster.status())) # Mount a client diff --git a/qa/tasks/cephfs/test_scrub.py b/qa/tasks/cephfs/test_scrub.py index 9469dfce6e493..d96f5691ba21e 100644 --- a/qa/tasks/cephfs/test_scrub.py +++ b/qa/tasks/cephfs/test_scrub.py @@ -103,7 +103,7 @@ def damage(self): self._filesystem.wait_for_daemons() def validate(self): - out_json = self._filesystem.mds_asok(["scrub_path", "/", "recursive", "repair"]) + out_json = self._filesystem.rank_tell(["scrub", "start", "/", "recursive", "repair"]) self.assertNotEqual(out_json, None) self.assertTrue(self._filesystem.are_daemons_healthy()) return self._errors @@ -129,7 +129,7 @@ def _scrub(self, workload, workers=1): # Apply any data damage the workload wants workload.damage() - out_json = self.fs.mds_asok(["scrub_path", "/", "recursive", "repair"]) + out_json = self.fs.rank_tell(["scrub", "start", "/", "recursive", "repair"]) self.assertNotEqual(out_json, None) # See that the files are present and correct diff --git a/qa/tasks/cephfs/test_scrub_checks.py b/qa/tasks/cephfs/test_scrub_checks.py index a2de5271972b8..4fae23deee732 100644 --- a/qa/tasks/cephfs/test_scrub_checks.py +++ b/qa/tasks/cephfs/test_scrub_checks.py @@ -11,6 +11,98 @@ log = logging.getLogger(__name__) +class TestScrubControls(CephFSTestCase): + """ + Test basic scrub control operations such as abort, pause and resume. + """ + + MDSS_REQUIRED = 1 + CLIENTS_REQUIRED = 1 + + def _abort_scrub(self, expected): + res = self.fs.rank_tell(["scrub", "abort"]) + self.assertEqual(res['return_code'], expected) + def _pause_scrub(self, expected): + res = self.fs.rank_tell(["scrub", "pause"]) + self.assertEqual(res['return_code'], expected) + def _resume_scrub(self, expected): + res = self.fs.rank_tell(["scrub", "resume"]) + self.assertEqual(res['return_code'], expected) + def _get_scrub_status(self): + return self.fs.rank_tell(["scrub", "status"]) + + def test_scrub_abort(self): + test_dir = "scrub_control_test_path" + abs_test_path = "/{0}".format(test_dir) + + log.info("mountpoint: {0}".format(self.mount_a.mountpoint)) + client_path = os.path.join(self.mount_a.mountpoint, test_dir) + log.info("client_path: {0}".format(client_path)) + + log.info("Cloning repo into place") + repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path) + + out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"]) + self.assertNotEqual(out_json, None) + + # abort and verify + self._abort_scrub(0) + out_json = self._get_scrub_status() + self.assertTrue("no active" in out_json['status']) + + def test_scrub_pause_and_resume(self): + test_dir = "scrub_control_test_path" + abs_test_path = "/{0}".format(test_dir) + + log.info("mountpoint: {0}".format(self.mount_a.mountpoint)) + client_path = os.path.join(self.mount_a.mountpoint, test_dir) + log.info("client_path: {0}".format(client_path)) + + log.info("Cloning repo into place") + repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path) + + out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"]) + self.assertNotEqual(out_json, None) + + # pause and verify + self._pause_scrub(0) + out_json = self._get_scrub_status() + self.assertTrue("PAUSED" in out_json['status']) + + # resume and verify + self._resume_scrub(0) + out_json = self._get_scrub_status() + self.assertFalse("PAUSED" in out_json['status']) + + def test_scrub_pause_and_resume_with_abort(self): + test_dir = "scrub_control_test_path" + abs_test_path = "/{0}".format(test_dir) + + log.info("mountpoint: {0}".format(self.mount_a.mountpoint)) + client_path = os.path.join(self.mount_a.mountpoint, test_dir) + log.info("client_path: {0}".format(client_path)) + + log.info("Cloning repo into place") + repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path) + + out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"]) + self.assertNotEqual(out_json, None) + + # pause and verify + self._pause_scrub(0) + out_json = self._get_scrub_status() + self.assertTrue("PAUSED" in out_json['status']) + + # abort and verify + self._abort_scrub(0) + out_json = self._get_scrub_status() + self.assertTrue("PAUSED" in out_json['status']) + self.assertTrue("0 inodes" in out_json['status']) + + # resume and verify + self._resume_scrub(0) + out_json = self._get_scrub_status() + self.assertTrue("no active" in out_json['status']) class TestScrubChecks(CephFSTestCase): """ @@ -50,7 +142,7 @@ def _checks(self, run_seq): log.info("client_path: {0}".format(client_path)) log.info("Cloning repo into place") - repo_path = self.clone_repo(self.mount_a, client_path) + repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path) log.info("Initiating mds_scrub_checks on mds.{id_}, " + "test_path {path}, run_seq {seq}".format( @@ -63,7 +155,7 @@ def _checks(self, run_seq): nep = "{test_path}/i/dont/exist".format(test_path=abs_test_path) self.asok_command(mds_rank, "flush_path {nep}".format(nep=nep), lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT)) - self.asok_command(mds_rank, "scrub_path {nep}".format(nep=nep), + self.tell_command(mds_rank, "scrub start {nep}".format(nep=nep), lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT)) test_repo_path = "{test_path}/ceph-qa-suite".format(test_path=abs_test_path) @@ -73,8 +165,8 @@ def _checks(self, run_seq): log.info("First run: flushing {dirpath}".format(dirpath=dirpath)) command = "flush_path {dirpath}".format(dirpath=dirpath) self.asok_command(mds_rank, command, success_validator) - command = "scrub_path {dirpath}".format(dirpath=dirpath) - self.asok_command(mds_rank, command, success_validator) + command = "scrub start {dirpath}".format(dirpath=dirpath) + self.tell_command(mds_rank, command, success_validator) filepath = "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format( repo_path=test_repo_path) @@ -82,13 +174,13 @@ def _checks(self, run_seq): log.info("First run: flushing {filepath}".format(filepath=filepath)) command = "flush_path {filepath}".format(filepath=filepath) self.asok_command(mds_rank, command, success_validator) - command = "scrub_path {filepath}".format(filepath=filepath) - self.asok_command(mds_rank, command, success_validator) + command = "scrub start {filepath}".format(filepath=filepath) + self.tell_command(mds_rank, command, success_validator) filepath = "{repo_path}/suites/fs/basic/clusters/fixed-3-cephfs.yaml". \ format(repo_path=test_repo_path) - command = "scrub_path {filepath}".format(filepath=filepath) - self.asok_command(mds_rank, command, + command = "scrub start {filepath}".format(filepath=filepath) + self.tell_command(mds_rank, command, lambda j, r: self.json_validator(j, r, "performed_validation", False)) @@ -96,8 +188,8 @@ def _checks(self, run_seq): log.info("First run: flushing base dir /") command = "flush_path /" self.asok_command(mds_rank, command, success_validator) - command = "scrub_path /" - self.asok_command(mds_rank, command, success_validator) + command = "scrub start /" + self.tell_command(mds_rank, command, success_validator) new_dir = "{repo_path}/new_dir_{i}".format(repo_path=repo_path, i=run_seq) test_new_dir = "{repo_path}/new_dir_{i}".format(repo_path=test_repo_path, @@ -118,16 +210,16 @@ def _checks(self, run_seq): # check that scrub fails on errors ino = self.mount_a.path_to_ino(new_file) rados_obj_name = "{ino:x}.00000000".format(ino=ino) - command = "scrub_path {file}".format(file=test_new_file) + command = "scrub start {file}".format(file=test_new_file) # Missing parent xattr -> ENODATA self.fs.rados(["rmxattr", rados_obj_name, "parent"], pool=self.fs.get_data_pool_name()) - self.asok_command(mds_rank, command, + self.tell_command(mds_rank, command, lambda j, r: self.json_validator(j, r, "return_code", -errno.ENODATA)) # Missing object -> ENOENT self.fs.rados(["rm", rados_obj_name], pool=self.fs.get_data_pool_name()) - self.asok_command(mds_rank, command, + self.tell_command(mds_rank, command, lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT)) command = "flush_path /" @@ -162,7 +254,7 @@ def test_scrub_repair(self): self.mount_a.run_shell(["sudo", "rmdir", test_dir]) self.assertEqual(ar.exception.exitstatus, 1) - self.asok_command(mds_rank, "scrub_path /{0} repair".format(test_dir), + self.tell_command(mds_rank, "scrub start /{0} repair".format(test_dir), lambda j, r: self.json_validator(j, r, "return_code", 0)) # wait a few second for background repair @@ -181,6 +273,20 @@ def json_validator(json_out, rc, element, expected_value): jv=element_value, ev=expected_value) return True, "Succeeded" + def tell_command(self, mds_rank, command, validator): + log.info("Running command '{command}'".format(command=command)) + + command_list = command.split() + jout = self.fs.rank_tell(command_list, mds_rank) + + log.info("command '{command}' returned '{jout}'".format( + command=command, jout=jout)) + + success, errstring = validator(jout, 0) + if not success: + raise AsokCommandFailedError(command, rout, jout, errstring) + return jout + def asok_command(self, mds_rank, command, validator): log.info("Running command '{command}'".format(command=command)) @@ -209,7 +315,8 @@ def asok_command(self, mds_rank, command, validator): return jout - def clone_repo(self, client_mount, path): + @staticmethod + def clone_repo(client_mount, path): repo = "ceph-qa-suite" repo_path = os.path.join(path, repo) client_mount.run_shell(["mkdir", "-p", path]) diff --git a/qa/tasks/cephfs_upgrade_snap.py b/qa/tasks/cephfs_upgrade_snap.py index a11b1d7ee7523..1708d43cfe74a 100644 --- a/qa/tasks/cephfs_upgrade_snap.py +++ b/qa/tasks/cephfs_upgrade_snap.py @@ -24,13 +24,13 @@ def task(ctx, config): mds_map = fs.get_mds_map() assert(mds_map['max_mds'] == 1) - json = fs.rank_asok(["scrub_path", "/", "force", "recursive", "repair"]) + json = fs.rank_tell(["scrub", "start", "/", "force", "recursive", "repair"]) if not json or json['return_code'] == 0: log.info("scrub / completed") else: log.info("scrub / failed: {}".format(json)) - json = fs.rank_asok(["scrub_path", "~mdsdir", "force", "recursive", "repair"]) + json = fs.rank_tell(["scrub", "start", "~mdsdir", "force", "recursive", "repair"]) if not json or json['return_code'] == 0: log.info("scrub ~mdsdir completed") else: diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc index 801b20a1e7f1b..28fc20a72c017 100644 --- a/src/mds/CInode.cc +++ b/src/mds/CInode.cc @@ -4110,7 +4110,8 @@ void CInode::validate_disk_state(CInode::validated_data *results, /** * Fetch backtrace and set tag if tag is non-empty */ - void fetch_backtrace_and_tag(CInode *in, std::string_view tag, + void fetch_backtrace_and_tag(CInode *in, + std::string_view tag, bool is_internal, Context *fin, int *bt_r, bufferlist *bt) { const int64_t pool = in->get_backtrace_pool(); @@ -4121,8 +4122,8 @@ void CInode::validate_disk_state(CInode::validated_data *results, in->mdcache->mds->objecter->read(oid, object_locator_t(pool), fetch, CEPH_NOSNAP, NULL, 0, fin); using ceph::encode; - if (!tag.empty()) { - ObjectOperation scrub_tag; + if (!is_internal) { + ObjectOperation scrub_tag; bufferlist tag_bl; encode(tag, tag_bl); scrub_tag.setxattr("scrub_tag", tag_bl); @@ -4153,10 +4154,11 @@ void CInode::validate_disk_state(CInode::validated_data *results, in->mdcache->mds->finisher); std::string_view tag = in->scrub_infop->header->get_tag(); + bool is_internal = in->scrub_infop->header->is_internal_tag(); // Rather than using the usual CInode::fetch_backtrace, // use a special variant that optionally writes a tag in the same // operation. - fetch_backtrace_and_tag(in, tag, conf, &results->backtrace.ondisk_read_retval, &bl); + fetch_backtrace_and_tag(in, tag, is_internal, conf, &results->backtrace.ondisk_read_retval, &bl); return false; } @@ -4768,6 +4770,24 @@ void CInode::scrub_dirfrag_finished(frag_t dirfrag) si.last_scrub_version = si.scrub_start_version; } +void CInode::scrub_aborted(MDSInternalContextBase **c) { + dout(20) << __func__ << dendl; + ceph_assert(scrub_is_in_progress()); + + *c = nullptr; + std::swap(*c, scrub_infop->on_finish); + + if (scrub_infop->scrub_parent) { + CDentry *dn = scrub_infop->scrub_parent; + scrub_infop->scrub_parent = NULL; + dn->dir->scrub_dentry_finished(dn); + dn->put(CDentry::PIN_SCRUBPARENT); + } + + delete scrub_infop; + scrub_infop = nullptr; +} + void CInode::scrub_finished(MDSInternalContextBase **c) { dout(20) << __func__ << dendl; ceph_assert(scrub_is_in_progress()); @@ -4800,12 +4820,8 @@ void CInode::scrub_finished(MDSInternalContextBase **c) { if (scrub_infop->header->get_origin() == this) { // We are at the point that a tagging scrub was initiated LogChannelRef clog = mdcache->mds->clog; - if (scrub_infop->header->get_tag().empty()) { - clog->info() << "scrub complete"; - } else { - clog->info() << "scrub complete with tag '" - << scrub_infop->header->get_tag() << "'"; - } + clog->info() << "scrub complete with tag '" + << scrub_infop->header->get_tag() << "'"; } } diff --git a/src/mds/CInode.h b/src/mds/CInode.h index e4309c89fbcc3..bbdf01c0a08db 100644 --- a/src/mds/CInode.h +++ b/src/mds/CInode.h @@ -358,6 +358,9 @@ class CInode : public MDSCacheObject, public InodeStoreBase, public Counterget_recursive()) { + formatter->open_object_section("results"); + formatter->dump_int("return_code", 0); + formatter->dump_string("scrub_tag", tag); + formatter->dump_string("mode", "asynchronous"); + formatter->close_section(); // results + } + } else { // we failed the lookup or something; dump ourselves formatter->open_object_section("results"); formatter->dump_int("return_code", r); formatter->close_section(); // results + r = 0; // already dumped in formatter } if (on_finish) on_finish->complete(r); @@ -12350,19 +12362,21 @@ void MDCache::enqueue_scrub( mdr->set_filepath(path); } - C_MDS_EnqueueScrub *cs = new C_MDS_EnqueueScrub(f, fin); + bool is_internal = false; + std::string tag_str(tag); + if (tag_str.empty()) { + uuid_d uuid_gen; + uuid_gen.generate_random(); + tag_str = uuid_gen.to_string(); + is_internal = true; + } + + C_MDS_EnqueueScrub *cs = new C_MDS_EnqueueScrub(tag_str, f, fin); cs->header = std::make_shared( - tag, force, recursive, repair, f); + tag_str, is_internal, force, recursive, repair, f); mdr->internal_op_finish = cs; enqueue_scrub_work(mdr); - - // since recursive scrub is asynchronous, dump minimal output - // to not upset cli tools. - if (recursive) { - f->open_object_section("results"); - f->close_section(); // results - } } void MDCache::enqueue_scrub_work(MDRequestRef& mdr) @@ -12383,7 +12397,7 @@ void MDCache::enqueue_scrub_work(MDRequestRef& mdr) ScrubHeaderRef header = cs->header; // Cannot scrub same dentry twice at same time - if (in->scrub_infop && in->scrub_infop->scrub_in_progress) { + if (in->scrub_is_in_progress()) { mds->server->respond_to_request(mdr, -EBUSY); return; } else { diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index 6fbe913043e95..a7b5a4b64b4c9 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -682,6 +682,12 @@ const std::vector& MDSDaemon::get_commands() "name=heapcmd,type=CephChoices,strings=dump|start_profiler|stop_profiler|release|stats", "show heap usage info (available only if compiled with tcmalloc)"), MDSCommand("cache drop name=timeout,type=CephInt,range=0,req=false", "trim cache and optionally request client to release all caps and flush the journal"), + MDSCommand("scrub start name=path,type=CephString name=scrubops,type=CephChoices,strings=force|recursive|repair,n=N,req=false name=tag,type=CephString,req=false", + "scrub an inode and output results"), + MDSCommand("scrub abort", "Abort in progress scrub operation(s)"), + MDSCommand("scrub pause", "Pause in progress scrub operation(s)"), + MDSCommand("scrub resume", "Resume paused scrub operation(s)"), + MDSCommand("scrub status", "Status of scrub operation"), }; return commands; }; diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index a7bb5a50cfd6b..abdf9e3909910 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -2453,7 +2453,10 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command, vector scrubop_vec; cmd_getval(g_ceph_context, cmdmap, "scrubops", scrubop_vec); cmd_getval(g_ceph_context, cmdmap, "path", path); - command_scrub_path(f, path, scrubop_vec); + + C_SaferCond cond; + command_scrub_start(f, path, "", scrubop_vec, &cond); + cond.wait(); } else if (command == "tag path") { string path; cmd_getval(g_ceph_context, cmdmap, "path", path); @@ -2578,6 +2581,91 @@ class C_MDS_Send_Command_Reply : public MDSInternalContext { } }; +class C_ExecAndReply : public C_MDS_Send_Command_Reply { +public: + C_ExecAndReply(MDSRank *mds, const MCommand::const_ref &m) + : C_MDS_Send_Command_Reply(mds, m), f(true) { + } + + void finish(int r) override { + std::stringstream ds; + std::stringstream ss; + if (r != 0) { + f.flush(ss); + } else { + f.flush(ds); + } + + send(r, ss.str(), ds); + } + + virtual void exec() = 0; + +protected: + JSONFormatter f; +}; + +class C_CacheDropExecAndReply : public C_ExecAndReply { +public: + C_CacheDropExecAndReply(MDSRank *mds, const MCommand::const_ref &m, + uint64_t timeout) + : C_ExecAndReply(mds, m), timeout(timeout) { + } + + void exec() override { + mds->command_cache_drop(timeout, &f, this); + } + +private: + uint64_t timeout; +}; + +class C_ScrubExecAndReply : public C_ExecAndReply { +public: + C_ScrubExecAndReply(MDSRank *mds, const MCommand::const_ref &m, + const std::string &path, const std::string &tag, + const std::vector &scrubop) + : C_ExecAndReply(mds, m), path(path), tag(tag), scrubop(scrubop) { + } + + void exec() override { + mds->command_scrub_start(&f, path, tag, scrubop, this); + } + +private: + std::string path; + std::string tag; + std::vector scrubop; +}; + +class C_ScrubControlExecAndReply : public C_ExecAndReply { +public: + C_ScrubControlExecAndReply(MDSRank *mds, const MCommand::const_ref &m, + const std::string &command) + : C_ExecAndReply(mds, m), command(command) { + } + + void exec() override { + if (command == "abort") { + mds->command_scrub_abort(&f, this); + } else if (command == "pause") { + mds->command_scrub_pause(&f, this); + } else { + ceph_abort(); + } + } + + void finish(int r) override { + f.open_object_section("result"); + f.dump_int("return_code", r); + f.close_section(); + C_ExecAndReply::finish(r); + } + +private: + std::string command; +}; + /** * This function drops the mds_lock, so don't do anything with * MDSRank after calling it (we could have gone into shutdown): just @@ -2663,25 +2751,24 @@ void MDSRankDispatcher::dump_sessions(const SessionFilter &filter, Formatter *f) f->close_section(); //sessions } -void MDSRank::command_scrub_path(Formatter *f, std::string_view path, vector& scrubop_vec) +void MDSRank::command_scrub_start(Formatter *f, + std::string_view path, std::string_view tag, + const vector& scrubop_vec, Context *on_finish) { bool force = false; bool recursive = false; bool repair = false; - for (vector::iterator i = scrubop_vec.begin() ; i != scrubop_vec.end(); ++i) { - if (*i == "force") + for (auto &op : scrubop_vec) { + if (op == "force") force = true; - else if (*i == "recursive") + else if (op == "recursive") recursive = true; - else if (*i == "repair") + else if (op == "repair") repair = true; } - C_SaferCond scond; - { - std::lock_guard l(mds_lock); - mdcache->enqueue_scrub(path, "", force, recursive, repair, f, &scond); - } - scond.wait(); + + std::lock_guard l(mds_lock); + mdcache->enqueue_scrub(path, tag, force, recursive, repair, f, on_finish); // scrub_dentry() finishers will dump the data for us; we're done! } @@ -2696,6 +2783,28 @@ void MDSRank::command_tag_path(Formatter *f, scond.wait(); } +void MDSRank::command_scrub_abort(Formatter *f, Context *on_finish) { + std::lock_guard l(mds_lock); + scrubstack->scrub_abort(on_finish); +} + +void MDSRank::command_scrub_pause(Formatter *f, Context *on_finish) { + std::lock_guard l(mds_lock); + scrubstack->scrub_pause(on_finish); +} + +void MDSRank::command_scrub_resume(Formatter *f) { + int r = scrubstack->scrub_resume(); + + f->open_object_section("result"); + f->dump_int("return_code", r); + f->close_section(); +} + +void MDSRank::command_scrub_status(Formatter *f) { + scrubstack->scrub_status(f); +} + void MDSRank::command_flush_path(Formatter *f, std::string_view path) { C_SaferCond scond; @@ -3309,6 +3418,12 @@ void MDSRank::bcast_mds_map() last_client_mdsmap_bcast = mdsmap->get_epoch(); } +Context *MDSRank::create_async_exec_context(C_ExecAndReply *ctx) { + return new C_OnFinisher(new FunctionContext([ctx](int _) { + ctx->exec(); + }), finisher); +} + MDSRankDispatcher::MDSRankDispatcher( mds_rank_t whoami_, Mutex &mds_lock_, @@ -3391,40 +3506,47 @@ bool MDSRankDispatcher::handle_command( timeout = 0; } - JSONFormatter *f = new JSONFormatter(true); - C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m); - Context *on_finish = new FunctionContext([this, f, reply](int r) { - cache_drop_send_reply(f, reply, r); - delete f; - delete reply; - }); - *need_reply = false; - *run_later = new C_OnFinisher( - new FunctionContext([this, timeout, f, on_finish](int _) { - command_cache_drop((uint64_t)timeout, f, on_finish); - }), finisher); + *run_later = create_async_exec_context(new C_CacheDropExecAndReply + (this, m, (uint64_t)timeout)); + return true; + } else if (prefix == "scrub start") { + string path; + string tag; + vector scrubop_vec; + cmd_getval(g_ceph_context, cmdmap, "scrubops", scrubop_vec); + cmd_getval(g_ceph_context, cmdmap, "path", path); + cmd_getval(g_ceph_context, cmdmap, "tag", tag); + *need_reply = false; + *run_later = create_async_exec_context(new C_ScrubExecAndReply + (this, m, path, tag, scrubop_vec)); + return true; + } else if (prefix == "scrub abort") { + *need_reply = false; + *run_later = create_async_exec_context(new C_ScrubControlExecAndReply + (this, m, "abort")); + return true; + } else if (prefix == "scrub pause") { + *need_reply = false; + *run_later = create_async_exec_context(new C_ScrubControlExecAndReply + (this, m, "pause")); + return true; + } else if (prefix == "scrub resume") { + JSONFormatter f(true); + command_scrub_resume(&f); + f.flush(*ds); + return true; + } else if (prefix == "scrub status") { + JSONFormatter f(true); + command_scrub_status(&f); + f.flush(*ds); return true; } else { return false; } } -void MDSRank::cache_drop_send_reply(Formatter *f, C_MDS_Send_Command_Reply *reply, int r) { - dout(20) << __func__ << ": r=" << r << dendl; - - std::stringstream ds; - std::stringstream ss; - if (r != 0) { - f->flush(ss); - } else { - f->flush(ds); - } - - reply->send(r, ss.str(), ds); -} - void MDSRank::command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish) { dout(20) << __func__ << dendl; diff --git a/src/mds/MDSRank.h b/src/mds/MDSRank.h index 93dbdd9a4d021..b2f2179ae0fd1 100644 --- a/src/mds/MDSRank.h +++ b/src/mds/MDSRank.h @@ -119,6 +119,7 @@ class MonClient; class Finisher; class ScrubStack; class C_MDS_Send_Command_Reply; +class C_ExecAndReply; /** * The public part of this class's interface is what's exposed to all @@ -138,6 +139,10 @@ class MDSRank { friend class C_Flush_Journal; friend class C_Drop_Cache; + friend class C_CacheDropExecAndReply; + friend class C_ScrubExecAndReply; + friend class C_ScrubControlExecAndReply; + mds_rank_t get_nodeid() const { return whoami; } int64_t get_metadata_pool(); @@ -456,9 +461,17 @@ class MDSRank { protected: void dump_clientreplay_status(Formatter *f) const; - void command_scrub_path(Formatter *f, std::string_view path, vector& scrubop_vec); + void command_scrub_start(Formatter *f, + std::string_view path, std::string_view tag, + const vector& scrubop_vec, Context *on_finish); void command_tag_path(Formatter *f, std::string_view path, std::string_view tag); + // scrub control commands + void command_scrub_abort(Formatter *f, Context *on_finish); + void command_scrub_pause(Formatter *f, Context *on_finish); + void command_scrub_resume(Formatter *f); + void command_scrub_status(Formatter *f); + void command_flush_path(Formatter *f, std::string_view path); void command_flush_journal(Formatter *f); void command_get_subtrees(Formatter *f); @@ -481,8 +494,6 @@ class MDSRank { void command_openfiles_ls(Formatter *f); void command_dump_tree(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f); void command_dump_inode(Formatter *f, const cmdmap_t &cmdmap, std::ostream &ss); - - void cache_drop_send_reply(Formatter *f, C_MDS_Send_Command_Reply *reply, int r); void command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish); protected: @@ -559,6 +570,9 @@ class MDSRank { void set_mdsmap_multimds_snaps_allowed(); private: mono_time starttime = mono_clock::zero(); + +protected: + Context *create_async_exec_context(C_ExecAndReply *ctx); }; /* This expects to be given a reference which it is responsible for. diff --git a/src/mds/ScrubHeader.h b/src/mds/ScrubHeader.h index 02acc84fb5207..f49598d85e06c 100644 --- a/src/mds/ScrubHeader.h +++ b/src/mds/ScrubHeader.h @@ -26,10 +26,10 @@ class CInode; */ class ScrubHeader { public: - ScrubHeader(std::string_view tag_, bool force_, bool recursive_, - bool repair_, Formatter *f_) - : tag(tag_), force(force_), recursive(recursive_), repair(repair_), - formatter(f_), origin(nullptr) + ScrubHeader(std::string_view tag_, bool is_tag_internal_, bool force_, + bool recursive_, bool repair_, Formatter *f_) + : tag(tag_), is_tag_internal(is_tag_internal_), force(force_), + recursive(recursive_), repair(repair_), formatter(f_), origin(nullptr) { ceph_assert(formatter != nullptr); } @@ -41,6 +41,7 @@ class ScrubHeader { bool get_recursive() const { return recursive; } bool get_repair() const { return repair; } bool get_force() const { return force; } + bool is_internal_tag() const { return is_tag_internal; } CInode *get_origin() const { return origin; } std::string_view get_tag() const { return tag; } Formatter &get_formatter() const { return *formatter; } @@ -50,6 +51,7 @@ class ScrubHeader { protected: const std::string tag; + bool is_tag_internal; const bool force; const bool recursive; const bool repair; diff --git a/src/mds/ScrubStack.cc b/src/mds/ScrubStack.cc index 2b864e5e53607..0d72dba810080 100644 --- a/src/mds/ScrubStack.cc +++ b/src/mds/ScrubStack.cc @@ -28,6 +28,27 @@ static ostream& _prefix(std::ostream *_dout, MDSRank *mds) { return *_dout << "mds." << mds->get_nodeid() << ".scrubstack "; } +std::ostream &operator<<(std::ostream &os, const ScrubStack::State &state) { + switch(state) { + case ScrubStack::STATE_RUNNING: + os << "RUNNING"; + break; + case ScrubStack::STATE_IDLE: + os << "IDLE"; + break; + case ScrubStack::STATE_PAUSING: + os << "PAUSING"; + break; + case ScrubStack::STATE_PAUSED: + os << "PAUSED"; + break; + default: + ceph_abort(); + } + + return os; +} + void ScrubStack::push_inode(CInode *in) { dout(20) << "pushing " << *in << " on top of ScrubStack" << dendl; @@ -75,18 +96,59 @@ void ScrubStack::_enqueue_inode(CInode *in, CDentry *parent, void ScrubStack::enqueue_inode(CInode *in, ScrubHeaderRef& header, MDSInternalContextBase *on_finish, bool top) { + // abort in progress + if (clear_inode_stack) { + on_finish->complete(-EAGAIN); + return; + } + _enqueue_inode(in, NULL, header, on_finish, top); kick_off_scrubs(); } void ScrubStack::kick_off_scrubs() { + ceph_assert(mdcache->mds->mds_lock.is_locked()); + dout(20) << __func__ << ": state=" << state << dendl; + + if (clear_inode_stack || state == STATE_PAUSING || state == STATE_PAUSED) { + if (scrubs_in_progress == 0) { + dout(10) << __func__ << ": in progress scrub operations finished, " + << stack_size << " in the stack" << dendl; + + State final_state = state; + if (clear_inode_stack) { + abort_pending_scrubs(); + final_state = STATE_IDLE; + } + if (state == STATE_PAUSING) { + final_state = STATE_PAUSED; + } + + set_state(final_state); + complete_control_contexts(0); + } + + return; + } + dout(20) << __func__ << " entering with " << scrubs_in_progress << " in " "progress and " << stack_size << " in the stack" << dendl; bool can_continue = true; elist::iterator i = inode_stack.begin(); while (g_conf()->mds_max_scrub_ops_in_progress > scrubs_in_progress && - can_continue && !i.end()) { + can_continue) { + if (i.end()) { + if (scrubs_in_progress == 0) { + set_state(STATE_IDLE); + } + + return; + } + + assert(state == STATE_RUNNING || state == STATE_IDLE); + set_state(STATE_RUNNING); + CInode *curi = *i; ++i; // we have our reference, push iterator forward @@ -418,13 +480,16 @@ void ScrubStack::_validate_inode_done(CInode *in, int r, MDSInternalContextBase *c = NULL; in->scrub_finished(&c); - if (!header->get_recursive() && in == header->get_origin()) { - if (r >= 0) { // we got into the scrubbing dump it - result.dump(&(header->get_formatter())); - } else { // we failed the lookup or something; dump ourselves - header->get_formatter().open_object_section("results"); - header->get_formatter().dump_int("return_code", r); - header->get_formatter().close_section(); // results + if (in == header->get_origin()) { + scrub_origins.erase(in); + if (!header->get_recursive()) { + if (r >= 0) { // we got into the scrubbing dump it + result.dump(&(header->get_formatter())); + } else { // we failed the lookup or something; dump ourselves + header->get_formatter().open_object_section("results"); + header->get_formatter().dump_int("return_code", r); + header->get_formatter().close_section(); // results + } } } if (c) { @@ -434,3 +499,191 @@ void ScrubStack::_validate_inode_done(CInode *in, int r, ScrubStack::C_KickOffScrubs::C_KickOffScrubs(MDCache *mdcache, ScrubStack *s) : MDSInternalContext(mdcache->mds), stack(s) { } + +void ScrubStack::complete_control_contexts(int r) { + ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + + for (auto &ctx : control_ctxs) { + ctx->complete(r); + } + control_ctxs.clear(); +} + +void ScrubStack::set_state(State next_state) { + if (state != next_state) { + dout(20) << __func__ << ", from state=" << state << ", to state=" + << next_state << dendl; + state = next_state; + } +} + +bool ScrubStack::scrub_in_transition_state() { + ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + dout(20) << __func__ << ": state=" << state << dendl; + + // STATE_RUNNING is considered as a transition state so as to + // "delay" the scrub control operation. + if (state == STATE_RUNNING || state == STATE_PAUSING) { + return true; + } + + return false; +} + +void ScrubStack::scrub_status(Formatter *f) { + ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + + f->open_object_section("result"); + + std::stringstream ss; + bool have_more = false; + + if (state == STATE_IDLE) { + ss << "no active scrubs running"; + } else if (state == STATE_RUNNING) { + if (clear_inode_stack) { + ss << "ABORTING"; + } else { + ss << "scrub active"; + } + ss << " (" << stack_size << " inodes in the stack)"; + } else { + if (state == STATE_PAUSING || state == STATE_PAUSED) { + have_more = true; + ss << state; + } + if (clear_inode_stack) { + if (have_more) { + ss << "+"; + } + ss << "ABORTING"; + } + + ss << " (" << stack_size << " inodes in the stack)"; + } + f->dump_string("status", ss.str()); + + f->open_object_section("scrubs"); + for (auto &inode : scrub_origins) { + have_more = false; + ScrubHeaderRefConst header = inode->get_scrub_header(); + + std::string tag(header->get_tag()); + f->open_object_section(tag.c_str()); // scrub id + + std::string path; + inode->make_path_string(path, true); + f->dump_string("path", path.empty() ? "/" : path.c_str()); + + std::stringstream optss; + if (header->get_recursive()) { + optss << "recursive"; + have_more = true; + } + if (header->get_repair()) { + if (have_more) { + optss << ","; + } + optss << "repair"; + have_more = true; + } + if (header->get_force()) { + if (have_more) { + optss << ","; + } + optss << "force"; + } + + f->dump_string("options", optss.str()); + f->close_section(); // scrub id + } + f->close_section(); // scrubs + f->close_section(); // result +} + +void ScrubStack::abort_pending_scrubs() { + ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + ceph_assert(clear_inode_stack); + + for (auto inode = inode_stack.begin(); !inode.end(); ++inode) { + CInode *in = *inode; + if (in == in->scrub_info()->header->get_origin()) { + scrub_origins.erase(in); + } + + MDSInternalContextBase *ctx = nullptr; + in->scrub_aborted(&ctx); + if (ctx != nullptr) { + ctx->complete(-ECANCELED); + } + } + + stack_size = 0; + inode_stack.clear(); + clear_inode_stack = false; +} + +void ScrubStack::scrub_abort(Context *on_finish) { + ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + ceph_assert(on_finish != nullptr); + + dout(10) << __func__ << ": aborting with " << scrubs_in_progress + << " scrubs in progress and " << stack_size << " in the" + << " stack" << dendl; + + clear_inode_stack = true; + if (scrub_in_transition_state()) { + control_ctxs.push_back(on_finish); + return; + } + + abort_pending_scrubs(); + if (state != STATE_PAUSED) { + set_state(STATE_IDLE); + } + on_finish->complete(0); +} + +void ScrubStack::scrub_pause(Context *on_finish) { + ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + ceph_assert(on_finish != nullptr); + + dout(10) << __func__ << ": pausing with " << scrubs_in_progress + << " scrubs in progress and " << stack_size << " in the" + << " stack" << dendl; + + // abort is in progress + if (clear_inode_stack) { + on_finish->complete(-EINVAL); + return; + } + + bool done = scrub_in_transition_state(); + if (done) { + set_state(STATE_PAUSING); + control_ctxs.push_back(on_finish); + return; + } + + set_state(STATE_PAUSED); + on_finish->complete(0); +} + +bool ScrubStack::scrub_resume() { + ceph_assert(mdcache->mds->mds_lock.is_locked_by_me()); + dout(20) << __func__ << ": state=" << state << dendl; + + int r = 0; + + if (clear_inode_stack) { + r = -EINVAL; + } else if (state == STATE_PAUSING) { + set_state(STATE_RUNNING); + complete_control_contexts(-ECANCELED); + } else if (state == STATE_PAUSED) { + set_state(STATE_RUNNING); + kick_off_scrubs(); + } + + return r; +} diff --git a/src/mds/ScrubStack.h b/src/mds/ScrubStack.h index 86e0b28e7d467..e01cebfa484e8 100644 --- a/src/mds/ScrubStack.h +++ b/src/mds/ScrubStack.h @@ -44,6 +44,10 @@ class ScrubStack { C_KickOffScrubs(MDCache *mdcache, ScrubStack *s); void finish(int r) override { } void complete(int r) override { + if (r == -ECANCELED) { + return; + } + stack->scrubs_in_progress--; stack->kick_off_scrubs(); // don't delete self @@ -76,6 +80,7 @@ class ScrubStack { void enqueue_inode_top(CInode *in, ScrubHeaderRef& header, MDSInternalContextBase *on_finish) { enqueue_inode(in, header, on_finish, true); + scrub_origins.emplace(in); } /** Like enqueue_inode_top, but we wait for all pending scrubs before * starting this one. @@ -83,9 +88,66 @@ class ScrubStack { void enqueue_inode_bottom(CInode *in, ScrubHeaderRef& header, MDSInternalContextBase *on_finish) { enqueue_inode(in, header, on_finish, false); + scrub_origins.emplace(in); } + /** + * Abort an ongoing scrub operation. The abort operation could be + * delayed if there are in-progress scrub operations on going. The + * caller should provide a context which is completed after all + * in-progress scrub operations are completed and pending inodes + * are removed from the scrub stack (with the context callbacks for + * inodes completed with -ECANCELED). + * @param on_finish Context callback to invoke after abort + */ + void scrub_abort(Context *on_finish); + + /** + * Pause scrub operations. Similar to abort, pause is delayed if + * there are in-progress scrub operations on going. The caller + * should provide a context which is completed after all in-progress + * scrub operations are completed. Subsequent scrub operations are + * queued until scrub is resumed. + * @param on_finish Context callback to invoke after pause + */ + void scrub_pause(Context *on_finish); + + /** + * Resume a paused scrub. Unlike abort or pause, this is instantaneous. + * Pending pause operations are cancelled (context callbacks are + * invoked with -ECANCELED). + * @returns 0 (success) if resumed, -EINVAL if an abort is in-progress. + */ + bool scrub_resume(); + + /** + * Get the current scrub status as human readable string. Some basic + * information is returned such as number of inodes pending abort/pause. + */ + void scrub_status(Formatter *f); + private: + // scrub abort is _not_ a state, rather it's an operation that's + // performed after in-progress scrubs are finished. + enum State { + STATE_RUNNING = 0, + STATE_IDLE, + STATE_PAUSING, + STATE_PAUSED, + }; + friend std::ostream &operator<<(std::ostream &os, const State &state); + + State state = STATE_IDLE; + bool clear_inode_stack = false; + + // list of pending context completions for asynchronous scrub + // control operations. + std::list control_ctxs; + + // list of inodes for which scrub operations are running -- used + // to diplay out in `scrub status`. + std::set scrub_origins; + /** * Put the inode at either the top or bottom of the stack, with * the given scrub params, and then try and kick off more scrubbing. @@ -185,6 +247,28 @@ class ScrubStack { */ bool get_next_cdir(CInode *in, CDir **new_dir); + /** + * Set scrub state + * @param next_state State to move the scrub to. + */ + void set_state(State next_state); + + /** + * Is scrub in one of transition states (running, pausing) + */ + bool scrub_in_transition_state(); + + /** + * complete queued up contexts + * @param r return value to complete contexts. + */ + void complete_control_contexts(int r); + + /** + * Abort pending scrubs for inodes waiting in the inode stack. + * Completion context is complete with -ECANCELED. + */ + void abort_pending_scrubs(); }; #endif /* SCRUBSTACK_H_ */