-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscrub.py
117 lines (91 loc) · 2.59 KB
/
scrub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""
Scrub osds
"""
import contextlib
import gevent
import logging
import random
import time
import ceph_manager
from teuthology import misc as teuthology
log = logging.getLogger(__name__)
@contextlib.contextmanager
def task(ctx, config):
"""
Run scrub periodically. Randomly chooses an OSD to scrub.
The config should be as follows:
scrub:
frequency: <seconds between scrubs>
deep: <bool for deepness>
example:
tasks:
- ceph:
- scrub:
frequency: 30
deep: 0
"""
if config is None:
config = {}
assert isinstance(config, dict), \
'scrub task only accepts a dict for configuration'
log.info('Beginning scrub...')
first_mon = teuthology.get_first_mon(ctx, config)
(mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
manager = ceph_manager.CephManager(
mon,
ctx=ctx,
logger=log.getChild('ceph_manager'),
)
num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd')
while len(manager.get_osd_status()['up']) < num_osds:
time.sleep(10)
scrub_proc = Scrubber(
manager,
config,
)
try:
yield
finally:
log.info('joining scrub')
scrub_proc.do_join()
class Scrubber:
"""
Scrubbing is actually performed during initialization
"""
def __init__(self, manager, config):
"""
Spawn scrubbing thread upon completion.
"""
self.ceph_manager = manager
self.ceph_manager.wait_for_clean()
osd_status = self.ceph_manager.get_osd_status()
self.osds = osd_status['up']
self.config = config
if self.config is None:
self.config = dict()
else:
def tmp(x):
"""Local display"""
print x
self.log = tmp
self.stopping = False
log.info("spawning thread")
self.thread = gevent.spawn(self.do_scrub)
def do_join(self):
"""Scrubbing thread finished"""
self.stopping = True
self.thread.get()
def do_scrub(self):
"""Perform the scrub operation"""
frequency = self.config.get("frequency", 30)
deep = self.config.get("deep", 0)
log.info("stopping %s" % self.stopping)
while not self.stopping:
osd = str(random.choice(self.osds))
if deep:
cmd = 'deep-scrub'
else:
cmd = 'scrub'
log.info('%sbing %s' % (cmd, osd))
self.ceph_manager.raw_cluster_cmd('osd', cmd, osd)
time.sleep(frequency)