-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb.py
281 lines (234 loc) · 8.8 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
"""
Database management methods for RoboRegistry
@author: Lucas Bubner, 2023
"""
import math
from datetime import datetime
from time import time
from flask_login import current_user
from pytz import timezone
from requests.exceptions import HTTPError
import utils
from fb import db
def get_user_data(uid, auth=None) -> dict:
"""
Gets a user's info from the database.
"""
auth = auth or getattr(current_user, "token", None)
try:
data = db.child("users").child(uid).get(auth).val()
except KeyError:
return {}
if not data:
return {}
return dict(data)
def mutate_user_data(info: dict, auth=None) -> None:
"""
Appends user data in the database.
"""
auth = auth or getattr(current_user, "token", None)
db.child("users").child(utils.get_uid()).update(info, auth)
def get_uid_for(event_id, auth=None) -> str:
"""
Find the event creator for an event.
"""
auth = auth or getattr(current_user, "token", None)
return str(db.child("events").child(event_id).child("creator").get(auth).val())
def add_event(uid, event, auth=None):
"""
Adds an event to the database.
"""
auth = auth or getattr(current_user, "token", None)
db.child("events").child(uid).set(event, auth)
def add_entry(event_id, public_data, private_data, override, auth=None):
"""
Updates an event in the database to reflect a new registration.
"""
auth = auth or getattr(current_user, "token", None)
# Refuse if the event is not accepting registrations
if not get_event(event_id)["settings"]["regis"] and not override:
return
public_data |= {
# Only show the first name of the contact
"entity": f"{private_data['contactName'].split(' ')[0]} | {private_data['repName'].upper()}",
"checkin_data": {
"checked_in": False,
"time": 0
}
}
if not override:
db.child("events").child(event_id).child("registered").child(utils.get_uid()).set(public_data, auth)
db.child("registered_data").child(event_id).child(utils.get_uid()).set(private_data, auth)
else:
# Push instead of setting to allow for multiple registrations
public_data_push = db.child("events").child(event_id).child("registered").push(public_data, auth)
db.child("registered_data").child(event_id).child(public_data_push["name"]).set(private_data, auth)
def check_in(event_id, uid=None, auth=None):
"""
Checks a user into an event.
No arguments will check in the current user.
"""
auth = auth or getattr(current_user, "token", None)
# Refuse if check-ins are not allowed
if not get_event(event_id)["settings"]["checkin"]:
return
uid = uid or utils.get_uid()
db.child("events").child(event_id).child("registered").child(uid).child("checkin_data").set({
"checked_in": True,
"time": math.floor(time())
}, auth)
def anon_check_in(event_id, affil, name):
"""
Checks in an anonymous user.
"""
# Authentication is not required as this tree is public
data = {
"rep": affil,
"name": name,
"time": math.floor(time())
}
# Decline if check-ins are not allowed
if not get_event(event_id)["settings"]["checkin"]:
return
db.child("registered_data").child(event_id).child("anon_data").push(data)
def dyn_check_in(event_id, entity):
"""
Checks in a user from an entity.
"""
uid = get_uid_for_entity(event_id, entity)
check_in(event_id, uid)
def get_event(event_id, auth=None):
"""
Gets an event from a creator from the database.
"""
auth = auth or getattr(current_user, "token", None)
try:
event = db.child("events").child(event_id).get(auth).val()
event = dict(event)
event["uid"] = event_id
# Refuse to give the event if it is not visible
if event["settings"]["visible"] is False and event["creator"] != utils.get_uid():
return {}
except (HTTPError, TypeError):
# Event does not exist
return {}
return event
def unregister(event_id, auth=None) -> bool:
"""
Unregister from an event.
"""
auth = auth or getattr(current_user, "token", None)
event = get_event(event_id)
tz = timezone(event["timezone"])
tz.localize(datetime.strptime(event["date"] + event["start_time"], "%Y-%m-%d%H:%M"))
# Disallow unregistration if event has already started/ended or if it is not accepting registrations
if datetime.now(tz) > tz.localize(datetime.strptime(event["date"] + event["end_time"], "%Y-%m-%d%H:%M")) or not \
event["settings"]["regis"]:
return False
db.child("events").child(event_id).child("registered").child(utils.get_uid()).remove(auth)
db.child("registered_data").child(event_id).child(utils.get_uid()).remove(auth)
return True
def verify_unique(event_id, repname, auth=None) -> bool:
"""
Verify a team name is not already registered for an event.
"""
auth = auth or getattr(current_user, "token", None)
all_registrations = db.child("events").child(event_id).child("registered").get(auth).val()
try:
all_registrations = dict(all_registrations)
except TypeError:
# No registrations, has to be unique
return True
for registration in all_registrations.values():
if registration["entity"].split(" | ")[1].upper() == repname.upper():
return False
return True
def get_event_data(event_id, auth=None) -> dict:
"""
Get registered data for an event.
May only be accessed by the event owner.
"""
auth = auth or getattr(current_user, "token", None)
# Will raise HTTPError if not authorised, but will return an empty object if no data exists
try:
data = dict(db.child("registered_data").child(event_id).get(auth).val())
except TypeError:
return {}
return data
def get_uid_for_entity(event_id, entity) -> str:
"""
Find the entity creator for an entity.
"""
# entity has the structure of '{CONTACTNAME} | {REPNAME}'
event = get_event(event_id)
if not event:
return ""
for uid, data in event["registered"].items():
if data["entity"] == entity:
return uid
return ""
def get_my_events(auth=None) -> tuple[dict, dict]:
"""
Gets a user's events from the database.
@return: (registered_events, owned_events)
"""
auth = auth or getattr(current_user, "token", None)
try:
events = db.child("events").get(auth).val()
registered_events = {}
owned_events = {}
for event_id, event_data in dict(events).items():
if event_data["creator"] == utils.get_uid():
owned_events[event_id] = event_data
continue
if event_data.get("registered") and utils.get_uid() in event_data["registered"] and event_data.get(
"settings").get("visible") is True:
registered_events[event_id] = event_data
except (HTTPError, TypeError):
# Events do not exist
return {}, {}
return registered_events, owned_events
def delete_event(event_id, auth=None):
"""
Deletes an event from the database.
"""
auth = auth or getattr(current_user, "token", None)
if db.child("events").child(event_id).child("creator").get(auth).val() != utils.get_uid():
return
# MUST remove registered_data before events, otherwise Firebase cannot determine an owner
db.child("registered_data").child(event_id).remove(auth)
db.child("events").child(event_id).remove(auth)
def update_event(event_id, updates: dict, settings: dict, auth=None):
"""
Update an event in the database.
"""
auth = auth or getattr(current_user, "token", None)
settings |= {"last_modified": math.floor(time())}
# Refuse to update if the event is not owned by the user
if db.child("events").child(event_id).child("creator").get(auth).val() != utils.get_uid():
return
# Update the event tree based on each node
for node, value in updates.items():
db.child("events").child(event_id).child(node).set(value, auth)
for node, value in settings.items():
db.child("events").child(event_id).child("settings").child(node).set(value, auth)
def delete_all_user_events():
"""
Deletes all owned events from a user.
"""
events = get_my_events()[1]
for event_id in events:
delete_event(event_id)
def delete_user_data(auth=None):
"""
Delete all additional user data.
"""
auth = auth or getattr(current_user, "token", None)
db.child("users").child(utils.get_uid()).remove(auth)
logged_out_data = {
"first_name": "Guest",
"last_name": "User",
"email": "[email protected]",
"promotion": False,
"role": "guest"
}