Skip to content

Commit

Permalink
cephfs-top: top(1) like utility for Ceph Filesystem
Browse files Browse the repository at this point in the history
Signed-off-by: Venky Shankar <[email protected]>
  • Loading branch information
vshankar committed Jan 11, 2021
1 parent 7430943 commit 4867fc1
Show file tree
Hide file tree
Showing 4 changed files with 352 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/tools/cephfs/top/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
include(Distutils)
distutils_install_module(cephfs-top)

if(WITH_TESTS)
include(AddCephTest)
add_tox_test(cephfs-top)
endif()
313 changes: 313 additions & 0 deletions src/tools/cephfs/top/cephfs-top
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
#!/usr/bin/python3

import argparse
import sys
import curses
import errno
import json
import signal
import time

from collections import OrderedDict
from datetime import datetime
from enum import Enum, unique

import rados


class FSTopException(Exception):
def __init__(self, msg=''):
self.error_msg = msg

def get_error_msg(self):
return self.error_msg


@unique
class MetricType(Enum):
METRIC_TYPE_NONE = 0
METRIC_TYPE_PERCENTAGE = 1
METRIC_TYPE_LATENCY = 2


FS_TOP_PROG_STR = 'cephfs-top'

# version match b/w fstop and stats emitted by mgr/stats
FS_TOP_SUPPORTED_VER = 1

ITEMS_PAD_LEN = 1
ITEMS_PAD = " " * ITEMS_PAD_LEN

# metadata provided by mgr/stats
FS_TOP_MAIN_WINDOW_COL_CLIENT_ID = "CLIENT_ID"
FS_TOP_MAIN_WINDOW_COL_MNT_ROOT = "MOUNT_ROOT"
FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR = "MOUNT_POINT@HOST/ADDR"

MAIN_WINDOW_TOP_LINE_ITEMS_START = [ITEMS_PAD,
FS_TOP_MAIN_WINDOW_COL_CLIENT_ID,
FS_TOP_MAIN_WINDOW_COL_MNT_ROOT]
MAIN_WINDOW_TOP_LINE_ITEMS_END = [FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR]

# adjust this map according to stats version and maintain order
# as emitted by mgr/stast
MAIN_WINDOW_TOP_LINE_METRICS = OrderedDict([
("CAP_HIT", MetricType.METRIC_TYPE_PERCENTAGE),
("READ_LATENCY", MetricType.METRIC_TYPE_LATENCY),
("WRITE_LATENCY", MetricType.METRIC_TYPE_LATENCY),
("METADATA_LATENCY", MetricType.METRIC_TYPE_LATENCY),
("DENTRY_LEASE", MetricType.METRIC_TYPE_PERCENTAGE),
])
MGR_STATS_COUNTERS = list(MAIN_WINDOW_TOP_LINE_METRICS.keys())

FS_TOP_VERSION_HEADER_FMT = '{prog_name} - {now}'
FS_TOP_CLIENT_HEADER_FMT = 'Client(s): {num_clients} - {num_mounts} FUSE, '\
'{num_kclients} kclient, {num_libs} libcephfs'

CLIENT_METADATA_KEY = "client_metadata"
CLIENT_METADATA_MOUNT_POINT_KEY = "mount_point"
CLIENT_METADATA_MOUNT_ROOT_KEY = "root"
CLIENT_METADATA_IP_KEY = "IP"
CLIENT_METADATA_HOSTNAME_KEY = "hostname"

GLOBAL_METRICS_KEY = "global_metrics"
GLOBAL_COUNTERS_KEY = "global_counters"


def calc_perc(c):
if c[0] == 0 and c[1] == 0:
return 0.0
return round((c[0] / (c[0] + c[1])) * 100, 2)


def calc_lat(c):
return round(c[0] + c[1] / 1000000000, 2)


def wrap(s, sl):
"""return a '+' suffixed wrapped string"""
if len(s) < sl:
return s
return f'{s[0:sl-1]}+'


class FSTop(object):
def __init__(self, args):
self.rados = None
self.stop = False
self.stdscr = None # curses instance
self.client_name = args.id
self.cluster_name = args.cluster
self.conffile = args.conffile

def handle_signal(self, signum, _):
self.stop = True

