|
import os |
|
from PIL import Image |
|
from retinaface import RetinaFace |
|
from concurrent.futures import ProcessPoolExecutor |
|
|
|
Image.MAX_IMAGE_PIXELS = 200000000 |
|
|
|
DATA = [ |
|
((512, 512), r"G:\man dataset\man_4502_imgs_raw"), |
|
((1024, 1024), r"G:\man dataset\man_4502_imgs_raw") |
|
] |
|
|
|
NUM_THREADS = 1 |
|
OUTPUT_ROOT = r"H:\man dataset\resize_faces" |
|
|
|
|
|
def resize_face(args, resize_enabled=False): |
|
try: |
|
input_path, resolution, output_root = args |
|
except ValueError as e: |
|
print(f"Error unpacking arguments: {e}") |
|
return |
|
|
|
try: |
|
image = Image.open(input_path) |
|
except Exception as e: |
|
print(f"Error loading image: {e}") |
|
return |
|
|
|
try: |
|
|
|
faces = RetinaFace.detect_faces(input_path) |
|
|
|
if not faces: |
|
print(f"No face detected in {input_path}") |
|
return |
|
|
|
|
|
face = next(iter(faces.values())) |
|
x, y, x2, y2 = face['facial_area'] |
|
face_image = image.crop((x, y, x2, y2)) |
|
|
|
if resize_enabled: |
|
|
|
face_image = face_image.resize(resolution, Image.LANCZOS) |
|
except Exception as e: |
|
print(f"Error in face detection or resizing: {e}") |
|
return |
|
|
|
try: |
|
output_folder = os.path.join(output_root, f"{resolution[0]}x{resolution[1]}" if resize_enabled else "no_resize") |
|
if not os.path.exists(output_folder): |
|
os.makedirs(output_folder) |
|
|
|
output_path = os.path.join(output_folder, os.path.basename(input_path)) |
|
face_image.save(output_path, quality=100) |
|
except Exception as e: |
|
print(f"Error saving image: {e}") |
|
|
|
def process_folder(input_folder, resolution, resize_enabled=False): |
|
image_paths = [os.path.join(input_folder, fname) for fname in os.listdir(input_folder) if fname.lower().endswith(('png', 'jpg', 'jpeg'))] |
|
|
|
total_images = len(image_paths) |
|
processed_count = 0 |
|
|
|
with ProcessPoolExecutor(max_workers=NUM_THREADS) as executor: |
|
for _ in executor.map(resize_face, [(path, resolution, OUTPUT_ROOT) for path in image_paths], [resize_enabled] * len(image_paths)): |
|
processed_count += 1 |
|
print(f"Processed {processed_count}/{total_images} images for resolution {resolution[0]}x{resolution[1]} (Resize: {'Enabled' if resize_enabled else 'Disabled'})...") |
|
|
|
if __name__ == "__main__": |
|
resize_enabled = False |
|
for resolution, folder in DATA: |
|
process_folder(folder, resolution, resize_enabled) |
|
print("Processing complete!") |
|
|