-
Notifications
You must be signed in to change notification settings - Fork 0
/
manifestsubjectbackfillworker.py
181 lines (149 loc) · 5.95 KB
/
manifestsubjectbackfillworker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import json
import logging
import time
from peewee import fn
import features
from app import app
from data.database import Manifest
from image.shared.schemas import ManifestException, parse_manifest_from_bytes
from util.bytes import Bytes
from util.log import logfile_path
from util.migrate.allocator import yield_random_entries
from workers.gunicorn_worker import GunicornWorker
from workers.worker import Worker
logger = logging.getLogger(__name__)
WORKER_FREQUENCY = app.config.get("MANIFEST_SUBJECT_BACKFILL_WORKER_FREQUENCY", 60)
class ManifestSubjectBackfillWorker(Worker):
"""
Worker which backfills the newly added subject fields onto Manifest.
"""
def __init__(self):
super().__init__()
self.add_operation(self._backfill_manifest_subject, WORKER_FREQUENCY)
self.add_operation(self._backfill_manifest_artifact_type, WORKER_FREQUENCY)
def _backfill_manifest_subject(self):
try:
Manifest.select().where(
(Manifest.subject_backfilled == False) | (Manifest.subject_backfilled >> None),
).get()
except Manifest.DoesNotExist:
logger.debug("Manifest subject backfill worker has completed; skipping")
return False
iterator = yield_random_entries(
lambda: Manifest.select().where(
(Manifest.subject_backfilled == False) | (Manifest.subject_backfilled >> None)
),
Manifest.id,
250,
Manifest.select(fn.Max(Manifest.id)).scalar(),
1,
)
for manifest_row, abt, _ in iterator:
if manifest_row.subject_backfilled:
logger.debug("Another worker preempted this worker")
abt.set()
continue
logger.debug("Setting manifest subject for manifest %s", manifest_row.id)
manifest_bytes = Bytes.for_string_or_unicode(manifest_row.manifest_bytes)
try:
parsed = parse_manifest_from_bytes(
manifest_bytes, manifest_row.media_type.name, validate=False
)
subject = parsed.subject
except ManifestException as me:
logger.warning(
"Got exception when trying to parse manifest %s: %s", manifest_row.id, me
)
updated = (
Manifest.update(
subject=subject,
subject_backfilled=True,
)
.where(
Manifest.id == manifest_row.id,
(Manifest.subject_backfilled == False) | (Manifest.subject_backfilled >> None),
)
.execute()
)
if updated != 1:
logger.debug("Another worker preempted this worker")
abt.set()
continue
return True
def _backfill_manifest_artifact_type(self):
try:
Manifest.select().where(
(Manifest.artifact_type_backfilled == False)
| (Manifest.artifact_type_backfilled >> None),
).get()
except Manifest.DoesNotExist:
logger.debug("Manifest artifact_type backfill worker has completed; skipping")
return False
iterator = yield_random_entries(
lambda: Manifest.select().where(
(Manifest.artifact_type_backfilled == False)
| (Manifest.artifact_type_backfilled >> None)
),
Manifest.id,
250,
Manifest.select(fn.Max(Manifest.id)).scalar(),
1,
)
for manifest_row, abt, _ in iterator:
if manifest_row.artifact_type_backfilled:
logger.debug("Another worker preempted this worker")
abt.set()
continue
logger.debug("Setting artifact_type for manifest %s", manifest_row.id)
manifest_bytes = Bytes.for_string_or_unicode(manifest_row.manifest_bytes)
try:
parsed = parse_manifest_from_bytes(
manifest_bytes, manifest_row.media_type.name, validate=False
)
artifact_type = parsed.artifact_type
except ManifestException as me:
logger.warning(
"Got exception when trying to parse manifest %s: %s", manifest_row.id, me
)
updated = (
Manifest.update(
artifact_type=artifact_type,
artifact_type_backfilled=True,
)
.where(
Manifest.id == manifest_row.id,
(Manifest.artifact_type_backfilled == False)
| (Manifest.artifact_type_backfilled >> None),
)
.execute()
)
if updated != 1:
logger.debug("Another worker preempted this worker")
abt.set()
continue
return True
def create_gunicorn_worker():
"""
follows the gunicorn application factory pattern, enabling
a quay worker to run as a gunicorn worker thread.
this is useful when utilizing gunicorn's hot reload in local dev.
utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio.
"""
worker = GunicornWorker(
__name__, app, ManifestSubjectBackfillWorker(), features.MANIFEST_SUBJECT_BACKFILL
)
return worker
def main():
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
if app.config.get("ACCOUNT_RECOVERY_MODE", False):
logger.debug("Quay running in account recovery mode")
while True:
time.sleep(100000)
if not features.MANIFEST_SUBJECT_BACKFILL:
logger.debug("Manifest backfill worker not enabled; skipping")
while True:
time.sleep(100000)
worker = ManifestSubjectBackfillWorker()
worker.start()
if __name__ == "__main__":
main()