Skip to content

Commit

Permalink
Merge PR ceph#24635 into master
Browse files Browse the repository at this point in the history
* refs/pull/24635/head:
	test: add scrub control command tests
	mds: scrub abort/pause/resume/status control commands
	test: switch using "scrub start" tell interface to initiate scrub
	mds: introduce "scrub start" tell interface to initiate scrub
	mds: dump scrub formatted output when context completion
	mds: generate random scrub tag when empty
	mds: introduce C_ExecAndReply context completion class
	mds: use CInode::scrub_is_in_progress() wherever required

Reviewed-by: Patrick Donnelly <[email protected]>
Reviewed-by: Zheng Yan <[email protected]>
  • Loading branch information
batrick committed Jan 3, 2019
2 parents 3dda250 + 869b13c commit ee7d253
Show file tree
Hide file tree
Showing 17 changed files with 726 additions and 101 deletions.
2 changes: 1 addition & 1 deletion doc/cephfs/disaster-recovery-experts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,4 @@ run a forward scrub to repair them. Ensure you have an MDS running and issue:

::

ceph daemon mds.a scrub_path / recursive repair
ceph tell mds.a scrub start / recursive repair
4 changes: 4 additions & 0 deletions qa/tasks/cephfs/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,10 @@ def rank_asok(self, command, rank=0, status=None, timeout=None):
info = self.get_rank(rank=rank, status=status)
return self.json_asok(command, 'mds', info['name'], timeout=timeout)

def rank_tell(self, command, rank=0, status=None):
info = self.get_rank(rank=rank, status=status)
return json.loads(self.mon_manager.raw_cluster_cmd("tell", 'mds.{0}'.format(info['name']), *command))

def read_cache(self, path, depth=None):
cmd = ["dump", "tree", path]
if depth is not None:
Expand Down
2 changes: 1 addition & 1 deletion qa/tasks/cephfs/test_damage.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def test_damaged_dentry(self):
self.mount_a.umount_wait()

# Now repair the stats
scrub_json = self.fs.mds_asok(["scrub_path", "/subdir", "repair"])
scrub_json = self.fs.rank_tell(["scrub", "start", "/subdir", "repair"])
log.info(json.dumps(scrub_json, indent=2))

self.assertEqual(scrub_json["passed_validation"], False)
Expand Down
6 changes: 3 additions & 3 deletions qa/tasks/cephfs/test_forward_scrub.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def test_inotable_sync(self):
self.mount_a.umount_wait()

with self.assert_cluster_log("inode table repaired", invert_match=True):
out_json = self.fs.mds_asok(["scrub_path", "/", "repair", "recursive"])
out_json = self.fs.rank_tell(["scrub", "start", "/", "repair", "recursive"])
self.assertNotEqual(out_json, None)

self.mds_cluster.mds_stop()
Expand All @@ -255,7 +255,7 @@ def test_inotable_sync(self):
self.fs.wait_for_daemons()

with self.assert_cluster_log("inode table repaired"):
out_json = self.fs.mds_asok(["scrub_path", "/", "repair", "recursive"])
out_json = self.fs.rank_tell(["scrub", "start", "/", "repair", "recursive"])
self.assertNotEqual(out_json, None)

self.mds_cluster.mds_stop()
Expand Down Expand Up @@ -286,7 +286,7 @@ def test_backtrace_repair(self):
"oh i'm sorry did i overwrite your xattr?")

with self.assert_cluster_log("bad backtrace on inode"):
out_json = self.fs.mds_asok(["scrub_path", "/", "repair", "recursive"])
out_json = self.fs.rank_tell(["scrub", "start", "/", "repair", "recursive"])
self.assertNotEqual(out_json, None)
self.fs.mds_asok(["flush", "journal"])
backtrace = self.fs.read_backtrace(file_ino)
Expand Down
2 changes: 1 addition & 1 deletion qa/tasks/cephfs/test_recovery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def _rebuild_metadata(self, workload, other_pool=None, workers=1):
for rank in self.recovery_fs.get_ranks(status=status):
self.fs.mon_manager.raw_cluster_cmd('tell', "mds." + rank['name'],
'injectargs', '--debug-mds=20')
self.fs.rank_asok(['scrub_path', '/', 'recursive', 'repair'], rank=rank['rank'], status=status)
self.fs.rank_tell(['scrub', 'start', '/', 'recursive', 'repair'], rank=rank['rank'], status=status)
log.info(str(self.mds_cluster.status()))

