File size: 7,864 Bytes
c2a191e
 
 
 
 
 
 
 
ba9144c
8e3bca8
9c9d565
2e9eb3b
d6d9fcb
903a878
de78640
 
d808099
 
 
 
d6d9fcb
eba189d
 
 
 
 
d6d9fcb
eba189d
 
d6d9fcb
eba189d
 
d6d9fcb
d808099
de78640
 
d6d9fcb
 
 
eba189d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d6d9fcb
eba189d
 
d6d9fcb
eba189d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
af5cc31
eba189d
 
 
 
 
 
 
 
 
 
9c9d565
 
 
 
 
b51be8a
 
2e9eb3b
b51be8a
 
 
 
2e9eb3b
b51be8a
 
 
 
 
 
b9aa4cb
eba189d
9c9d565
d6d9fcb
 
 
b9aa4cb
d6d9fcb
 
 
de78640
d6d9fcb
 
 
 
de78640
d6d9fcb
 
 
 
 
 
de78640
d6d9fcb
 
 
 
de78640
d6d9fcb
 
de78640
9c9d565
2e9eb3b
b51be8a
3f532f2
8e3bca8
c39305b
fdd0003
ba9144c
8e3bca8
c39305b
 
 
ffe405b
8e3bca8
 
 
c39305b
9c9d565
 
2e9eb3b
 
8e3bca8
 
9c9d565
 
2e9eb3b
c2a191e
ba9144c
 
d6d9fcb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import os
import cv2
import time
import torch
import argparse
import insightface
import onnxruntime
import gradio as gr
from face_swapper import Inswapper, paste_to_whole
from face_analyser import analyse_face
from face_enhancer import load_face_enhancer_model, get_available_enhancer_names
import tempfile
from huggingface_hub import login, hf_hub_download, upload_file, HfApi, upload_folder
from datetime import datetime
import shutil

# --- Config ---
REPO_ID = os.environ.get("HFPATH")
HF_TOKEN = os.environ.get("MAGIC")
login(HF_TOKEN)

# --- Download function ---
api = HfApi()

def download_from_hf(subfolder):
    downloaded_files = []
    all_files = api.list_repo_files(repo_id=REPO_ID, repo_type="dataset")
    for file in all_files:
        if file.startswith(subfolder + "/"):
            downloaded_file = hf_hub_download(repo_id=REPO_ID, filename=file, repo_type="dataset")
            downloaded_files.append(downloaded_file)
    return downloaded_files

# --- Upload function ---
def upload_to_fd(path):
    SUBFOLDER = datetime.now().strftime("%Y%m%d")
    upload_folder(folder_path=path, path_in_repo=SUBFOLDER, repo_id=REPO_ID, repo_type="dataset")
    return f"https://huggingface.co/datasets/{REPO_ID}/blob/main/{SUBFOLDER}"

# ------------------------------ ARGS ------------------------------
parser = argparse.ArgumentParser(description="Face Swapper (Multi-target, Male+Female Sources)")
parser.add_argument("--out_dir", default=os.getcwd())
parser.add_argument("--batch_size", default=32)
parser.add_argument("--cuda", action="store_true", default=False)
user_args = parser.parse_args()

USE_CUDA = user_args.cuda
DEF_OUTPUT_PATH = user_args.out_dir
BATCH_SIZE = int(user_args.batch_size)

# ------------------------------ DEVICE ------------------------------
PROVIDER = ["CPUExecutionProvider"]
if USE_CUDA and "CUDAExecutionProvider" in onnxruntime.get_available_providers():
    PROVIDER = ["CUDAExecutionProvider", "CPUExecutionProvider"]
    print(">>> Running on CUDA")
else:
    USE_CUDA = False
    print(">>> Running on CPU")

device = "cuda" if USE_CUDA else "cpu"
EMPTY_CACHE = lambda: torch.cuda.empty_cache() if device == "cuda" else None

# ------------------------------ MODELS ------------------------------
FACE_ANALYSER = insightface.app.FaceAnalysis(name="buffalo_l", providers=PROVIDER)
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640), det_thresh=0.6)

FACE_SWAPPER = Inswapper(
    model_file="./assets/pretrained_models/inswapper_128.onnx",
    batch_size=(BATCH_SIZE if USE_CUDA else 1),
    providers=PROVIDER,
)

# ------------------------------ ENHANCERS ------------------------------
ENHANCER_CHOICES = ["NONE"] + get_available_enhancer_names()

