forked from deepfakes/faceswap
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconvert.py
227 lines (197 loc) · 10.7 KB
/
convert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!/usr/bin/env python3
""" Converter for faceswap.py
Based on: https://gist.github.com/anonymous/d3815aba83a8f79779451262599b0955
found on https://www.reddit.com/r/deepfakes/ """
import logging
import cv2
import numpy as np
from lib.model import masks as model_masks
from plugins.plugin_loader import PluginLoader
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
class Converter():
""" Swap a source face with a target """
def __init__(self, output_dir, output_size, output_has_mask,
draw_transparent, pre_encode, arguments, configfile=None):
logger.debug("Initializing %s: (output_dir: '%s', output_size: %s, output_has_mask: %s, "
"draw_transparent: %s, pre_encode: %s, arguments: %s, configfile: %s)",
self.__class__.__name__, output_dir, output_size, output_has_mask,
draw_transparent, pre_encode, arguments, configfile)
self.output_dir = output_dir
self.draw_transparent = draw_transparent
self.writer_pre_encode = pre_encode
self.scale = arguments.output_scale / 100
self.output_size = output_size
self.output_has_mask = output_has_mask
self.args = arguments
self.configfile = configfile
self.adjustments = dict(box=None, mask=None, color=None, seamless=None, scaling=None)
self.load_plugins()
logger.debug("Initialized %s", self.__class__.__name__)
def reinitialize(self, config):
""" reinitialize converter """
logger.debug("Reinitializing converter")
self.adjustments = dict(box=None, mask=None, color=None, seamless=None, scaling=None)
self.load_plugins(config=config, disable_logging=True)
logger.debug("Reinitialized converter")
def load_plugins(self, config=None, disable_logging=False):
""" Load the requested adjustment plugins """
logger.debug("Loading plugins. config: %s", config)
self.adjustments["box"] = PluginLoader.get_converter(
"mask",
"box_blend",
disable_logging=disable_logging)("none",
self.output_size,
configfile=self.configfile,
config=config)
self.adjustments["mask"] = PluginLoader.get_converter(
"mask",
"mask_blend",
disable_logging=disable_logging)(self.args.mask_type,
self.output_size,
self.output_has_mask,
configfile=self.configfile,
config=config)
if self.args.color_adjustment != "none" and self.args.color_adjustment is not None:
self.adjustments["color"] = PluginLoader.get_converter(
"color",
self.args.color_adjustment,
disable_logging=disable_logging)(configfile=self.configfile, config=config)
if self.args.scaling != "none" and self.args.scaling is not None:
self.adjustments["scaling"] = PluginLoader.get_converter(
"scaling",
self.args.scaling,
disable_logging=disable_logging)(configfile=self.configfile, config=config)
logger.debug("Loaded plugins: %s", self.adjustments)
def process(self, in_queue, out_queue):
""" Process items from the queue """
logger.debug("Starting convert process. (in_queue: %s, out_queue: %s)",
in_queue, out_queue)
while True:
item = in_queue.get()
if item == "EOF":
logger.debug("EOF Received")
logger.debug("Patch queue finished")
# Signal EOF to other processes in pool
logger.debug("Putting EOF back to in_queue")
in_queue.put(item)
break
logger.trace("Patch queue got: '%s'", item["filename"])
try:
image = self.patch_image(item)
except Exception as err: # pylint: disable=broad-except
# Log error and output original frame
logger.error("Failed to convert image: '%s'. Reason: %s",
item["filename"], str(err))
image = item["image"]
# UNCOMMENT THIS CODE BLOCK TO PRINT TRACEBACK ERRORS
# import sys
# import traceback
# exc_info = sys.exc_info()
# traceback.print_exception(*exc_info)
logger.trace("Out queue put: %s", item["filename"])
out_queue.put((item["filename"], image))
logger.debug("Completed convert process")
def patch_image(self, predicted):
""" Patch the image """
logger.trace("Patching image: '%s'", predicted["filename"])
frame_size = (predicted["image"].shape[1], predicted["image"].shape[0])
new_image = self.get_new_image(predicted, frame_size)
patched_face = self.post_warp_adjustments(predicted, new_image)
patched_face = self.scale_image(patched_face)
patched_face = np.rint(patched_face * 255.0).astype("uint8")
if self.writer_pre_encode is not None:
patched_face = self.writer_pre_encode(patched_face)
logger.trace("Patched image: '%s'", predicted["filename"])
return patched_face
def get_new_image(self, predicted, frame_size):
""" Get the new face from the predictor and apply box manipulations """
logger.trace("Getting: (filename: '%s', faces: %s)",
predicted["filename"], len(predicted["swapped_faces"]))
placeholder = predicted["image"] / 255.0
placeholder = np.concatenate((placeholder,
np.zeros((frame_size[1], frame_size[0], 1))),
axis=-1).astype("float32")
for new_face, detected_face in zip(predicted["swapped_faces"],
predicted["detected_faces"]):
predicted_mask = new_face[:, :, -1] if new_face.shape[2] == 4 else None
new_face = new_face[:, :, :3]
src_face = detected_face.reference_face
interpolator = detected_face.reference_interpolators[1]
new_face = self.pre_warp_adjustments(src_face, new_face, detected_face, predicted_mask)
# Warp face with the mask
placeholder = cv2.warpAffine( # pylint: disable=no-member
new_face,
detected_face.reference_matrix,
frame_size,
placeholder,
flags=cv2.WARP_INVERSE_MAP | interpolator, # pylint: disable=no-member
borderMode=cv2.BORDER_TRANSPARENT) # pylint: disable=no-member
placeholder = np.clip(placeholder, 0.0, 1.0)
logger.trace("Got filename: '%s'. (placeholders: %s)",
predicted["filename"], placeholder.shape)
return placeholder
def pre_warp_adjustments(self, old_face, new_face, detected_face, predicted_mask):
""" Run the pre-warp adjustments """
logger.trace("old_face shape: %s, new_face shape: %s, predicted_mask shape: %s",
old_face.shape, new_face.shape,
predicted_mask.shape if predicted_mask is not None else None)
new_face = self.adjustments["box"].run(new_face)
new_face, raw_mask = self.get_image_mask(new_face, detected_face, predicted_mask)
if self.adjustments["color"] is not None:
new_face = self.adjustments["color"].run(old_face, new_face, raw_mask)
if self.adjustments["seamless"] is not None:
new_face = self.adjustments["seamless"].run(old_face, new_face, raw_mask)
logger.trace("returning: new_face shape %s", new_face.shape)
return new_face
def get_image_mask(self, new_face, detected_face, predicted_mask):
""" Get the image mask """
logger.trace("Getting mask. Image shape: %s", new_face.shape)
mask, raw_mask = self.adjustments["mask"].run(detected_face, predicted_mask)
if new_face.shape[2] == 4:
logger.trace("Combining mask with alpha channel box mask")
new_face[:, :, -1] = np.minimum(new_face[:, :, -1], mask.squeeze())
else:
logger.trace("Adding mask to alpha channel")
new_face = np.concatenate((new_face, mask), -1)
new_face = np.clip(new_face, 0.0, 1.0)
logger.trace("Got mask. Image shape: %s", new_face.shape)
return new_face, raw_mask
def post_warp_adjustments(self, predicted, new_image):
""" Apply fixes to the image after warping """
if self.adjustments["scaling"] is not None:
new_image = self.adjustments["scaling"].run(new_image)
mask = np.repeat(new_image[:, :, -1][:, :, np.newaxis], 3, axis=-1)
foreground = new_image[:, :, :3]
background = (predicted["image"][:, :, :3] / 255.0) * (1.0 - mask)
foreground *= mask
frame = foreground + background
frame = self.add_alpha_mask(frame, predicted)
np.clip(frame, 0.0, 1.0, out=frame)
return frame
def add_alpha_mask(self, frame, predicted):
""" Adding a 4th channel should happen after all other channel operations
Add the default mask as 4th channel for saving as image with alpha channel """
if not self.draw_transparent:
return frame
logger.trace("Creating transparent image: '%s'", predicted["filename"])
mask_type = getattr(model_masks, model_masks.get_default_mask())
final_mask = np.zeros(frame.shape[:2] + (1, ), dtype="float32")
for detected_face in predicted["detected_faces"]:
landmarks = detected_face.landmarks_as_xy
final_mask = cv2.bitwise_or(final_mask, # pylint: disable=no-member
mask_type(landmarks, frame, channels=1).mask)
final_mask = np.expand_dims(final_mask, axis=-1) if final_mask.ndim == 2 else final_mask
frame = np.concatenate((frame, final_mask), axis=-1)
logger.trace("Created transparent image: '%s'", predicted["filename"])
return frame
def scale_image(self, frame):
""" Scale the image if requested """
if self.scale == 1:
return frame
logger.trace("source frame: %s", frame.shape)
interp = cv2.INTER_CUBIC if self.scale > 1 else cv2.INTER_AREA # pylint: disable=no-member
dims = (round((frame.shape[1] / 2 * self.scale) * 2),
round((frame.shape[0] / 2 * self.scale) * 2))
frame = cv2.resize(frame, dims, interpolation=interp) # pylint: disable=no-member
logger.trace("resized frame: %s", frame.shape)
return frame