import traceback |
import math |
import multiprocessing |
import operator |
import os |
import shutil |
import sys |
import time |
from pathlib import Path |
import cv2 |
import numpy as np |
from numpy import linalg as npla |
import facelib |
from core import imagelib |
from core import mathlib |
from facelib import FaceType, LandmarksProcessor |
from core.interact import interact as io |
from core.joblib import Subprocessor |
from core.leras import nn |
from core import pathex |
from core.cv2ex import * |
from DFLIMG import * |
DEBUG = False |
class ExtractSubprocessor(Subprocessor): |
class Data(object): |
def __init__(self, filepath=None, rects=None, landmarks = None, landmarks_accurate=True, manual=False, force_output_path=None, final_output_files = None): |
self.filepath = filepath |
self.rects = rects or [] |
self.rects_rotation = 0 |
self.landmarks_accurate = landmarks_accurate |
self.manual = manual |
self.landmarks = landmarks or [] |
self.force_output_path = force_output_path |
self.final_output_files = final_output_files or [] |
self.faces_detected = 0 |
class Cli(Subprocessor.Cli): |
def on_initialize(self, client_dict): |
self.type = client_dict['type'] |
self.image_size = client_dict['image_size'] |
self.jpeg_quality = client_dict['jpeg_quality'] |
self.face_type = client_dict['face_type'] |
self.max_faces_from_image = client_dict['max_faces_from_image'] |
self.device_idx = client_dict['device_idx'] |
self.cpu_only = client_dict['device_type'] == 'CPU' |
self.final_output_path = client_dict['final_output_path'] |
self.output_debug_path = client_dict['output_debug_path'] |
stdin_fd = client_dict['stdin_fd'] |
if stdin_fd is not None and DEBUG: |
sys.stdin = os.fdopen(stdin_fd) |
if self.cpu_only: |
device_config = nn.DeviceConfig.CPU() |
place_model_on_cpu = True |
else: |
device_config = nn.DeviceConfig.GPUIndexes ([self.device_idx]) |
place_model_on_cpu = device_config.devices[0].total_mem_gb < 4 |
if self.type == 'all' or 'rects' in self.type or 'landmarks' in self.type: |
nn.initialize (device_config) |
self.log_info (f"Running on {client_dict['device_name'] }") |
if self.type == 'all' or self.type == 'rects-s3fd' or 'landmarks' in self.type: |
self.rects_extractor = facelib.S3FDExtractor(place_model_on_cpu=place_model_on_cpu) |
if self.type == 'all' or 'landmarks' in self.type: |
self.landmarks_extractor = facelib.FANExtractor(landmarks_3D=self.face_type >= FaceType.HEAD, |
place_model_on_cpu=place_model_on_cpu) |
self.cached_image = (None, None) |
def process_data(self, data): |
if 'landmarks' in self.type and len(data.rects) == 0: |
return data |
filepath = data.filepath |
cached_filepath, image = self.cached_image |
if cached_filepath != filepath: |
image = cv2_imread( filepath ) |
if image is None: |
self.log_err (f'Failed to open {filepath}, reason: cv2_imread() fail.') |
return data |
image = imagelib.normalize_channels(image, 3) |
image = imagelib.cut_odd_image(image) |
self.cached_image = ( filepath, image ) |
h, w, c = image.shape |
if 'rects' in self.type or self.type == 'all': |
data = ExtractSubprocessor.Cli.rects_stage (data=data, |
image=image, |
max_faces_from_image=self.max_faces_from_image, |
rects_extractor=self.rects_extractor, |
) |
if 'landmarks' in self.type or self.type == 'all': |
data = ExtractSubprocessor.Cli.landmarks_stage (data=data, |
image=image, |
landmarks_extractor=self.landmarks_extractor, |
rects_extractor=self.rects_extractor, |
) |
if self.type == 'final' or self.type == 'all': |
data = ExtractSubprocessor.Cli.final_stage(data=data, |
image=image, |
face_type=self.face_type, |
image_size=self.image_size, |
jpeg_quality=self.jpeg_quality, |
output_debug_path=self.output_debug_path, |
final_output_path=self.final_output_path, |
) |
return data |
@staticmethod |
def rects_stage(data, |
image, |
max_faces_from_image, |
rects_extractor, |
): |
h,w,c = image.shape |
if min(h,w) < 128: |
data.rects = [] |
else: |
for rot in ([0, 90, 270, 180]): |
if rot == 0: |
rotated_image = image |
elif rot == 90: |
rotated_image = image.swapaxes( 0,1 )[:,::-1,:] |
elif rot == 180: |
rotated_image = image[::-1,::-1,:] |
elif rot == 270: |
rotated_image = image.swapaxes( 0,1 )[::-1,:,:] |
rects = data.rects = rects_extractor.extract (rotated_image, is_bgr=True) |
if len(rects) != 0: |
data.rects_rotation = rot |
break |
if max_faces_from_image is not None and \ |
max_faces_from_image > 0 and \ |
len(data.rects) > 0: |
data.rects = data.rects[0:max_faces_from_image] |
return data |
@staticmethod |
def landmarks_stage(data, |
image, |
landmarks_extractor, |
rects_extractor, |
): |
h, w, ch = image.shape |
if data.rects_rotation == 0: |
rotated_image = image |
elif data.rects_rotation == 90: |
rotated_image = image.swapaxes( 0,1 )[:,::-1,:] |
elif data.rects_rotation == 180: |
rotated_image = image[::-1,::-1,:] |
elif data.rects_rotation == 270: |
rotated_image = image.swapaxes( 0,1 )[::-1,:,:] |
data.landmarks = landmarks_extractor.extract (rotated_image, data.rects, rects_extractor if (data.landmarks_accurate) else None, is_bgr=True) |
if data.rects_rotation != 0: |
for i, (rect, lmrks) in enumerate(zip(data.rects, data.landmarks)): |
new_rect, new_lmrks = rect, lmrks |
(l,t,r,b) = rect |
if data.rects_rotation == 90: |
new_rect = ( t, h-l, b, h-r) |
if lmrks is not None: |
new_lmrks = lmrks[:,::-1].copy() |
new_lmrks[:,1] = h - new_lmrks[:,1] |
elif data.rects_rotation == 180: |
if lmrks is not None: |
new_rect = ( w-l, h-t, w-r, h-b) |
new_lmrks = lmrks.copy() |
new_lmrks[:,0] = w - new_lmrks[:,0] |
new_lmrks[:,1] = h - new_lmrks[:,1] |
elif data.rects_rotation == 270: |
new_rect = ( w-b, l, w-t, r ) |
if lmrks is not None: |
new_lmrks = lmrks[:,::-1].copy() |
new_lmrks[:,0] = w - new_lmrks[:,0] |
data.rects[i], data.landmarks[i] = new_rect, new_lmrks |
return data |
@staticmethod |
def final_stage(data, |
image, |
face_type, |
image_size, |
jpeg_quality, |
output_debug_path=None, |
final_output_path=None, |
): |
data.final_output_files = [] |
filepath = data.filepath |
rects = data.rects |
landmarks = data.landmarks |
if output_debug_path is not None: |
debug_image = image.copy() |
face_idx = 0 |
for rect, image_landmarks in zip( rects, landmarks ): |
if image_landmarks is None: |
continue |
rect = np.array(rect) |
if face_type == FaceType.MARK_ONLY: |
image_to_face_mat = None |
face_image = image |
face_image_landmarks = image_landmarks |
else: |
image_to_face_mat = LandmarksProcessor.get_transform_mat (image_landmarks, image_size, face_type) |
face_image = cv2.warpAffine(image, image_to_face_mat, (image_size, image_size), cv2.INTER_LANCZOS4) |
face_image_landmarks = LandmarksProcessor.transform_points (image_landmarks, image_to_face_mat) |
landmarks_bbox = LandmarksProcessor.transform_points ( [ (0,0), (0,image_size-1), (image_size-1, image_size-1), (image_size-1,0) ], image_to_face_mat, True) |
rect_area = mathlib.polygon_area(np.array(rect[[0,2,2,0]]).astype(np.float32), np.array(rect[[1,1,3,3]]).astype(np.float32)) |
landmarks_area = mathlib.polygon_area(landmarks_bbox[:,0].astype(np.float32), landmarks_bbox[:,1].astype(np.float32) ) |
if not data.manual and face_type <= FaceType.FULL_NO_ALIGN and landmarks_area > 4*rect_area: |
continue |
if output_debug_path is not None: |
LandmarksProcessor.draw_rect_landmarks (debug_image, rect, image_landmarks, face_type, image_size, transparent_mask=True) |
output_path = final_output_path |
if data.force_output_path is not None: |
output_path = data.force_output_path |
output_filepath = output_path / f"{filepath.stem}_{face_idx}.jpg" |
cv2_imwrite(output_filepath, face_image, [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality ] ) |
dflimg = DFLJPG.load(output_filepath) |
dflimg.set_face_type(FaceType.toString(face_type)) |
dflimg.set_landmarks(face_image_landmarks.tolist()) |
dflimg.set_source_filename(filepath.name) |
dflimg.set_source_rect(rect) |
dflimg.set_source_landmarks(image_landmarks.tolist()) |
dflimg.set_image_to_face_mat(image_to_face_mat) |
dflimg.save() |
data.final_output_files.append (output_filepath) |
face_idx += 1 |
data.faces_detected = face_idx |
if output_debug_path is not None: |
cv2_imwrite( output_debug_path / (filepath.stem+'.jpg'), debug_image, [int(cv2.IMWRITE_JPEG_QUALITY), 50] ) |
return data |
def get_data_name (self, data): |
return data.filepath |
@staticmethod |
def get_devices_for_config (type, device_config): |
devices = device_config.devices |
cpu_only = len(devices) == 0 |
if 'rects' in type or \ |
'landmarks' in type or \ |
'all' in type: |
if not cpu_only: |
if type == 'landmarks-manual': |
devices = [devices.get_best_device()] |
result = [] |
for device in devices: |
count = 1 |
if count == 1: |
result += [ (device.index, 'GPU', device.name, device.total_mem_gb) ] |
else: |
for i in range(count): |
result += [ (device.index, 'GPU', f"{device.name} #{i}", device.total_mem_gb) ] |
return result |
else: |
if type == 'landmarks-manual': |
return [ (0, 'CPU', 'CPU', 0 ) ] |
else: |
return [ (i, 'CPU', 'CPU%d' % (i), 0 ) for i in range( min(8, multiprocessing.cpu_count() // 2) ) ] |
elif type == 'final': |
return [ (i, 'CPU', 'CPU%d' % (i), 0 ) for i in (range(min(8, multiprocessing.cpu_count())) if not DEBUG else [0]) ] |
def __init__(self, input_data, type, image_size=None, jpeg_quality=None, face_type=None, output_debug_path=None, manual_window_size=0, max_faces_from_image=0, final_output_path=None, device_config=None): |
if type == 'landmarks-manual': |
for x in input_data: |
x.manual = True |
self.input_data = input_data |
self.type = type |
self.image_size = image_size |
self.jpeg_quality = jpeg_quality |
self.face_type = face_type |
self.output_debug_path = output_debug_path |
self.final_output_path = final_output_path |
self.manual_window_size = manual_window_size |
self.max_faces_from_image = max_faces_from_image |
self.result = [] |
self.devices = ExtractSubprocessor.get_devices_for_config(self.type, device_config) |
super().__init__('Extractor', ExtractSubprocessor.Cli, |
999999 if type == 'landmarks-manual' or DEBUG else 120) |
def on_clients_initialized(self): |
if self.type == 'landmarks-manual': |
self.wnd_name = 'Manual pass' |
io.named_window(self.wnd_name) |
io.capture_mouse(self.wnd_name) |
io.capture_keys(self.wnd_name) |
self.cache_original_image = (None, None) |
self.cache_image = (None, None) |
self.cache_text_lines_img = (None, None) |
self.hide_help = False |
self.landmarks_accurate = True |
self.force_landmarks = False |
self.landmarks = None |
self.x = 0 |
self.y = 0 |
self.rect_size = 100 |
self.rect_locked = False |
self.extract_needed = True |
self.image = None |
self.image_filepath = None |
io.progress_bar (None, len (self.input_data)) |
def on_clients_finalized(self): |
if self.type == 'landmarks-manual': |
io.destroy_all_windows() |
io.progress_bar_close() |
def process_info_generator(self): |
base_dict = {'type' : self.type, |
'image_size': self.image_size, |
'jpeg_quality' : self.jpeg_quality, |
'face_type': self.face_type, |
'max_faces_from_image':self.max_faces_from_image, |
'output_debug_path': self.output_debug_path, |
'final_output_path': self.final_output_path, |
'stdin_fd': sys.stdin.fileno() } |
for (device_idx, device_type, device_name, device_total_vram_gb) in self.devices: |
client_dict = base_dict.copy() |
client_dict['device_idx'] = device_idx |
client_dict['device_name'] = device_name |
client_dict['device_type'] = device_type |
yield client_dict['device_name'], {}, client_dict |
def get_data(self, host_dict): |
if self.type == 'landmarks-manual': |
need_remark_face = False |
while len (self.input_data) > 0: |
data = self.input_data[0] |
filepath, data_rects, data_landmarks = data.filepath, data.rects, data.landmarks |
is_frame_done = False |
if self.image_filepath != filepath: |
self.image_filepath = filepath |
if self.cache_original_image[0] == filepath: |
self.original_image = self.cache_original_image[1] |
else: |
self.original_image = imagelib.normalize_channels( cv2_imread( filepath ), 3 ) |
self.cache_original_image = (filepath, self.original_image ) |
(h,w,c) = self.original_image.shape |
self.view_scale = 1.0 if self.manual_window_size == 0 else self.manual_window_size / ( h * (16.0/9.0) ) |
if self.cache_image[0] == (h,w,c) + (self.view_scale,filepath): |
self.image = self.cache_image[1] |
else: |
self.image = cv2.resize (self.original_image, ( int(w*self.view_scale), int(h*self.view_scale) ), interpolation=cv2.INTER_LINEAR) |
self.cache_image = ( (h,w,c) + (self.view_scale,filepath), self.image ) |
(h,w,c) = self.image.shape |
sh = (0,0, w, min(100, h) ) |
if self.cache_text_lines_img[0] == sh: |
self.text_lines_img = self.cache_text_lines_img[1] |
else: |
self.text_lines_img = (imagelib.get_draw_text_lines ( self.image, sh, |
[ '[L Mouse click] - lock/unlock selection. [Mouse wheel] - change rect', |
'[R Mouse Click] - manual face rectangle', |
'[Enter] / [Space] - confirm / skip frame', |
'[,] [.]- prev frame, next frame. [Q] - skip remaining frames', |
'[a] - accuracy on/off (more fps)', |
'[h] - hide this help' |
], (1, 1, 1) )*255).astype(np.uint8) |
self.cache_text_lines_img = (sh, self.text_lines_img) |
if need_remark_face: |
need_remark_face = False |
if len(data_rects) != 0: |
self.rect = data_rects.pop() |
self.landmarks = data_landmarks.pop() |
data_rects.clear() |
data_landmarks.clear() |
self.rect_locked = True |
self.rect_size = ( self.rect[2] - self.rect[0] ) / 2 |
self.x = ( self.rect[0] + self.rect[2] ) / 2 |
self.y = ( self.rect[1] + self.rect[3] ) / 2 |
self.redraw() |
if len(data_rects) == 0: |
(h,w,c) = self.image.shape |
while True: |
io.process_messages(0.0001) |
if not self.force_landmarks: |
new_x = self.x |
new_y = self.y |
new_rect_size = self.rect_size |
mouse_events = io.get_mouse_events(self.wnd_name) |
for ev in mouse_events: |
(x, y, ev, flags) = ev |
if ev == io.EVENT_MOUSEWHEEL and not self.rect_locked: |
mod = 1 if flags > 0 else -1 |
diff = 1 if new_rect_size <= 40 else np.clip(new_rect_size / 10, 1, 10) |
new_rect_size = max (5, new_rect_size + diff*mod) |
elif ev == io.EVENT_LBUTTONDOWN: |
if self.force_landmarks: |
self.x = new_x |
self.y = new_y |
self.force_landmarks = False |
self.rect_locked = True |
self.redraw() |
else: |
self.rect_locked = not self.rect_locked |
self.extract_needed = True |
elif ev == io.EVENT_RBUTTONDOWN: |
self.force_landmarks = not self.force_landmarks |
if self.force_landmarks: |
self.rect_locked = False |
elif not self.rect_locked: |
new_x = np.clip (x, 0, w-1) / self.view_scale |
new_y = np.clip (y, 0, h-1) / self.view_scale |
key_events = io.get_key_events(self.wnd_name) |
key, chr_key, ctrl_pressed, alt_pressed, shift_pressed = key_events[-1] if len(key_events) > 0 else (0,0,False,False,False) |
if key == ord('\r') or key == ord('\n'): |
is_frame_done = True |
data_rects.append (self.rect) |
data_landmarks.append (self.landmarks) |
break |
elif key == ord(' '): |
is_frame_done = True |
break |
elif key == ord(',') and len(self.result) > 0: |
if self.rect_locked: |
self.rect_locked = False |
data_rects.append (self.rect) |
data_landmarks.append (self.landmarks) |
self.input_data.insert(0, self.result.pop() ) |
io.progress_bar_inc(-1) |
need_remark_face = True |
break |
elif key == ord('.'): |
if self.rect_locked: |
self.rect_locked = False |
data_rects.append (self.rect) |
data_landmarks.append (self.landmarks) |
need_remark_face = True |
is_frame_done = True |
break |
elif key == ord('q'): |
if self.rect_locked: |
self.rect_locked = False |
data_rects.append (self.rect) |
data_landmarks.append (self.landmarks) |
while len(self.input_data) > 0: |
self.result.append( self.input_data.pop(0) ) |
io.progress_bar_inc(1) |
break |
elif key == ord('h'): |
self.hide_help = not self.hide_help |
break |
elif key == ord('a'): |
self.landmarks_accurate = not self.landmarks_accurate |
break |
if self.force_landmarks: |
pt2 = np.float32([new_x, new_y]) |
pt1 = np.float32([self.x, self.y]) |
pt_vec_len = npla.norm(pt2-pt1) |
pt_vec = pt2-pt1 |
if pt_vec_len != 0: |
pt_vec /= pt_vec_len |
self.rect_size = pt_vec_len |
self.rect = ( int(self.x-self.rect_size), |
int(self.y-self.rect_size), |
int(self.x+self.rect_size), |
int(self.y+self.rect_size) ) |
if pt_vec_len > 0: |
lmrks = np.concatenate ( (np.zeros ((17,2), np.float32), LandmarksProcessor.landmarks_2D), axis=0 ) |
lmrks -= lmrks[30:31,:] |
mat = cv2.getRotationMatrix2D( (0, 0), -np.arctan2( pt_vec[1], pt_vec[0] )*180/math.pi , pt_vec_len) |
mat[:, 2] += (self.x, self.y) |
self.landmarks = LandmarksProcessor.transform_points(lmrks, mat ) |
self.redraw() |
elif self.x != new_x or \ |
self.y != new_y or \ |
self.rect_size != new_rect_size or \ |
self.extract_needed: |
self.x = new_x |
self.y = new_y |
self.rect_size = new_rect_size |
self.rect = ( int(self.x-self.rect_size), |
int(self.y-self.rect_size), |
int(self.x+self.rect_size), |
int(self.y+self.rect_size) ) |
return ExtractSubprocessor.Data (filepath, rects=[self.rect], landmarks_accurate=self.landmarks_accurate) |
else: |
is_frame_done = True |
if is_frame_done: |
self.result.append ( data ) |
self.input_data.pop(0) |
io.progress_bar_inc(1) |
self.extract_needed = True |
self.rect_locked = False |
else: |
if len (self.input_data) > 0: |
return self.input_data.pop(0) |
return None |
def on_data_return (self, host_dict, data): |
if not self.type != 'landmarks-manual': |
self.input_data.insert(0, data) |
def redraw(self): |
(h,w,c) = self.image.shape |
if not self.hide_help: |
image = cv2.addWeighted (self.image,1.0,self.text_lines_img,1.0,0) |
else: |
image = self.image.copy() |
view_rect = (np.array(self.rect) * self.view_scale).astype(np.int).tolist() |
view_landmarks = (np.array(self.landmarks) * self.view_scale).astype(np.int).tolist() |
if self.rect_size <= 40: |
scaled_rect_size = h // 3 if w > h else w // 3 |
p1 = (self.x - self.rect_size, self.y - self.rect_size) |
p2 = (self.x + self.rect_size, self.y - self.rect_size) |
p3 = (self.x - self.rect_size, self.y + self.rect_size) |
wh = h if h < w else w |
np1 = (w / 2 - wh / 4, h / 2 - wh / 4) |
np2 = (w / 2 + wh / 4, h / 2 - wh / 4) |
np3 = (w / 2 - wh / 4, h / 2 + wh / 4) |
mat = cv2.getAffineTransform( np.float32([p1,p2,p3])*self.view_scale, np.float32([np1,np2,np3]) ) |
image = cv2.warpAffine(image, mat,(w,h) ) |
view_landmarks = LandmarksProcessor.transform_points (view_landmarks, mat) |
landmarks_color = (255,255,0) if self.rect_locked else (0,255,0) |
LandmarksProcessor.draw_rect_landmarks (image, view_rect, view_landmarks, self.face_type, self.image_size, landmarks_color=landmarks_color) |
self.extract_needed = False |
io.show_image (self.wnd_name, image) |
def on_result (self, host_dict, data, result): |
if self.type == 'landmarks-manual': |
filepath, landmarks = result.filepath, result.landmarks |
if len(landmarks) != 0 and landmarks[0] is not None: |
self.landmarks = landmarks[0] |
self.redraw() |
else: |
self.result.append ( result ) |
io.progress_bar_inc(1) |
def get_result(self): |
return self.result |
class DeletedFilesSearcherSubprocessor(Subprocessor): |
class Cli(Subprocessor.Cli): |
def on_initialize(self, client_dict): |
self.debug_paths_stems = client_dict['debug_paths_stems'] |
return None |
def process_data(self, data): |
input_path_stem = Path(data[0]).stem |
return any ( [ input_path_stem == d_stem for d_stem in self.debug_paths_stems] ) |
def get_data_name (self, data): |
return data[0] |
def __init__(self, input_paths, debug_paths ): |
self.input_paths = input_paths |
self.debug_paths_stems = [ Path(d).stem for d in debug_paths] |
self.result = [] |
super().__init__('DeletedFilesSearcherSubprocessor', DeletedFilesSearcherSubprocessor.Cli, 60) |
def process_info_generator(self): |
for i in range(min(multiprocessing.cpu_count(), 8)): |
yield 'CPU%d' % (i), {}, {'debug_paths_stems' : self.debug_paths_stems} |
def on_clients_initialized(self): |
io.progress_bar ("Searching deleted files", len (self.input_paths)) |
def on_clients_finalized(self): |
io.progress_bar_close() |
def get_data(self, host_dict): |
if len (self.input_paths) > 0: |
return [self.input_paths.pop(0)] |
return None |
def on_data_return (self, host_dict, data): |
self.input_paths.insert(0, data[0]) |
def on_result (self, host_dict, data, result): |
if result == False: |
self.result.append( data[0] ) |
io.progress_bar_inc(1) |
def get_result(self): |
return self.result |
def main(detector=None, |
input_path=None, |
output_path=None, |
output_debug=None, |
manual_fix=False, |
manual_output_debug_fix=False, |
manual_window_size=1368, |
face_type='full_face', |
max_faces_from_image=None, |
image_size=None, |
jpeg_quality=None, |
cpu_only = False, |
force_gpu_idxs = None, |
): |
if not input_path.exists(): |
io.log_err ('Input directory not found. Please ensure it exists.') |
return |
if not output_path.exists(): |
output_path.mkdir(parents=True, exist_ok=True) |
if face_type is not None: |
face_type = FaceType.fromString(face_type) |
if face_type is None: |
if manual_output_debug_fix: |
files = pathex.get_image_paths(output_path) |
if len(files) != 0: |
dflimg = DFLIMG.load(Path(files[0])) |
if dflimg is not None and dflimg.has_data(): |
face_type = FaceType.fromString ( dflimg.get_face_type() ) |
input_image_paths = pathex.get_image_unique_filestem_paths(input_path, verbose_print_func=io.log_info) |
output_images_paths = pathex.get_image_paths(output_path) |
output_debug_path = output_path.parent / (output_path.name + '_debug') |
continue_extraction = False |
if not manual_output_debug_fix and len(output_images_paths) > 0: |
if len(output_images_paths) > 128: |
continue_extraction = io.input_bool ("Continue extraction?", True, help_message="Extraction can be continued, but you must specify the same options again.") |
if len(output_images_paths) > 128 and continue_extraction: |
try: |
input_image_paths = input_image_paths[ [ Path(x).stem for x in input_image_paths ].index ( Path(output_images_paths[-128]).stem.split('_')[0] ) : ] |
except: |
io.log_err("Error in fetching the last index. Extraction cannot be continued.") |
return |
elif input_path != output_path: |
io.input(f"\n WARNING !!! \n {output_path} contains files! \n They will be deleted. \n Press enter to continue.\n") |
for filename in output_images_paths: |
Path(filename).unlink() |
device_config = nn.DeviceConfig.GPUIndexes( force_gpu_idxs or nn.ask_choose_device_idxs(choose_only_one=detector=='manual', suggest_all_gpu=True) ) \ |
if not cpu_only else nn.DeviceConfig.CPU() |
if face_type is None: |
face_type = io.input_str ("Face type", 'wf', ['f','wf','head'], help_message="Full face / whole face / head. 'Whole face' covers full area of face include forehead. 'head' covers full head, but requires XSeg for src and dst faceset.").lower() |
face_type = {'f' : FaceType.FULL, |
'wf' : FaceType.WHOLE_FACE, |
'head' : FaceType.HEAD}[face_type] |
if max_faces_from_image is None: |
max_faces_from_image = io.input_int(f"Max number of faces from image", 0, help_message="If you extract a src faceset that has frames with a large number of faces, it is advisable to set max faces to 3 to speed up extraction. 0 - unlimited") |
if image_size is None: |
image_size = io.input_int(f"Image size", 512 if face_type < FaceType.HEAD else 768, valid_range=[256,2048], help_message="Output image size. The higher image size, the worse face-enhancer works. Use higher than 512 value only if the source image is sharp enough and the face does not need to be enhanced.") |
if jpeg_quality is None: |
jpeg_quality = io.input_int(f"Jpeg quality", 90, valid_range=[1,100], help_message="Jpeg quality. The higher jpeg quality the larger the output file size.") |
if detector is None: |
io.log_info ("Choose detector type.") |
io.log_info ("[0] S3FD") |
io.log_info ("[1] manual") |
detector = {0:'s3fd', 1:'manual'}[ io.input_int("", 0, [0,1]) ] |
if output_debug is None: |
output_debug = io.input_bool (f"Write debug images to {output_debug_path.name}?", False) |
if output_debug: |
output_debug_path.mkdir(parents=True, exist_ok=True) |
if manual_output_debug_fix: |
if not output_debug_path.exists(): |
io.log_err(f'{output_debug_path} not found. Re-extract faces with "Write debug images" option.') |
return |
else: |
detector = 'manual' |
io.log_info('Performing re-extract frames which were deleted from _debug directory.') |
input_image_paths = DeletedFilesSearcherSubprocessor (input_image_paths, pathex.get_image_paths(output_debug_path) ).run() |
input_image_paths = sorted (input_image_paths) |
io.log_info('Found %d images.' % (len(input_image_paths))) |
else: |
if not continue_extraction and output_debug_path.exists(): |
for filename in pathex.get_image_paths(output_debug_path): |
Path(filename).unlink() |
images_found = len(input_image_paths) |
faces_detected = 0 |
if images_found != 0: |
if detector == 'manual': |
io.log_info ('Performing manual extract...') |
data = ExtractSubprocessor ([ ExtractSubprocessor.Data(Path(filename)) for filename in input_image_paths ], 'landmarks-manual', image_size, jpeg_quality, face_type, output_debug_path if output_debug else None, manual_window_size=manual_window_size, device_config=device_config).run() |
io.log_info ('Performing 3rd pass...') |
data = ExtractSubprocessor (data, 'final', image_size, jpeg_quality, face_type, output_debug_path if output_debug else None, final_output_path=output_path, device_config=device_config).run() |
else: |
io.log_info ('Extracting faces...') |
data = ExtractSubprocessor ([ ExtractSubprocessor.Data(Path(filename)) for filename in input_image_paths ], |
'all', |
image_size, |
jpeg_quality, |
face_type, |
output_debug_path if output_debug else None, |
max_faces_from_image=max_faces_from_image, |
final_output_path=output_path, |
device_config=device_config).run() |
faces_detected += sum([d.faces_detected for d in data]) |
if manual_fix: |
if all ( np.array ( [ d.faces_detected > 0 for d in data] ) == True ): |
io.log_info ('All faces are detected, manual fix not needed.') |
else: |
fix_data = [ ExtractSubprocessor.Data(d.filepath) for d in data if d.faces_detected == 0 ] |
io.log_info ('Performing manual fix for %d images...' % (len(fix_data)) ) |
fix_data = ExtractSubprocessor (fix_data, 'landmarks-manual', image_size, jpeg_quality, face_type, output_debug_path if output_debug else None, manual_window_size=manual_window_size, device_config=device_config).run() |
fix_data = ExtractSubprocessor (fix_data, 'final', image_size, jpeg_quality, face_type, output_debug_path if output_debug else None, final_output_path=output_path, device_config=device_config).run() |
faces_detected += sum([d.faces_detected for d in fix_data]) |
io.log_info ('-------------------------') |
io.log_info ('Images found: %d' % (images_found) ) |
io.log_info ('Faces detected: %d' % (faces_detected) ) |
io.log_info ('-------------------------') |