# Mount a client
Expand Down
4 changes: 2 additions & 2 deletions qa/tasks/cephfs/test_scrub.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def damage(self):
self._filesystem.wait_for_daemons()

def validate(self):
out_json = self._filesystem.mds_asok(["scrub_path", "/", "recursive", "repair"])
out_json = self._filesystem.rank_tell(["scrub", "start", "/", "recursive", "repair"])
self.assertNotEqual(out_json, None)
self.assertTrue(self._filesystem.are_daemons_healthy())
return self._errors
Expand All @@ -129,7 +129,7 @@ def _scrub(self, workload, workers=1):
# Apply any data damage the workload wants
workload.damage()

out_json = self.fs.mds_asok(["scrub_path", "/", "recursive", "repair"])
out_json = self.fs.rank_tell(["scrub", "start", "/", "recursive", "repair"])
self.assertNotEqual(out_json, None)

# See that the files are present and correct
Expand Down
137 changes: 122 additions & 15 deletions qa/tasks/cephfs/test_scrub_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,98 @@

log = logging.getLogger(__name__)

class TestScrubControls(CephFSTestCase):
"""
Test basic scrub control operations such as abort, pause and resume.
"""

MDSS_REQUIRED = 1
CLIENTS_REQUIRED = 1

def _abort_scrub(self, expected):
res = self.fs.rank_tell(["scrub", "abort"])
self.assertEqual(res['return_code'], expected)
def _pause_scrub(self, expected):
res = self.fs.rank_tell(["scrub", "pause"])
self.assertEqual(res['return_code'], expected)
def _resume_scrub(self, expected):
res = self.fs.rank_tell(["scrub", "resume"])
self.assertEqual(res['return_code'], expected)
def _get_scrub_status(self):
return self.fs.rank_tell(["scrub", "status"])

def test_scrub_abort(self):
test_dir = "scrub_control_test_path"
abs_test_path = "/{0}".format(test_dir)

log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
client_path = os.path.join(self.mount_a.mountpoint, test_dir)
log.info("client_path: {0}".format(client_path))

log.info("Cloning repo into place")
repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path)

out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
self.assertNotEqual(out_json, None)

# abort and verify
self._abort_scrub(0)
out_json = self._get_scrub_status()
self.assertTrue("no active" in out_json['status'])

def test_scrub_pause_and_resume(self):
test_dir = "scrub_control_test_path"
abs_test_path = "/{0}".format(test_dir)

log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
client_path = os.path.join(self.mount_a.mountpoint, test_dir)
log.info("client_path: {0}".format(client_path))

log.info("Cloning repo into place")
repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path)

out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
self.assertNotEqual(out_json, None)

# pause and verify
self._pause_scrub(0)
out_json = self._get_scrub_status()
self.assertTrue("PAUSED" in out_json['status'])

# resume and verify
self._resume_scrub(0)
out_json = self._get_scrub_status()
self.assertFalse("PAUSED" in out_json['status'])

def test_scrub_pause_and_resume_with_abort(self):
test_dir = "scrub_control_test_path"
abs_test_path = "/{0}".format(test_dir)

log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
client_path = os.path.join(self.mount_a.mountpoint, test_dir)
log.info("client_path: {0}".format(client_path))

log.info("Cloning repo into place")
repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path)

out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
self.assertNotEqual(out_json, None)

# pause and verify
self._pause_scrub(0)
out_json = self._get_scrub_status()
self.assertTrue("PAUSED" in out_json['status'])

# abort and verify
self._abort_scrub(0)
out_json = self._get_scrub_status()
self.assertTrue("PAUSED" in out_json['status'])
self.assertTrue("0 inodes" in out_json['status'])

# resume and verify
self._resume_scrub(0)
out_json = self._get_scrub_status()
self.assertTrue("no active" in out_json['status'])

