forked from arsenetar/dupeguru
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathresults.py
434 lines (385 loc) · 15.8 KB
/
results.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
# Created By: Virgil Dupras
# Created On: 2006/02/23
# Copyright 2015 Hardcoded Software (http://www.hardcoded.net)
#
# This software is licensed under the "GPLv3" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.gnu.org/licenses/gpl-3.0.html
import logging
import re
import os
import os.path as op
from xml.etree import ElementTree as ET
from hscommon.jobprogress.job import nulljob
from hscommon.conflict import get_conflicted_name
from hscommon.util import flatten, nonone, FileOrPath, format_size
from hscommon.trans import tr
from . import engine
from .markable import Markable
class Results(Markable):
"""Manages a collection of duplicate :class:`~core.engine.Group`.
This class takes care or marking, sorting and filtering duplicate groups.
.. attribute:: groups
The list of :class:`~core.engine.Group` contained managed by this instance.
.. attribute:: dupes
A list of all duplicates (:class:`~core.fs.File` instances), without ref, contained in the
currently managed :attr:`groups`.
"""
# ---Override
def __init__(self, app):
Markable.__init__(self)
self.__groups = []
self.__group_of_duplicate = {}
self.__groups_sort_descriptor = None # This is a tuple (key, asc)
self.__dupes = None
self.__dupes_sort_descriptor = None # This is a tuple (key, asc, delta)
self.__filters = None
self.__filtered_dupes = None
self.__filtered_groups = None
self.__recalculate_stats()
self.__marked_size = 0
self.app = app
self.problems = [] # (dupe, error_msg)
self.is_modified = False
def _did_mark(self, dupe):
self.__marked_size += dupe.size
def _did_unmark(self, dupe):
self.__marked_size -= dupe.size
def _get_markable_count(self):
return self.__total_count
def _is_markable(self, dupe):
if dupe.is_ref:
return False
g = self.get_group_of_duplicate(dupe)
if not g:
return False
if dupe is g.ref:
return False
if self.__filtered_dupes and dupe not in self.__filtered_dupes:
return False
return True
def mark_all(self):
if self.__filters:
self.mark_multiple(self.__filtered_dupes)
else:
Markable.mark_all(self)
def mark_invert(self):
if self.__filters:
self.mark_toggle_multiple(self.__filtered_dupes)
else:
Markable.mark_invert(self)
def mark_none(self):
if self.__filters:
self.unmark_multiple(self.__filtered_dupes)
else:
Markable.mark_none(self)
# ---Private
def __get_dupe_list(self):
if self.__dupes is None:
self.__dupes = flatten(group.dupes for group in self.groups)
if None in self.__dupes:
# This is debug logging to try to figure out #44
logging.warning(
"There is a None value in the Results' dupe list. dupes: %r groups: %r",
self.__dupes,
self.groups,
)
if self.__filtered_dupes:
self.__dupes = [
dupe for dupe in self.__dupes if dupe in self.__filtered_dupes
]
sd = self.__dupes_sort_descriptor
if sd:
self.sort_dupes(sd[0], sd[1], sd[2])
return self.__dupes
def __get_groups(self):
if self.__filtered_groups is None:
return self.__groups
else:
return self.__filtered_groups
def __get_stat_line(self):
if self.__filtered_dupes is None:
mark_count = self.mark_count
marked_size = self.__marked_size
total_count = self.__total_count
total_size = self.__total_size
else:
mark_count = len(
[dupe for dupe in self.__filtered_dupes if self.is_marked(dupe)]
)
marked_size = sum(
dupe.size for dupe in self.__filtered_dupes if self.is_marked(dupe)
)
total_count = len(
[dupe for dupe in self.__filtered_dupes if self.is_markable(dupe)]
)
total_size = sum(
dupe.size for dupe in self.__filtered_dupes if self.is_markable(dupe)
)
if self.mark_inverted:
marked_size = self.__total_size - marked_size
result = tr("%d / %d (%s / %s) duplicates marked.") % (
mark_count,
total_count,
format_size(marked_size, 2),
format_size(total_size, 2),
)
if self.__filters:
result += tr(" filter: %s") % " --> ".join(self.__filters)
return result
def __recalculate_stats(self):
self.__total_size = 0
self.__total_count = 0
for group in self.groups:
markable = [dupe for dupe in group.dupes if self._is_markable(dupe)]
self.__total_count += len(markable)
self.__total_size += sum(dupe.size for dupe in markable)
def __set_groups(self, new_groups):
self.mark_none()
self.__groups = new_groups
self.__group_of_duplicate = {}
for g in self.__groups:
for dupe in g:
self.__group_of_duplicate[dupe] = g
if not hasattr(dupe, "is_ref"):
dupe.is_ref = False
self.is_modified = bool(self.__groups)
old_filters = nonone(self.__filters, [])
self.apply_filter(None)
for filter_str in old_filters:
self.apply_filter(filter_str)
# ---Public
def apply_filter(self, filter_str):
"""Applies a filter ``filter_str`` to :attr:`groups`
When you apply the filter, only dupes with the filename matching ``filter_str`` will be in
in the results. To cancel the filter, just call apply_filter with ``filter_str`` to None,
and the results will go back to normal.
If call apply_filter on a filtered results, the filter will be applied
*on the filtered results*.
:param str filter_str: a string containing a regexp to filter dupes with.
"""
if not filter_str:
self.__filtered_dupes = None
self.__filtered_groups = None
self.__filters = None
else:
if not self.__filters:
self.__filters = []
try:
filter_re = re.compile(filter_str, re.IGNORECASE)
except re.error:
return # don't apply this filter.
self.__filters.append(filter_str)
if self.__filtered_dupes is None:
self.__filtered_dupes = flatten(g[:] for g in self.groups)
self.__filtered_dupes = set(
dupe
for dupe in self.__filtered_dupes
if filter_re.search(str(dupe.path))
)
filtered_groups = set()
for dupe in self.__filtered_dupes:
filtered_groups.add(self.get_group_of_duplicate(dupe))
self.__filtered_groups = list(filtered_groups)
self.__recalculate_stats()
sd = self.__groups_sort_descriptor
if sd:
self.sort_groups(sd[0], sd[1])
self.__dupes = None
def get_group_of_duplicate(self, dupe):
"""Returns :class:`~core.engine.Group` in which ``dupe`` belongs.
"""
try:
return self.__group_of_duplicate[dupe]
except (TypeError, KeyError):
return None
is_markable = _is_markable
def load_from_xml(self, infile, get_file, j=nulljob):
"""Load results from ``infile``.
:param infile: a file or path pointing to an XML file created with :meth:`save_to_xml`.
:param get_file: a function f(path) returning a :class:`~core.fs.File` wrapping the path.
:param j: A :ref:`job progress instance <jobs>`.
"""
def do_match(ref_file, other_files, group):
if not other_files:
return
for other_file in other_files:
group.add_match(engine.get_match(ref_file, other_file))
do_match(other_files[0], other_files[1:], group)
self.apply_filter(None)
root = ET.parse(infile).getroot()
group_elems = list(root.iter("group"))
groups = []
marked = set()
for group_elem in j.iter_with_progress(group_elems, every=100):
group = engine.Group()
dupes = []
for file_elem in group_elem.iter("file"):
path = file_elem.get("path")
words = file_elem.get("words", "")
if not path:
continue
file = get_file(path)
if file is None:
continue
file.words = words.split(",")
file.is_ref = file_elem.get("is_ref") == "y"
dupes.append(file)
if file_elem.get("marked") == "y":
marked.add(file)
for match_elem in group_elem.iter("match"):
try:
attrs = match_elem.attrib
first_file = dupes[int(attrs["first"])]
second_file = dupes[int(attrs["second"])]
percentage = int(attrs["percentage"])
group.add_match(engine.Match(first_file, second_file, percentage))
except (IndexError, KeyError, ValueError):
# Covers missing attr, non-int values and indexes out of bounds
pass
if (not group.matches) and (len(dupes) >= 2):
do_match(dupes[0], dupes[1:], group)
group.prioritize(lambda x: dupes.index(x))
if len(group):
groups.append(group)
j.add_progress()
self.groups = groups
for dupe_file in marked:
self.mark(dupe_file)
self.is_modified = False
def make_ref(self, dupe):
"""Make ``dupe`` take the :attr:`~core.engine.Group.ref` position of its group.
"""
g = self.get_group_of_duplicate(dupe)
r = g.ref
if not g.switch_ref(dupe):
return False
self._remove_mark_flag(dupe)
if not r.is_ref:
self.__total_count += 1
self.__total_size += r.size
if not dupe.is_ref:
self.__total_count -= 1
self.__total_size -= dupe.size
self.__dupes = None
self.is_modified = True
return True
def perform_on_marked(self, func, remove_from_results):
"""Performs ``func`` on all marked dupes.
If an ``EnvironmentError`` is raised during the call, the problematic dupe is added to
self.problems.
:param bool remove_from_results: If true, dupes which had ``func`` applied and didn't cause
any problem.
"""
self.problems = []
to_remove = []
marked = (dupe for dupe in self.dupes if self.is_marked(dupe))
for dupe in marked:
try:
func(dupe)
to_remove.append(dupe)
except (EnvironmentError, UnicodeEncodeError) as e:
self.problems.append((dupe, str(e)))
if remove_from_results:
self.remove_duplicates(to_remove)
self.mark_none()
for dupe, _ in self.problems:
self.mark(dupe)
def remove_duplicates(self, dupes):
"""Remove ``dupes`` from their respective :class:`~core.engine.Group`.
Also, remove the group from :attr:`groups` if it ends up empty.
"""
affected_groups = set()
for dupe in dupes:
group = self.get_group_of_duplicate(dupe)
if dupe not in group.dupes:
return
ref = group.ref
group.remove_dupe(dupe, False)
del self.__group_of_duplicate[dupe]
self._remove_mark_flag(dupe)
self.__total_count -= 1
self.__total_size -= dupe.size
if not group:
del self.__group_of_duplicate[ref]
self.__groups.remove(group)
if self.__filtered_groups:
self.__filtered_groups.remove(group)
else:
affected_groups.add(group)
for group in affected_groups:
group.discard_matches()
self.__dupes = None
self.is_modified = bool(self.__groups)
def save_to_xml(self, outfile):
"""Save results to ``outfile`` in XML.
:param outfile: file object or path.
"""
self.apply_filter(None)
root = ET.Element("results")
for g in self.groups:
group_elem = ET.SubElement(root, "group")
dupe2index = {}
for index, d in enumerate(g):
dupe2index[d] = index
try:
words = engine.unpack_fields(d.words)
except AttributeError:
words = ()
file_elem = ET.SubElement(group_elem, "file")
try:
file_elem.set("path", str(d.path))
file_elem.set("words", ",".join(words))
except ValueError: # If there's an invalid character, just skip the file
file_elem.set("path", "")
file_elem.set("is_ref", ("y" if d.is_ref else "n"))
file_elem.set("marked", ("y" if self.is_marked(d) else "n"))
for match in g.matches:
match_elem = ET.SubElement(group_elem, "match")
match_elem.set("first", str(dupe2index[match.first]))
match_elem.set("second", str(dupe2index[match.second]))
match_elem.set("percentage", str(int(match.percentage)))
tree = ET.ElementTree(root)
def do_write(outfile):
with FileOrPath(outfile, "wb") as fp:
tree.write(fp, encoding="utf-8")
try:
do_write(outfile)
except IOError as e:
# If our IOError is because dest is already a directory, we want to handle that. 21 is
# the code we get on OS X and Linux, 13 is what we get on Windows.
if e.errno in {21, 13}:
p = str(outfile)
dirname, basename = op.split(p)
otherfiles = os.listdir(dirname)
newname = get_conflicted_name(otherfiles, basename)
do_write(op.join(dirname, newname))
else:
raise
self.is_modified = False
def sort_dupes(self, key, asc=True, delta=False):
"""Sort :attr:`dupes` according to ``key``.
:param str key: key attribute name to sort with.
:param bool asc: If false, sorting is reversed.
:param bool delta: If true, sorting occurs using :ref:`delta values <deltavalues>`.
"""
if not self.__dupes:
self.__get_dupe_list()
keyfunc = lambda d: self.app._get_dupe_sort_key(
d, lambda: self.get_group_of_duplicate(d), key, delta
)
self.__dupes.sort(key=keyfunc, reverse=not asc)
self.__dupes_sort_descriptor = (key, asc, delta)
def sort_groups(self, key, asc=True):
"""Sort :attr:`groups` according to ``key``.
The :attr:`~core.engine.Group.ref` of each group is used to extract values for sorting.
:param str key: key attribute name to sort with.
:param bool asc: If false, sorting is reversed.
"""
keyfunc = lambda g: self.app._get_group_sort_key(g, key)
self.groups.sort(key=keyfunc, reverse=not asc)
self.__groups_sort_descriptor = (key, asc)
# ---Properties
dupes = property(__get_dupe_list)
groups = property(__get_groups, __set_groups)
stat_line = property(__get_stat_line)