|
import os |
|
import time |
|
from PIL import Image |
|
from retinaface import RetinaFace |
|
from concurrent.futures import ProcessPoolExecutor |
|
|
|
Image.MAX_IMAGE_PIXELS = 200000000 |
|
|
|
DATA = [ |
|
((768, 768), r"/kaggle/input/samikk"), |
|
((1024, 1024), r"/kaggle/input/samikk"), |
|
] |
|
|
|
|
|
|
|
NUM_THREADS = 1 |
|
OUTPUT_ROOT = r"/kaggle/working/outputs" |
|
Save_PNG = False |
|
LOG_FILE = "face_detection_failures.txt" |
|
|
|
def resize_image(args): |
|
start_time = time.time() |
|
|
|
try: |
|
input_path, resolution, output_root, save_as_png = args |
|
except ValueError as e: |
|
print(f"Error unpacking arguments: {e}") |
|
return None |
|
|
|
try: |
|
|
|
image = Image.open(input_path) |
|
except Exception as e: |
|
print(f"Error loading image: {e}") |
|
return None |
|
|
|
face_detected = True |
|
face_locations = None |
|
|
|
try: |
|
|
|
faces = RetinaFace.detect_faces(input_path) |
|
|
|
if not faces or 'face_1' not in faces: |
|
face_detected = False |
|
with open(LOG_FILE, 'a') as log: |
|
log.write(f"Face not detected: {input_path}\n") |
|
else: |
|
face = faces['face_1'] |
|
x, y, x2, y2 = face['facial_area'] |
|
face_locations = [(y, x2, y2, x)] |
|
except Exception as e: |
|
print(f"Error detecting faces: {e}") |
|
face_detected = False |
|
|
|
|
|
try: |
|
desired_aspect_ratio = resolution[0] / resolution[1] |
|
image_aspect_ratio = image.width / image.height |
|
|
|
|
|
if image_aspect_ratio > desired_aspect_ratio: |
|
new_width = int(image.height * desired_aspect_ratio) |
|
new_height = image.height |
|
else: |
|
new_width = image.width |
|
new_height = int(image.width / desired_aspect_ratio) |
|
|
|
|
|
left = (image.width - new_width) / 2 |
|
top = (image.height - new_height) / 2 |
|
right = (image.width + new_width) / 2 |
|
bottom = (image.height + new_height) / 2 |
|
|
|
|
|
if face_detected and face_locations: |
|
face_top, face_right, face_bottom, face_left = face_locations[0] |
|
face_center_x = (face_left + face_right) // 2 |
|
face_center_y = (face_top + face_bottom) // 2 |
|
|
|
left = min(max(0, face_center_x - new_width // 2), image.width - new_width) |
|
top = min(max(0, face_center_y - new_height // 2), image.height - new_height) |
|
right = left + new_width |
|
bottom = top + new_height |
|
|
|
image = image.crop((left, top, right, bottom)) |
|
|
|
|
|
image = image.resize(resolution, Image.LANCZOS) |
|
|
|
output_folder = os.path.join(output_root, f"{resolution[0]}x{resolution[1]}") |
|
if not os.path.exists(output_folder): |
|
os.makedirs(output_folder) |
|
|
|
output_path = os.path.join(output_folder, os.path.basename(input_path)) |
|
if save_as_png: |
|
output_path = os.path.splitext(output_path)[0] + '.png' |
|
image.save(output_path, format='PNG') |
|
else: |
|
image.save(output_path, quality=100) |
|
|
|
except Exception as e: |
|
print(f"Error processing or saving image: {e}") |
|
return None |
|
|
|
end_time = time.time() |
|
return end_time - start_time |
|
|
|
def process_folder(input_folder, resolution, save_as_png): |
|
image_paths = [os.path.join(input_folder, fname) for fname in os.listdir(input_folder) if fname.lower().endswith(('png', 'jpg', 'jpeg'))] |
|
|
|
total_images = len(image_paths) |
|
processed_count = 0 |
|
total_time = 0 |
|
|
|
with ProcessPoolExecutor(max_workers=NUM_THREADS) as executor: |
|
for processing_time in executor.map(resize_image, [(path, resolution, OUTPUT_ROOT, save_as_png) for path in image_paths]): |
|
processed_count += 1 |
|
if processing_time is not None: |
|
total_time += processing_time |
|
average_time = total_time / processed_count * 1000 |
|
print(f"Processed {processed_count}/{total_images} images for resolution {resolution[0]}x{resolution[1]}... Average processing time per image: {average_time:.2f} ms") |
|
|
|
if __name__ == "__main__": |
|
for resolution, folder in DATA: |
|
process_folder(folder, resolution, Save_PNG) |
|
print("Processing complete!") |
|
|