import cv2 import time from matplotlib import text import mediapipe as mp from mediapipe.tasks.python import vision import numpy as np from mediapipe import solutions from mediapipe.framework.formats import landmark_pb2 from utils import mask_overlay def draw_landmarks_on_image(rgb_image, detection_result): face_landmarks_list = detection_result.face_landmarks annotated_image = np.copy(rgb_image) # Loop through the detected faces to visualize. for idx in range(len(face_landmarks_list)): face_landmarks = face_landmarks_list[idx] face_landmarks_proto = landmark_pb2.NormalizedLandmarkList() face_landmarks_proto.landmark.extend( [landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in face_landmarks] ) solutions.drawing_utils.draw_landmarks( image=annotated_image, landmark_list=face_landmarks_proto, connections=mp.solutions.face_mesh.FACEMESH_TESSELATION, landmark_drawing_spec=None, connection_drawing_spec=mp.solutions.drawing_styles.get_default_face_mesh_tesselation_style() ) solutions.drawing_utils.draw_landmarks( image=annotated_image, landmark_list=face_landmarks_proto, connections=mp.solutions.face_mesh.FACEMESH_CONTOURS, landmark_drawing_spec=None, connection_drawing_spec=mp.solutions.drawing_styles.get_default_face_mesh_contours_style() ) solutions.drawing_utils.draw_landmarks( image=annotated_image, landmark_list=face_landmarks_proto, connections=mp.solutions.face_mesh.FACEMESH_IRISES, landmark_drawing_spec=None, connection_drawing_spec=mp.solutions.drawing_styles.get_default_face_mesh_iris_connections_style() ) return annotated_image def mediapipe_config(): model_path = "face_landmarker.task" BaseOptions = mp.tasks.BaseOptions FaceLandmarker = mp.tasks.vision.FaceLandmarker FaceLandmarkerOptions = mp.tasks.vision.FaceLandmarkerOptions VisionRunningMode = mp.tasks.vision.RunningMode options = FaceLandmarkerOptions( base_options=BaseOptions(model_asset_path=model_path), running_mode=VisionRunningMode.VIDEO, ) landmarker = FaceLandmarker.create_from_options(options) return landmarker landmarker = mediapipe_config() def face_point(results, frame): ih, iw, ic = frame.shape faces = [] if results.face_landmarks: for face_landmarks in results.face_landmarks: face = [] for id, lm in enumerate(face_landmarks): x, y = int(lm.x * iw), int(lm.y * ih) face.append([id, x, y]) ## FIX: Indentation was wrong. It should be inside the loop to capture all faces. faces.append(face) return faces def letterbox(image, target_width, target_height): """Resize image keeping aspect ratio, pad with black to fit target size.""" ih, iw = image.shape[:2] scale = min(target_width / iw, target_height / ih) nw, nh = int(iw * scale), int(ih * scale) resized = cv2.resize(image, (nw, nh), interpolation=cv2.INTER_AREA) canvas = np.zeros((target_height, target_width, 3), dtype=np.uint8) x_offset = (target_width - nw) // 2 y_offset = (target_height - nh) // 2 canvas[y_offset:y_offset+nh, x_offset:x_offset+nw] = resized return canvas import subprocess import os import shutil import os, shutil, subprocess def add_audio(input_video, mask_video, save_video="final.mp4"): try: os.makedirs("./temp", exist_ok=True) audio_file = os.path.abspath("./temp/temp_audio.wav") # Normalize all paths for ffmpeg (Windows safe) input_video = os.path.normpath(os.path.abspath(input_video)) mask_video = os.path.normpath(os.path.abspath(mask_video)) save_video = os.path.normpath(os.path.abspath(save_video)) # Step 1: Extract WAV audio extract_cmd = [ "ffmpeg", "-y", "-i", input_video, "-vn", "-acodec", "pcm_s16le", "-ar", "44100", "-ac", "2", audio_file, "-hide_banner", "-loglevel", "error" ] subprocess.run(extract_cmd, check=True) # Validate if not os.path.exists(audio_file) or os.path.getsize(audio_file) == 0: raise Exception("No audio track extracted") # Step 2: Merge WAV + video merge_cmd = [ "ffmpeg", "-y", "-i", mask_video, "-i", audio_file, "-c:v", "copy", "-c:a", "aac", "-shortest", save_video, "-hide_banner", "-loglevel", "error" ] subprocess.run(merge_cmd, check=True) os.remove(audio_file) return True except Exception as e: print("⚠️ Audio merge failed:", e) try: shutil.copy(mask_video, save_video) # fallback except Exception as e2: print("❌ Fallback copy failed:", e2) return False return False def add_mask(upload_video, mask_name="Blue Mask",mask_up=10, mask_down=10): output_video="./temp/mask.mp4" os.makedirs("./temp", exist_ok=True) cap = cv2.VideoCapture(upload_video) if not cap.isOpened(): print("❌ Cannot access video file") exit() input_fps = int(cap.get(cv2.CAP_PROP_FPS)) if input_fps <= 0 or input_fps > 120: # sanity check input_fps = 25 # default fallback OUTPUT_WIDTH = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) OUTPUT_HEIGHT = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(output_video, fourcc, input_fps, (OUTPUT_WIDTH, OUTPUT_HEIGHT)) # For more stable FPS calculation frame_count = 0 fps = 0 fps_start_time = time.time() while True: ret, frame = cap.read() if not ret: break frame = cv2.flip(frame, 1) raw_frame=frame.copy() frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb) timestamp_ms = int(cap.get(cv2.CAP_PROP_POS_MSEC)) results = landmarker.detect_for_video(mp_image, timestamp_ms) # Create the mesh visualization visualized_image = draw_landmarks_on_image(frame_rgb, results) visualized_image = cv2.cvtColor(visualized_image, cv2.COLOR_RGB2BGR) # Create the mask overlay image faces = face_point(results, frame) if len(faces) > 0: masked_frame = mask_overlay(frame, faces, mask_up, mask_down, mask_name) else: masked_frame = frame out.write(masked_frame) # frame_count += 1 # if time.time() - fps_start_time >= 1.0: # fps = frame_count / (time.time() - fps_start_time) # frame_count = 0 # fps_start_time = time.time() # fps_text = f"FPS: {fps:.2f}" # cv2.putText(masked_frame, fps_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) # cv2.putText(visualized_image, fps_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) # SCREEN_W, SCREEN_H = 480, 270 # left=letterbox(raw_frame, SCREEN_W, SCREEN_H) # middle = letterbox(visualized_image, SCREEN_W, SCREEN_H) # right = letterbox(masked_frame, SCREEN_W, SCREEN_H) # combined_image = np.hstack((left,middle, right)) # cv2.imshow("Face Mesh and Mask Overlay", combined_image) # if cv2.waitKey(1) & 0xFF == ord("q"): # break print("Releasing resources...") cap.release() out.release() cv2.destroyAllWindows() save_video_path="./temp/"+os.path.splitext(upload_video)[0] + "_mask.mp4" sucess=add_audio(upload_video,output_video, save_video_path) if sucess: print(f"✅ Masked video saved to {save_video_path}") return save_video_path,save_video_path else: print("❌ Failed to save masked video.") return output_video,output_video # add_mask("input.mp4", "output_video.mp4", mask_up=10, mask_down=10, mask_name="Blue Mask") import gradio as gr def ui(): with gr.Blocks() as demo: gr.Markdown("## Hide Face Using Squid Game Masks") mask_names=["Front Man Mask", "Guards Mask", "Red Mask", "Blue Mask"] with gr.Row(): with gr.Column(): video_input = gr.Video(label="Upload Video") mask_selector = gr.Dropdown(choices=mask_names, label="Select Mask") submit_btn = gr.Button("Apply Mask") with gr.Accordion('Mask Settings', open=False): mask_up = gr.Slider(minimum=0, maximum=100, label="Mask Up", value=10) mask_down = gr.Slider(minimum=0, maximum=100, label="Mask Down", value=10) with gr.Column(): output_video = gr.Video(label="Output Video") download_video = gr.File(label="Download Video") inputs = [video_input, mask_selector, mask_up, mask_down] outputs = [output_video, download_video] submit_btn.click(add_mask, inputs=inputs, outputs=outputs) return demo demo=ui() demo.launch()