-
Notifications
You must be signed in to change notification settings - Fork 0
/
chunkcleanupworker.py
77 lines (58 loc) · 2.32 KB
/
chunkcleanupworker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import logging
import time
from app import app, chunk_cleanup_queue, storage
from util.log import logfile_path
from workers.gunicorn_worker import GunicornWorker
from workers.queueworker import JobException, QueueWorker
logger = logging.getLogger(__name__)
POLL_PERIOD_SECONDS = 10
class ChunkCleanupWorker(QueueWorker):
"""
Worker which cleans up chunks enqueued by the storage engine(s).
This is typically used to cleanup empty chunks which are no longer needed.
"""
def process_queue_item(self, job_details):
logger.debug("Got chunk cleanup queue item: %s", job_details)
storage_location = job_details["location"]
storage_path = job_details["path"]
if not storage.exists([storage_location], storage_path):
logger.debug("Chunk already deleted")
return
try:
storage.remove([storage_location], storage_path)
except IOError:
raise JobException()
def create_gunicorn_worker():
"""
follows the gunicorn application factory pattern, enabling
a quay worker to run as a gunicorn worker thread.
this is useful when utilizing gunicorn's hot reload in local dev.
utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio.
"""
engines = set(
[config[0] for config in list(app.config.get("DISTRIBUTED_STORAGE_CONFIG", {}).values())]
)
feature_flag = "SwiftStorage" in engines
worker = GunicornWorker(
__name__,
app,
ChunkCleanupWorker(chunk_cleanup_queue, poll_period_seconds=POLL_PERIOD_SECONDS),
feature_flag,
)
return worker
if __name__ == "__main__":
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
if app.config.get("ACCOUNT_RECOVERY_MODE", False):
logger.debug("Quay running in account recovery mode")
while True:
time.sleep(100000)
engines = set(
[config[0] for config in list(app.config.get("DISTRIBUTED_STORAGE_CONFIG", {}).values())]
)
if "SwiftStorage" not in engines:
logger.debug("Swift storage not detected; sleeping")
while True:
time.sleep(10000)
logger.debug("Starting chunk cleanup worker")
worker = ChunkCleanupWorker(chunk_cleanup_queue, poll_period_seconds=POLL_PERIOD_SECONDS)
worker.start()