import os |
import shutil |
import glob |
import re |
import subprocess |
import random |
import yaml |
from pathlib import Path |
import torch |
import gradio as gr |
import threading |
import time |
import librosa |
import soundfile as sf |
import numpy as np |
import requests |
import json |
import locale |
from datetime import datetime |
import yt_dlp |
import validators |
from pytube import YouTube |
import io |
import math |
import hashlib |
import gc |
import psutil |
import concurrent.futures |
from tqdm import tqdm |
import tempfile |
from urllib.parse import urlparse, quote |
import argparse |
from tqdm.auto import tqdm |
import torch.nn as nn |
from model import get_model_config, MODEL_CONFIGS |
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
INPUT_DIR = os.path.join(BASE_DIR, "input") |
OUTPUT_DIR = os.path.join(BASE_DIR, "output") |
OLD_OUTPUT_DIR = os.path.join(BASE_DIR, "old_output") |
AUTO_ENSEMBLE_TEMP = os.path.join(BASE_DIR, "auto_ensemble_temp") |
AUTO_ENSEMBLE_OUTPUT = os.path.join(BASE_DIR, "ensemble_folder") |
VIDEO_TEMP = os.path.join(BASE_DIR, "video_temp") |
ENSEMBLE_DIR = os.path.join(BASE_DIR, "ensemble") |
COOKIE_PATH = os.path.join(BASE_DIR, "cookies.txt") |
INFERENCE_SCRIPT_PATH = os.path.join(BASE_DIR, "inference.py") |
os.makedirs(directory, exist_ok=True) |
class IndentDumper(yaml.Dumper): |
def increase_indent(self, flow=False, indentless=False): |
return super(IndentDumper, self).increase_indent(flow, False) |
def tuple_constructor(loader, node): |
"""Loads a tuple from a YAML sequence.""" |
values = loader.construct_sequence(node) |
return tuple(values) |
yaml.SafeLoader.add_constructor('tag:yaml.org,2002:python/tuple', tuple_constructor) |
def update_model_dropdown(category): |
"""Kategoriye göre model dropdown'ını günceller.""" |
return gr.Dropdown(choices=list(MODEL_CONFIGS[category].keys()), label="Model") |
def handle_file_upload(uploaded_file, file_path, is_auto_ensemble=False): |
clear_temp_folder("/tmp", exclude_items=["gradio", "config.json"]) |
clear_directory(INPUT_DIR) |
os.makedirs(INPUT_DIR, exist_ok=True) |
clear_directory(INPUT_DIR) |
if uploaded_file: |
target_path = save_uploaded_file(uploaded_file, is_input=True) |
return target_path, target_path |
elif file_path and os.path.exists(file_path): |
target_path = os.path.join(INPUT_DIR, os.path.basename(file_path)) |
shutil.copy(file_path, target_path) |
return target_path, target_path |
return None, None |
def clear_directory(directory): |
"""Deletes all files in the given directory.""" |
files = glob.glob(os.path.join(directory, '*')) |
for f in files: |
try: |
os.remove(f) |
except Exception as e: |
print(f"{f} could not be deleted: {e}") |
def clear_temp_folder(folder_path, exclude_items=None): |
"""Safely clears contents of a directory while preserving specified items.""" |
try: |
if not os.path.exists(folder_path): |
print(f"⚠️ Directory does not exist: {folder_path}") |
return False |
if not os.path.isdir(folder_path): |
print(f"⚠️ Path is not a directory: {folder_path}") |
return False |
exclude_items = exclude_items or [] |
for item_name in os.listdir(folder_path): |
item_path = os.path.join(folder_path, item_name) |
if item_name in exclude_items: |
continue |
try: |
if os.path.isfile(item_path) or os.path.islink(item_path): |
os.unlink(item_path) |
elif os.path.isdir(item_path): |
shutil.rmtree(item_path) |
except Exception as e: |
print(f"⚠️ Error deleting {item_path}: {str(e)}") |
return True |
except Exception as e: |
print(f"❌ Critical error: {str(e)}") |
return False |
def clear_old_output(): |
old_output_folder = os.path.join(BASE_DIR, 'old_output') |
try: |
if not os.path.exists(old_output_folder): |
return "❌ Old output folder does not exist" |
shutil.rmtree(old_output_folder) |
os.makedirs(old_output_folder, exist_ok=True) |
return "✅ Old outputs successfully cleared!" |
except Exception as e: |
return f"🔥 Error: {str(e)}" |
def shorten_filename(filename, max_length=30): |
"""Shortens a filename to a specified maximum length.""" |
base, ext = os.path.splitext(filename) |
if len(base) <= max_length: |
return filename |
return base[:15] + "..." + base[-10:] + ext |
def clean_filename(title): |
"""Removes special characters from a filename.""" |
return re.sub(r'[^\w\-_\. ]', '', title).strip() |
def convert_to_wav(file_path): |
"""Converts the audio file to WAV format.""" |
original_filename = os.path.basename(file_path) |
filename, ext = os.path.splitext(original_filename) |
if ext.lower() == '.wav': |
return file_path |
wav_output = os.path.join(ENSEMBLE_DIR, f"{filename}.wav") |
try: |
command = [ |
'ffmpeg', '-y', '-i', file_path, |
'-acodec', 'pcm_s16le', '-ar', '44100', wav_output |
] |
subprocess.run(command, check=True, capture_output=True) |
return wav_output |
except subprocess.CalledProcessError as e: |
print(f"FFmpeg Error ({e.returncode}): {e.stderr.decode()}") |
return None |
def generate_random_port(): |
"""Generates a random port number.""" |
return random.randint(1000, 9000) |
def update_file_list(): |
output_files = glob.glob(os.path.join(OUTPUT_DIR, "*.wav")) |
old_output_files = glob.glob(os.path.join(OLD_OUTPUT_DIR, "*.wav")) |
files = output_files + old_output_files |
return gr.Dropdown(choices=files) |
def save_uploaded_file(uploaded_file, is_input=False, target_dir=None): |
"""Saves an uploaded file to the specified directory.""" |
media_extensions = ['.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a', '.mp4'] |
target_dir = target_dir or (INPUT_DIR if is_input else OUTPUT_DIR) |
timestamp_patterns = [ |
r'_\d{8}_\d{6}_\d{6}$', r'_\d{14}$', r'_\d{10}$', r'_\d+$' |
] |
if hasattr(uploaded_file, 'name'): |
original_filename = os.path.basename(uploaded_file.name) |
else: |
original_filename = os.path.basename(str(uploaded_file)) |
if is_input: |
base_filename = original_filename |
for pattern in timestamp_patterns: |
base_filename = re.sub(pattern, '', base_filename) |
for ext in media_extensions: |
base_filename = base_filename.replace(ext, '') |
file_ext = next( |
(ext for ext in media_extensions if original_filename.lower().endswith(ext)), |
'.wav' |
) |
clean_filename = f"{base_filename.strip('_- ')}{file_ext}" |
else: |
clean_filename = original_filename |
target_path = os.path.join(target_dir, clean_filename) |
os.makedirs(target_dir, exist_ok=True) |
if os.path.exists(target_path): |
os.remove(target_path) |
if hasattr(uploaded_file, 'read'): |
with open(target_path, "wb") as f: |
f.write(uploaded_file.read()) |
else: |
shutil.copy(uploaded_file, target_path) |
print(f"File saved successfully: {os.path.basename(target_path)}") |
return target_path |
def move_old_files(output_folder): |
"""Moves old files to the old_output directory.""" |
os.makedirs(OLD_OUTPUT_DIR, exist_ok=True) |
for filename in os.listdir(output_folder): |
file_path = os.path.join(output_folder, filename) |
if os.path.isfile(file_path): |
new_filename = f"{os.path.splitext(filename)[0]}_old{os.path.splitext(filename)[1]}" |
new_file_path = os.path.join(OLD_OUTPUT_DIR, new_filename) |
shutil.move(file_path, new_file_path) |