Spaces:
Build error
Build error
File size: 13,919 Bytes
09eaf7c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 |
# -*- coding: utf-8 -*-
"""
TTS synthesis pipeline (per-line => stitched track)
- Language-aware text preprocessing (Chinese-only normalizations gated by target language)
- Backend dispatch to XTTS / CosyVoice / EdgeTTS / Higgs
- Precise timing via time-stretch with bounds
- Deterministic language support checks with unified language codes
"""
import os
import re
import json
import librosa
import numpy as np
from functools import lru_cache
from loguru import logger
from .utils import save_wav, save_wav_norm
from .cn_tx import TextNorm
from audiostretchy.stretch import stretch_audio
# TTS backends (each must expose: tts(text, output_path, speaker_wav, ...))
from .step042_tts_xtts import tts as xtts_tts
from .step043_tts_cosyvoice import tts as cosyvoice_tts
from .step044_tts_edge_tts import tts as edge_tts
# NEW: Higgs/Boson TTS (OpenAI-compatible)
from .step041_tts_higgs import tts as higgs_tts # ensure this file exists
# -----------------------
# Constants / globals
# -----------------------
SR = 24000
EPS = 1e-8 # tiny guard for divides
normalizer = TextNorm()
# Precompiled regexes
_RE_CAP_SPLIT = re.compile(r'(?<!^)([A-Z])')
_RE_ALNUM_GAP = re.compile(r'(?<=[a-zA-Z])(?=\d)|(?<=\d)(?=[a-zA-Z])')
# -----------------------
# Unified language normalization
# Accepts labels or codes; returns canonical codes:
# 'zh-cn','zh-tw','en','ko','ja','es','fr','pl'
# -----------------------
_LANG_ALIASES = {
# Simplified Chinese
"zh-cn": "zh-cn", "zh_cn": "zh-cn", "cn": "zh-cn",
"chinese (中文)": "zh-cn", "chinese": "zh-cn", "中文": "zh-cn",
"simplified chinese (简体中文)": "zh-cn", "simplified chinese": "zh-cn", "简体中文": "zh-cn",
# Traditional Chinese
"zh-tw": "zh-tw", "zh_tw": "zh-tw", "tw": "zh-tw",
"traditional chinese (繁体中文)": "zh-tw", "traditional chinese": "zh-tw", "繁体中文": "zh-tw",
# English
"en": "en", "english": "en",
# Korean
"ko": "ko", "korean": "ko", "한국어": "ko",
# Japanese
"ja": "ja", "japanese": "ja", "日本語": "ja",
# Spanish
"es": "es", "spanish": "es", "español": "es",
# French
"fr": "fr", "french": "fr", "français": "fr",
# Polish (XTTS supports it)
"pl": "pl", "polish": "pl",
}
_ALLOWED_CODES = {"zh-cn", "zh-tw", "en", "ko", "ja", "es", "fr", "pl"}
@lru_cache(maxsize=128)
def normalize_lang_to_code(lang: str) -> str:
if not lang:
raise ValueError("target_language is empty/None")
key = str(lang).strip().lower()
code = _LANG_ALIASES.get(key, key)
if code not in _ALLOWED_CODES:
raise ValueError(f"Unrecognized/unsupported language: {lang} -> {code}")
return code
def is_chinese_code(code: str) -> bool:
return code in ("zh-cn", "zh-tw")
# -----------------------
# Preprocessing
# -----------------------
@lru_cache(maxsize=4096)
def preprocess_text(text: str, target_lang_code: str) -> str:
"""
Minimal, language-aware text normalization.
Only apply Chinese-specific rules when target is Chinese (zh-cn/zh-tw).
"""
t = text or ""
if is_chinese_code(target_lang_code):
t = t.replace('AI', '人工智能') # legacy preference
t = _RE_CAP_SPLIT.sub(r' \1', t) # split camel-case-ish caps
t = normalizer(t) # Chinese text normalization
# Language-agnostic: space between letters and digits
t = _RE_ALNUM_GAP.sub(' ', t)
return t
# -----------------------
# Time & audio helpers
# -----------------------
def adjust_audio_length(
wav_path: str,
desired_length: float,
sample_rate: int = SR,
min_speed_factor: float = 0.5,
max_speed_factor: float = 1.2
):
"""
Load synthesized audio (wav or mp3), time-stretch to fit desired_length,
then crop to the exact slot if needed. Returns (audio, new_length_sec).
"""
# Load (fallback to .mp3 if needed)
try:
wav, sample_rate = librosa.load(wav_path, sr=sample_rate)
except Exception:
alt = wav_path.replace('.wav', '.mp3') if wav_path.endswith('.wav') else wav_path
wav, sample_rate = librosa.load(alt, sr=sample_rate)
current_length = len(wav) / max(sample_rate, 1)
if current_length <= 1e-6 or desired_length <= 0:
return np.zeros(0, dtype=np.float32), 0.0
speed_factor = max(min(desired_length / (current_length + EPS), max_speed_factor), min_speed_factor)
logger.info(f"[TTS] stretch ratio={speed_factor:.3f}")
# output path for stretched version
if wav_path.endswith('.wav'):
target_path = wav_path.replace('.wav', '_adjusted.wav')
elif wav_path.endswith('.mp3'):
target_path = wav_path.replace('.mp3', '_adjusted.wav')
else:
target_path = wav_path + '_adjusted.wav'
# stretch + reload
stretch_audio(wav_path, target_path, ratio=speed_factor, sample_rate=sample_rate)
wav, sample_rate = librosa.load(target_path, sr=sample_rate)
new_len = min(desired_length, len(wav) / max(sample_rate, 1))
return wav[:int(new_len * sample_rate)].astype(np.float32), new_len
# -----------------------
# Backend support map (codes)
# -----------------------
tts_support_languages = {
# XTTS supports many; we keep a safe subset used in your project
'xtts': {'zh-cn', 'zh-tw', 'en', 'ja', 'ko', 'fr', 'pl', 'es'},
# EdgeTTS: voices primarily determine exact locale, but these codes are fine as hints
'EdgeTTS': {'zh-cn', 'zh-tw', 'en', 'ja', 'ko', 'fr', 'es', 'pl'},
# CosyVoice (common distributions): no Spanish/Polish typically
'cosyvoice': {'zh-cn', 'zh-tw', 'en', 'ja', 'ko', 'fr'},
# Higgs (per your notes): includes Spanish, French, etc.
'Higgs': {'zh-cn', 'zh-tw', 'en', 'ja', 'ko', 'fr', 'es'},
}
# If a backend needs a specific token instead of the unified code, adapt here.
_BACKEND_LANG_ADAPTER = {
'xtts': {
# XTTS is happy with codes as below (common TTS community convention)
# Keeping identity mapping; override here if your xtts expects different tokens.
},
'EdgeTTS': {
# EdgeTTS typically uses the voice to pick locale, but we pass the code for completeness.
# Identity mapping is fine; voice wins in Edge backend.
},
'cosyvoice': {
# Identity for supported codes; Cantonese not used here.
},
'Higgs': {
# Higgs/OpenAI-compatible endpoints are fine with ISO-ish codes per your prior usage.
}
}
def _adapt_lang_for_backend(method: str, code: str) -> str:
# If adapter table has a mapping, use it; otherwise default to the code itself.
table = _BACKEND_LANG_ADAPTER.get(method, {})
return table.get(code, code)
# -----------------------
# Backend dispatcher
# -----------------------
def _synthesize_one_line(method: str, text: str, out_path: str, speaker_wav: str,
target_lang_code: str, voice: str):
"""
Dispatch to the selected backend. Backends write WAV to out_path.
target_lang_code is one of: 'zh-cn','zh-tw','en','ko','ja','es','fr','pl'
"""
lang = _adapt_lang_for_backend(method, target_lang_code)
if method == 'xtts':
xtts_tts(text, out_path, speaker_wav, target_language=lang)
elif method == 'cosyvoice':
cosyvoice_tts(text, out_path, speaker_wav, target_language=lang)
elif method == 'EdgeTTS':
edge_tts(text, out_path, target_language=lang, voice=voice)
elif method == 'Higgs':
higgs_tts(text, out_path, speaker_wav, voice_type=voice, target_language=lang)
else:
raise ValueError(f"Unknown TTS method: {method}")
# -----------------------
# Small I/O helper
# -----------------------
def _atomic_write_json(path: str, obj):
tmp = f"{path}.tmp"
with open(tmp, 'w', encoding='utf-8') as f:
json.dump(obj, f, indent=2, ensure_ascii=False)
os.replace(tmp, path)
# -----------------------
# Main per-folder synthesis
# -----------------------
def generate_wavs(method: str, folder: str, target_language: str = "en", voice: str = 'zh-CN-XiaoxiaoNeural'):
"""
Generate per-line WAVs and the combined track for one video's folder.
RETURNS (strictly two values):
(combined_wav_path, original_audio_path)
"""
# Normalize & validate language for this backend (to code)
lang_code = normalize_lang_to_code(target_language)
supported = tts_support_languages.get(method, set())
if supported and lang_code not in supported:
raise ValueError(
f"TTS method '{method}' does not support target language '{target_language}' "
f"(normalized code='{lang_code}')"
)
transcript_path = os.path.join(folder, 'translation.json')
if not os.path.exists(transcript_path):
raise FileNotFoundError(f"translation.json not found in {folder}")
with open(transcript_path, 'r', encoding='utf-8') as f:
transcript = json.load(f)
# Create output directory
output_folder = os.path.join(folder, 'wavs')
os.makedirs(output_folder, exist_ok=True)
# Collect speakers (for info)
speakers = {line.get('speaker', 'SPEAKER_00') for line in transcript}
logger.info(f'[TTS] Found {len(speakers)} speakers')
# Build combined wav via chunk list to avoid repeated reallocations
chunks: list[np.ndarray] = []
current_time = 0.0 # in seconds
for i, line in enumerate(transcript):
speaker = line.get('speaker', 'SPEAKER_00')
raw_text = (line.get('translation') or '').strip()
if not raw_text:
logger.warning(f'[TTS] Empty translation for line {i}, inserting silence.')
text = ""
else:
text = preprocess_text(raw_text, lang_code)
out_path = os.path.join(output_folder, f'{str(i).zfill(4)}.wav')
speaker_wav = os.path.join(folder, 'SPEAKER', f'{speaker}.wav')
# Optional idempotency: skip synthesis if file already exists & non-empty
if not (os.path.exists(out_path) and os.path.getsize(out_path) > 1024):
_synthesize_one_line(method, text, out_path, speaker_wav, lang_code, voice)
# Desired slot timing from transcript
start = float(line['start'])
end = float(line['end'])
length = max(0.0, end - start)
# Pad any gap between current timeline and desired start
if start > current_time:
pad_len = int((start - current_time) * SR)
if pad_len > 0:
chunks.append(np.zeros((pad_len,), dtype=np.float32))
current_time = start
# Avoid overlap with next line
if i < len(transcript) - 1:
next_start = float(transcript[i + 1]['start'])
end = min(current_time + length, next_start)
else:
end = current_time + length
# Stretch/crop synthesized line to fit the slot
wav_seg, adj_len = adjust_audio_length(out_path, end - current_time, sample_rate=SR)
chunks.append(wav_seg.astype(np.float32))
# Write back updated timing
line['start'] = current_time
line['end'] = current_time + adj_len
current_time = line['end']
# Concatenate once
full_wav = np.concatenate(chunks) if chunks else np.zeros(0, dtype=np.float32)
# Match energy with original vocals
vocal_path = os.path.join(folder, 'audio_vocals.wav')
if os.path.exists(vocal_path):
vocal_wav, _sr = librosa.load(vocal_path, sr=SR)
peak_vocal = float(np.max(np.abs(vocal_wav))) if vocal_wav.size else 1.0
peak_tts = float(np.max(np.abs(full_wav))) if full_wav.size else 0.0
if peak_vocal > 0 and peak_tts > 0:
full_wav = full_wav / (peak_tts + EPS) * peak_vocal
# Save TTS-only track and write back timing updates
tts_path = os.path.join(folder, 'audio_tts.wav')
save_wav(full_wav, tts_path)
_atomic_write_json(transcript_path, transcript)
# Mix with instruments
inst_path = os.path.join(folder, 'audio_instruments.wav')
if os.path.exists(inst_path):
instruments_wav, _sr = librosa.load(inst_path, sr=SR)
else:
instruments_wav = np.zeros_like(full_wav)
# Length align
len_full = len(full_wav)
len_inst = len(instruments_wav)
if len_full > len_inst:
instruments_wav = np.pad(instruments_wav, (0, len_full - len_inst), mode='constant')
elif len_inst > len_full:
full_wav = np.pad(full_wav, (0, len_inst - len_full), mode='constant')
combined = full_wav + instruments_wav
combined_path = os.path.join(folder, 'audio_combined.wav')
save_wav_norm(combined, combined_path)
logger.info(f'[TTS] Generated {combined_path}')
# Return strictly two values (EXPECTED by callers)
return combined_path, os.path.join(folder, 'audio.wav')
def generate_all_wavs_under_folder(root_folder: str, method: str,
target_language: str = 'en',
voice: str = 'zh-CN-XiaoxiaoNeural'):
"""
Walk `root_folder`, generate TTS where needed.
RETURNS (strictly three values):
(status_text, combined_wav_path_or_None, original_audio_path_or_None)
"""
wav_combined, wav_ori = None, None
for root, dirs, files in os.walk(root_folder):
if 'translation.json' in files and 'audio_combined.wav' not in files:
wav_combined, wav_ori = generate_wavs(method, root, target_language, voice)
elif 'audio_combined.wav' in files:
wav_combined = os.path.join(root, 'audio_combined.wav')
wav_ori = os.path.join(root, 'audio.wav')
logger.info(f'[TTS] Wavs already generated in {root}')
return f'Generated all wavs under {root_folder}', wav_combined, wav_ori
if __name__ == '__main__':
# Example quick test
# folder = r'videos/ExampleUploader/20240805 Demo Video'
# print(generate_wavs('xtts', folder))
pass
|