Skip to content

Commit

Permalink
mgr/cephadm: lock multithreaded access to OSDRemovalQueue
Browse files Browse the repository at this point in the history
Since the set can be changed also from the CLI thread

Fixes: https://tracker.ceph.com/issues/47700

Signed-off-by: Sebastian Wagner <[email protected]>
  • Loading branch information
sebastian-philipp committed Jan 14, 2021
1 parent 1f39532 commit db7c6a5
Showing 1 changed file with 53 additions and 32 deletions.
85 changes: 53 additions & 32 deletions src/pybind/mgr/cephadm/services/osd.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
from threading import Lock
from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional, TYPE_CHECKING

from ceph.deployment import translate
Expand Down Expand Up @@ -602,28 +603,36 @@ def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.osds: Set[OSD] = set()
self.rm_util = RemoveUtil(mgr)

# locks multithreaded access to self.osds. Please avoid locking
# network calls, like mon commands.
self.lock = Lock()

def process_removal_queue(self) -> None:
"""
Performs actions in the _serve() loop to remove an OSD
when criteria is met.
we can't hold self.lock, as we're calling _remove_daemon in the loop
"""

# make sure that we don't run on OSDs that are not in the cluster anymore.
self.cleanup()

logger.debug(
f"{self.queue_size()} OSDs are scheduled "
f"for removal: {self.all_osds()}")

# find osds that are ok-to-stop and not yet draining
ok_to_stop_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
if ok_to_stop_osds:
# start draining those
_ = [osd.start_draining() for osd in ok_to_stop_osds]

all_osds = self.all_osds()

logger.debug(
f"{self.queue_size()} OSDs are scheduled "
f"for removal: {all_osds}")

# Check all osds for their state and take action (remove, purge etc)
new_queue: Set[OSD] = set()
for osd in self.all_osds(): # type: OSD
for osd in all_osds: # type: OSD
if not osd.force:
# skip criteria
if not osd.is_empty:
Expand Down Expand Up @@ -662,66 +671,78 @@ def process_removal_queue(self) -> None:
# self could change while this is processing (osds get added from the CLI)
# The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
# osds that were added while this method was executed'
self.osds.intersection_update(new_queue)
self.save_to_store()
with self.lock:
self.osds.intersection_update(new_queue)
self._save_to_store()

def cleanup(self) -> None:
# OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
for osd in self.not_in_cluster():
self.osds.remove(osd)
with self.lock:
for osd in self._not_in_cluster():
self.osds.remove(osd)

def save_to_store(self) -> None:
osd_queue = [osd.to_json() for osd in self.all_osds()]
def _save_to_store(self) -> None:
osd_queue = [osd.to_json() for osd in self.osds]
logger.debug(f"Saving {osd_queue} to store")
self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))

def load_from_store(self) -> None:
for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
for osd in json.loads(v):
logger.debug(f"Loading osd ->{osd} from store")
osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
if osd_obj is not None:
self.osds.add(osd_obj)
with self.lock:
for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
for osd in json.loads(v):
logger.debug(f"Loading osd ->{osd} from store")
osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
if osd_obj is not None:
self.osds.add(osd_obj)

def as_osd_ids(self) -> List[int]:
return [osd.osd_id for osd in self.osds]
with self.lock:
return [osd.osd_id for osd in self.osds]

def queue_size(self) -> int:
return len(self.osds)
with self.lock:
return len(self.osds)

def draining_osds(self) -> List["OSD"]:
return [osd for osd in self.osds if osd.is_draining]
with self.lock:
return [osd for osd in self.osds if osd.is_draining]

def idling_osds(self) -> List["OSD"]:
return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
with self.lock:
return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]

def empty_osds(self) -> List["OSD"]:
return [osd for osd in self.osds if osd.is_empty]
with self.lock:
return [osd for osd in self.osds if osd.is_empty]

def all_osds(self) -> List["OSD"]:
return [osd for osd in self.osds]
with self.lock:
return [osd for osd in self.osds]

def not_in_cluster(self) -> List["OSD"]:
def _not_in_cluster(self) -> List["OSD"]:
return [osd for osd in self.osds if not osd.exists]

def enqueue(self, osd: "OSD") -> None:
if not osd.exists:
raise NotFoundError()
self.osds.add(osd)
with self.lock:
self.osds.add(osd)
osd.start()

def rm(self, osd: "OSD") -> None:
if not osd.exists:
raise NotFoundError()
osd.stop()
try:
logger.debug(f'Removing {osd} from the queue.')
self.osds.remove(osd)
except KeyError:
logger.debug(f"Could not find {osd} in queue.")
raise KeyError
with self.lock:
try:
logger.debug(f'Removing {osd} from the queue.')
self.osds.remove(osd)
except KeyError:
logger.debug(f"Could not find {osd} in queue.")
raise KeyError

def __eq__(self, other: Any) -> bool:
if not isinstance(other, OSDRemovalQueue):
return False
return self.osds == other.osds
with self.lock:
return self.osds == other.osds

0 comments on commit db7c6a5

Please sign in to comment.