Skip to content

Commit

Permalink
mgr/cephadm: move process_removal_queue into OSDRemovalQueue
Browse files Browse the repository at this point in the history
`process_removal_queue` belongs to OSDRemovalQueue
instead of RemoveUtil

Signed-off-by: Sebastian Wagner <[email protected]>
  • Loading branch information
sebastian-philipp committed Jan 14, 2021
1 parent 086afa9 commit af52ba4
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 99 deletions.
8 changes: 4 additions & 4 deletions src/pybind/mgr/cephadm/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ def __init__(self, *args: Any, **kwargs: Any):
self.cache.load()

self.rm_util = RemoveUtil(self)
self.to_remove_osds = OSDRemovalQueue()
self.rm_util.load_from_store()
self.to_remove_osds = OSDRemovalQueue(self)
self.to_remove_osds.load_from_store()

self.spec_store = SpecStore(self)
self.spec_store.load()
Expand Down Expand Up @@ -527,7 +527,7 @@ def _trigger_osd_removal(self) -> None:
self.log.debug(f"Found empty osd. Starting removal process")
# if the osd that is now empty is also part of the removal queue
# start the process
self.rm_util.process_removal_queue()
self._kick_serve_loop()

def pause(self) -> None:
if not self.paused:
Expand Down Expand Up @@ -1178,7 +1178,7 @@ def _add_host(self, spec):
addr=spec.addr,
error_ok=True, no_fsid=True)
if code:
# err will contain stdout and stderr, so we filter on the message text to
# err will contain stdout and stderr, so we filter on the message text to
# only show the errors
errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')]
raise OrchestratorError('New host %s (%s) failed check(s): %s' % (
Expand Down
2 changes: 1 addition & 1 deletion src/pybind/mgr/cephadm/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def serve(self) -> None:
self._update_paused_health()

if not self.mgr.paused:
self.mgr.rm_util.process_removal_queue()
self.mgr.to_remove_osds.process_removal_queue()

self.mgr.migration.migrate()
if self.mgr.migration.is_migration_ongoing():
Expand Down
171 changes: 85 additions & 86 deletions src/pybind/mgr/cephadm/services/osd.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,76 +306,6 @@ class RemoveUtil(object):
def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr: "CephadmOrchestrator" = mgr

def process_removal_queue(self) -> None:
"""
Performs actions in the _serve() loop to remove an OSD
when criteria is met.
"""

# make sure that we don't run on OSDs that are not in the cluster anymore.
self.cleanup()

logger.debug(
f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled "
f"for removal: {self.mgr.to_remove_osds.all_osds()}")

# find osds that are ok-to-stop and not yet draining
ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds())
if ok_to_stop_osds:
# start draining those
_ = [osd.start_draining() for osd in ok_to_stop_osds]

# Check all osds for their state and take action (remove, purge etc)
to_remove_osds = self.mgr.to_remove_osds.all_osds()
new_queue: Set[OSD] = set()
for osd in to_remove_osds: # type: OSD
if not osd.force:
# skip criteria
if not osd.is_empty:
logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
new_queue.add(osd)
continue

if not osd.safe_to_destroy():
logger.info(
f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
new_queue.add(osd)
continue

# abort criteria
if not osd.down():
# also remove it from the remove_osd list and set a health_check warning?
raise orchestrator.OrchestratorError(
f"Could not set OSD <{osd.osd_id}> to 'down'")

if osd.replace:
if not osd.destroy():
raise orchestrator.OrchestratorError(
f"Could not destroy OSD <{osd.osd_id}>")
else:
if not osd.purge():
raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")

if not osd.exists:
continue
assert osd.fullname is not None
assert osd.hostname is not None
CephadmServe(self.mgr)._remove_daemon(osd.fullname, osd.hostname)
logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
logger.debug(f"Removing {osd.osd_id} from the queue.")

# self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI)
# The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
# osds that were added while this method was executed'
self.mgr.to_remove_osds.intersection_update(new_queue)
self.save_to_store()

def cleanup(self) -> None:
# OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster()
for osd in not_in_cluster_osds:
self.mgr.to_remove_osds.remove(osd)

def get_osds_in_cluster(self) -> List[str]:
osd_map = self.mgr.get_osdmap()
return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
Expand Down Expand Up @@ -483,19 +413,6 @@ def _run_mon_cmd(self, cmd_args: dict) -> bool:
self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
return True

def save_to_store(self) -> None:
osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()]
logger.debug(f"Saving {osd_queue} to store")
self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))

def load_from_store(self) -> None:
for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
for osd in json.loads(v):
logger.debug(f"Loading osd ->{osd} from store")
osd_obj = OSD.from_json(osd, ctx=self)
if osd_obj is not None:
self.mgr.to_remove_osds.add(osd_obj)


class NotFoundError(Exception):
pass
Expand Down Expand Up @@ -654,13 +571,13 @@ def to_json(self) -> dict:
return out

@classmethod
def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]:
def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
if not inp:
return None
for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
if inp.get(date_field):
inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
inp.update({'remove_util': ctx})
inp.update({'remove_util': rm_util})
if 'nodename' in inp:
hostname = inp.pop('nodename')
inp['hostname'] = hostname
Expand All @@ -680,9 +597,91 @@ def __repr__(self) -> str:

