|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gradio as gr |
|
import cv2 |
|
import torch |
|
import numpy as np |
|
import mediapipe as mp |
|
from torchvision import models, transforms |
|
from tempfile import NamedTemporaryFile |
|
from pathlib import Path |
|
import logging |
|
from typing import Tuple, Optional |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class DeepfakeDetector: |
|
def __init__(self, detection_confidence: float = 0.5, max_faces: int = 1): |
|
"""Initialize the DeepfakeDetector with MediaPipe and ResNet model.""" |
|
self.mp_face_detection = mp.solutions.face_detection |
|
self.mp_face_mesh = mp.solutions.face_mesh |
|
|
|
|
|
self.face_detection = self.mp_face_detection.FaceDetection( |
|
model_selection=1, |
|
min_detection_confidence=detection_confidence |
|
) |
|
self.face_mesh = self.mp_face_mesh.FaceMesh( |
|
static_image_mode=False, |
|
max_num_faces=max_faces, |
|
min_detection_confidence=detection_confidence |
|
) |
|
|
|
|
|
self.model = self._create_model() |
|
self.transform = self._create_transform() |
|
|
|
@staticmethod |
|
def _create_model() -> torch.nn.Module: |
|
"""Create and configure the ResNet model.""" |
|
model = models.resnet34(weights=None) |
|
model.fc = torch.nn.Linear(model.fc.in_features, 2) |
|
model.eval() |
|
return model |
|
|
|
@staticmethod |
|
def _create_transform() -> transforms.Compose: |
|
"""Create the image transformation pipeline.""" |
|
return transforms.Compose([ |
|
transforms.ToPILImage(), |
|
transforms.Resize((224, 224)), |
|
transforms.ToTensor(), |
|
transforms.Normalize( |
|
mean=[0.485, 0.456, 0.406], |
|
std=[0.229, 0.224, 0.225] |
|
) |
|
]) |
|
|
|
def get_face_bbox(self, landmarks, frame_shape: Tuple[int, int]) -> Tuple[int, int, int, int]: |
|
"""Extract face bounding box from landmarks.""" |
|
h, w = frame_shape[:2] |
|
xs = [lm.x * w for lm in landmarks.landmark] |
|
ys = [lm.y * h for lm in landmarks.landmark] |
|
return ( |
|
max(0, int(min(xs))), |
|
max(0, int(min(ys))), |
|
min(w, int(max(xs))), |
|
min(h, int(max(ys))) |
|
) |
|
|
|
def process_frame(self, frame: np.ndarray) -> np.ndarray: |
|
"""Process a single frame to detect deepfakes.""" |
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
detection_results = self.face_detection.process(rgb_frame) |
|
if not detection_results.detections: |
|
return frame |
|
|
|
|
|
for detection in detection_results.detections: |
|
mesh_results = self.face_mesh.process(rgb_frame) |
|
if not mesh_results.multi_face_landmarks: |
|
continue |
|
|
|
for face_landmarks in mesh_results.multi_face_landmarks: |
|
frame = self._analyze_face(frame, rgb_frame, face_landmarks) |
|
|
|
return frame |
|
|
|
def _analyze_face(self, frame: np.ndarray, rgb_frame: np.ndarray, |
|
face_landmarks) -> np.ndarray: |
|
"""Analyze a single face and draw results on frame.""" |
|
|
|
x_min, y_min, x_max, y_max = self.get_face_bbox( |
|
face_landmarks, frame.shape |
|
) |
|
|
|
|
|
face_crop = rgb_frame[y_min:y_max, x_min:x_max] |
|
if face_crop.size == 0: |
|
return frame |
|
|
|
|
|
try: |
|
face_tensor = self.transform(face_crop).unsqueeze(0) |
|
with torch.no_grad(): |
|
output = torch.softmax(self.model(face_tensor), dim=1) |
|
fake_confidence = output[0, 1].item() * 100 |
|
except Exception as e: |
|
logger.error(f"Error during inference: {str(e)}") |
|
return frame |
|
|
|
|
|
label = "Fake" if fake_confidence > 50 else "Real" |
|
color = (0, 0, 255) if label == "Fake" else (0, 255, 0) |
|
label_text = f"{label} ({fake_confidence:.2f}%)" |
|
|
|
cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 2) |
|
cv2.putText(frame, label_text, (x_min, y_min - 10), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1) |
|
|
|
return frame |
|
|
|
def process_video(self, video_path: str) -> Optional[str]: |
|
"""Process a video file and return path to processed video.""" |
|
try: |
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
logger.error("Error opening video file") |
|
return None |
|
|
|
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
fps = int(cap.get(cv2.CAP_PROP_FPS)) |
|
|
|
|
|
output_path = str(Path(video_path).with_suffix('')) + "_processed.mp4" |
|
output_video = cv2.VideoWriter( |
|
output_path, |
|
cv2.VideoWriter_fourcc(*'mp4v'), |
|
fps, |
|
(width, height) |
|
) |
|
|
|
|
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
processed_frame = self.process_frame(frame) |
|
output_video.write(processed_frame) |
|
|
|
|
|
cap.release() |
|
output_video.release() |
|
|
|
return output_path |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing video: {str(e)}") |
|
return None |
|
|
|
def gradio_interface(video_file): |
|
"""Gradio interface function.""" |
|
if video_file is None: |
|
return "Error: No video uploaded." |
|
|
|
detector = DeepfakeDetector() |
|
|
|
with NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: |
|
temp_file_path = temp_file.name |
|
with open(video_file, "rb") as uploaded_file: |
|
temp_file.write(uploaded_file.read()) |
|
|
|
output_path = detector.process_video(temp_file_path) |
|
if output_path is None: |
|
return "Error processing video" |
|
|
|
return output_path |
|
|
|
|
|
iface = gr.Interface( |
|
fn=gradio_interface, |
|
inputs=gr.Video(label="Upload Video"), |
|
outputs=gr.Video(label="Processed Video"), |
|
title="Deepfake Detection", |
|
description="Upload a video to detect deepfakes", |
|
examples=[], |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch( |
|
server_name="0.0.0.0", |
|
share=True, |
|
debug=True |
|
) |