File size: 13,919 Bytes
09eaf7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# -*- coding: utf-8 -*-
"""
TTS synthesis pipeline (per-line => stitched track)
- Language-aware text preprocessing (Chinese-only normalizations gated by target language)
- Backend dispatch to XTTS / CosyVoice / EdgeTTS / Higgs
- Precise timing via time-stretch with bounds
- Deterministic language support checks with unified language codes
"""

import os
import re
import json
import librosa
import numpy as np
from functools import lru_cache
from loguru import logger

from .utils import save_wav, save_wav_norm
from .cn_tx import TextNorm
from audiostretchy.stretch import stretch_audio

# TTS backends (each must expose: tts(text, output_path, speaker_wav, ...))
from .step042_tts_xtts import tts as xtts_tts
from .step043_tts_cosyvoice import tts as cosyvoice_tts
from .step044_tts_edge_tts import tts as edge_tts
# NEW: Higgs/Boson TTS (OpenAI-compatible)
from .step041_tts_higgs import tts as higgs_tts  # ensure this file exists

# -----------------------
# Constants / globals
# -----------------------
SR = 24000
EPS = 1e-8  # tiny guard for divides
normalizer = TextNorm()

# Precompiled regexes
_RE_CAP_SPLIT = re.compile(r'(?<!^)([A-Z])')
_RE_ALNUM_GAP = re.compile(r'(?<=[a-zA-Z])(?=\d)|(?<=\d)(?=[a-zA-Z])')

# -----------------------
# Unified language normalization
#   Accepts labels or codes; returns canonical codes:
#   'zh-cn','zh-tw','en','ko','ja','es','fr','pl'
# -----------------------
_LANG_ALIASES = {
    # Simplified Chinese
    "zh-cn": "zh-cn", "zh_cn": "zh-cn", "cn": "zh-cn",
    "chinese (中文)": "zh-cn", "chinese": "zh-cn", "中文": "zh-cn",
    "simplified chinese (简体中文)": "zh-cn", "simplified chinese": "zh-cn", "简体中文": "zh-cn",

    # Traditional Chinese
    "zh-tw": "zh-tw", "zh_tw": "zh-tw", "tw": "zh-tw",
    "traditional chinese (繁体中文)": "zh-tw", "traditional chinese": "zh-tw", "繁体中文": "zh-tw",

    # English
    "en": "en", "english": "en",

    # Korean
    "ko": "ko", "korean": "ko", "한국어": "ko",

    # Japanese
    "ja": "ja", "japanese": "ja", "日本語": "ja",

    # Spanish
    "es": "es", "spanish": "es", "español": "es",

    # French
    "fr": "fr", "french": "fr", "français": "fr",

    # Polish (XTTS supports it)
    "pl": "pl", "polish": "pl",
}

_ALLOWED_CODES = {"zh-cn", "zh-tw", "en", "ko", "ja", "es", "fr", "pl"}

@lru_cache(maxsize=128)
def normalize_lang_to_code(lang: str) -> str:
    if not lang:
        raise ValueError("target_language is empty/None")
    key = str(lang).strip().lower()
    code = _LANG_ALIASES.get(key, key)
    if code not in _ALLOWED_CODES:
        raise ValueError(f"Unrecognized/unsupported language: {lang} -> {code}")
    return code

def is_chinese_code(code: str) -> bool:
    return code in ("zh-cn", "zh-tw")


# -----------------------
# Preprocessing
# -----------------------
@lru_cache(maxsize=4096)
def preprocess_text(text: str, target_lang_code: str) -> str:
    """
    Minimal, language-aware text normalization.
    Only apply Chinese-specific rules when target is Chinese (zh-cn/zh-tw).
    """
    t = text or ""

    if is_chinese_code(target_lang_code):
        t = t.replace('AI', '人工智能')            # legacy preference
        t = _RE_CAP_SPLIT.sub(r' \1', t)         # split camel-case-ish caps
        t = normalizer(t)                        # Chinese text normalization

    # Language-agnostic: space between letters and digits
    t = _RE_ALNUM_GAP.sub(' ', t)
    return t


