|
import tensorflow as tf |
|
import numpy as np |
|
import cv2 |
|
import os |
|
import gradio as gr |
|
from keras.models import load_model |
|
from pytube import YouTube |
|
import pickle |
|
|
|
|
|
def load_video_from_url(youtube_url): |
|
visible = True |
|
try: |
|
|
|
yt = YouTube(youtube_url) |
|
yt_stream = yt.streams.filter(file_extension='mp4').first() |
|
title = yt_stream.title |
|
src = yt_stream.download() |
|
capture = cv2.VideoCapture(src) |
|
|
|
|
|
sampled_frames = sample_frames_from_video_file(capture) |
|
|
|
|
|
example_frames = [ |
|
sampled_frames[len(sampled_frames) // 4], |
|
sampled_frames[len(sampled_frames) // 2], |
|
sampled_frames[3 * len(sampled_frames) // 4], |
|
] |
|
|
|
|
|
example_frames = swap_color_channels(example_frames) |
|
|
|
|
|
if os.path.exists(src): |
|
os.remove(src) |
|
|
|
|
|
frames_tensor = tf.expand_dims(tf.convert_to_tensor(sampled_frames, dtype=tf.float32), axis=0) |
|
|
|
|
|
pickle.dump(frames_tensor, open("frames_tf.pkl", "wb")) |
|
|
|
except Exception as e: |
|
title = "Error while loading video: " + str(e) |
|
visible = False |
|
example_frames = [np.zeros((256, 256, 3)) for _ in range(3)] |
|
|
|
|
|
predVideoBtn = gr.Button(value="Classify Video", visible=visible) |
|
|
|
predOutput = gr.Label( |
|
label="DETECTED LABEL (AND CONFIDENCE LEVEL)", |
|
num_top_classes=2, |
|
visible=visible |
|
) |
|
|
|
return title, example_frames, predVideoBtn, predOutput |
|
|
|
|
|
def detect_deepfake(): |
|
|
|
frames_tf = pickle.load(open("frames_tf.pkl", "rb")) |
|
|
|
|
|
if os.path.exists("frames_tf.pkl"): |
|
os.remove("frames_tf.pkl") |
|
|
|
|
|
loaded_model = load_model("MesonetRNN.keras") |
|
|
|
|
|
|
|
out = loaded_model.predict(frames_tf) |
|
real_confidence = out[0][0] |
|
fake_confidence = 1 - real_confidence |
|
confidence_dict = {"FAKE": fake_confidence, "REAL": real_confidence} |
|
|
|
|
|
flagBtn = gr.Button(value="Flag Output", visible=True) |
|
|
|
|
|
return confidence_dict, flagBtn |
|
|
|
|
|
def sample_frames_from_video_file(capture, sample_count=10, frames_per_sample=10, frame_step=10, |
|
output_size=(256, 256)): |
|
|
|
result = [] |
|
|
|
video_length = capture.get(cv2.CAP_PROP_FRAME_COUNT) |
|
|
|
need_length = 1 + (frames_per_sample - 1) * frame_step |
|
|
|
max_start = video_length - need_length |
|
|
|
sample_starts = [] |
|
|
|
for sample in range(sample_count): |
|
sample_start = int(max_start * sample / sample_count) |
|
sample_starts.append(sample_start) |
|
|
|
|
|
for start in sample_starts: |
|
capture.set(cv2.CAP_PROP_POS_FRAMES, start) |
|
|
|
ret, frame = capture.read() |
|
result.append(format_frames(frame, output_size)) |
|
|
|
for _ in range(frames_per_sample - 1): |
|
for _ in range(frame_step): |
|
ret, frame = capture.read() |
|
if ret: |
|
frame = format_frames(frame, output_size) |
|
result.append(frame) |
|
else: |
|
result.append(np.zeros_like(result[0])) |
|
capture.release() |
|
|
|
return np.array(result) |
|
|
|
|
|
def swap_color_channels(frames): |
|
return [cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) for frame in frames] |
|
|
|
|
|
def format_frames(frame, output_size): |
|
frame = tf.image.convert_image_dtype(frame, tf.float32) |
|
frame = tf.image.resize_with_pad(frame, *output_size) |
|
return frame |
|
|