-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathsave.py
145 lines (113 loc) · 4.72 KB
/
save.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import contextlib
import datetime
from cjwkernel.types import FetchResult
from cjworkbench.sync import database_sync_to_async
from cjwstate import clientside, commands, rabbitmq, storedobjects
from cjwstate.models import Step, Workflow
from cjwstate.models.commands import SetStepDataVersion
@contextlib.contextmanager
def _locked_step(workflow_id: int, step: Step):
"""Refresh step from database and yield with workflow lock.
Raise Workflow.DoesNotExist or Step.DoesNotExist in the event of a
race. (Even soft-deleted Step or Tab raises Step.DoesNotExist,
to simulate hard deletion -- because sooner or later soft-delete won't be
a thing any more.)
"""
# raise Workflow.DoesNotExist
with Workflow.lookup_and_cooperative_lock(id=workflow_id):
# raise Step.DoesNotExist
step.refresh_from_db()
if step.is_deleted or step.tab.is_deleted:
raise Step.DoesNotExist("soft-deleted")
yield
async def _notify_websockets(workflow_id: int, step: Step) -> None:
"""Send delta to client, syncing all `step` fields fetcher can edit."""
update = clientside.Update(
steps={
step.id: clientside.StepUpdate(
is_busy=step.is_busy, last_fetched_at=step.last_update_check
)
}
)
await rabbitmq.send_update_to_workflow_clients(workflow_id, update)
@database_sync_to_async
def _do_create_result(
workflow_id: int, step: Step, result: FetchResult, now: datetime.datetime
) -> None:
"""Do database manipulations for create_result().
Modify `step` in-place.
Do *not* do the logic in SetStepDataVersion. We're creating a new
version, not doing something undoable.
Raise Step.DoesNotExist or Workflow.DoesNotExist in case of a race.
"""
with _locked_step(workflow_id, step):
storedobjects.create_stored_object(
workflow_id, step.id, result.path, stored_at=now
)
storedobjects.delete_old_files_to_enforce_storage_limits(step=step)
# Assume caller sends new list to clients via SetStepDataVersion
step.fetch_errors = result.errors
step.is_busy = False
step.last_update_check = now
step.save(update_fields=["fetch_errors", "is_busy", "last_update_check"])
async def create_result(
workflow_id: int, step: Step, result: FetchResult, now: datetime.datetime
) -> None:
"""Store fetched table as storedobject.
Set `fetch_errors` to `result.errors`. Set `is_busy` to `False`. Set
`last_update_check`.
Create (and run) a SetStepDataVersion. This will kick off an execute
cycle, which will render each module and email the owner if data has
changed and notifications are enabled.
Notify the user over Websockets.
No-op if `workflow` or `step` has been deleted.
"""
try:
await _do_create_result(workflow_id, step, result, now)
except (Step.DoesNotExist, Workflow.DoesNotExist):
return # there's nothing more to do
# SetStepDataVersion will change `step.last_relevant_delta_id`.
# This must happen before we notify with our own `is_busy=False`, to avoid
# this erroneous ordering of Websockets messages:
#
# A. is_busy=True -- we acknowledge the user's fetch request
# C. is_busy=False -- we're done fetching
# B. new last_relevant_delta_id -- a render is pending
#
# If C comes before B, the client will flicker to a "not-busy" state.
await commands.do(
SetStepDataVersion,
workflow_id=workflow_id,
step=step,
new_version=now,
)
# XXX odd design: SetStepDataVersion happens to update "versions"
# on the client. So _notify_websockets() doesn't need to send the new
# "versions".
await _notify_websockets(workflow_id, step)
@database_sync_to_async
def _do_mark_result_unchanged(
workflow_id: int, step: Step, now: datetime.datetime
) -> None:
"""Do database manipulations for mark_result_unchanged().
Modify `step` in-place.
Raise Step.DoesNotExist or Workflow.DoesNotExist in case of a race.
"""
with _locked_step(workflow_id, step):
step.is_busy = False
step.last_update_check = now
step.save(update_fields=["is_busy", "last_update_check"])
async def mark_result_unchanged(
workflow_id: int, step: Step, now: datetime.datetime
) -> None:
"""Leave storedobjects and `step.fetch_errors` unchanged.
Set step.is_busy to False.
Set step.last_update_check.
Notify the user over Websockets.
No-op if `workflow` or `step` has been deleted.
"""
try:
await _do_mark_result_unchanged(workflow_id, step, now)
except (Step.DoesNotExist, Workflow.DoesNotExist):
return # there's nothing more to do
await _notify_websockets(workflow_id, step)