# -----------------------
# Time & audio helpers
# -----------------------
def adjust_audio_length(
    wav_path: str,
    desired_length: float,
    sample_rate: int = SR,
    min_speed_factor: float = 0.5,
    max_speed_factor: float = 1.2
):
    """
    Load synthesized audio (wav or mp3), time-stretch to fit desired_length,
    then crop to the exact slot if needed. Returns (audio, new_length_sec).
    """
    # Load (fallback to .mp3 if needed)
    try:
        wav, sample_rate = librosa.load(wav_path, sr=sample_rate)
    except Exception:
        alt = wav_path.replace('.wav', '.mp3') if wav_path.endswith('.wav') else wav_path
        wav, sample_rate = librosa.load(alt, sr=sample_rate)

    current_length = len(wav) / max(sample_rate, 1)
    if current_length <= 1e-6 or desired_length <= 0:
        return np.zeros(0, dtype=np.float32), 0.0

    speed_factor = max(min(desired_length / (current_length + EPS), max_speed_factor), min_speed_factor)
    logger.info(f"[TTS] stretch ratio={speed_factor:.3f}")

    # output path for stretched version
    if wav_path.endswith('.wav'):
        target_path = wav_path.replace('.wav', '_adjusted.wav')
    elif wav_path.endswith('.mp3'):
        target_path = wav_path.replace('.mp3', '_adjusted.wav')
    else:
        target_path = wav_path + '_adjusted.wav'

    # stretch + reload
    stretch_audio(wav_path, target_path, ratio=speed_factor, sample_rate=sample_rate)
    wav, sample_rate = librosa.load(target_path, sr=sample_rate)

    new_len = min(desired_length, len(wav) / max(sample_rate, 1))
    return wav[:int(new_len * sample_rate)].astype(np.float32), new_len


# -----------------------
# Backend support map (codes)
# -----------------------
tts_support_languages = {
    # XTTS supports many; we keep a safe subset used in your project
    'xtts':      {'zh-cn', 'zh-tw', 'en', 'ja', 'ko', 'fr', 'pl', 'es'},
    # EdgeTTS: voices primarily determine exact locale, but these codes are fine as hints
    'EdgeTTS':   {'zh-cn', 'zh-tw', 'en', 'ja', 'ko', 'fr', 'es', 'pl'},
    # CosyVoice (common distributions): no Spanish/Polish typically
    'cosyvoice': {'zh-cn', 'zh-tw', 'en', 'ja', 'ko', 'fr'},
    # Higgs (per your notes): includes Spanish, French, etc.
    'Higgs':     {'zh-cn', 'zh-tw', 'en', 'ja', 'ko', 'fr', 'es'},
}

# If a backend needs a specific token instead of the unified code, adapt here.
_BACKEND_LANG_ADAPTER = {
    'xtts': {
        # XTTS is happy with codes as below (common TTS community convention)
        # Keeping identity mapping; override here if your xtts expects different tokens.
    },
    'EdgeTTS': {
        # EdgeTTS typically uses the voice to pick locale, but we pass the code for completeness.
        # Identity mapping is fine; voice wins in Edge backend.
    },
    'cosyvoice': {
        # Identity for supported codes; Cantonese not used here.
    },
    'Higgs': {
        # Higgs/OpenAI-compatible endpoints are fine with ISO-ish codes per your prior usage.
    }
}

def _adapt_lang_for_backend(method: str, code: str) -> str:
    # If adapter table has a mapping, use it; otherwise default to the code itself.
    table = _BACKEND_LANG_ADAPTER.get(method, {})
    return table.get(code, code)


# -----------------------
# Backend dispatcher
# -----------------------
def _synthesize_one_line(method: str, text: str, out_path: str, speaker_wav: str,
                         target_lang_code: str, voice: str):
    """
    Dispatch to the selected backend. Backends write WAV to out_path.
    target_lang_code is one of: 'zh-cn','zh-tw','en','ko','ja','es','fr','pl'
    """
    lang = _adapt_lang_for_backend(method, target_lang_code)

    if method == 'xtts':
        xtts_tts(text, out_path, speaker_wav, target_language=lang)
    elif method == 'cosyvoice':
        cosyvoice_tts(text, out_path, speaker_wav, target_language=lang)
    elif method == 'EdgeTTS':
        edge_tts(text, out_path, target_language=lang, voice=voice)
    elif method == 'Higgs':
        higgs_tts(text, out_path, speaker_wav, voice_type=voice, target_language=lang)
    else:
        raise ValueError(f"Unknown TTS method: {method}")


# -----------------------
# Small I/O helper
# -----------------------
def _atomic_write_json(path: str, obj):
    tmp = f"{path}.tmp"
    with open(tmp, 'w', encoding='utf-8') as f:
        json.dump(obj, f, indent=2, ensure_ascii=False)
    os.replace(tmp, path)