def init(self):
try:
if self.conffile:
r_rados = rados.Rados(rados_id=self.client_name, clustername=self.cluster_name,
conffile=self.conffile)
else:
r_rados = rados.Rados(rados_id=self.client_name, clustername=self.cluster_name)
r_rados.conf_read_file()
r_rados.connect()
self.rados = r_rados
except rados.Error as e:
if e.errno == errno.ENOENT:
raise FSTopException(f'cluster {self.cluster_name} does not exist')
else:
raise FSTopException(f'error connecting to cluster: {e}')
self.verify_perf_stats_support()
signal.signal(signal.SIGTERM, self.handle_signal)
signal.signal(signal.SIGINT, self.handle_signal)

def fini(self):
if self.rados:
self.rados.shutdown()
self.rados = None

def selftest(self):
stats_json = self.perf_stats_query()
if not stats_json['version'] == FS_TOP_SUPPORTED_VER:
raise FSTopException('perf stats version mismatch!')

def setup_curses(self):
self.stdscr = curses.initscr()

# coordinate constants for windowing -- (height, width, y, x)
# NOTE: requires initscr() call before accessing COLS, LINES.
HEADER_WINDOW_COORD = (2, curses.COLS - 1, 0, 0)
TOPLINE_WINDOW_COORD = (1, curses.COLS - 1, 3, 0)
MAIN_WINDOW_COORD = (curses.LINES - 4, curses.COLS - 1, 4, 0)

self.header = curses.newwin(*HEADER_WINDOW_COORD)
self.topl = curses.newwin(*TOPLINE_WINDOW_COORD)
self.mainw = curses.newwin(*MAIN_WINDOW_COORD)
curses.wrapper(self.display)

def verify_perf_stats_support(self):
mon_cmd = {'prefix': 'mgr module ls', 'format': 'json'}
try:
ret, buf, out = self.rados.mon_command(json.dumps(mon_cmd), b'')
except Exception as e:
raise FSTopException(f'error checking \'stats\' module: {e}')
if ret != 0:
raise FSTopException(f'error checking \'stats\' module: {out}')
if 'stats' not in json.loads(buf.decode('utf-8'))['enabled_modules']:
raise FSTopException('\'stats\' module not enabled. Use \'ceph mgr module '
'enable stats\' to enable')

def perf_stats_query(self):
mgr_cmd = {'prefix': 'fs perf stats', 'format': 'json'}
try:
ret, buf, out = self.rados.mgr_command(json.dumps(mgr_cmd), b'')
except Exception as e:
raise FSTopException(f'error in \'perf stats\' query: {e}')
if ret != 0:
raise FSTopException(f'error in \'perf stats\' query: {out}')
return json.loads(buf.decode('utf-8'))

def mtype(self, typ):
if typ == MetricType.METRIC_TYPE_PERCENTAGE:
return "(%)"
elif typ == MetricType.METRIC_TYPE_LATENCY:
return "(s)"
else:
return ''

def refresh_top_line_and_build_coord(self):
xp = 0
x_coord_map = {}

heading = []
for item in MAIN_WINDOW_TOP_LINE_ITEMS_START:
heading.append(item)
nlen = len(item) + len(ITEMS_PAD)
x_coord_map[item] = (xp, nlen)
xp += nlen

for item, typ in MAIN_WINDOW_TOP_LINE_METRICS.items():
it = f'{item}{self.mtype(typ)}'
heading.append(it)
nlen = len(it) + len(ITEMS_PAD)
x_coord_map[item] = (xp, nlen)
xp += nlen

for item in MAIN_WINDOW_TOP_LINE_ITEMS_END:
heading.append(item)
nlen = len(item) + len(ITEMS_PAD)
x_coord_map[item] = (xp, nlen)
xp += nlen
self.topl.addstr(0, 0, ITEMS_PAD.join(heading), curses.A_STANDOUT | curses.A_BOLD)
return x_coord_map

def refresh_client(self, client_id, metrics, counters, client_meta, x_coord_map, y_coord):
for item in MAIN_WINDOW_TOP_LINE_ITEMS_END:
coord = x_coord_map[item]
if item == FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR:
self.mainw.addstr(y_coord, coord[0],
f'{client_meta[CLIENT_METADATA_MOUNT_POINT_KEY]}@'
f'{client_meta[CLIENT_METADATA_HOSTNAME_KEY]}/'
f'{client_meta[CLIENT_METADATA_IP_KEY]}')
for item in MAIN_WINDOW_TOP_LINE_ITEMS_START:
coord = x_coord_map[item]
hlen = coord[1] - len(ITEMS_PAD)
if item == FS_TOP_MAIN_WINDOW_COL_CLIENT_ID:
self.mainw.addstr(y_coord, coord[0],
wrap(client_id.split('.')[1], hlen))
elif item == FS_TOP_MAIN_WINDOW_COL_MNT_ROOT:
self.mainw.addstr(y_coord, coord[0],
wrap(client_meta[CLIENT_METADATA_MOUNT_ROOT_KEY], hlen))
cidx = 0
for item in counters:
coord = x_coord_map[item]
m = metrics[cidx]
typ = MAIN_WINDOW_TOP_LINE_METRICS[MGR_STATS_COUNTERS[cidx]]
if item.lower() in client_meta['valid_metrics']:
if typ == MetricType.METRIC_TYPE_PERCENTAGE:
self.mainw.addstr(y_coord, coord[0], f'{calc_perc(m)}')
elif typ == MetricType.METRIC_TYPE_LATENCY:
self.mainw.addstr(y_coord, coord[0], f'{calc_lat(m)}')
else:
self.mainw.addstr(y_coord, coord[0], "N/A")
cidx += 1

