Skip to content

Commit

Permalink
Merge PR ceph#44162 into master
Browse files Browse the repository at this point in the history
* refs/pull/44162/head:
	mgr: only queue notify events that modules ask for
	pybind/mgr: annotate which events modules consume
	pybind/mgr: introduce NotifyType enum
	mgr: stop issuing events that no modules consume

Reviewed-by: Sebastian Wagner <[email protected]>
  • Loading branch information
liewegas committed Dec 8, 2021
2 parents 907f38c + ee4e3ec commit 1c741b4
Show file tree
Hide file tree
Showing 17 changed files with 126 additions and 37 deletions.
8 changes: 7 additions & 1 deletion src/mgr/ActivePyModules.cc
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,12 @@ void ActivePyModules::notify_all(const std::string &notify_type,

dout(10) << __func__ << ": notify_all " << notify_type << dendl;
for (auto& [name, module] : modules) {
if (!py_module_registry.should_notify(name, notify_type)) {
continue;
}
// Send all python calls down a Finisher to avoid blocking
// C++ code, and avoid any potential lock cycles.
dout(15) << "queuing notify to " << name << dendl;
dout(15) << "queuing notify (" << notify_type << ") to " << name << dendl;
// workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
finisher.queue(new LambdaContext([module=module, notify_type, notify_id]
(int r){
Expand All @@ -570,6 +573,9 @@ void ActivePyModules::notify_all(const LogEntry &log_entry)

dout(10) << __func__ << ": notify_all (clog)" << dendl;
for (auto& [name, module] : modules) {
if (!py_module_registry.should_notify(name, "clog")) {
continue;
}
// Send all python calls down a Finisher to avoid blocking
// C++ code, and avoid any potential lock cycles.
//
Expand Down
2 changes: 2 additions & 0 deletions src/mgr/DaemonServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -678,9 +678,11 @@ bool DaemonServer::handle_report(const ref_t<MMgrReport>& m)
}

// if there are any schema updates, notify the python modules
/* no users currently
if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
py_modules.notify_all("perf_schema_update", ceph::to_string(key));
}
*/

if (m->get_connection()->peer_is_osd()) {
osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports);
Expand Down
4 changes: 2 additions & 2 deletions src/mgr/Mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ bool Mgr::ms_dispatch2(const ref_t<Message>& m)
break;
case MSG_SERVICE_MAP:
handle_service_map(ref_cast<MServiceMap>(m));
py_module_registry->notify_all("service_map", "");
//no users: py_module_registry->notify_all("service_map", "");
break;
case MSG_LOG:
handle_log(ref_cast<MLog>(m));
Expand Down Expand Up @@ -781,7 +781,7 @@ void Mgr::handle_mgr_digest(ref_t<MMgrDigest> m)
dout(10) << m->mon_status_json.length() << dendl;
dout(10) << m->health_json.length() << dendl;
cluster_state.load_digest(m.get());
py_module_registry->notify_all("mon_status", "");
//no users: py_module_registry->notify_all("mon_status", "");
py_module_registry->notify_all("health", "");

// Hack: use this as a tick/opportunity to prompt python-land that
Expand Down
35 changes: 35 additions & 0 deletions src/mgr/PyModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ int PyModule::load(PyThreadState *pMainThreadState)
return r;
}

load_notify_types();

// We've imported the module and found a MgrModule subclass, at this
// point the module is considered loaded. It might still not be
// runnable though, can_run populated later...
Expand Down Expand Up @@ -509,6 +511,39 @@ int PyModule::register_options(PyObject *cls)
return 0;
}

int PyModule::load_notify_types()
{
PyObject *ls = PyObject_GetAttrString(pClass, "NOTIFY_TYPES");
if (ls == nullptr) {
derr << "Module " << get_name() << " has missing NOTIFY_TYPES member" << dendl;
return -EINVAL;
}
if (!PyObject_TypeCheck(ls, &PyList_Type)) {
// Relatively easy mistake for human to make, e.g. defining COMMANDS
// as a {} instead of a []
derr << "Module " << get_name() << " has NOTIFY_TYPES that is not a list" << dendl;
return -EINVAL;
}

const size_t list_size = PyList_Size(ls);
for (size_t i = 0; i < list_size; ++i) {
PyObject *notify_type = PyList_GetItem(ls, i);
ceph_assert(notify_type != nullptr);

if (!PyObject_TypeCheck(notify_type, &PyUnicode_Type)) {
derr << "Module " << get_name() << " has non-string entry in NOTIFY_TYPES list"
<< dendl;
return -EINVAL;
}

notify_types.insert(PyUnicode_AsUTF8(notify_type));
}
Py_DECREF(ls);
dout(10) << "Module " << get_name() << " notify_types " << notify_types << dendl;

return 0;
}

int PyModule::load_commands()
{
PyObject *pRegCmd = PyObject_CallMethod(pClass,
Expand Down
7 changes: 7 additions & 0 deletions src/mgr/PyModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class PyModule
int load_options();
std::map<std::string, MgrMap::ModuleOption> options;

int load_notify_types();
std::set<std::string> notify_types;

public:
static std::string mgr_store_prefix;

Expand Down Expand Up @@ -154,6 +157,10 @@ class PyModule
bool is_loaded() const { std::lock_guard l(lock) ; return loaded; }
bool is_always_on() const { std::lock_guard l(lock) ; return always_on; }

bool should_notify(const std::string& notify_type) const {
return notify_types.count(notify_type);
}

const std::string &get_name() const {
std::lock_guard l(lock) ; return module_name;
}
Expand Down
5 changes: 5 additions & 0 deletions src/mgr/PyModuleRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ class PyModuleRegistry
}
}

bool should_notify(const std::string& name,
const std::string& notify_type) {
return modules.at(name)->should_notify(notify_type);
}

std::map<std::string, std::string> get_services() const
{
ceph_assert(active_modules);
Expand Down
11 changes: 6 additions & 5 deletions src/pybind/mgr/cephadm/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from cephadm.agent import CherryPyThread, CephadmAgentHelpers


from mgr_module import MgrModule, HandleCommandResult, Option
from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType
import orchestrator
from orchestrator.module import to_format, Format

Expand Down Expand Up @@ -123,6 +123,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
_STORE_HOST_PREFIX = "host"

instance = None
NOTIFY_TYPES = [NotifyType.mon_map, NotifyType.pg_summary]
NATIVE_OPTIONS = [] # type: List[Any]
MODULE_OPTIONS = [
Option(
Expand Down Expand Up @@ -435,7 +436,7 @@ def __init__(self, *args: Any, **kwargs: Any):
self.max_osd_draining_count = 10
self.device_enhanced_scan = False

self.notify('mon_map', None)
self.notify(NotifyType.mon_map, None)
self.config_notify()

path = self.get_ceph_option('cephadm_path')
Expand Down Expand Up @@ -581,8 +582,8 @@ def config_notify(self) -> None:

self.event.set()

def notify(self, notify_type: str, notify_id: Optional[str]) -> None:
if notify_type == "mon_map":
def notify(self, notify_type: NotifyType, notify_id: Optional[str]) -> None:
if notify_type == NotifyType.mon_map:
# get monmap mtime so we can refresh configs when mons change
monmap = self.get('mon_map')
self.last_monmap = str_to_datetime(monmap['modified'])
Expand All @@ -591,7 +592,7 @@ def notify(self, notify_type: str, notify_id: Optional[str]) -> None:
if getattr(self, 'manage_etc_ceph_ceph_conf', False):
# getattr, due to notify() being called before config_notify()
self._kick_serve_loop()
if notify_type == "pg_summary":
if notify_type == NotifyType.pg_summary:
self._trigger_osd_removal()

def _trigger_osd_removal(self) -> None:
Expand Down
8 changes: 5 additions & 3 deletions src/pybind/mgr/dashboard/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing_extensions import Literal

from mgr_module import CLICommand, CLIWriteCommand, HandleCommandResult, \
MgrModule, MgrStandbyModule, Option, _get_localized_key
MgrModule, MgrStandbyModule, NotifyType, Option, _get_localized_key
from mgr_util import ServerConfigException, build_url, \
create_self_signed_cert, get_default_addr, verify_tls_files

Expand Down Expand Up @@ -277,6 +277,8 @@ class Module(MgrModule, CherryPyConfig):
for options in PLUGIN_MANAGER.hook.get_options() or []:
MODULE_OPTIONS.extend(options)

NOTIFY_TYPES = [NotifyType.clog]

__pool_stats = collections.defaultdict(lambda: collections.defaultdict(
lambda: collections.deque(maxlen=10))) # type: dict

Expand Down Expand Up @@ -478,8 +480,8 @@ def handle_command(self, inbuf, cmd):
return (-errno.EINVAL, '', 'Command not found \'{0}\''
.format(cmd['prefix']))

def notify(self, notify_type, notify_id):
NotificationQueue.new_notification(notify_type, notify_id)
def notify(self, notify_type: NotifyType, notify_id):
NotificationQueue.new_notification(str(notify_type), notify_id)

def get_updated_pool_stats(self):
df = self.get('df')
Expand Down
9 changes: 6 additions & 3 deletions src/pybind/mgr/insights/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import threading

from mgr_module import CLICommand, CLIReadCommand, HandleCommandResult
from mgr_module import MgrModule, CommandResult
from mgr_module import MgrModule, CommandResult, NotifyType
from . import health as health_util

# hours of crash history to report
Expand All @@ -20,6 +20,9 @@


class Module(MgrModule):

NOTIFY_TYPES = [NotifyType.health]

def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)

Expand Down Expand Up @@ -50,9 +53,9 @@ def get_store_prefix(self, prefix):
return { k: v for k, v in self._store.items() if k.startswith(prefix) }


def notify(self, ttype, ident):
def notify(self, ttype: NotifyType, ident):
"""Queue updates for processing"""
if ttype == "health":
if ttype == NotifyType.health:
self.log.info("Received health check update {} pending".format(
len(self._pending_health)))
health = json.loads(self.get("health")["json"])
Expand Down
7 changes: 4 additions & 3 deletions src/pybind/mgr/k8sevents/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from collections import OrderedDict

import rados
from mgr_module import MgrModule
from mgr_module import MgrModule, NotifyType
from mgr_util import verify_cacrt, ServerConfigException

try:
Expand Down Expand Up @@ -1051,6 +1051,7 @@ class Module(MgrModule):
'default': 7,
'desc': "Days to hold ceph event information within local cache"}
]
NOTIFY_TYPES = [NotifyType.clog]

def __init__(self, *args, **kwargs):
self.run = True
Expand Down Expand Up @@ -1138,7 +1139,7 @@ def process_clog(self, log_message):
else:
self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message))

def notify(self, notify_type, notify_id):
def notify(self, notify_type: NotifyType, notify_id):
"""
Called by the ceph-mgr service to notify the Python plugin
that new state is available.
Expand All @@ -1153,7 +1154,7 @@ def notify(self, notify_type, notify_id):
"""

# only interested in cluster log (clog) messages for now
if notify_type == 'clog':
if notify_type == NotifyType.clog:
self.log.debug("received a clog entry from mgr.notify")
if isinstance(notify_id, dict):
# create a log object to process
Expand Down
7 changes: 4 additions & 3 deletions src/pybind/mgr/localpool/module.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from mgr_module import MgrModule, CommandResult, Option
from mgr_module import MgrModule, CommandResult, Option, NotifyType
import json
import threading
from typing import cast, Any
Expand Down Expand Up @@ -46,13 +46,14 @@ class Module(MgrModule):
desc='name prefix for any created local pool',
runtime=True),
]
NOTIFY_TYPES = [NotifyType.osd_map]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
self.serve_event = threading.Event()