# -----------------------
# Main per-folder synthesis
# -----------------------
def generate_wavs(method: str, folder: str, target_language: str = "en", voice: str = 'zh-CN-XiaoxiaoNeural'):
    """
    Generate per-line WAVs and the combined track for one video's folder.

    RETURNS (strictly two values):
        (combined_wav_path, original_audio_path)
    """
    # Normalize & validate language for this backend (to code)
    lang_code = normalize_lang_to_code(target_language)
    supported = tts_support_languages.get(method, set())
    if supported and lang_code not in supported:
        raise ValueError(
            f"TTS method '{method}' does not support target language '{target_language}' "
            f"(normalized code='{lang_code}')"
        )

    transcript_path = os.path.join(folder, 'translation.json')
    if not os.path.exists(transcript_path):
        raise FileNotFoundError(f"translation.json not found in {folder}")

    with open(transcript_path, 'r', encoding='utf-8') as f:
        transcript = json.load(f)

    # Create output directory
    output_folder = os.path.join(folder, 'wavs')
    os.makedirs(output_folder, exist_ok=True)

    # Collect speakers (for info)
    speakers = {line.get('speaker', 'SPEAKER_00') for line in transcript}
    logger.info(f'[TTS] Found {len(speakers)} speakers')

    # Build combined wav via chunk list to avoid repeated reallocations
    chunks: list[np.ndarray] = []
    current_time = 0.0  # in seconds

    for i, line in enumerate(transcript):
        speaker = line.get('speaker', 'SPEAKER_00')
        raw_text = (line.get('translation') or '').strip()

        if not raw_text:
            logger.warning(f'[TTS] Empty translation for line {i}, inserting silence.')
            text = ""
        else:
            text = preprocess_text(raw_text, lang_code)

        out_path = os.path.join(output_folder, f'{str(i).zfill(4)}.wav')
        speaker_wav = os.path.join(folder, 'SPEAKER', f'{speaker}.wav')

        # Optional idempotency: skip synthesis if file already exists & non-empty
        if not (os.path.exists(out_path) and os.path.getsize(out_path) > 1024):
            _synthesize_one_line(method, text, out_path, speaker_wav, lang_code, voice)

        # Desired slot timing from transcript
        start = float(line['start'])
        end = float(line['end'])
        length = max(0.0, end - start)

        # Pad any gap between current timeline and desired start
        if start > current_time:
            pad_len = int((start - current_time) * SR)
            if pad_len > 0:
                chunks.append(np.zeros((pad_len,), dtype=np.float32))
                current_time = start

        # Avoid overlap with next line
        if i < len(transcript) - 1:
            next_start = float(transcript[i + 1]['start'])
            end = min(current_time + length, next_start)
        else:
            end = current_time + length

        # Stretch/crop synthesized line to fit the slot
        wav_seg, adj_len = adjust_audio_length(out_path, end - current_time, sample_rate=SR)
        chunks.append(wav_seg.astype(np.float32))

        # Write back updated timing
        line['start'] = current_time
        line['end'] = current_time + adj_len
        current_time = line['end']

    # Concatenate once
    full_wav = np.concatenate(chunks) if chunks else np.zeros(0, dtype=np.float32)

    # Match energy with original vocals
    vocal_path = os.path.join(folder, 'audio_vocals.wav')
    if os.path.exists(vocal_path):
        vocal_wav, _sr = librosa.load(vocal_path, sr=SR)
        peak_vocal = float(np.max(np.abs(vocal_wav))) if vocal_wav.size else 1.0
        peak_tts = float(np.max(np.abs(full_wav))) if full_wav.size else 0.0
        if peak_vocal > 0 and peak_tts > 0:
            full_wav = full_wav / (peak_tts + EPS) * peak_vocal

    # Save TTS-only track and write back timing updates
    tts_path = os.path.join(folder, 'audio_tts.wav')
    save_wav(full_wav, tts_path)
    _atomic_write_json(transcript_path, transcript)

    # Mix with instruments
    inst_path = os.path.join(folder, 'audio_instruments.wav')
    if os.path.exists(inst_path):
        instruments_wav, _sr = librosa.load(inst_path, sr=SR)
    else:
        instruments_wav = np.zeros_like(full_wav)

    # Length align
    len_full = len(full_wav)
    len_inst = len(instruments_wav)
    if len_full > len_inst:
        instruments_wav = np.pad(instruments_wav, (0, len_full - len_inst), mode='constant')
    elif len_inst > len_full:
        full_wav = np.pad(full_wav, (0, len_inst - len_full), mode='constant')

    combined = full_wav + instruments_wav
    combined_path = os.path.join(folder, 'audio_combined.wav')
    save_wav_norm(combined, combined_path)
    logger.info(f'[TTS] Generated {combined_path}')

    # Return strictly two values (EXPECTED by callers)
    return combined_path, os.path.join(folder, 'audio.wav')


def generate_all_wavs_under_folder(root_folder: str, method: str,
                                   target_language: str = 'en',
                                   voice: str = 'zh-CN-XiaoxiaoNeural'):
    """
    Walk `root_folder`, generate TTS where needed.

    RETURNS (strictly three values):
        (status_text, combined_wav_path_or_None, original_audio_path_or_None)
    """
    wav_combined, wav_ori = None, None
    for root, dirs, files in os.walk(root_folder):
        if 'translation.json' in files and 'audio_combined.wav' not in files:
            wav_combined, wav_ori = generate_wavs(method, root, target_language, voice)
        elif 'audio_combined.wav' in files:
            wav_combined = os.path.join(root, 'audio_combined.wav')
            wav_ori = os.path.join(root, 'audio.wav')
            logger.info(f'[TTS] Wavs already generated in {root}')

    return f'Generated all wavs under {root_folder}', wav_combined, wav_ori


if __name__ == '__main__':
    # Example quick test
    # folder = r'videos/ExampleUploader/20240805 Demo Video'
    # print(generate_wavs('xtts', folder))
    pass