forked from a1ex4/ownfoil
-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.py
210 lines (172 loc) · 7.17 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm.exc import NoResultFound
from flask_login import UserMixin
import json, os
import logging
# Retrieve main logger
logger = logging.getLogger('main')
db = SQLAlchemy()
def to_dict(db_results):
return {c.name: getattr(db_results, c.name) for c in db_results.__table__.columns}
class Files(db.Model):
id = db.Column(db.Integer, primary_key=True)
filepath = db.Column(db.String)
library = db.Column(db.String)
folder = db.Column(db.String)
filename = db.Column(db.String)
title_id = db.Column(db.String)
app_id = db.Column(db.String)
type = db.Column(db.String)
version = db.Column(db.String)
extension = db.Column(db.String)
size = db.Column(db.Integer)
identification = db.Column(db.String)
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
user = db.Column(db.String(100), unique=True)
password = db.Column(db.String(100))
admin_access = db.Column(db.Boolean)
shop_access = db.Column(db.Boolean)
backup_access = db.Column(db.Boolean)
@property
def is_admin(self):
return self.admin_access
def has_shop_access(self):
return self.shop_access
def has_backup_access(self):
return self.backup_access
def has_admin_access(self):
return self.admin_access
def has_access(self, access):
if access == 'admin':
return self.has_admin_access()
elif access == 'shop':
return self.has_shop_access()
elif access == 'backup':
return self.has_backup_access()
def add_to_titles_db(library, file_info):
filepath = file_info["filepath"]
filedir = file_info["filedir"].replace(library, '')
if exists := db.session.query(
db.session.query(Files).filter_by(filepath=filepath).exists()
).scalar():
existing_entry = db.session.query(Files).filter_by(filepath=filepath).all()
existing_entry_data = to_dict(existing_entry[0])
current_identification = existing_entry_data["identification"]
new_identification = file_info["identification"]
if new_identification == current_identification:
return
else:
# delete old entry and replace with updated one
db.session.query(Files).filter_by(filepath=filepath).delete()
new_title = Files(
filepath = filepath,
library = library,
folder = filedir,
filename = file_info["filename"],
title_id = file_info["title_id"],
app_id = file_info["app_id"],
type = file_info["type"],
version = file_info["version"],
extension = file_info["extension"],
size = file_info["size"],
identification = file_info["identification"],
)
db.session.add(new_title)
db.session.commit()
def update_file_path(library, old_path, new_path):
try:
# Find the file entry in the database using the old_path
file_entry = Files.query.filter_by(filepath=old_path).one()
# Extract the new folder and filename from the new_path
folder = os.path.dirname(new_path)
if os.path.normpath(library) == os.path.normpath(folder):
# file is at the root of the library
new_folder = ''
else:
new_folder = folder.replace(library, '')
new_folder = '/' + new_folder if not new_folder.startswith('/') else new_folder
filename = os.path.basename(new_path)
# Update the file entry with the new path values
file_entry.filename = filename
file_entry.filepath = new_path
file_entry.folder = new_folder
file_entry.library = library
# Commit the changes to the database
db.session.commit()
logger.info(f"File path updated successfully from {old_path} to {new_path}.")
except NoResultFound:
logger.warning(f"No file entry found for the path: {old_path}.")
except Exception as e:
db.session.rollback() # Roll back the session in case of an error
logger.error(f"An error occurred while updating the file path: {str(e)}")
def get_all_titles_from_db():
# results = db.session.query(Files.title_id).distinct()
# return [row[0] for row in results]
results = db.session.query(Files).all()
return [to_dict(r) for r in results]
def get_all_title_files(title_id):
title_id = title_id.upper()
results = db.session.query(Files).filter_by(title_id=title_id).all()
return [to_dict(r) for r in results]
def get_all_files_with_identification(identification):
results = db.session.query(Files).filter_by(identification=identification).all()
return[to_dict(r)['filepath'] for r in results]
def delete_files_by_library(library_path):
success = True
errors = []
try:
# Find all files with the given library
files_to_delete = Files.query.filter_by(library=library_path).all()
# Delete each file
for file in files_to_delete:
db.session.delete(file)
# Commit the changes
db.session.commit()
logger.info(f"All entries with library '{library_path}' have been deleted.")
return success, errors
except Exception as e:
# If there's an error, rollback the session
db.session.rollback()
logger.error(f"An error occurred: {e}")
success = False
errors.append({
'path': 'library/paths',
'error': f"An error occurred: {e}"
})
return success, errors
def delete_file_by_filepath(filepath):
try:
# Find file with the given filepath
file_to_delete = Files.query.filter_by(filepath=filepath).one()
# Delete file
db.session.delete(file_to_delete)
# Commit the changes
db.session.commit()
logger.info(f"File '{filepath}' has been deleted.")
except Exception as e:
# If there's an error, rollback the session
db.session.rollback()
logger.error(f"An error occurred while deleting the file path: {str(e)}")
def remove_missing_files():
try:
# Query all entries in the Files table
files = Files.query.all()
# List to keep track of IDs to be deleted
ids_to_delete = []
for file_entry in files:
# Check if the file exists on disk
if not os.path.exists(file_entry.filepath):
# If the file does not exist, mark this entry for deletion
ids_to_delete.append(file_entry.id)
logger.debug(f"File not found, marking file for deletion: {file_entry.filepath}")
# Delete all marked entries from the database
if ids_to_delete:
Files.query.filter(Files.id.in_(ids_to_delete)).delete(synchronize_session=False)
db.session.commit()
logger.info(f"Deleted {len(ids_to_delete)} files from the database.")
else:
logger.debug("No files were deleted. All files are present on disk.")
except Exception as e:
db.session.rollback() # Rollback in case of an error
logger.error(f"An error occurred while removing missing files: {str(e)}")