class OSDRemovalQueue(object):

def __init__(self) -> None:
def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr: "CephadmOrchestrator" = mgr
self.osds: Set[OSD] = set()

def process_removal_queue(self) -> None:
"""
Performs actions in the _serve() loop to remove an OSD
when criteria is met.
"""

# make sure that we don't run on OSDs that are not in the cluster anymore.
self.cleanup()

logger.debug(
f"{self.queue_size()} OSDs are scheduled "
f"for removal: {self.all_osds()}")

# find osds that are ok-to-stop and not yet draining
ok_to_stop_osds = self.mgr.rm_util.find_osd_stop_threshold(self.idling_osds())
if ok_to_stop_osds:
# start draining those
_ = [osd.start_draining() for osd in ok_to_stop_osds]

# Check all osds for their state and take action (remove, purge etc)
new_queue: Set[OSD] = set()
for osd in self.all_osds(): # type: OSD
if not osd.force:
# skip criteria
if not osd.is_empty:
logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
new_queue.add(osd)
continue

if not osd.safe_to_destroy():
logger.info(
f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
new_queue.add(osd)
continue

# abort criteria
if not osd.down():
# also remove it from the remove_osd list and set a health_check warning?
raise orchestrator.OrchestratorError(
f"Could not set OSD <{osd.osd_id}> to 'down'")

if osd.replace:
if not osd.destroy():
raise orchestrator.OrchestratorError(
f"Could not destroy OSD <{osd.osd_id}>")
else:
if not osd.purge():
raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")

if not osd.exists:
continue
assert osd.fullname is not None
assert osd.hostname is not None
CephadmServe(self.mgr)._remove_daemon(osd.fullname, osd.hostname)
logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
logger.debug(f"Removing {osd.osd_id} from the queue.")

# self could change while this is processing (osds get added from the CLI)
# The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
# osds that were added while this method was executed'
self.intersection_update(new_queue)
self.save_to_store()

def cleanup(self) -> None:
# OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
for osd in self.not_in_cluster():
self.remove(osd)

def save_to_store(self) -> None:
osd_queue = [osd.to_json() for osd in self.all_osds()]
logger.debug(f"Saving {osd_queue} to store")
self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))

def load_from_store(self) -> None:
for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
for osd in json.loads(v):
logger.debug(f"Loading osd ->{osd} from store")
osd_obj = OSD.from_json(osd, rm_util=self.mgr.rm_util)
if osd_obj is not None:
self.osds.add(osd_obj)

def as_osd_ids(self) -> List[int]:
return [osd.osd_id for osd in self.osds]

Expand Down
4 changes: 2 additions & 2 deletions src/pybind/mgr/cephadm/tests/test_cephadm.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,8 @@ def test_remove_osds(self, cephadm_module):
process_started_at=datetime_now(),
remove_util=cephadm_module.rm_util
))
cephadm_module.rm_util.process_removal_queue()
assert cephadm_module.to_remove_osds == OSDRemovalQueue()
cephadm_module.to_remove_osds.process_removal_queue()
assert cephadm_module.to_remove_osds == OSDRemovalQueue(cephadm_module)

c = cephadm_module.remove_osds_status()
out = wait(cephadm_module, c)
Expand Down
12 changes: 6 additions & 6 deletions src/pybind/mgr/cephadm/tests/test_osd_removal.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ def test_load(self, cephadm_module, rm_util):
}
])
cephadm_module.set_store('osd_remove_queue', data)
cephadm_module.rm_util.load_from_store()
cephadm_module.to_remove_osds.load_from_store()

expected = OSDRemovalQueue()
expected = OSDRemovalQueue(cephadm_module)
expected.add(OSD(osd_id=35, remove_util=rm_util, draining=True))
assert cephadm_module.to_remove_osds == expected

Expand Down Expand Up @@ -218,30 +218,30 @@ def test_drain_status_human_done(self, osd_obj):
class TestOSDRemovalQueue:

def test_queue_size(self, osd_obj):
q = OSDRemovalQueue()
q = OSDRemovalQueue(mock.Mock())
assert q.queue_size() == 0
q.add(osd_obj)
assert q.queue_size() == 1

@mock.patch("cephadm.services.osd.OSD.start")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_enqueue(self, exist, start, osd_obj):
q = OSDRemovalQueue()
q = OSDRemovalQueue(mock.Mock())
q.enqueue(osd_obj)
osd_obj.start.assert_called_once()

@mock.patch("cephadm.services.osd.OSD.stop")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_rm_raise(self, exist, stop, osd_obj):
q = OSDRemovalQueue()
q = OSDRemovalQueue(mock.Mock())
with pytest.raises(KeyError):
q.rm(osd_obj)
osd_obj.stop.assert_called_once()

@mock.patch("cephadm.services.osd.OSD.stop")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_rm(self, exist, stop, osd_obj):
q = OSDRemovalQueue()
q = OSDRemovalQueue(mock.Mock())
q.add(osd_obj)
q.rm(osd_obj)
osd_obj.stop.assert_called_once()

0 comments on commit af52ba4

Please sign in to comment.