Spaces:
Build error
Build error
| # -*- coding: utf-8 -*- | |
| """ | |
| TTS synthesis pipeline (per-line => stitched track) | |
| - Language-aware text preprocessing (Chinese-only normalizations gated by target language) | |
| - Backend dispatch to XTTS / CosyVoice / EdgeTTS / Higgs | |
| - Precise timing via time-stretch with bounds | |
| - Deterministic language support checks with unified language codes | |
| """ | |
| import os | |
| import re | |
| import json | |
| import librosa | |
| import numpy as np | |
| from functools import lru_cache | |
| from loguru import logger | |
| from .utils import save_wav, save_wav_norm | |
| from .cn_tx import TextNorm | |
| from audiostretchy.stretch import stretch_audio | |
| # TTS backends (each must expose: tts(text, output_path, speaker_wav, ...)) | |
| from .step042_tts_xtts import tts as xtts_tts | |
| from .step043_tts_cosyvoice import tts as cosyvoice_tts | |
| from .step044_tts_edge_tts import tts as edge_tts | |
| # NEW: Higgs/Boson TTS (OpenAI-compatible) | |
| from .step041_tts_higgs import tts as higgs_tts # ensure this file exists | |
| # ----------------------- | |
| # Constants / globals | |
| # ----------------------- | |
| SR = 24000 | |
| EPS = 1e-8 # tiny guard for divides | |
| normalizer = TextNorm() | |
| # Precompiled regexes | |
| _RE_CAP_SPLIT = re.compile(r'(?<!^)([A-Z])') | |
| _RE_ALNUM_GAP = re.compile(r'(?<=[a-zA-Z])(?=\d)|(?<=\d)(?=[a-zA-Z])') | |
| # ----------------------- | |
| # Unified language normalization | |
| # Accepts labels or codes; returns canonical codes: | |
| # 'zh-cn','zh-tw','en','ko','ja','es','fr','pl' | |
| # ----------------------- | |
| _LANG_ALIASES = { | |
| # Simplified Chinese | |
| "zh-cn": "zh-cn", "zh_cn": "zh-cn", "cn": "zh-cn", | |
| "chinese (中文)": "zh-cn", "chinese": "zh-cn", "中文": "zh-cn", | |
| "simplified chinese (简体中文)": "zh-cn", "simplified chinese": "zh-cn", "简体中文": "zh-cn", | |
| # Traditional Chinese | |
| "zh-tw": "zh-tw", "zh_tw": "zh-tw", "tw": "zh-tw", | |
| "traditional chinese (繁体中文)": "zh-tw", "traditional chinese": "zh-tw", "繁体中文": "zh-tw", | |
| # English | |
| "en": "en", "english": "en", | |
| # Korean | |
| "ko": "ko", "korean": "ko", "한국어": "ko", | |
| # Japanese | |
| "ja": "ja", "japanese": "ja", "日本語": "ja", | |
| # Spanish | |
| "es": "es", "spanish": "es", "español": "es", | |
| # French | |
| "fr": "fr", "french": "fr", "français": "fr", | |
| # Polish (XTTS supports it) | |
| "pl": "pl", "polish": "pl", | |
| } | |
| _ALLOWED_CODES = {"zh-cn", "zh-tw", "en", "ko", "ja", "es", "fr", "pl"} | |
| def normalize_lang_to_code(lang: str) -> str: | |
| if not lang: | |
| raise ValueError("target_language is empty/None") | |
| key = str(lang).strip().lower() | |
| code = _LANG_ALIASES.get(key, key) | |
| if code not in _ALLOWED_CODES: | |
| raise ValueError(f"Unrecognized/unsupported language: {lang} -> {code}") | |
| return code | |
| def is_chinese_code(code: str) -> bool: | |
| return code in ("zh-cn", "zh-tw") | |
| # ----------------------- | |
| # Preprocessing | |
| # ----------------------- | |
| def preprocess_text(text: str, target_lang_code: str) -> str: | |
| """ | |
| Minimal, language-aware text normalization. | |
| Only apply Chinese-specific rules when target is Chinese (zh-cn/zh-tw). | |
| """ | |
| t = text or "" | |
| if is_chinese_code(target_lang_code): | |
| t = t.replace('AI', '人工智能') # legacy preference | |
| t = _RE_CAP_SPLIT.sub(r' \1', t) # split camel-case-ish caps | |
| t = normalizer(t) # Chinese text normalization | |
| # Language-agnostic: space between letters and digits | |
| t = _RE_ALNUM_GAP.sub(' ', t) | |
| return t | |
| # ----------------------- | |
| # Time & audio helpers | |
| # ----------------------- | |
| def adjust_audio_length( | |
| wav_path: str, | |
| desired_length: float, | |
| sample_rate: int = SR, | |
| min_speed_factor: float = 0.5, | |
| max_speed_factor: float = 1.2 | |
| ): | |
| """ | |
| Load synthesized audio (wav or mp3), time-stretch to fit desired_length, | |
| then crop to the exact slot if needed. Returns (audio, new_length_sec). | |
| """ | |
| # Load (fallback to .mp3 if needed) | |
| try: | |
| wav, sample_rate = librosa.load(wav_path, sr=sample_rate) | |
| except Exception: | |
| alt = wav_path.replace('.wav', '.mp3') if wav_path.endswith('.wav') else wav_path | |
| wav, sample_rate = librosa.load(alt, sr=sample_rate) | |
| current_length = len(wav) / max(sample_rate, 1) | |
| if current_length <= 1e-6 or desired_length <= 0: | |
| return np.zeros(0, dtype=np.float32), 0.0 | |
| speed_factor = max(min(desired_length / (current_length + EPS), max_speed_factor), min_speed_factor) | |
| logger.info(f"[TTS] stretch ratio={speed_factor:.3f}") | |
| # output path for stretched version | |
| if wav_path.endswith('.wav'): | |
| target_path = wav_path.replace('.wav', '_adjusted.wav') | |
| elif wav_path.endswith('.mp3'): | |
| target_path = wav_path.replace('.mp3', '_adjusted.wav') | |
| else: | |
| target_path = wav_path + '_adjusted.wav' | |
| # stretch + reload | |
| stretch_audio(wav_path, target_path, ratio=speed_factor, sample_rate=sample_rate) | |
| wav, sample_rate = librosa.load(target_path, sr=sample_rate) | |
| new_len = min(desired_length, len(wav) / max(sample_rate, 1)) | |
| return wav[:int(new_len * sample_rate)].astype(np.float32), new_len | |
| # ----------------------- | |
| # Backend support map (codes) | |
| # ----------------------- | |
| tts_support_languages = { | |
| # XTTS supports many; we keep a safe subset used in your project | |
| 'xtts': {'zh-cn', 'zh-tw', 'en', 'ja', 'ko', 'fr', 'pl', 'es'}, | |
| # EdgeTTS: voices primarily determine exact locale, but these codes are fine as hints | |
| 'EdgeTTS': {'zh-cn', 'zh-tw', 'en', 'ja', 'ko', 'fr', 'es', 'pl'}, | |
| # CosyVoice (common distributions): no Spanish/Polish typically | |
| 'cosyvoice': {'zh-cn', 'zh-tw', 'en', 'ja', 'ko', 'fr'}, | |
| # Higgs (per your notes): includes Spanish, French, etc. | |
| 'Higgs': {'zh-cn', 'zh-tw', 'en', 'ja', 'ko', 'fr', 'es'}, | |
| } | |
| # If a backend needs a specific token instead of the unified code, adapt here. | |
| _BACKEND_LANG_ADAPTER = { | |
| 'xtts': { | |
| # XTTS is happy with codes as below (common TTS community convention) | |
| # Keeping identity mapping; override here if your xtts expects different tokens. | |
| }, | |
| 'EdgeTTS': { | |
| # EdgeTTS typically uses the voice to pick locale, but we pass the code for completeness. | |
| # Identity mapping is fine; voice wins in Edge backend. | |
| }, | |
| 'cosyvoice': { | |
| # Identity for supported codes; Cantonese not used here. | |
| }, | |
| 'Higgs': { | |
| # Higgs/OpenAI-compatible endpoints are fine with ISO-ish codes per your prior usage. | |
| } | |
| } | |
| def _adapt_lang_for_backend(method: str, code: str) -> str: | |
| # If adapter table has a mapping, use it; otherwise default to the code itself. | |
| table = _BACKEND_LANG_ADAPTER.get(method, {}) | |
| return table.get(code, code) | |
| # ----------------------- | |
| # Backend dispatcher | |
| # ----------------------- | |
| def _synthesize_one_line(method: str, text: str, out_path: str, speaker_wav: str, | |
| target_lang_code: str, voice: str): | |
| """ | |
| Dispatch to the selected backend. Backends write WAV to out_path. | |
| target_lang_code is one of: 'zh-cn','zh-tw','en','ko','ja','es','fr','pl' | |
| """ | |
| lang = _adapt_lang_for_backend(method, target_lang_code) | |
| if method == 'xtts': | |
| xtts_tts(text, out_path, speaker_wav, target_language=lang) | |
| elif method == 'cosyvoice': | |
| cosyvoice_tts(text, out_path, speaker_wav, target_language=lang) | |
| elif method == 'EdgeTTS': | |
| edge_tts(text, out_path, target_language=lang, voice=voice) | |
| elif method == 'Higgs': | |
| higgs_tts(text, out_path, speaker_wav, voice_type=voice, target_language=lang) | |
| else: | |
| raise ValueError(f"Unknown TTS method: {method}") | |
| # ----------------------- | |
| # Small I/O helper | |
| # ----------------------- | |
| def _atomic_write_json(path: str, obj): | |
| tmp = f"{path}.tmp" | |
| with open(tmp, 'w', encoding='utf-8') as f: | |
| json.dump(obj, f, indent=2, ensure_ascii=False) | |
| os.replace(tmp, path) | |
| # ----------------------- | |
| # Main per-folder synthesis | |
| # ----------------------- | |
| def generate_wavs(method: str, folder: str, target_language: str = "en", voice: str = 'zh-CN-XiaoxiaoNeural'): | |
| """ | |
| Generate per-line WAVs and the combined track for one video's folder. | |
| RETURNS (strictly two values): | |
| (combined_wav_path, original_audio_path) | |
| """ | |
| # Normalize & validate language for this backend (to code) | |
| lang_code = normalize_lang_to_code(target_language) | |
| supported = tts_support_languages.get(method, set()) | |
| if supported and lang_code not in supported: | |
| raise ValueError( | |
| f"TTS method '{method}' does not support target language '{target_language}' " | |
| f"(normalized code='{lang_code}')" | |
| ) | |
| transcript_path = os.path.join(folder, 'translation.json') | |
| if not os.path.exists(transcript_path): | |
| raise FileNotFoundError(f"translation.json not found in {folder}") | |
| with open(transcript_path, 'r', encoding='utf-8') as f: | |
| transcript = json.load(f) | |
| # Create output directory | |
| output_folder = os.path.join(folder, 'wavs') | |
| os.makedirs(output_folder, exist_ok=True) | |
| # Collect speakers (for info) | |
| speakers = {line.get('speaker', 'SPEAKER_00') for line in transcript} | |
| logger.info(f'[TTS] Found {len(speakers)} speakers') | |
| # Build combined wav via chunk list to avoid repeated reallocations | |
| chunks: list[np.ndarray] = [] | |
| current_time = 0.0 # in seconds | |
| for i, line in enumerate(transcript): | |
| speaker = line.get('speaker', 'SPEAKER_00') | |
| raw_text = (line.get('translation') or '').strip() | |
| if not raw_text: | |
| logger.warning(f'[TTS] Empty translation for line {i}, inserting silence.') | |
| text = "" | |
| else: | |
| text = preprocess_text(raw_text, lang_code) | |
| out_path = os.path.join(output_folder, f'{str(i).zfill(4)}.wav') | |
| speaker_wav = os.path.join(folder, 'SPEAKER', f'{speaker}.wav') | |
| # Optional idempotency: skip synthesis if file already exists & non-empty | |
| if not (os.path.exists(out_path) and os.path.getsize(out_path) > 1024): | |
| _synthesize_one_line(method, text, out_path, speaker_wav, lang_code, voice) | |
| # Desired slot timing from transcript | |
| start = float(line['start']) | |
| end = float(line['end']) | |
| length = max(0.0, end - start) | |
| # Pad any gap between current timeline and desired start | |
| if start > current_time: | |
| pad_len = int((start - current_time) * SR) | |
| if pad_len > 0: | |
| chunks.append(np.zeros((pad_len,), dtype=np.float32)) | |
| current_time = start | |
| # Avoid overlap with next line | |
| if i < len(transcript) - 1: | |
| next_start = float(transcript[i + 1]['start']) | |
| end = min(current_time + length, next_start) | |
| else: | |
| end = current_time + length | |
| # Stretch/crop synthesized line to fit the slot | |
| wav_seg, adj_len = adjust_audio_length(out_path, end - current_time, sample_rate=SR) | |
| chunks.append(wav_seg.astype(np.float32)) | |
| # Write back updated timing | |
| line['start'] = current_time | |
| line['end'] = current_time + adj_len | |
| current_time = line['end'] | |
| # Concatenate once | |
| full_wav = np.concatenate(chunks) if chunks else np.zeros(0, dtype=np.float32) | |
| # Match energy with original vocals | |
| vocal_path = os.path.join(folder, 'audio_vocals.wav') | |
| if os.path.exists(vocal_path): | |
| vocal_wav, _sr = librosa.load(vocal_path, sr=SR) | |
| peak_vocal = float(np.max(np.abs(vocal_wav))) if vocal_wav.size else 1.0 | |
| peak_tts = float(np.max(np.abs(full_wav))) if full_wav.size else 0.0 | |
| if peak_vocal > 0 and peak_tts > 0: | |
| full_wav = full_wav / (peak_tts + EPS) * peak_vocal | |
| # Save TTS-only track and write back timing updates | |
| tts_path = os.path.join(folder, 'audio_tts.wav') | |
| save_wav(full_wav, tts_path) | |
| _atomic_write_json(transcript_path, transcript) | |
| # Mix with instruments | |
| inst_path = os.path.join(folder, 'audio_instruments.wav') | |
| if os.path.exists(inst_path): | |
| instruments_wav, _sr = librosa.load(inst_path, sr=SR) | |
| else: | |
| instruments_wav = np.zeros_like(full_wav) | |
| # Length align | |
| len_full = len(full_wav) | |
| len_inst = len(instruments_wav) | |
| if len_full > len_inst: | |
| instruments_wav = np.pad(instruments_wav, (0, len_full - len_inst), mode='constant') | |
| elif len_inst > len_full: | |
| full_wav = np.pad(full_wav, (0, len_inst - len_full), mode='constant') | |
| combined = full_wav + instruments_wav | |
| combined_path = os.path.join(folder, 'audio_combined.wav') | |
| save_wav_norm(combined, combined_path) | |
| logger.info(f'[TTS] Generated {combined_path}') | |
| # Return strictly two values (EXPECTED by callers) | |
| return combined_path, os.path.join(folder, 'audio.wav') | |
| def generate_all_wavs_under_folder(root_folder: str, method: str, | |
| target_language: str = 'en', | |
| voice: str = 'zh-CN-XiaoxiaoNeural'): | |
| """ | |
| Walk `root_folder`, generate TTS where needed. | |
| RETURNS (strictly three values): | |
| (status_text, combined_wav_path_or_None, original_audio_path_or_None) | |
| """ | |
| wav_combined, wav_ori = None, None | |
| for root, dirs, files in os.walk(root_folder): | |
| if 'translation.json' in files and 'audio_combined.wav' not in files: | |
| wav_combined, wav_ori = generate_wavs(method, root, target_language, voice) | |
| elif 'audio_combined.wav' in files: | |
| wav_combined = os.path.join(root, 'audio_combined.wav') | |
| wav_ori = os.path.join(root, 'audio.wav') | |
| logger.info(f'[TTS] Wavs already generated in {root}') | |
| return f'Generated all wavs under {root_folder}', wav_combined, wav_ori | |
| if __name__ == '__main__': | |
| # Example quick test | |
| # folder = r'videos/ExampleUploader/20240805 Demo Video' | |
| # print(generate_wavs('xtts', folder)) | |
| pass | |