def refresh_clients(self, x_coord_map, stats_json):
counters = [m.upper() for m in stats_json[GLOBAL_COUNTERS_KEY]]
y_coord = 0
for client_id, metrics in stats_json[GLOBAL_METRICS_KEY].items():
self.refresh_client(client_id,
metrics,
counters,
stats_json[CLIENT_METADATA_KEY][client_id],
x_coord_map,
y_coord)
y_coord += 1

def refresh_main_window(self, x_coord_map, stats_json):
self.refresh_clients(x_coord_map, stats_json)

def refresh_header(self, stats_json):
if not stats_json['version'] == FS_TOP_SUPPORTED_VER:
self.header.addstr(0, 0, 'perf stats version mismatch!')
return False
client_metadata = stats_json[CLIENT_METADATA_KEY]
num_clients = len(client_metadata)
num_mounts = len([client for client, metadata in client_metadata.items() if not
metadata[CLIENT_METADATA_MOUNT_POINT_KEY] == 'N/A'])
num_kclients = len([client for client, metadata in client_metadata.items() if
"kernel_version" in metadata])
num_libs = num_clients - (num_mounts + num_kclients)
now = datetime.now().ctime()
self.header.addstr(0, 0,
FS_TOP_VERSION_HEADER_FMT.format(prog_name=FS_TOP_PROG_STR, now=now),
curses.A_STANDOUT | curses.A_BOLD)
self.header.addstr(1, 0, FS_TOP_CLIENT_HEADER_FMT.format(num_clients=num_clients,
num_mounts=num_mounts,
num_kclients=num_kclients,
num_libs=num_libs))
return True

def display(self, _):
x_coord_map = self.refresh_top_line_and_build_coord()
self.topl.refresh()
while not self.stop:
stats_json = self.perf_stats_query()
self.header.clear()
self.mainw.clear()
if self.refresh_header(stats_json):
self.refresh_main_window(x_coord_map, stats_json)
self.header.refresh()
self.mainw.refresh()
time.sleep(1)


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Ceph Filesystem top utility')
parser.add_argument('--cluster', nargs='?', const='ceph', default='ceph',
help='Ceph cluster to connect (defualt: ceph)')
parser.add_argument('--id', nargs='?', const='fstop', default='fstop',
help='Ceph user to use to connection (default: fstop)')
parser.add_argument('--conffile', nargs='?', default=None,
help='Path to cluster configuration file')
parser.add_argument('--selftest', dest='selftest', action='store_true',
help='run in selftest mode')
args = parser.parse_args()
err = False
ft = FSTop(args)
try:
ft.init()
if args.selftest:
ft.selftest()
sys.stdout.write("selftest ok\n")
else:
ft.setup_curses()
except FSTopException as fst:
err = True
sys.stderr.write(f'{fst.get_error_msg()}\n')
except Exception as e:
err = True
sys.stderr.write(f'exception: {e}\n')
finally:
ft.fini()
sys.exit(0 if not err else -1)
25 changes: 25 additions & 0 deletions src/tools/cephfs/top/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-

from setuptools import setup

__version__ = '0.0.1'

setup(
name='cephfs-top',
version=__version__,
description='top(1) like utility for Ceph Filesystem',
keywords='cephfs, top',
scripts=['cephfs-top'],
install_requires=[
'rados',
],
classifiers=[
'Development Status :: 3 - Alpha',
'Environment :: Console',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 3'
],
license='LGPLv2+',
)
7 changes: 7 additions & 0 deletions src/tools/cephfs/top/tox.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[tox]
envlist = py3
skipsdist = true

[testenv:py3]
deps = flake8
commands = flake8 --ignore=W503 --max-line-length=100 cephfs-top

0 comments on commit 4867fc1

Please sign in to comment.