Spaces:
Running
Running
import os | |
from tempfile import NamedTemporaryFile | |
import streamlit as st | |
import cv2 | |
import numpy as np | |
import insightface | |
from insightface.app import FaceAnalysis | |
import time | |
import requests | |
app = '' | |
swapper = '' | |
st.set_page_config(page_title="FaceSwap App by Adil Khan") | |
def download_model(): | |
url = "https://cdn.adikhanofficial.com/python/insightface/models/inswapper_128.onnx" | |
filename = url.split('/')[-1] | |
filepath = os.path.join(os.path.dirname(__file__),filename) | |
if not os.path.exists(filepath): | |
print(f"Downloading {filename}...") | |
response = requests.get(url) | |
with open(filepath, 'wb') as file: | |
file.write(response.content) | |
print(f"{filename} downloaded successfully.") | |
else: | |
print(f"{filename} already exists in the directory.") | |
def swap_faces(target_image, target_face, source_face): | |
try: | |
return swapper.get(target_image, target_face, source_face, paste_back=True) | |
except Exception as e: | |
st.error(f"Error during swaping: {e}") | |
def image_faceswap_app(): | |
st.title("Face Swapper for Image") | |
source_image = st.file_uploader("Upload Source Image", type=["jpg", "jpeg", "png"]) | |
target_image = st.file_uploader("Upload Target Image", type=["jpg", "jpeg", "png"]) | |
if source_image and target_image: | |
with st.spinner("Swapping... Please wait."): | |
try: | |
source_image = cv2.imdecode(np.frombuffer(source_image.read(), np.uint8), -1) | |
target_image = cv2.imdecode(np.frombuffer(target_image.read(), np.uint8), -1) | |
source_image = cv2.cvtColor(source_image, cv2.COLOR_BGR2RGB) | |
target_image = cv2.cvtColor(target_image, cv2.COLOR_BGR2RGB) | |
source_faces = app.get(source_image) | |
source_faces = sorted(source_faces, key=lambda x: x.bbox[0]) | |
if len(source_faces) == 0: | |
raise ValueError("No faces found in the source image.") | |
source_face = source_faces[0] | |
target_faces = app.get(target_image) | |
target_faces = sorted(target_faces, key=lambda x: x.bbox[0]) | |
if len(target_faces) == 0: | |
raise ValueError("No faces found in the target image.") | |
target_face = target_faces[0] | |
swapped_image = swap_faces(target_image, target_face, source_face) | |
message_placeholder = st.empty() | |
message_placeholder.success("Swapped Successfully!") | |
col1, col2, col3 = st.columns([1, 1, 1]) | |
with col1: | |
st.image(source_image, caption="Source Image", use_column_width=True) | |
with col2: | |
st.image(target_image, caption="Target Image", use_column_width=True) | |
with col3: | |
st.image(swapped_image, caption="Swapped Image", use_column_width=True) | |
except Exception as e: | |
st.error(f"Error during image processing: {e}") | |
def process_video(source_img, video_path, output_video_path): | |
try: | |
cap = cv2.VideoCapture(video_path) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) | |
source_faces = app.get(source_img) | |
source_faces = sorted(source_faces, key=lambda x: x.bbox[0]) | |
if len(source_faces) == 0: | |
raise ValueError("No faces found in the source image.") | |
source_face = source_faces[0] | |
progress_placeholder = st.empty() | |
frame_count = 0 | |
start_time = time.time() | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
target_faces = app.get(frame) | |
target_faces = sorted(target_faces, key=lambda x: x.bbox[0]) | |
if len(target_faces) > 0: | |
frame = swap_faces(frame, target_faces[0], source_face) | |
out.write(frame) | |
elapsed_time = time.time() - start_time | |
frames_per_second = frame_count / elapsed_time if elapsed_time > 0 else 0 | |
remaining_time_seconds = max(0, (total_frames - frame_count) / frames_per_second) if frames_per_second > 0 else 0 | |
remaining_minutes, remaining_seconds = divmod(remaining_time_seconds, 60) | |
elapsed_minutes, elapsed_seconds = divmod(elapsed_time, 60) | |
progress_placeholder.text( | |
f"Processed Frames: {frame_count}/{total_frames} | Elapsed Time: {int(elapsed_minutes)}m {int(elapsed_seconds)}s | Remaining Time: {int(remaining_minutes)}m {int(remaining_seconds)}s") | |
frame_count += 1 | |
cap.release() | |
out.release() | |
except Exception as e: | |
st.error(f"Error during video processing: {e}") | |
def video_faceswap_app(): | |
st.title("Face Swapper for Video") | |
source_image = st.file_uploader("Upload Source Face Image", type=["jpg", "jpeg", "png"]) | |
if source_image is not None: | |
source_image = cv2.imdecode(np.frombuffer(source_image.read(), np.uint8), -1) | |
target_video = st.file_uploader("Upload Target Video", type=["mp4"]) | |
if target_video is not None: | |
temp_video = NamedTemporaryFile(delete=False, suffix=".mp4") | |
temp_video.write(target_video.read()) | |
output_video_path = os.path.splitext(temp_video.name)[0] + '_output.mp4' | |
status_placeholder = st.empty() | |
try: | |
with st.spinner("Processing... This may take a while."): | |
process_video(source_image, temp_video.name, output_video_path) | |
status_placeholder.success("Processing complete!") | |
st.subheader("Your video is ready:") | |
st.video(output_video_path) | |
except Exception as e: | |
st.error(f"Error during video processing: {e}") | |
def main(): | |
app_selection = st.sidebar.radio("Select App", ("Image Face Swapping", "Video Face Swapping")) | |
if app_selection == "Image Face Swapping": | |
image_faceswap_app() | |
elif app_selection == "Video Face Swapping": | |
video_faceswap_app() | |
if __name__ == "__main__": | |
app = FaceAnalysis(name='buffalo_l') | |
app.prepare(ctx_id=0, det_size=(640, 640)) | |
download_model() #download model if not available | |
swapper = insightface.model_zoo.get_model('inswapper_128.onnx', root=os.path.dirname(__file__)) | |
main() | |