diff --git a/src/mgr/ActivePyModules.cc b/src/mgr/ActivePyModules.cc index 04fbdc8104080..f95257a172199 100644 --- a/src/mgr/ActivePyModules.cc +++ b/src/mgr/ActivePyModules.cc @@ -553,9 +553,12 @@ void ActivePyModules::notify_all(const std::string ¬ify_type, dout(10) << __func__ << ": notify_all " << notify_type << dendl; for (auto& [name, module] : modules) { + if (!py_module_registry.should_notify(name, notify_type)) { + continue; + } // Send all python calls down a Finisher to avoid blocking // C++ code, and avoid any potential lock cycles. - dout(15) << "queuing notify to " << name << dendl; + dout(15) << "queuing notify (" << notify_type << ") to " << name << dendl; // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984 finisher.queue(new LambdaContext([module=module, notify_type, notify_id] (int r){ @@ -570,6 +573,9 @@ void ActivePyModules::notify_all(const LogEntry &log_entry) dout(10) << __func__ << ": notify_all (clog)" << dendl; for (auto& [name, module] : modules) { + if (!py_module_registry.should_notify(name, "clog")) { + continue; + } // Send all python calls down a Finisher to avoid blocking // C++ code, and avoid any potential lock cycles. // diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index 7884a081aa1fd..f5c0376c7ca63 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -678,9 +678,11 @@ bool DaemonServer::handle_report(const ref_t& m) } // if there are any schema updates, notify the python modules + /* no users currently if (!m->declare_types.empty() || !m->undeclare_types.empty()) { py_modules.notify_all("perf_schema_update", ceph::to_string(key)); } + */ if (m->get_connection()->peer_is_osd()) { osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports); diff --git a/src/mgr/Mgr.cc b/src/mgr/Mgr.cc index 5bbb8892cfe93..d7aebd8fceca8 100644 --- a/src/mgr/Mgr.cc +++ b/src/mgr/Mgr.cc @@ -633,7 +633,7 @@ bool Mgr::ms_dispatch2(const ref_t& m) break; case MSG_SERVICE_MAP: handle_service_map(ref_cast(m)); - py_module_registry->notify_all("service_map", ""); + //no users: py_module_registry->notify_all("service_map", ""); break; case MSG_LOG: handle_log(ref_cast(m)); @@ -781,7 +781,7 @@ void Mgr::handle_mgr_digest(ref_t m) dout(10) << m->mon_status_json.length() << dendl; dout(10) << m->health_json.length() << dendl; cluster_state.load_digest(m.get()); - py_module_registry->notify_all("mon_status", ""); + //no users: py_module_registry->notify_all("mon_status", ""); py_module_registry->notify_all("health", ""); // Hack: use this as a tick/opportunity to prompt python-land that diff --git a/src/mgr/PyModule.cc b/src/mgr/PyModule.cc index 87100c50421ef..084cf3ffc1eac 100644 --- a/src/mgr/PyModule.cc +++ b/src/mgr/PyModule.cc @@ -398,6 +398,8 @@ int PyModule::load(PyThreadState *pMainThreadState) return r; } + load_notify_types(); + // We've imported the module and found a MgrModule subclass, at this // point the module is considered loaded. It might still not be // runnable though, can_run populated later... @@ -509,6 +511,39 @@ int PyModule::register_options(PyObject *cls) return 0; } +int PyModule::load_notify_types() +{ + PyObject *ls = PyObject_GetAttrString(pClass, "NOTIFY_TYPES"); + if (ls == nullptr) { + derr << "Module " << get_name() << " has missing NOTIFY_TYPES member" << dendl; + return -EINVAL; + } + if (!PyObject_TypeCheck(ls, &PyList_Type)) { + // Relatively easy mistake for human to make, e.g. defining COMMANDS + // as a {} instead of a [] + derr << "Module " << get_name() << " has NOTIFY_TYPES that is not a list" << dendl; + return -EINVAL; + } + + const size_t list_size = PyList_Size(ls); + for (size_t i = 0; i < list_size; ++i) { + PyObject *notify_type = PyList_GetItem(ls, i); + ceph_assert(notify_type != nullptr); + + if (!PyObject_TypeCheck(notify_type, &PyUnicode_Type)) { + derr << "Module " << get_name() << " has non-string entry in NOTIFY_TYPES list" + << dendl; + return -EINVAL; + } + + notify_types.insert(PyUnicode_AsUTF8(notify_type)); + } + Py_DECREF(ls); + dout(10) << "Module " << get_name() << " notify_types " << notify_types << dendl; + + return 0; +} + int PyModule::load_commands() { PyObject *pRegCmd = PyObject_CallMethod(pClass, diff --git a/src/mgr/PyModule.h b/src/mgr/PyModule.h index fbb56080efa4a..8d88ff94c6271 100644 --- a/src/mgr/PyModule.h +++ b/src/mgr/PyModule.h @@ -87,6 +87,9 @@ class PyModule int load_options(); std::map options; + int load_notify_types(); + std::set notify_types; + public: static std::string mgr_store_prefix; @@ -154,6 +157,10 @@ class PyModule bool is_loaded() const { std::lock_guard l(lock) ; return loaded; } bool is_always_on() const { std::lock_guard l(lock) ; return always_on; } + bool should_notify(const std::string& notify_type) const { + return notify_types.count(notify_type); + } + const std::string &get_name() const { std::lock_guard l(lock) ; return module_name; } diff --git a/src/mgr/PyModuleRegistry.h b/src/mgr/PyModuleRegistry.h index 19360b3fa2ded..4cd98e8827e48 100644 --- a/src/mgr/PyModuleRegistry.h +++ b/src/mgr/PyModuleRegistry.h @@ -191,6 +191,11 @@ class PyModuleRegistry } } + bool should_notify(const std::string& name, + const std::string& notify_type) { + return modules.at(name)->should_notify(notify_type); + } + std::map get_services() const { ceph_assert(active_modules); diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index c968d956e09d7..5a11d8884a422 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -31,7 +31,7 @@ from cephadm.agent import CherryPyThread, CephadmAgentHelpers -from mgr_module import MgrModule, HandleCommandResult, Option +from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType import orchestrator from orchestrator.module import to_format, Format @@ -123,6 +123,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, _STORE_HOST_PREFIX = "host" instance = None + NOTIFY_TYPES = [NotifyType.mon_map, NotifyType.pg_summary] NATIVE_OPTIONS = [] # type: List[Any] MODULE_OPTIONS = [ Option( @@ -435,7 +436,7 @@ def __init__(self, *args: Any, **kwargs: Any): self.max_osd_draining_count = 10 self.device_enhanced_scan = False - self.notify('mon_map', None) + self.notify(NotifyType.mon_map, None) self.config_notify() path = self.get_ceph_option('cephadm_path') @@ -581,8 +582,8 @@ def config_notify(self) -> None: self.event.set() - def notify(self, notify_type: str, notify_id: Optional[str]) -> None: - if notify_type == "mon_map": + def notify(self, notify_type: NotifyType, notify_id: Optional[str]) -> None: + if notify_type == NotifyType.mon_map: # get monmap mtime so we can refresh configs when mons change monmap = self.get('mon_map') self.last_monmap = str_to_datetime(monmap['modified']) @@ -591,7 +592,7 @@ def notify(self, notify_type: str, notify_id: Optional[str]) -> None: if getattr(self, 'manage_etc_ceph_ceph_conf', False): # getattr, due to notify() being called before config_notify() self._kick_serve_loop() - if notify_type == "pg_summary": + if notify_type == NotifyType.pg_summary: self._trigger_osd_removal() def _trigger_osd_removal(self) -> None: diff --git a/src/pybind/mgr/dashboard/module.py b/src/pybind/mgr/dashboard/module.py index 9b9e028d6947c..17e94b25b5337 100644 --- a/src/pybind/mgr/dashboard/module.py +++ b/src/pybind/mgr/dashboard/module.py @@ -20,7 +20,7 @@ from typing_extensions import Literal from mgr_module import CLICommand, CLIWriteCommand, HandleCommandResult, \ - MgrModule, MgrStandbyModule, Option, _get_localized_key + MgrModule, MgrStandbyModule, NotifyType, Option, _get_localized_key from mgr_util import ServerConfigException, build_url, \ create_self_signed_cert, get_default_addr, verify_tls_files @@ -277,6 +277,8 @@ class Module(MgrModule, CherryPyConfig): for options in PLUGIN_MANAGER.hook.get_options() or []: MODULE_OPTIONS.extend(options) + NOTIFY_TYPES = [NotifyType.clog] + __pool_stats = collections.defaultdict(lambda: collections.defaultdict( lambda: collections.deque(maxlen=10))) # type: dict @@ -478,8 +480,8 @@ def handle_command(self, inbuf, cmd): return (-errno.EINVAL, '', 'Command not found \'{0}\'' .format(cmd['prefix'])) - def notify(self, notify_type, notify_id): - NotificationQueue.new_notification(notify_type, notify_id) + def notify(self, notify_type: NotifyType, notify_id): + NotificationQueue.new_notification(str(notify_type), notify_id) def get_updated_pool_stats(self): df = self.get('df') diff --git a/src/pybind/mgr/insights/module.py b/src/pybind/mgr/insights/module.py index 4c7fd98cdaed1..5e891069e8f34 100644 --- a/src/pybind/mgr/insights/module.py +++ b/src/pybind/mgr/insights/module.py @@ -4,7 +4,7 @@ import threading from mgr_module import CLICommand, CLIReadCommand, HandleCommandResult -from mgr_module import MgrModule, CommandResult +from mgr_module import MgrModule, CommandResult, NotifyType from . import health as health_util # hours of crash history to report @@ -20,6 +20,9 @@ class Module(MgrModule): + + NOTIFY_TYPES = [NotifyType.health] + def __init__(self, *args, **kwargs): super(Module, self).__init__(*args, **kwargs) @@ -50,9 +53,9 @@ def get_store_prefix(self, prefix): return { k: v for k, v in self._store.items() if k.startswith(prefix) } - def notify(self, ttype, ident): + def notify(self, ttype: NotifyType, ident): """Queue updates for processing""" - if ttype == "health": + if ttype == NotifyType.health: self.log.info("Received health check update {} pending".format( len(self._pending_health))) health = json.loads(self.get("health")["json"]) diff --git a/src/pybind/mgr/k8sevents/module.py b/src/pybind/mgr/k8sevents/module.py index 1e12d1f27a25c..b3402920974e5 100644 --- a/src/pybind/mgr/k8sevents/module.py +++ b/src/pybind/mgr/k8sevents/module.py @@ -40,7 +40,7 @@ from collections import OrderedDict import rados -from mgr_module import MgrModule +from mgr_module import MgrModule, NotifyType from mgr_util import verify_cacrt, ServerConfigException try: @@ -1051,6 +1051,7 @@ class Module(MgrModule): 'default': 7, 'desc': "Days to hold ceph event information within local cache"} ] + NOTIFY_TYPES = [NotifyType.clog] def __init__(self, *args, **kwargs): self.run = True @@ -1138,7 +1139,7 @@ def process_clog(self, log_message): else: self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message)) - def notify(self, notify_type, notify_id): + def notify(self, notify_type: NotifyType, notify_id): """ Called by the ceph-mgr service to notify the Python plugin that new state is available. @@ -1153,7 +1154,7 @@ def notify(self, notify_type, notify_id): """ # only interested in cluster log (clog) messages for now - if notify_type == 'clog': + if notify_type == NotifyType.clog: self.log.debug("received a clog entry from mgr.notify") if isinstance(notify_id, dict): # create a log object to process diff --git a/src/pybind/mgr/localpool/module.py b/src/pybind/mgr/localpool/module.py index ce7cb1af2a6ea..0706dff65194a 100644 --- a/src/pybind/mgr/localpool/module.py +++ b/src/pybind/mgr/localpool/module.py @@ -1,4 +1,4 @@ -from mgr_module import MgrModule, CommandResult, Option +from mgr_module import MgrModule, CommandResult, Option, NotifyType import json import threading from typing import cast, Any @@ -46,13 +46,14 @@ class Module(MgrModule): desc='name prefix for any created local pool', runtime=True), ] + NOTIFY_TYPES = [NotifyType.osd_map] def __init__(self, *args: Any, **kwargs: Any) -> None: super(Module, self).__init__(*args, **kwargs) self.serve_event = threading.Event() - def notify(self, notify_type: str, notify_id: str) -> None: - if notify_type == 'osd_map': + def notify(self, notify_type: NotifyType, notify_id: str) -> None: + if notify_type == NotifyType.osd_map: self.handle_osd_map() def handle_osd_map(self) -> None: diff --git a/src/pybind/mgr/mds_autoscaler/module.py b/src/pybind/mgr/mds_autoscaler/module.py index 006d8547c9556..4d9165e478ccf 100644 --- a/src/pybind/mgr/mds_autoscaler/module.py +++ b/src/pybind/mgr/mds_autoscaler/module.py @@ -4,7 +4,7 @@ import logging from typing import Any, Optional -from mgr_module import MgrModule +from mgr_module import MgrModule, NotifyType from ceph.deployment.service_spec import ServiceSpec import orchestrator import copy @@ -16,6 +16,8 @@ class MDSAutoscaler(orchestrator.OrchestratorClientMixin, MgrModule): """ MDS autoscaler. """ + NOTIFY_TYPES = [NotifyType.fs_map] + def __init__(self, *args: Any, **kwargs: Any) -> None: MgrModule.__init__(self, *args, **kwargs) self.set_mgr(self) @@ -84,8 +86,8 @@ def verify_and_manage_mds_instance(self, fs_map: dict, fs_name: str) -> None: self.log.exception(f"fs {fs_name}: exception while updating service: {e}") pass - def notify(self, notify_type: str, notify_id: str) -> None: - if notify_type != 'fs_map': + def notify(self, notify_type: NotifyType, notify_id: str) -> None: + if notify_type != NotifyType.fs_map: return fs_map = self.get('fs_map') if not fs_map: diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index 0f6a19263d222..7f45ad0b75f83 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -17,7 +17,7 @@ import subprocess import threading from collections import defaultdict -from enum import IntEnum +from enum import IntEnum, Enum import rados import re import socket @@ -84,6 +84,23 @@ def get_origin(tp: Any) -> Any: NFS_POOL_NAME = '.nfs' +class NotifyType(str, Enum): + mon_map = 'mon_map' + pg_summary = 'pg_summary' + health = 'health' + clog = 'clog' + osd_map = 'osd_map' + fs_map = 'fs_map' + command = 'command' + + # these are disabled because there are no users. + # see Mgr.cc: + # service_map = 'service_map' + # mon_status = 'mon_status' + # see DaemonServer.cc: + # perf_schema_update = 'perf_schema_update' + + class CommandResult(object): """ Use with MgrModule.send_command @@ -1169,10 +1186,12 @@ def get_context(self) -> object: """ return self._ceph_get_context() - def notify(self, notify_type: str, notify_id: str) -> None: + def notify(self, notify_type: NotifyType, notify_id: str) -> None: """ Called by the ceph-mgr service to notify the Python plugin - that new state is available. + that new state is available. This method is *only* called for + notify_types that are listed in the NOTIFY_TYPES string list + member of the module class. :param notify_type: string indicating what kind of notification, such as osd_map, mon_map, fs_map, mon_status, diff --git a/src/pybind/mgr/mirroring/fs/snapshot_mirror.py b/src/pybind/mgr/mirroring/fs/snapshot_mirror.py index ad9e550435d80..6fa8d0c4c5338 100644 --- a/src/pybind/mgr/mirroring/fs/snapshot_mirror.py +++ b/src/pybind/mgr/mirroring/fs/snapshot_mirror.py @@ -15,6 +15,7 @@ from mgr_util import RTimer, CephfsClient, open_filesystem,\ CephfsConnectionException +from mgr_module import NotifyType from .blocklist import blocklist from .notify import Notifier, InstanceWatcher from .utils import INSTANCE_ID_PREFIX, MIRROR_OBJECT_NAME, Finisher, \ @@ -288,9 +289,9 @@ def __init__(self, mgr): self.refresh_pool_policy() self.local_fs = CephfsClient(mgr) - def notify(self, notify_type): + def notify(self, notify_type: NotifyType): log.debug(f'got notify type {notify_type}') - if notify_type == 'fs_map': + if notify_type == NotifyType.fs_map: with self.lock: self.fs_map = self.mgr.get('fs_map') self.refresh_pool_policy_locked() diff --git a/src/pybind/mgr/mirroring/module.py b/src/pybind/mgr/mirroring/module.py index b9223111ae902..4b4354ab2b9c4 100644 --- a/src/pybind/mgr/mirroring/module.py +++ b/src/pybind/mgr/mirroring/module.py @@ -1,17 +1,18 @@ from typing import List, Optional -from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option +from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option, NotifyType from .fs.snapshot_mirror import FSSnapshotMirror class Module(MgrModule): MODULE_OPTIONS: List[Option] = [] + NOTIFY_TYPES = [NotifyType.fs_map] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.fs_snapshot_mirror = FSSnapshotMirror(self) - def notify(self, notify_type, notify_id): + def notify(self, notify_type: NotifyType, notify_id): self.fs_snapshot_mirror.notify(notify_type) @CLIWriteCommand('fs snapshot mirror enable') diff --git a/src/pybind/mgr/restful/module.py b/src/pybind/mgr/restful/module.py index b76464e76fa8e..cb8391ecd08da 100644 --- a/src/pybind/mgr/restful/module.py +++ b/src/pybind/mgr/restful/module.py @@ -23,7 +23,7 @@ from werkzeug.serving import make_server, make_ssl_devcert from .hooks import ErrorHook -from mgr_module import MgrModule, CommandResult +from mgr_module import MgrModule, CommandResult, NotifyType from mgr_util import build_url @@ -227,6 +227,8 @@ class Module(MgrModule): }, ] + NOTIFY_TYPES = [NotifyType.command] + def __init__(self, *args, **kwargs): super(Module, self).__init__(*args, **kwargs) context.instance = self @@ -360,15 +362,15 @@ def restart(self): self.log.error(str(traceback.format_exc())) - def notify(self, notify_type, tag): + def notify(self, notify_type: NotifyType, tag: str): try: self._notify(notify_type, tag) except: self.log.error(str(traceback.format_exc())) - def _notify(self, notify_type, tag): - if notify_type != "command": + def _notify(self, notify_type: NotifyType, tag): + if notify_type != NotifyType.command: self.log.debug("Unhandled notification type '%s'", notify_type) return # we can safely skip all the sequential commands diff --git a/src/pybind/mgr/stats/module.py b/src/pybind/mgr/stats/module.py index a4bc44630d637..d942c19e87bba 100644 --- a/src/pybind/mgr/stats/module.py +++ b/src/pybind/mgr/stats/module.py @@ -5,7 +5,7 @@ import json from typing import List, Dict -from mgr_module import MgrModule, Option +from mgr_module import MgrModule, Option, NotifyType from .fs.perf_stats import FSPerfStats @@ -21,13 +21,14 @@ class Module(MgrModule): }, ] MODULE_OPTIONS: List[Option] = [] + NOTIFY_TYPES = [NotifyType.command] def __init__(self, *args, **kwargs): super(Module, self).__init__(*args, **kwargs) self.fs_perf_stats = FSPerfStats(self) - def notify(self, notify_type, notify_id): - if notify_type == "command": + def notify(self, notify_type: NotifyType, notify_id): + if notify_type == NotifyType.command: self.fs_perf_stats.notify(notify_id) def handle_command(self, inbuf, cmd):