import cv2 import time from matplotlib import text import mediapipe as mp from mediapipe.tasks.python import vision import numpy as np from mediapipe import solutions from mediapipe.framework.formats import landmark_pb2 from utils import mask_overlay def draw_landmarks_on_image(rgb_image, detection_result): face_landmarks_list = detection_result.face_landmarks annotated_image = np.copy(rgb_image) for idx in range(len(face_landmarks_list)): face_landmarks = face_landmarks_list[idx] face_landmarks_proto = landmark_pb2.NormalizedLandmarkList() face_landmarks_proto.landmark.extend( [landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in face_landmarks] ) solutions.drawing_utils.draw_landmarks( image=annotated_image, landmark_list=face_landmarks_proto, connections=mp.solutions.face_mesh.FACEMESH_TESSELATION, landmark_drawing_spec=None, connection_drawing_spec=mp.solutions.drawing_styles.get_default_face_mesh_tesselation_style() ) solutions.drawing_utils.draw_landmarks( image=annotated_image, landmark_list=face_landmarks_proto, connections=mp.solutions.face_mesh.FACEMESH_CONTOURS, landmark_drawing_spec=None, connection_drawing_spec=mp.solutions.drawing_styles.get_default_face_mesh_contours_style() ) solutions.drawing_utils.draw_landmarks( image=annotated_image, landmark_list=face_landmarks_proto, connections=mp.solutions.face_mesh.FACEMESH_IRISES, landmark_drawing_spec=None, connection_drawing_spec=mp.solutions.drawing_styles.get_default_face_mesh_iris_connections_style() ) return annotated_image def mediapipe_config(): model_path = "face_landmarker.task" BaseOptions = mp.tasks.BaseOptions FaceLandmarker = mp.tasks.vision.FaceLandmarker FaceLandmarkerOptions = mp.tasks.vision.FaceLandmarkerOptions VisionRunningMode = mp.tasks.vision.RunningMode options = FaceLandmarkerOptions( base_options=BaseOptions(model_asset_path=model_path), running_mode=VisionRunningMode.VIDEO, ) landmarker = FaceLandmarker.create_from_options(options) return landmarker landmarker = None def reset_landmarker(): global landmarker try: if landmarker: landmarker.close() except: pass landmarker = mediapipe_config() def face_point(results, frame): ih, iw, ic = frame.shape faces = [] if results.face_landmarks: for face_landmarks in results.face_landmarks: face = [] for id, lm in enumerate(face_landmarks): x, y = int(lm.x * iw), int(lm.y * ih) face.append([id, x, y]) faces.append(face) return faces def letterbox(image, target_width, target_height): """Resize image keeping aspect ratio, pad with black to fit target size.""" ih, iw = image.shape[:2] scale = min(target_width / iw, target_height / ih) nw, nh = int(iw * scale), int(ih * scale) resized = cv2.resize(image, (nw, nh), interpolation=cv2.INTER_AREA) canvas = np.zeros((target_height, target_width, 3), dtype=np.uint8) x_offset = (target_width - nw) // 2 y_offset = (target_height - nh) // 2 canvas[y_offset:y_offset+nh, x_offset:x_offset+nw] = resized return canvas import subprocess import os import shutil import os, shutil, subprocess import uuid def replace_audio_with_ffmpeg(video_path, audio_path): base_name = os.path.splitext(os.path.basename(video_path))[0] os.makedirs("./save_video/", exist_ok=True) output_path = f"./save_video/{base_name}_.mp4" gpu = False if gpu: print("CUDA GPU is available. Running on GPU.") else: print("No CUDA GPU found. Falling back to CPU.") video_codec = "h264_nvenc" if gpu else "libx264" cmd = [ "ffmpeg", "-y", "-i", video_path, "-i", audio_path, "-c:v", video_codec, "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0", "-shortest", output_path ] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return os.path.abspath(output_path) def add_audio(input_video, mask_video, save_video="final.mp4"): """ Extract audio as WAV from input_video and add it to mask_video. If extraction fails, just copy mask_video to save_video. """ try: os.makedirs("./temp", exist_ok=True) audio_file = os.path.abspath("./temp/temp_audio.wav") input_video = os.path.normpath(os.path.abspath(input_video)) mask_video = os.path.normpath(os.path.abspath(mask_video)) save_video = os.path.normpath(os.path.abspath(save_video)) extract_cmd = [ "ffmpeg", "-y", "-i", input_video, "-vn", "-acodec", "pcm_s16le", "-ar", "44100", "-ac", "2", audio_file, "-hide_banner", "-loglevel", "error" ] subprocess.run(extract_cmd, check=True,stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if not os.path.exists(audio_file) or os.path.getsize(audio_file) == 0: raise Exception("No audio track extracted") gpu=False video_codec = "h264_nvenc" if gpu else "libx264" merge_cmd = [ "ffmpeg", "-y", "-i", mask_video, "-i", audio_file, "-c:v", video_codec, "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0", "-shortest", save_video ] subprocess.run(merge_cmd, check=True,stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) os.remove(audio_file) return True except Exception as e: print("Audio merge failed:", e) try: shutil.copy(mask_video, save_video) except Exception as e2: print("Fallback copy failed:", e2) return False return False def is_camera_source(source): if isinstance(source, int): return True if isinstance(source, str) and not os.path.isfile(source): try: idx = int(source) return True except ValueError: return False return False def add_mask(upload_video, mask_name="Blue Mask",mask_up=10, mask_down=10,display=False): reset_landmarker() output_video="./temp/mask.mp4" os.makedirs("./temp", exist_ok=True) cap = cv2.VideoCapture(upload_video) if not cap.isOpened(): print("Cannot access video file") exit() input_fps = int(cap.get(cv2.CAP_PROP_FPS)) if input_fps <= 0 or input_fps > 120: input_fps = 25 OUTPUT_WIDTH = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) OUTPUT_HEIGHT = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(output_video, fourcc, input_fps, (OUTPUT_WIDTH, OUTPUT_HEIGHT)) frame_count = 0 fps = 0 fps_start_time = time.time() while True: ret, frame = cap.read() if not ret: print("✅ Video processing complete.") break # Flip frame # frame = cv2.flip(frame, 1) raw_frame=frame.copy() frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb) timestamp_ms = int(cap.get(cv2.CAP_PROP_POS_MSEC)) results = landmarker.detect_for_video(mp_image, timestamp_ms) # Create the mesh visualization visualized_image = draw_landmarks_on_image(frame_rgb, results) visualized_image = cv2.cvtColor(visualized_image, cv2.COLOR_RGB2BGR) # Create the mask overlay image faces = face_point(results, frame) if len(faces) > 0: masked_frame = mask_overlay(frame, faces, mask_up, mask_down, mask_name) else: masked_frame = frame out.write(masked_frame) if display: frame_count += 1 if time.time() - fps_start_time >= 1.0: fps = frame_count / (time.time() - fps_start_time) frame_count = 0 fps_start_time = time.time() fps_text = f"FPS: {fps:.2f}" SCREEN_W, SCREEN_H = 480, 270 cv2.putText(raw_frame, fps_text, (30, 60), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 255, 0), 2) # cv2.putText(middle, fps_text, (30, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) # cv2.putText(right, fps_text, (30, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) left=letterbox(raw_frame, SCREEN_W, SCREEN_H) middle = letterbox(visualized_image, SCREEN_W, SCREEN_H) right = letterbox(masked_frame, SCREEN_W, SCREEN_H) combined_image = np.hstack((left,middle, right)) cv2.imshow("Background Preview", combined_image) cv2.imshow("OBS", masked_frame) if cv2.waitKey(1) & 0xFF == ord("q"): break print("Releasing resources...") cap.release() out.release() cv2.destroyAllWindows() random_str = str(uuid.uuid4())[:5] if is_camera_source(upload_video): print("Using Camera Index:", upload_video) return None, None save_video_path="./temp/"+os.path.basename(upload_video).split('.')[0] + "_" +mask_name.replace(" ","_") + "_" + random_str+".mp4" sucess=add_audio(upload_video,output_video, save_video_path) if sucess: print(f"Masked video saved to {save_video_path}") return save_video_path,save_video_path else: print("Failed to save masked video.") return output_video,output_video # mask_names=["Front Man Mask", "Guards Mask", "Red Mask", "Blue Mask"] # add_mask(0,mask_name=mask_names[0],mask_up=10, mask_down=10,display=True)