class TestScrubChecks(CephFSTestCase):
"""
Expand Down Expand Up @@ -50,7 +142,7 @@ def _checks(self, run_seq):
log.info("client_path: {0}".format(client_path))

log.info("Cloning repo into place")
repo_path = self.clone_repo(self.mount_a, client_path)
repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path)

log.info("Initiating mds_scrub_checks on mds.{id_}, " +
"test_path {path}, run_seq {seq}".format(
Expand All @@ -63,7 +155,7 @@ def _checks(self, run_seq):
nep = "{test_path}/i/dont/exist".format(test_path=abs_test_path)
self.asok_command(mds_rank, "flush_path {nep}".format(nep=nep),
lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
self.asok_command(mds_rank, "scrub_path {nep}".format(nep=nep),
self.tell_command(mds_rank, "scrub start {nep}".format(nep=nep),
lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))

test_repo_path = "{test_path}/ceph-qa-suite".format(test_path=abs_test_path)
Expand All @@ -73,31 +165,31 @@ def _checks(self, run_seq):
log.info("First run: flushing {dirpath}".format(dirpath=dirpath))
command = "flush_path {dirpath}".format(dirpath=dirpath)
self.asok_command(mds_rank, command, success_validator)
command = "scrub_path {dirpath}".format(dirpath=dirpath)
self.asok_command(mds_rank, command, success_validator)
command = "scrub start {dirpath}".format(dirpath=dirpath)
self.tell_command(mds_rank, command, success_validator)

filepath = "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format(
repo_path=test_repo_path)
if run_seq == 0:
log.info("First run: flushing {filepath}".format(filepath=filepath))
command = "flush_path {filepath}".format(filepath=filepath)
self.asok_command(mds_rank, command, success_validator)
command = "scrub_path {filepath}".format(filepath=filepath)
self.asok_command(mds_rank, command, success_validator)
command = "scrub start {filepath}".format(filepath=filepath)
self.tell_command(mds_rank, command, success_validator)

filepath = "{repo_path}/suites/fs/basic/clusters/fixed-3-cephfs.yaml". \
format(repo_path=test_repo_path)
command = "scrub_path {filepath}".format(filepath=filepath)
self.asok_command(mds_rank, command,
command = "scrub start {filepath}".format(filepath=filepath)
self.tell_command(mds_rank, command,
lambda j, r: self.json_validator(j, r, "performed_validation",
False))

if run_seq == 0:
log.info("First run: flushing base dir /")
command = "flush_path /"
self.asok_command(mds_rank, command, success_validator)
command = "scrub_path /"
self.asok_command(mds_rank, command, success_validator)
command = "scrub start /"
self.tell_command(mds_rank, command, success_validator)

new_dir = "{repo_path}/new_dir_{i}".format(repo_path=repo_path, i=run_seq)
test_new_dir = "{repo_path}/new_dir_{i}".format(repo_path=test_repo_path,
Expand All @@ -118,16 +210,16 @@ def _checks(self, run_seq):
# check that scrub fails on errors
ino = self.mount_a.path_to_ino(new_file)
rados_obj_name = "{ino:x}.00000000".format(ino=ino)
command = "scrub_path {file}".format(file=test_new_file)
command = "scrub start {file}".format(file=test_new_file)

# Missing parent xattr -> ENODATA
self.fs.rados(["rmxattr", rados_obj_name, "parent"], pool=self.fs.get_data_pool_name())
self.asok_command(mds_rank, command,
self.tell_command(mds_rank, command,
lambda j, r: self.json_validator(j, r, "return_code", -errno.ENODATA))

# Missing object -> ENOENT
self.fs.rados(["rm", rados_obj_name], pool=self.fs.get_data_pool_name())
self.asok_command(mds_rank, command,
self.tell_command(mds_rank, command,
lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))

command = "flush_path /"
Expand Down Expand Up @@ -162,7 +254,7 @@ def test_scrub_repair(self):
self.mount_a.run_shell(["sudo", "rmdir", test_dir])
self.assertEqual(ar.exception.exitstatus, 1)

self.asok_command(mds_rank, "scrub_path /{0} repair".format(test_dir),
self.tell_command(mds_rank, "scrub start /{0} repair".format(test_dir),
lambda j, r: self.json_validator(j, r, "return_code", 0))

# wait a few second for background repair
Expand All @@ -181,6 +273,20 @@ def json_validator(json_out, rc, element, expected_value):
jv=element_value, ev=expected_value)
return True, "Succeeded"

def tell_command(self, mds_rank, command, validator):
log.info("Running command '{command}'".format(command=command))

command_list = command.split()
jout = self.fs.rank_tell(command_list, mds_rank)

log.info("command '{command}' returned '{jout}'".format(
command=command, jout=jout))

success, errstring = validator(jout, 0)
if not success:
raise AsokCommandFailedError(command, rout, jout, errstring)
return jout

def asok_command(self, mds_rank, command, validator):
log.info("Running command '{command}'".format(command=command))

Expand Down Expand Up @@ -209,7 +315,8 @@ def asok_command(self, mds_rank, command, validator):

return jout

def clone_repo(self, client_mount, path):
@staticmethod
def clone_repo(client_mount, path):
repo = "ceph-qa-suite"
repo_path = os.path.join(path, repo)
client_mount.run_shell(["mkdir", "-p", path])
Expand Down
4 changes: 2 additions & 2 deletions qa/tasks/cephfs_upgrade_snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ def task(ctx, config):
mds_map = fs.get_mds_map()
assert(mds_map['max_mds'] == 1)

json = fs.rank_asok(["scrub_path", "/", "force", "recursive", "repair"])
json = fs.rank_tell(["scrub", "start", "/", "force", "recursive", "repair"])
if not json or json['return_code'] == 0:
log.info("scrub / completed")
else:
log.info("scrub / failed: {}".format(json))

json = fs.rank_asok(["scrub_path", "~mdsdir", "force", "recursive", "repair"])
json = fs.rank_tell(["scrub", "start", "~mdsdir", "force", "recursive", "repair"])
if not json or json['return_code'] == 0:
log.info("scrub ~mdsdir completed")
else:
Expand Down
36 changes: 26 additions & 10 deletions src/mds/CInode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4110,7 +4110,8 @@ void CInode::validate_disk_state(CInode::validated_data *results,
/**
* Fetch backtrace and set tag if tag is non-empty
*/
void fetch_backtrace_and_tag(CInode *in, std::string_view tag,
void fetch_backtrace_and_tag(CInode *in,
std::string_view tag, bool is_internal,
Context *fin, int *bt_r, bufferlist *bt)
{
const int64_t pool = in->get_backtrace_pool();
Expand All @@ -4121,8 +4122,8 @@ void CInode::validate_disk_state(CInode::validated_data *results,
in->mdcache->mds->objecter->read(oid, object_locator_t(pool), fetch, CEPH_NOSNAP,
NULL, 0, fin);
using ceph::encode;
if (!tag.empty()) {
ObjectOperation scrub_tag;
if (!is_internal) {
ObjectOperation scrub_tag;
bufferlist tag_bl;
encode(tag, tag_bl);
scrub_tag.setxattr("scrub_tag", tag_bl);
Expand Down Expand Up @@ -4153,10 +4154,11 @@ void CInode::validate_disk_state(CInode::validated_data *results,
in->mdcache->mds->finisher);

std::string_view tag = in->scrub_infop->header->get_tag();
bool is_internal = in->scrub_infop->header->is_internal_tag();
// Rather than using the usual CInode::fetch_backtrace,
// use a special variant that optionally writes a tag in the same
// operation.
fetch_backtrace_and_tag(in, tag, conf, &results->backtrace.ondisk_read_retval, &bl);
fetch_backtrace_and_tag(in, tag, is_internal, conf, &results->backtrace.ondisk_read_retval, &bl);
return false;
}

Expand Down Expand Up @@ -4768,6 +4770,24 @@ void CInode::scrub_dirfrag_finished(frag_t dirfrag)
si.last_scrub_version = si.scrub_start_version;
}

void CInode::scrub_aborted(MDSInternalContextBase **c) {
dout(20) << __func__ << dendl;
ceph_assert(scrub_is_in_progress());

*c = nullptr;
std::swap(*c, scrub_infop->on_finish);

if (scrub_infop->scrub_parent) {
CDentry *dn = scrub_infop->scrub_parent;
scrub_infop->scrub_parent = NULL;
dn->dir->scrub_dentry_finished(dn);
dn->put(CDentry::PIN_SCRUBPARENT);
}

delete scrub_infop;
scrub_infop = nullptr;
}

void CInode::scrub_finished(MDSInternalContextBase **c) {
dout(20) << __func__ << dendl;
ceph_assert(scrub_is_in_progress());
Expand Down Expand Up @@ -4800,12 +4820,8 @@ void CInode::scrub_finished(MDSInternalContextBase **c) {
if (scrub_infop->header->get_origin() == this) {
// We are at the point that a tagging scrub was initiated
LogChannelRef clog = mdcache->mds->clog;
if (scrub_infop->header->get_tag().empty()) {
clog->info() << "scrub complete";
} else {
clog->info() << "scrub complete with tag '"
<< scrub_infop->header->get_tag() << "'";
}
clog->info() << "scrub complete with tag '"
<< scrub_infop->header->get_tag() << "'";
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/mds/CInode.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ class CInode : public MDSCacheObject, public InodeStoreBase, public Counter<CIno
* be complete()ed.
*/
void scrub_finished(MDSInternalContextBase **c);

void scrub_aborted(MDSInternalContextBase **c);

/**
* Report to the CInode that alldirfrags it owns have been scrubbed.
*/
Expand Down
Loading

0 comments on commit ee7d253

Please sign in to comment.