|
import os |
|
import cv2 |
|
import time |
|
import torch |
|
import argparse |
|
import insightface |
|
import onnxruntime |
|
import gradio as gr |
|
from face_swapper import Inswapper, paste_to_whole |
|
from face_analyser import analyse_face |
|
from face_enhancer import load_face_enhancer_model, get_available_enhancer_names |
|
import tempfile |
|
from huggingface_hub import login, hf_hub_download, upload_file, HfApi, upload_folder |
|
from datetime import datetime |
|
import shutil |
|
|
|
|
|
REPO_ID = os.environ.get("HFPATH") |
|
HF_TOKEN = os.environ.get("MAGIC") |
|
login(HF_TOKEN) |
|
|
|
|
|
api = HfApi() |
|
|
|
def download_from_hf(subfolder): |
|
downloaded_files = [] |
|
all_files = api.list_repo_files(repo_id=REPO_ID, repo_type="dataset") |
|
for file in all_files: |
|
if file.startswith(subfolder + "/"): |
|
downloaded_file = hf_hub_download(repo_id=REPO_ID, filename=file, repo_type="dataset") |
|
downloaded_files.append(downloaded_file) |
|
return downloaded_files |
|
|
|
|
|
def upload_to_fd(path): |
|
SUBFOLDER = datetime.now().strftime("%Y%m%d") |
|
upload_folder(folder_path=path, path_in_repo=SUBFOLDER, repo_id=REPO_ID, repo_type="dataset") |
|
return f"https://huggingface.co/datasets/{REPO_ID}/blob/main/{SUBFOLDER}" |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Face Swapper (Multi-target, Male+Female Sources)") |
|
parser.add_argument("--out_dir", default=os.getcwd()) |
|
parser.add_argument("--batch_size", default=32) |
|
parser.add_argument("--cuda", action="store_true", default=False) |
|
user_args = parser.parse_args() |
|
|
|
USE_CUDA = user_args.cuda |
|
DEF_OUTPUT_PATH = user_args.out_dir |
|
BATCH_SIZE = int(user_args.batch_size) |
|
|
|
|
|
PROVIDER = ["CPUExecutionProvider"] |
|
if USE_CUDA and "CUDAExecutionProvider" in onnxruntime.get_available_providers(): |
|
PROVIDER = ["CUDAExecutionProvider", "CPUExecutionProvider"] |
|
print(">>> Running on CUDA") |
|
else: |
|
USE_CUDA = False |
|
print(">>> Running on CPU") |
|
|
|
device = "cuda" if USE_CUDA else "cpu" |
|
EMPTY_CACHE = lambda: torch.cuda.empty_cache() if device == "cuda" else None |
|
|
|
|
|
FACE_ANALYSER = insightface.app.FaceAnalysis(name="buffalo_l", providers=PROVIDER) |
|
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640), det_thresh=0.6) |
|
|
|
FACE_SWAPPER = Inswapper( |
|
model_file="./assets/pretrained_models/inswapper_128.onnx", |
|
batch_size=(BATCH_SIZE if USE_CUDA else 1), |
|
providers=PROVIDER, |
|
) |
|
|
|
|
|
ENHANCER_CHOICES = ["NONE"] + get_available_enhancer_names() |
|
|
|
|
|
def swap_on_frame(frame_bgr, analysed_source_male, analysed_source_female, enhancer_name="NONE"): |
|
analysed_faces = FACE_ANALYSER.get(frame_bgr) |
|
preds, matrs = [], [] |
|
|
|
for analysed_face in analysed_faces: |
|
gender = analysed_face.get("gender", 1) |
|
src_face = None |
|
|
|
if analysed_source_male is not None and analysed_source_female is not None: |
|
src_face = analysed_source_male if gender == 1 else analysed_source_female |
|
elif analysed_source_male is not None: |
|
if gender == 1: |
|
src_face = analysed_source_male |
|
elif analysed_source_female is not None: |
|
if gender == 0: |
|
src_face = analysed_source_female |
|
|
|
if src_face is None: |
|
continue |
|
|
|
batch_pred, batch_matr = FACE_SWAPPER.get([frame_bgr], [analysed_face], [src_face]) |
|
preds.extend(batch_pred) |
|
matrs.extend(batch_matr) |
|
EMPTY_CACHE() |
|
|
|
for p, m in zip(preds, matrs): |
|
frame_bgr = paste_to_whole( |
|
foreground=p, |
|
background=frame_bgr, |
|
matrix=m, |
|
mask=None, |
|
crop_mask=(0, 0, 0, 0), |
|
blur_amount=0.1, |
|
erode_amount=0.15, |
|
blend_method="laplacian" |
|
) |
|
|
|
if enhancer_name != "NONE": |
|
try: |
|
model, runner = load_face_enhancer_model(name=enhancer_name, device=device) |
|
frame_bgr = runner(frame_bgr, model) |
|
except Exception as e: |
|
print(f"[Enhancer] Error while running {enhancer_name}: {e}") |
|
|
|
return frame_bgr |
|
|
|
|
|
def swap_faces(target_files, male_file, female_file, enhancer_name="NONE"): |
|
start_time = time.time() |
|
|
|
analysed_source_male, analysed_source_female = None, None |
|
|
|
|
|
if male_file is not None: |
|
male_source_path = male_file.name |
|
analysed_source_male = analyse_face(cv2.imread(male_source_path), FACE_ANALYSER) |
|
|
|
|
|
if female_file is not None: |
|
female_source_path = female_file.name |
|
analysed_source_female = analyse_face(cv2.imread(female_source_path), FACE_ANALYSER) |
|
|
|
if analysed_source_male is None and analysed_source_female is None: |
|
raise ValueError("❌ Cần ít nhất 1 khuôn mặt nguồn (Nam hoặc Nữ).") |
|
|
|
output_files = [] |
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
print(f"Thư mục tạm thời được tạo: {temp_dir}") |
|
|
|
for f in target_files: |
|
target_path = f.name |
|
ext = os.path.splitext(target_path)[-1].lower() |
|
|
|
|
|
if ext in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]: |
|
frame_bgr = cv2.imread(target_path) |
|
out_frame = swap_on_frame(frame_bgr, analysed_source_male, analysed_source_female, enhancer_name) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as temp_file: |
|
output_path = temp_file.name |
|
cv2.imwrite(output_path, out_frame) |
|
output_files.append(output_path) |
|
print(f"File tạm được tạo tại: {output_path}") |
|
|
|
|
|
destination_file = os.path.join(temp_dir, os.path.basename(output_path)) |
|
shutil.copy(output_path, destination_file) |
|
print(f"Đã sao chép {output_path} vào {destination_file}") |
|
|
|
|
|
upload_to_fd(temp_dir) |
|
|
|
print(f"✔ Hoàn tất tất cả trong {time.time() - start_time:.2f}s") |
|
return output_files |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("## 🧑🦱➡👩 Face Swapper (Upload nhiều file target + nguồn nam/nữ) + Enhancer") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
target_input = gr.Files(label="Files đích (ảnh)", file_types=[".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]) |
|
male_input = gr.File(label="File nguồn Nam (ảnh)", file_types=[".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]) |
|
female_input = gr.File(label="File nguồn Nữ (ảnh)", file_types=[".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]) |
|
enhancer = gr.Dropdown(ENHANCER_CHOICES, label="Face Enhancer", value="REAL-ESRGAN 2x") |
|
run_btn = gr.Button("✨ Swap") |
|
|
|
with gr.Column(): |
|
output_files = gr.Files(label="Kết quả ảnh") |
|
|
|
def run_wrapper(target_files, male_file, female_file, enhancer_name): |
|
out_files = swap_faces(target_files, male_file, female_file, enhancer_name) |
|
return out_files |
|
|
|
run_btn.click( |
|
fn=run_wrapper, |
|
inputs=[target_input, male_input, female_input, enhancer], |
|
outputs=[output_files], |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|