File size: 6,624 Bytes
1d6a80b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
from tempfile import NamedTemporaryFile
import streamlit as st
import cv2
import numpy as np
import insightface
from insightface.app import FaceAnalysis
import time
import requests

app = ''
swapper = ''
st.set_page_config(page_title="FaceSwap App by Adil Khan")

def download_model():
    url = "https://cdn.adikhanofficial.com/python/insightface/models/inswapper_128.onnx"
    filename = url.split('/')[-1]
    filepath = os.path.join(os.path.dirname(__file__),filename)
    
    if not os.path.exists(filepath):
        print(f"Downloading {filename}...")
        response = requests.get(url)
        with open(filepath, 'wb') as file:
            file.write(response.content)
        print(f"{filename} downloaded successfully.")
    else:
        print(f"{filename} already exists in the directory.")

def swap_faces(target_image, target_face, source_face):
    try:
        return swapper.get(target_image, target_face, source_face, paste_back=True)
    except Exception as e:
        st.error(f"Error during swaping: {e}")


def image_faceswap_app():
    st.title("Face Swapper for Image")
    source_image = st.file_uploader("Upload Source Image", type=["jpg", "jpeg", "png"])
    target_image = st.file_uploader("Upload Target Image", type=["jpg", "jpeg", "png"])
    if source_image and target_image:
        with st.spinner("Swapping... Please wait."):
            try:
                source_image = cv2.imdecode(np.frombuffer(source_image.read(), np.uint8), -1)
                target_image = cv2.imdecode(np.frombuffer(target_image.read(), np.uint8), -1)
                source_image = cv2.cvtColor(source_image, cv2.COLOR_BGR2RGB)
                target_image = cv2.cvtColor(target_image, cv2.COLOR_BGR2RGB)                
                source_faces = app.get(source_image)
                source_faces = sorted(source_faces, key=lambda x: x.bbox[0])
                if len(source_faces) == 0:
                    raise ValueError("No faces found in the source image.")
                source_face = source_faces[0]
                target_faces = app.get(target_image)
                target_faces = sorted(target_faces, key=lambda x: x.bbox[0])
                if len(target_faces) == 0:
                    raise ValueError("No faces found in the target image.")
                target_face = target_faces[0]
                swapped_image = swap_faces(target_image, target_face, source_face)                
                message_placeholder = st.empty()
                message_placeholder.success("Swapped Successfully!")
                col1, col2, col3 = st.columns([1, 1, 1])
                with col1:
                    st.image(source_image, caption="Source Image", use_column_width=True)
                with col2:
                    st.image(target_image, caption="Target Image", use_column_width=True)
                with col3:
                    st.image(swapped_image, caption="Swapped Image", use_column_width=True)
            except Exception as e:
                st.error(f"Error during image processing: {e}")


def process_video(source_img, video_path, output_video_path):
    try:
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
        source_faces = app.get(source_img)
        source_faces = sorted(source_faces, key=lambda x: x.bbox[0])
        if len(source_faces) == 0:
            raise ValueError("No faces found in the source image.")
        source_face = source_faces[0]
        progress_placeholder = st.empty()
        frame_count = 0
        start_time = time.time()
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            target_faces = app.get(frame)
            target_faces = sorted(target_faces, key=lambda x: x.bbox[0])
            if len(target_faces) > 0:
                frame = swap_faces(frame, target_faces[0], source_face)
            out.write(frame)
            elapsed_time = time.time() - start_time
            frames_per_second = frame_count / elapsed_time if elapsed_time > 0 else 0
            remaining_time_seconds = max(0, (total_frames - frame_count) / frames_per_second) if frames_per_second > 0 else 0
            remaining_minutes, remaining_seconds = divmod(remaining_time_seconds, 60)
            elapsed_minutes, elapsed_seconds = divmod(elapsed_time, 60)
            progress_placeholder.text(
                f"Processed Frames: {frame_count}/{total_frames} | Elapsed Time: {int(elapsed_minutes)}m {int(elapsed_seconds)}s | Remaining Time: {int(remaining_minutes)}m {int(remaining_seconds)}s")
            frame_count += 1
        cap.release()
        out.release()
    except Exception as e:
        st.error(f"Error during video processing: {e}")


def video_faceswap_app():
    st.title("Face Swapper for Video")
    source_image = st.file_uploader("Upload Source Face Image", type=["jpg", "jpeg", "png"])
    if source_image is not None:
        source_image = cv2.imdecode(np.frombuffer(source_image.read(), np.uint8), -1)
    target_video = st.file_uploader("Upload Target Video", type=["mp4"])
    if target_video is not None:
        temp_video = NamedTemporaryFile(delete=False, suffix=".mp4")
        temp_video.write(target_video.read())
        output_video_path = os.path.splitext(temp_video.name)[0] + '_output.mp4'
        status_placeholder = st.empty()
        try:
            with st.spinner("Processing... This may take a while."):
                process_video(source_image, temp_video.name, output_video_path)
            status_placeholder.success("Processing complete!")
            st.subheader("Your video is ready:")
            st.video(output_video_path)
        except Exception as e:
            st.error(f"Error during video processing: {e}")


def main():
    app_selection = st.sidebar.radio("Select App", ("Image Face Swapping", "Video Face Swapping"))
    if app_selection == "Image Face Swapping":
        image_faceswap_app()
    elif app_selection == "Video Face Swapping":
        video_faceswap_app()


if __name__ == "__main__":
    app = FaceAnalysis(name='buffalo_l')
    app.prepare(ctx_id=0, det_size=(640, 640))
    download_model() #download model if not available
    swapper = insightface.model_zoo.get_model('inswapper_128.onnx', root=os.path.dirname(__file__))
    main()