def notify(self, notify_type: str, notify_id: str) -> None:
if notify_type == 'osd_map':
def notify(self, notify_type: NotifyType, notify_id: str) -> None:
if notify_type == NotifyType.osd_map:
self.handle_osd_map()

def handle_osd_map(self) -> None:
Expand Down
8 changes: 5 additions & 3 deletions src/pybind/mgr/mds_autoscaler/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import logging
from typing import Any, Optional
from mgr_module import MgrModule
from mgr_module import MgrModule, NotifyType
from ceph.deployment.service_spec import ServiceSpec
import orchestrator
import copy
Expand All @@ -16,6 +16,8 @@ class MDSAutoscaler(orchestrator.OrchestratorClientMixin, MgrModule):
"""
MDS autoscaler.
"""
NOTIFY_TYPES = [NotifyType.fs_map]

def __init__(self, *args: Any, **kwargs: Any) -> None:
MgrModule.__init__(self, *args, **kwargs)
self.set_mgr(self)
Expand Down Expand Up @@ -84,8 +86,8 @@ def verify_and_manage_mds_instance(self, fs_map: dict, fs_name: str) -> None:
self.log.exception(f"fs {fs_name}: exception while updating service: {e}")
pass

def notify(self, notify_type: str, notify_id: str) -> None:
if notify_type != 'fs_map':
def notify(self, notify_type: NotifyType, notify_id: str) -> None:
if notify_type != NotifyType.fs_map:
return
fs_map = self.get('fs_map')
if not fs_map:
Expand Down
25 changes: 22 additions & 3 deletions src/pybind/mgr/mgr_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import subprocess
import threading
from collections import defaultdict
from enum import IntEnum
from enum import IntEnum, Enum
import rados
import re
import socket
Expand Down Expand Up @@ -84,6 +84,23 @@ def get_origin(tp: Any) -> Any:
NFS_POOL_NAME = '.nfs'


