# -*- coding: utf-8 -*- """ tools/do_everything.py End-to-end pipeline with post-TTS Emotion control automated by Higgs-understanding (windowed + crossfaded), using ONLY the auto batch. UI values supported: - "natural" -> skip emotion shaping - "happy" -> treated as "auto-happy" - "sad" -> treated as "auto-sad" - "angry" -> treated as "auto-angry" - "auto-*" -> respected as-is (e.g., "auto-happy", "auto-sad", "auto-angry") Requires: tools/step045_emotion_auto_batch.py """ import json import os import time import traceback from concurrent.futures import ThreadPoolExecutor from typing import Optional import torch from loguru import logger from .step000_video_downloader import ( get_info_list_from_url, download_single_video, get_target_folder, ) from .step010_demucs_vr import separate_all_audio_under_folder, init_demucs, release_model from .step020_asr import transcribe_all_audio_under_folder # from .step021_asr_whisperx import init_whisperx, init_diarize from .step022_asr_funasr import init_funasr from .step030_translation import translate_all_transcript_under_folder from .step040_tts import generate_all_wavs_under_folder from .step042_tts_xtts import init_TTS from .step043_tts_cosyvoice import init_cosyvoice from .step050_synthesize_video import synthesize_all_video_under_folder # ONLY import the auto emotion batch from .step047_emotion_auto_batch import auto_tune_emotion_all_wavs_under_folder # Track which heavy models were initialized (process lifetime) models_initialized = { "demucs": False, "xtts": False, "cosyvoice": False, "diarize": False, "funasr": False, # Higgs ASR/TTS are API-based; kept out of init gating intentionally } # ------------------------------------------------------------------------------------ # Unified language normalization # Accept BOTH UI labels and codes; normalize to codes: zh-cn, zh-tw, en, ko, es, fr # ------------------------------------------------------------------------------------ # Rich alias tables -> language code _TRANSLATION_ALIASES = { # Simplified Chinese "simplified chinese (简体中文)": "zh-cn", "简体中文": "zh-cn", "simplified chinese": "zh-cn", "chinese (simplified)": "zh-cn", "zh-cn": "zh-cn", "cn": "zh-cn", # Traditional Chinese "traditional chinese (繁体中文)": "zh-tw", "繁体中文": "zh-tw", "traditional chinese": "zh-tw", "chinese (traditional)": "zh-tw", "zh-tw": "zh-tw", "tw": "zh-tw", # English "english": "en", "en": "en", # Korean "korean": "ko", "한국어": "ko", "ko": "ko", # Spanish "spanish": "es", "español": "es", "es": "es", } _TTS_ALIASES = { # Chinese (generic UI label) -> use Simplified by default unless caller passed zh-tw explicitly "chinese (中文)": "zh-cn", "中文": "zh-cn", "chinese": "zh-cn", "zh": "zh-cn", "zh-cn": "zh-cn", # Traditional Chinese explicit "traditional chinese": "zh-tw", "繁体中文": "zh-tw", "zh-tw": "zh-tw", # English "english": "en", "en": "en", # Korean "korean": "ko", "한국어": "ko", "ko": "ko", # Spanish "spanish": "es", "español": "es", "es": "es", # French "french": "fr", "français": "fr", "fr": "fr", } _ALLOWED_SUB_LANGS = {"zh-cn", "zh-tw", "en", "ko", "es"} _ALLOWED_TTS_LANGS = {"zh-cn", "zh-tw", "en", "ko", "es", "fr"} def _canon(s: Optional[str]) -> Optional[str]: if s is None: return None return str(s).strip().lower() def _norm_translation_lang(ui_label_or_code: str) -> str: """Normalize subtitle/translation target to code.""" key = _canon(ui_label_or_code) code = _TRANSLATION_ALIASES.get(key, key) if code not in _ALLOWED_SUB_LANGS: raise ValueError(f"Unrecognized subtitle/translation language: {ui_label_or_code}") return code def _norm_tts_lang(ui_label_or_code: str) -> str: """Normalize TTS target to code.""" key = _canon(ui_label_or_code) code = _TTS_ALIASES.get(key, key) if code not in _ALLOWED_TTS_LANGS: raise ValueError(f"Unrecognized TTS language: {ui_label_or_code}") return code def _coerce_int_or_none(x): if x in (None, "", "None"): return None try: return int(x) except Exception: return None def get_available_gpu_memory() -> float: """Return available GPU memory in GiB (0 if CUDA is unavailable or an error occurs).""" try: if torch.cuda.is_available(): total = torch.cuda.get_device_properties(0).total_memory used = torch.cuda.memory_allocated(0) return (total - used) / (1024 ** 3) return 0.0 except Exception: return 0.0 def initialize_models(tts_method: str, asr_method: str, diarization: bool) -> None: """ Initialize required models exactly once per process. Uses a thread pool for parallel cold-start, then waits for completion. """ global models_initialized futures = [] try: with ThreadPoolExecutor(max_workers=2) as executor: # Demucs if not models_initialized["demucs"]: futures.append(executor.submit(init_demucs)) models_initialized["demucs"] = True logger.info("Initialized Demucs") else: logger.info("Demucs already initialized — skipping") # TTS if tts_method == "xtts": if not models_initialized["xtts"]: futures.append(executor.submit(init_TTS)) models_initialized["xtts"] = True logger.info("Initialized XTTS") elif tts_method == "cosyvoice": if not models_initialized["cosyvoice"]: futures.append(executor.submit(init_cosyvoice)) models_initialized["cosyvoice"] = True logger.info("Initialized CosyVoice") elif tts_method == "Higgs": # API-based; nothing to init locally logger.info("TTS 'Higgs' selected — API-based") # ASR (local initializers when applicable) # if asr_method == "WhisperX": # if not models_initialized["whisperx"]: # futures.append(executor.submit(init_whisperx)) # models_initialized["whisperx"] = True # logger.info("Initialized WhisperX") # if diarization and not models_initialized["diarize"]: # futures.append(executor.submit(init_diarize)) # models_initialized["diarize"] = True # logger.info("Initialized diarization") if asr_method == "FunASR": if not models_initialized["funasr"]: futures.append(executor.submit(init_funasr)) models_initialized["funasr"] = True logger.info("Initialized FunASR") elif asr_method == "Higgs": # API-based; no local model to init logger.info("ASR 'Higgs' selected — API-based, no local initialization required") # Ensure any init exception gets raised here for fut in futures: fut.result() except Exception as e: stack_trace = traceback.format_exc() logger.error(f"Failed to initialize models: {e}\n{stack_trace}") # Reset flags to allow retry and free any partially loaded state models_initialized = {k: False for k in models_initialized} release_model() raise def process_video( info, root_folder, resolution, demucs_model, device, shifts, asr_method, whisper_model, batch_size, diarization, whisper_min_speakers, whisper_max_speakers, translation_method, translation_target_language, # may be label or code tts_method, tts_target_language, # may be label or code voice, subtitles, speed_up, fps, background_music, bgm_volume, video_volume, target_resolution, max_retries, progress_callback=None, *, emotion: str = "natural", emotion_strength: float = 0.6, ): """ Process a single video end-to-end with optional progress callback. progress_callback(progress_percent: int, status_message: str) -> None """ # Progress stages: (label, weight_total_percent) stages = [ ("Downloading video...", 10), ("Separating vocals...", 15), ("Speech recognition...", 20), ("Translating subtitles...", 25), ("Synthesizing speech...", 20), ("Compositing video...", 10), ] current_stage = 0 progress_base = 0 if progress_callback: progress_callback(0, "Preparing...") for retry in range(max_retries): try: # Stage: Download stage_name, stage_weight = stages[current_stage] if progress_callback: progress_callback(progress_base, stage_name) if isinstance(info, str) and info.endswith(".mp4"): # Local file mode: place it under //download.mp4 import shutil original_file_name = os.path.basename(info) folder_name = os.path.splitext(original_file_name)[0] folder = os.path.join(root_folder, folder_name) os.makedirs(folder, exist_ok=True) dest_path = os.path.join(folder, "download.mp4") shutil.copy(info, dest_path) else: folder = get_target_folder(info, root_folder) if folder is None: error_msg = f'Unable to derive target folder: {info.get("title") if isinstance(info, dict) else info}' logger.warning(error_msg) return False, None, error_msg folder = download_single_video(info, root_folder, resolution) if folder is None: error_msg = f'Download failed: {info.get("title") if isinstance(info, dict) else info}' logger.warning(error_msg) return False, None, error_msg logger.info(f"Processing video folder: {folder}") # Stage: Vocal separation current_stage += 1 progress_base += stage_weight stage_name, stage_weight = stages[current_stage] if progress_callback: progress_callback(progress_base, stage_name) try: status, vocals_path, _ = separate_all_audio_under_folder( folder, model_name=demucs_model, device=device, progress=True, shifts=shifts ) logger.info(f"Vocal separation complete: {vocals_path}") except Exception as e: stack_trace = traceback.format_exc() error_msg = f"Vocal separation failed: {e}\n{stack_trace}" logger.error(error_msg) return False, None, error_msg # Stage: ASR current_stage += 1 progress_base += stage_weight stage_name, stage_weight = stages[current_stage] if progress_callback: progress_callback(progress_base, stage_name) try: # Coerce radios to int/None if needed whisper_min_speakers_c = _coerce_int_or_none(whisper_min_speakers) whisper_max_speakers_c = _coerce_int_or_none(whisper_max_speakers) status, result_json = transcribe_all_audio_under_folder( folder, asr_method=asr_method, whisper_model_name=whisper_model, # ignored by Higgs path if implemented that way device=device, batch_size=batch_size, diarization=diarization, min_speakers=whisper_min_speakers_c, max_speakers=whisper_max_speakers_c, ) logger.info(f"ASR completed: {status}") except Exception as e: stack_trace = traceback.format_exc() error_msg = f"ASR failed: {e}\n{stack_trace}" logger.error(error_msg) return False, None, error_msg # Stage: Translation current_stage += 1 progress_base += stage_weight stage_name, stage_weight = stages[current_stage] if progress_callback: progress_callback(progress_base, stage_name) try: # Normalize subtitle/translation target (label or code -> code) translation_target_language = _norm_translation_lang(translation_target_language) logger.info(f"Subtitle/Translation language (code): {translation_target_language}") msg, summary, translation = translate_all_transcript_under_folder( folder, method=translation_method, target_language=translation_target_language ) logger.info(f"Translation completed: {msg}") except Exception as e: stack_trace = traceback.format_exc() error_msg = f"Translation failed: {e}\n{stack_trace}" logger.error(error_msg) return False, None, error_msg # Stage: TTS current_stage += 1 progress_base += stage_weight stage_name, stage_weight = stages[current_stage] if progress_callback: progress_callback(progress_base, stage_name) try: # Normalize TTS language (label or code -> code) tts_target_language = _norm_tts_lang(tts_target_language) logger.info(f"TTS target language (code): {tts_target_language}") status, synth_path, _ = generate_all_wavs_under_folder( folder, method=tts_method, target_language=tts_target_language, voice=voice ) logger.info(f"TTS completed: {synth_path}") except Exception as e: stack_trace = traceback.format_exc() error_msg = f"TTS failed: {e}\n{stack_trace}" logger.error(error_msg) return False, None, error_msg # NEW Stage: Emotion shaping (auto via Higgs-understanding) try: # Map "happy"|"sad"|"angry" to "auto-happy"|... ; keep "natural" as skip _emotion = (emotion or "natural").strip().lower() if _emotion in ("happy", "sad", "angry"): _emotion = f"auto-{_emotion}" if _emotion.startswith("auto"): _lang_hint = tts_target_language or "en" # already normalized code ok, emsg = auto_tune_emotion_all_wavs_under_folder( folder, emotion=_emotion, # "auto-happy"/"auto-sad"/"auto-angry"/"auto" strength=float(emotion_strength), lang_hint=_lang_hint, win_s=10.0, hop_s=9.0, xfade_ms=int(os.getenv("HIGGS_TTS_XFADE_MS", "28")), latency_budget_s=0.5, min_confidence=0.50, max_iters=2, ) logger.info(f"Emotion (AUTO) shaping: {emsg}") else: logger.info("Emotion preset is natural — skipping.") except Exception as e: logger.warning(f"Emotion shaping step failed but continuing: {e}") # Stage: Synthesis (video) current_stage += 1 progress_base += stage_weight stage_name, stage_weight = stages[current_stage] if progress_callback: progress_callback(progress_base, stage_name) try: status, output_video = synthesize_all_video_under_folder( folder, subtitles=subtitles, speed_up=speed_up, fps=fps, resolution=target_resolution, background_music=background_music, bgm_volume=bgm_volume, video_volume=video_volume, ) logger.info(f"Video composition completed: {output_video}") except Exception as e: stack_trace = traceback.format_exc() error_msg = f"Video composition failed: {e}\n{stack_trace}" logger.error(error_msg) return False, None, error_msg # Done if progress_callback: progress_callback(100, "Completed!") return True, output_video, "Success" except Exception as e: stack_trace = traceback.format_exc() title = info.get("title") if isinstance(info, dict) else info error_msg = f"Error while processing {title}: {e}\n{stack_trace}" logger.error(error_msg) if retry < max_retries - 1: logger.info(f"Retrying {retry + 2}/{max_retries}...") else: return False, None, error_msg return False, None, f"Max retries reached: {max_retries}" def do_everything( root_folder, url, num_videos=5, resolution="1080p", demucs_model="htdemucs_ft", device="auto", shifts=5, asr_method="Higgs", # <-- matches UI default whisper_model="large", batch_size=32, diarization=False, whisper_min_speakers=None, whisper_max_speakers=None, translation_method="LLM", translation_target_language="zh-cn", # default code (was UI label) tts_method="Higgs", # <-- matches UI default tts_target_language="zh-cn", # default code (UI should override) voice="zh-CN-XiaoxiaoNeural", subtitles=True, speed_up=1.00, fps=30, background_music=None, bgm_volume=0.5, video_volume=1.0, target_resolution="1080p", max_workers=3, max_retries=5, progress_callback=None, *, emotion: str = "natural", # "natural" | "happy" | "sad" | "angry" | "auto-*" | "auto" emotion_strength: float = 0.6, # 0..1 ): """ Full pipeline entrypoint with an optional progress callback. Returns: (summary_text: str, last_output_video_path: Optional[str]) """ try: success_list = [] fail_list = [] error_details = [] # Normalize the possibly human-readable inputs to codes up-front try: translation_target_language = _norm_translation_lang(translation_target_language) tts_target_language = _norm_tts_lang(tts_target_language) except Exception as e: logger.error(f"Language normalization error: {e}") return f"Language normalization error: {e}", None logger.info("-" * 50) logger.info(f"Starting job: {url}") logger.info(f"Output folder={root_folder}, videos={num_videos}, download_res={resolution}") logger.info(f"Vocal separation: model={demucs_model}, device={device}, shifts={shifts}") logger.info(f"ASR: method={asr_method}, model={whisper_model}, batch_size={batch_size}, diarization={diarization}") logger.info(f"Translate: method={translation_method}, target_lang(code)={translation_target_language}") logger.info(f"TTS: method={tts_method}, target_lang(code)={tts_target_language}, voice={voice}") logger.info(f"Emotion(AUTO): preset={emotion}, strength={emotion_strength:.2f}") logger.info(f"Video compose: subtitles={subtitles}, speed={speed_up}, FPS={fps}, render_res={target_resolution}") logger.info("-" * 50) # Normalize multiline URL list; allow comma/Chinese comma separators normalized = (url or "").replace(" ", "").replace(",", "\n").replace(",", "\n") urls = [u for u in normalized.split("\n") if u] # Warm up models once try: if progress_callback: progress_callback(5, "Initializing models...") initialize_models(tts_method, asr_method, diarization) except Exception as e: stack_trace = traceback.format_exc() logger.error(f"Model initialization failed: {e}\n{stack_trace}") return f"Model initialization failed: {e}", None out_video: Optional[str] = None # Local file convenience: handle a single .mp4 path if url.endswith(".mp4"): try: success, output_video, error_msg = process_video( url, # pass the actual file path root_folder, resolution, demucs_model, device, shifts, asr_method, whisper_model, batch_size, diarization, whisper_min_speakers, whisper_max_speakers, translation_method, translation_target_language, tts_method, tts_target_language, voice, subtitles, speed_up, fps, background_music, bgm_volume, video_volume, target_resolution, max_retries, progress_callback, # NEW emotion=emotion, emotion_strength=emotion_strength, ) if success: logger.info(f"Local video processed successfully: {url}") return "Success", output_video else: logger.error(f"Local video failed: {url}, error: {error_msg}") return f"Failed: {error_msg}", None except Exception as e: stack_trace = traceback.format_exc() logger.error(f"Failed to process local video: {e}\n{stack_trace}") return f"Failed to process local video: {e}", None # Remote URLs try: videos_info = [] if progress_callback: progress_callback(10, "Fetching video info...") for video_info in get_info_list_from_url(urls, num_videos): videos_info.append(video_info) if not videos_info: return "Failed to retrieve video info. Please check the URL(s).", None for info in videos_info: try: success, output_video, error_msg = process_video( info, root_folder, resolution, demucs_model, device, shifts, asr_method, whisper_model, batch_size, diarization, whisper_min_speakers, whisper_max_speakers, translation_method, translation_target_language, tts_method, tts_target_language, voice, subtitles, speed_up, fps, background_music, bgm_volume, video_volume, target_resolution, max_retries, progress_callback, # NEW emotion=emotion, emotion_strength=emotion_strength, ) if success: success_list.append(info) out_video = output_video logger.info(f"Processed: {info['title'] if isinstance(info, dict) else info}") else: fail_list.append(info) error_details.append( f"{info['title'] if isinstance(info, dict) else info}: {error_msg}" ) logger.error( f"Failed: {info['title'] if isinstance(info, dict) else info}, error: {error_msg}" ) except Exception as e: stack_trace = traceback.format_exc() fail_list.append(info) error_details.append( f"{info['title'] if isinstance(info, dict) else info}: {e}" ) logger.error( f"Error: {info['title'] if isinstance(info, dict) else info}, error: {e}\n{stack_trace}" ) except Exception as e: stack_trace = traceback.format_exc() logger.error(f"Failed to get video list: {e}\n{stack_trace}") return f"Failed to get video list: {e}", None # Summary logger.info("-" * 50) logger.info(f"Done. success={len(success_list)}, failed={len(fail_list)}") if error_details: logger.info("Failure details:") for detail in error_details: logger.info(f" - {detail}") return f"Success: {len(success_list)}\nFailed: {len(fail_list)}", out_video except Exception as e: stack_trace = traceback.format_exc() error_msg = f"Pipeline error: {e}\n{stack_trace}" logger.error(error_msg) return error_msg, None