Skip to content

Commit

Permalink
SRE-15125: generalize daemon main() method
Browse files Browse the repository at this point in the history
Do daemon-specific things via callbacks.
  • Loading branch information
bugfood committed Dec 8, 2023
1 parent 22ca41b commit 3d35821
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 36 deletions.
43 changes: 42 additions & 1 deletion tds/scripts/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import sys

from kazoo.client import KazooClient, KazooState
from kazoo.exceptions import CancelledError, LockTimeout
from kazoo.exceptions import CancelledError, ConnectionClosedError, LockTimeout
from kazoo.retry import KazooRetry

log = logging.getLogger('tds.scripts.daemon')
Expand All @@ -38,6 +38,9 @@ def __init__(self, app):
self._should_stop = False
self.has_lock = False
self.lock = None
# These optional callbacks may be overriden by a child class.
self.end_callback = None
self.loop_callback = None

def create_zoo(self, zoo_config):
"""
Expand Down Expand Up @@ -91,6 +94,12 @@ def state_listener(state):
self.zoo.start()
return self.zoo.Lock(self.zookeeper_path, hostname)

def configure(self):
"""
Child classes may override this for daemon-specific configuration.
"""
pass

def lock_run(self, func, *args, **kwargs):
"""
Run the specified function, with the specified arguments, under a
Expand Down Expand Up @@ -133,6 +142,38 @@ def lock_run(self, func, *args, **kwargs):
elif self.should_stop():
log.debug("Aborted acquiring ZooKeeper lock due to shutdown")

def main(self):
"""
Read configuration file and get relevant information; if zookeeper is
in use, wait for a zookeper lock. Once operating, run callbacks.
"""
self.configure()
zookeeper_config = self.app.config.get('zookeeper', None)

if zookeeper_config is not None:
self.lock = self.create_zoo(zookeeper_config)
self.zk_run = self.lock_run
else:
self.zk_run = lambda f, *a, **k: f(*a, **k)

while not self.should_stop():
try:
self.zk_run(self.run_callback)
except ConnectionClosedError:
if not self.should_stop():
# Ignore errors raised by kazoo when it is having
# connection trouble; we're shutting down anyway.
raise

if self.loop_callback:
self.loop_callback()

if self.end_callback:
self.end_callback()

self.zoo.close()
log.info("Stopped.")

def release_lock(self):
"""
Release any acquired locks and cancel any pending ones.
Expand Down
56 changes: 21 additions & 35 deletions tds/scripts/tds_installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import sys

from datetime import datetime, timedelta
from kazoo.exceptions import ConnectionClosedError

import tagopsdb
import tds.apps
Expand All @@ -54,44 +53,20 @@ def __init__(self, app, **kwargs):
self.heartbeat_time = datetime.now()
self.zookeeper_path = '/tdsinstaller'

def main(self):
"""
Read configuration file and get relevant information, then try
to process queued deployments if single system or
zookeeper leader in multi-system configuration.
"""
self.exit_timeout = timedelta(
seconds=self.app.config.get('exit_timeout', 160)
)
self.deploy_exit_timeout = timedelta(
seconds=self.app.config.get('deploy_exit_timeout', 5)
)
zookeeper_config = self.app.config.get('zookeeper', None)

if zookeeper_config is not None:
self.lock = self.create_zoo(zookeeper_config)
self.zk_run = self.lock_run
else:
self.zk_run = lambda f, *a, **k: f(*a, **k)

while not self.should_stop():
try:
self.zk_run(self.handle_incoming_deployments)
except ConnectionClosedError:
if not self.should_stop():
# Ignore errors raised by kazoo when it is having
# connection trouble; we're shutting down anyway.
raise
# Set up callbacks.
def end_callback():
self.clean_up_processes(True)

def loop_callback():
self.clean_up_processes(wait=self.should_stop())
# When operating with a lock, wait this long before trying again.
# Any polling necessary to determine if action needs to be taken
# (database queries, etc.) will happen approximately this often.
time.sleep(0.1)

self.clean_up_processes(wait=True)
self.zoo.close()
log.info("Stopped.")
def run_callback():
self.handle_incoming_deployments()

self.end_callback = end_callback
self.loop_callback = loop_callback
self.run_callback = run_callback

def handle_incoming_deployments(self):
"""Look for files in 'incoming' directory and handle them."""
Expand Down Expand Up @@ -221,6 +196,17 @@ def clean_up_processes(self, wait=False):
# Old python does not provide timeouts for Popen methods.
time.sleep(1)

def configure(self):
"""
Set configuration specific to tds_installer.
"""
self.exit_timeout = timedelta(
seconds=self.app.config.get('exit_timeout', 160)
)
self.deploy_exit_timeout = timedelta(
seconds=self.app.config.get('deploy_exit_timeout', 5)
)

def waitproc(self, dep_id, proc):
"""
Do a nonblocking waitpid() on the PID of the supplied process. Check
Expand Down

0 comments on commit 3d35821

Please sign in to comment.