class NotifyType(str, Enum):
mon_map = 'mon_map'
pg_summary = 'pg_summary'
health = 'health'
clog = 'clog'
osd_map = 'osd_map'
fs_map = 'fs_map'
command = 'command'

# these are disabled because there are no users.
# see Mgr.cc:
# service_map = 'service_map'
# mon_status = 'mon_status'
# see DaemonServer.cc:
# perf_schema_update = 'perf_schema_update'


class CommandResult(object):
"""
Use with MgrModule.send_command
Expand Down Expand Up @@ -1169,10 +1186,12 @@ def get_context(self) -> object:
"""
return self._ceph_get_context()

def notify(self, notify_type: str, notify_id: str) -> None:
def notify(self, notify_type: NotifyType, notify_id: str) -> None:
"""
Called by the ceph-mgr service to notify the Python plugin
that new state is available.
that new state is available. This method is *only* called for
notify_types that are listed in the NOTIFY_TYPES string list
member of the module class.
:param notify_type: string indicating what kind of notification,
such as osd_map, mon_map, fs_map, mon_status,
Expand Down
5 changes: 3 additions & 2 deletions src/pybind/mgr/mirroring/fs/snapshot_mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from mgr_util import RTimer, CephfsClient, open_filesystem,\
CephfsConnectionException
from mgr_module import NotifyType
from .blocklist import blocklist
from .notify import Notifier, InstanceWatcher
from .utils import INSTANCE_ID_PREFIX, MIRROR_OBJECT_NAME, Finisher, \
Expand Down Expand Up @@ -288,9 +289,9 @@ def __init__(self, mgr):
self.refresh_pool_policy()
self.local_fs = CephfsClient(mgr)

def notify(self, notify_type):
def notify(self, notify_type: NotifyType):
log.debug(f'got notify type {notify_type}')
if notify_type == 'fs_map':
if notify_type == NotifyType.fs_map:
with self.lock:
self.fs_map = self.mgr.get('fs_map')
self.refresh_pool_policy_locked()
Expand Down
Loading

0 comments on commit 1c741b4

Please sign in to comment.