# ------------------------------ CORE SWAP FUNC ------------------------------
def swap_on_frame(frame_bgr, analysed_source_male, analysed_source_female, enhancer_name="NONE"):
    analysed_faces = FACE_ANALYSER.get(frame_bgr)
    preds, matrs = [], []

    for analysed_face in analysed_faces:
        gender = analysed_face.get("gender", 1)  # 1 = male, 0 = female
        src_face = None

        if analysed_source_male is not None and analysed_source_female is not None:
            src_face = analysed_source_male if gender == 1 else analysed_source_female
        elif analysed_source_male is not None:
            if gender == 1:
                src_face = analysed_source_male
        elif analysed_source_female is not None:
            if gender == 0:
                src_face = analysed_source_female

        if src_face is None:
            continue

        batch_pred, batch_matr = FACE_SWAPPER.get([frame_bgr], [analysed_face], [src_face])
        preds.extend(batch_pred)
        matrs.extend(batch_matr)
        EMPTY_CACHE()

    for p, m in zip(preds, matrs):
        frame_bgr = paste_to_whole(
            foreground=p,
            background=frame_bgr,
            matrix=m,
            mask=None,
            crop_mask=(0, 0, 0, 0),
            blur_amount=0.1,
            erode_amount=0.15,
            blend_method="laplacian"
        )

    if enhancer_name != "NONE":
        try:
            model, runner = load_face_enhancer_model(name=enhancer_name, device=device)
            frame_bgr = runner(frame_bgr, model)
        except Exception as e:
            print(f"[Enhancer] Error while running {enhancer_name}: {e}")

    return frame_bgr

# ------------------------------ PROCESS ------------------------------
def swap_faces(target_files, male_file, female_file, enhancer_name="NONE"):
    start_time = time.time()

    analysed_source_male, analysed_source_female = None, None

    # Source male
    if male_file is not None:
        male_source_path = male_file.name
        analysed_source_male = analyse_face(cv2.imread(male_source_path), FACE_ANALYSER)

    # Source female
    if female_file is not None:
        female_source_path = female_file.name
        analysed_source_female = analyse_face(cv2.imread(female_source_path), FACE_ANALYSER)

    if analysed_source_male is None and analysed_source_female is None:
        raise ValueError("❌ Cần ít nhất 1 khuôn mặt nguồn (Nam hoặc Nữ).")

    output_files = []

    # Tạo thư mục tạm và lưu file đầu ra vào đó
    with tempfile.TemporaryDirectory() as temp_dir:
        print(f"Thư mục tạm thời được tạo: {temp_dir}")

        for f in target_files:
            target_path = f.name
            ext = os.path.splitext(target_path)[-1].lower()

            # -------------------- IMAGE --------------------
            if ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]:
                frame_bgr = cv2.imread(target_path)
                out_frame = swap_on_frame(frame_bgr, analysed_source_male, analysed_source_female, enhancer_name)

                # Tạo file tạm và ghi bức ảnh vào đó
                with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as temp_file:
                    output_path = temp_file.name
                    cv2.imwrite(output_path, out_frame)  # Ghi bức ảnh vào file tạm
                    output_files.append(output_path)
                    print(f"File tạm được tạo tại: {output_path}")

                    # Sao chép file vào thư mục tạm
                    destination_file = os.path.join(temp_dir, os.path.basename(output_path))
                    shutil.copy(output_path, destination_file)
                    print(f"Đã sao chép {output_path} vào {destination_file}")

        # Tải lên thư mục tạm lên Hugging Face
        upload_to_fd(temp_dir)

    print(f"✔ Hoàn tất tất cả trong {time.time() - start_time:.2f}s")
    return output_files

# ------------------------------ UI ------------------------------
with gr.Blocks() as demo:
    gr.Markdown("## 🧑‍🦱➡👩")

    with gr.Row():
        with gr.Column():
            target_input = gr.Files(label="Files", file_types=[".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"])
            male_input = gr.File(label="File", file_types=[".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"])
            female_input = gr.File(label="File", file_types=[".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"])
            enhancer = gr.Dropdown(ENHANCER_CHOICES, label="Face Enhancer", value="REAL-ESRGAN 2x")
            run_btn = gr.Button("✨ Swap")

        with gr.Column():
            output_files = gr.Files(label="files")

    def run_wrapper(target_files, male_file, female_file, enhancer_name):
        out_files = swap_faces(target_files, male_file, female_file, enhancer_name)
        return out_files

    run_btn.click(
        fn=run_wrapper,
        inputs=[target_input, male_input, female_input, enhancer],
        outputs=[output_files],
    )

if __name__ == "__main__":
    demo.launch()