Spaces:
Runtime error
Runtime error
import pygame | |
import tempfile | |
import uuid | |
import threading | |
import asyncio | |
from pathlib import Path | |
from TTS.api import TTS | |
from gtts import gTTS | |
import edge_tts | |
import logging | |
import time | |
import os | |
class TTSUtils: | |
AVAILABLE_MODELS = { | |
'EDGE': { | |
'name': "es-MX-JorgeNeural", | |
'description': "Voz masculina de Microsoft Edge (MX)", | |
'type': 'edge', | |
'rate': '+25%' | |
}, | |
'EDGE_ES': { | |
'name': "es-ES-AlvaroNeural", | |
'description': "Voz masculina de Microsoft Edge (ES)", | |
'type': 'edge', | |
'rate': '+25%' | |
}, | |
'VITS': { | |
'name': "tts_models/es/css10/vits", | |
'description': "Voz masculina de VITS (ES)", | |
'type': 'local', | |
'config': { | |
'speed': 1.25, | |
'model_path': "tts_models/es/css10/vits" | |
} | |
} | |
} | |
def __init__(self, model_name='EDGE', elevenlabs_api_key=None): | |
"""Inicializa el motor TTS""" | |
self.is_speaking = False | |
self.should_stop = False | |
self.temp_dir = Path(tempfile.gettempdir()) / "chatbot_audio" | |
self.temp_dir.mkdir(exist_ok=True) | |
self.tts = None | |
self.audio_initialized = False | |
self.current_model = model_name | |
print(f"Inicializando TTS con modelo: {model_name}") | |
try: | |
if pygame.mixer.get_init(): | |
pygame.mixer.quit() | |
pygame.mixer.init(frequency=16000, size=-16, channels=1, buffer=2048) | |
pygame.mixer.music.set_volume(0.8) | |
self.audio_initialized = True | |
print("Audio inicializado correctamente") | |
except Exception as e: | |
print(f"Error inicializando audio: {str(e)}") | |
self.audio_initialized = False | |
self.play_lock = threading.Lock() | |
self.clock = pygame.time.Clock() | |
self.init_audio() | |
# Limpiar archivos temporales antiguos | |
self._cleanup_old_files() | |
def _cleanup_old_files(self, max_age_hours=1): | |
"""Limpia archivos temporales antiguos""" | |
try: | |
current_time = time.time() | |
for file in self.temp_dir.glob("*"): | |
if file.is_file(): | |
file_age = current_time - file.stat().st_mtime | |
if file_age > max_age_hours * 3600: # Convertir horas a segundos | |
try: | |
file.unlink() | |
except: | |
pass | |
except Exception as e: | |
print(f"Error limpiando archivos temporales: {e}") | |
def _verify_audio_system(self): | |
"""Verifica el estado del sistema de audio""" | |
if not self.audio_initialized or not pygame.mixer.get_init(): | |
try: | |
if pygame.mixer.get_init(): | |
pygame.mixer.quit() | |
pygame.mixer.init(frequency=44100, size=-16, channels=2, buffer=4096) | |
pygame.mixer.music.set_volume(1.0) | |
self.audio_initialized = True | |
return True | |
except Exception as e: | |
print(f"Error reinicializando audio: {e}") | |
return False | |
return True | |
def init_audio(self): | |
"""Inicializa el modelo TTS seleccionado""" | |
try: | |
if self.current_model == 'VITS': | |
model_info = self.AVAILABLE_MODELS[self.current_model] | |
print(f"Cargando modelo VITS: {model_info['name']}") | |
try: | |
self.tts = TTS(model_name=model_info['name']) | |
if hasattr(self.tts, 'synthesizer') and hasattr(self.tts.synthesizer, 'tts_config'): | |
self.tts.synthesizer.tts_config.update(model_info['config']) | |
print("Modelo VITS cargado correctamente") | |
return True | |
else: | |
print("Error: El modelo VITS no tiene la estructura esperada") | |
self.current_model = 'EDGE' # Fallback a Edge si hay error | |
return False | |
except Exception as vits_error: | |
print(f"Error cargando modelo VITS: {vits_error}") | |
self.current_model = 'EDGE' # Fallback a Edge si hay error | |
return False | |
return True | |
except Exception as e: | |
print(f"Error inicializando audio: {e}") | |
self.current_model = 'EDGE' # Fallback a Edge si hay error | |
return False | |
def _number_to_words(self, number): | |
"""Convierte un n煤mero a palabras en espa帽ol""" | |
UNITS = ['', 'uno', 'dos', 'tres', 'cuatro', 'cinco', 'seis', 'siete', 'ocho', 'nueve'] | |
TENS = ['', 'diez', 'veinte', 'treinta', 'cuarenta', 'cincuenta', 'sesenta', 'setenta', 'ochenta', 'noventa'] | |
TEENS = ['diez', 'once', 'doce', 'trece', 'catorce', 'quince', 'diecis茅is', 'diecisiete', 'dieciocho', 'diecinueve'] | |
try: | |
num = int(number) | |
if num == 0: | |
return 'cero' | |
elif num < 0: | |
return f"menos {self._number_to_words(abs(num))}" | |
elif num < 10: | |
return UNITS[num] | |
elif num < 20: | |
return TEENS[num - 10] | |
elif num < 100: | |
tens = num // 10 | |
units = num % 10 | |
if units == 0: | |
return TENS[tens] | |
else: | |
return f"{TENS[tens]} y {UNITS[units]}" | |
else: | |
return str(num) # Para n煤meros mayores a 99, mantener d铆gitos | |
except: | |
return number # Si hay error, devolver el n煤mero original | |
def _clean_text(self, text): | |
"""Limpia el texto de caracteres especiales antes de la s铆ntesis""" | |
if not text: | |
return text | |
# Reemplazar asteriscos y otros caracteres especiales | |
replacements = { | |
'*': '', | |
'#': '', | |
'`': '', | |
'~': '', | |
'|': '', | |
'>': '', | |
'<': '', | |
'\\': '', | |
'&': 'y', | |
'_': ' ', | |
'...': ',', | |
'..': ',', | |
'---': ',', | |
'--': ',', | |
'%': ' por ciento', | |
'$': ' pesos', | |
'=': ' igual a ', | |
'+': ' m谩s ', | |
'@': ' arroba ', | |
} | |
cleaned_text = text | |
for char, replacement in replacements.items(): | |
cleaned_text = cleaned_text.replace(char, replacement) | |
# Convertir n煤meros a palabras | |
words = [] | |
for word in cleaned_text.split(): | |
# Verificar si es un n煤mero (entero o decimal) | |
if word.replace('.', '').replace('-', '').isdigit(): | |
# Si es decimal | |
if '.' in word: | |
parts = word.split('.') | |
if len(parts) == 2: | |
integer_part = self._number_to_words(parts[0]) | |
decimal_part = self._number_to_words(parts[1]) | |
words.append(f"{integer_part} punto {decimal_part}") | |
else: | |
words.append(word) | |
else: | |
words.append(self._number_to_words(word)) | |
else: | |
words.append(word) | |
cleaned_text = ' '.join(words) | |
# Eliminar espacios m煤ltiples | |
cleaned_text = ' '.join(cleaned_text.split()) | |
return cleaned_text | |
def text_to_speech(self, text, save_path=None): | |
"""Genera audio a partir de texto con verificaciones mejoradas""" | |
if not text: | |
return None | |
# Limpiar el texto antes de procesarlo | |
text = self._clean_text(text) | |
if not text: | |
return None | |
if not self._verify_audio_system(): | |
print("Sistema de audio no disponible") | |
return None | |
try: | |
temp_file = save_path or str(self.temp_dir / f"{uuid.uuid4()}.mp3") | |
print(f"Generando audio para modelo: {self.current_model}") | |
# Verificar modelo actual y generar audio | |
try: | |
if self.current_model == 'VITS': | |
print("Usando modelo VITS") | |
if not self.tts: | |
print("Inicializando modelo VITS...") | |
if not self.init_audio(): | |
print("Fallback a Edge debido a error en inicializaci贸n de VITS") | |
return self.fallback_to_edge(text, temp_file) | |
try: | |
self.tts.tts_to_file( | |
text=text, | |
file_path=temp_file, | |
speed=self.AVAILABLE_MODELS['VITS']['config']['speed'] | |
) | |
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0: | |
print(f"Audio generado correctamente con VITS: {os.path.getsize(temp_file)} bytes") | |
return temp_file | |
raise Exception("Archivo de audio VITS inv谩lido") | |
except Exception as vits_error: | |
print(f"Error generando audio con VITS: {vits_error}") | |
return self.fallback_to_edge(text, temp_file) | |
elif self.current_model in ['EDGE', 'EDGE_ES']: | |
return self.fallback_to_edge(text, temp_file) | |
except Exception as primary_error: | |
print(f"Error con el modelo primario {self.current_model}: {primary_error}") | |
return self.fallback_to_gtts(text, temp_file) | |
return temp_file | |
except Exception as e: | |
print(f"Error en text_to_speech: {e}") | |
return None | |
finally: | |
self._cleanup_old_files() | |
def fallback_to_edge(self, text, temp_file): | |
"""M茅todo de respaldo usando Edge TTS""" | |
try: | |
voice = self.AVAILABLE_MODELS['EDGE']['name'] | |
print(f"Usando voz Edge como respaldo: {voice}") | |
for attempt in range(3): | |
try: | |
async def tts_with_timeout(): | |
return await asyncio.wait_for( | |
self.edge_tts_speak(text, voice, temp_file), | |
timeout=15.0 | |
) | |
asyncio.run(tts_with_timeout()) | |
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0: | |
print(f"Audio generado correctamente con Edge: {os.path.getsize(temp_file)} bytes") | |
return temp_file | |
raise Exception("Archivo de audio Edge inv谩lido") | |
except Exception as e: | |
print(f"Intento {attempt + 1} fallido con Edge: {e}") | |
if attempt == 2: | |
return self.fallback_to_gtts(text, temp_file) | |
time.sleep(2 ** attempt) | |
except Exception as edge_error: | |
print(f"Error con Edge TTS: {edge_error}") | |
return self.fallback_to_gtts(text, temp_file) | |
def fallback_to_gtts(self, text, temp_file): | |
"""M茅todo final de respaldo usando gTTS""" | |
print("Usando gTTS como 煤ltimo respaldo") | |
try: | |
tts = gTTS(text=text, lang='es', slow=False) | |
temp_normal = str(self.temp_dir / f"temp_normal_{uuid.uuid4()}.mp3") | |
tts.save(temp_normal) | |
import ffmpeg | |
stream = ffmpeg.input(temp_normal) | |
stream = ffmpeg.output(stream, temp_file, acodec='libmp3lame', atempo=1.25) | |
ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True) | |
try: | |
os.remove(temp_normal) | |
except: | |
pass | |
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0: | |
print(f"Audio generado correctamente con gTTS: {os.path.getsize(temp_file)} bytes") | |
return temp_file | |
raise Exception("Archivo de audio gTTS inv谩lido") | |
except Exception as gtts_error: | |
print(f"Error con gTTS: {gtts_error}") | |
return None | |
async def edge_tts_speak(self, text, voice, output_file): | |
"""Genera audio usando edge-tts""" | |
try: | |
print(f"Generando audio con voz: {voice}") | |
rate = self.AVAILABLE_MODELS[self.current_model]['rate'] | |
print(f"Usando rate: {rate}") | |
communicate = edge_tts.Communicate(text, voice, rate=rate) | |
await communicate.save(str(output_file)) | |
print(f"Audio generado correctamente con {voice}") | |
return True | |
except Exception as e: | |
print(f"Error generando audio con edge-tts: {e}") | |
raise | |
def stop_speaking(self): | |
"""Detiene la reproducci贸n actual""" | |
if self.is_speaking: | |
try: | |
self.should_stop = True | |
pygame.mixer.music.stop() | |
pygame.mixer.music.unload() | |
self.is_speaking = False | |
print("Reproducci贸n detenida por interrupci贸n") | |
except Exception as e: | |
print(f"Error al detener el audio: {e}") | |
finally: | |
self.is_speaking = False | |
self.should_stop = False | |
def change_model(self, model_name): | |
"""Cambia el modelo TTS actual""" | |
if model_name not in self.AVAILABLE_MODELS: | |
print(f"Modelo {model_name} no disponible") | |
return False | |
try: | |
print(f"Cambiando a modelo {model_name}...") | |
self.current_model = model_name | |
self.init_audio() | |
return True | |
except Exception as e: | |
print(f"Error cambiando modelo: {e}") | |
return False | |
def is_currently_speaking(self): | |
"""Verifica si hay audio reproduci茅ndose""" | |
return self.is_speaking | |
def create_audio_file(self, text, output_file): | |
"""Crea un archivo de audio permanente""" | |
try: | |
if 'EDGE' in self.current_model: | |
voice = self.AVAILABLE_MODELS[self.current_model]['name'] | |
async def tts_with_timeout(): | |
return await asyncio.wait_for( | |
self.edge_tts_speak(text, voice, output_file), | |
timeout=5.0 | |
) | |
asyncio.run(tts_with_timeout()) | |
elif self.current_model == 'gTTS': | |
tts = gTTS(text=text, lang='es', slow=False) | |
tts.save(str(output_file)) | |
else: # VITS | |
self.tts.tts_to_file( | |
text=text, | |
file_path=str(output_file), | |
speaker_wav=None, | |
split_sentences=False | |
) | |
return str(output_file) | |
except Exception as e: | |
print(f"Error creando archivo de audio: {e}") | |
return None | |
def play_audio(self, file_path): | |
"""Reproduce un archivo de audio con verificaciones mejoradas y escucha activa""" | |
if not self._verify_audio_system(): | |
raise Exception("Sistema de audio no disponible") | |
try: | |
if not Path(file_path).exists(): | |
raise FileNotFoundError(f"Archivo no encontrado: {file_path}") | |
if not Path(file_path).stat().st_size > 0: | |
raise ValueError("Archivo de audio vac铆o o corrupto") | |
with self.play_lock: | |
if self.is_speaking: | |
self.stop_speaking() | |
self.is_speaking = True | |
pygame.mixer.music.load(file_path) | |
pygame.mixer.music.play() | |
# Notificar al detector de voz que estamos reproduciendo | |
if hasattr(self, 'voice_detector'): | |
self.voice_detector.update_last_audio_output() | |
# Mantener la escucha activa pero con umbral m谩s alto | |
self.voice_detector.set_high_threshold_mode(True) | |
# Configurar callback para cuando termine la reproducci贸n | |
def on_music_end(): | |
self.is_speaking = False | |
self.should_stop = False | |
# Restaurar umbral normal de escucha | |
if hasattr(self, 'voice_detector'): | |
self.voice_detector.set_high_threshold_mode(False) | |
pygame.mixer.music.set_endevent(pygame.USEREVENT) | |
pygame.event.set_allowed(pygame.USEREVENT) | |
# Esperar a que termine sin bloquear | |
while pygame.mixer.music.get_busy() and not self.should_stop: | |
for event in pygame.event.get(): | |
if event.type == pygame.USEREVENT: | |
on_music_end() | |
self.clock.tick(30) | |
if self.should_stop: | |
self.stop_speaking() | |
# Restaurar umbral normal al interrumpir | |
if hasattr(self, 'voice_detector'): | |
self.voice_detector.set_high_threshold_mode(False) | |
except Exception as e: | |
print(f"Error reproduciendo audio: {e}") | |
self.is_speaking = False | |
self.audio_initialized = False | |
if hasattr(self, 'voice_detector'): | |
self.voice_detector.set_high_threshold_mode(False) | |
raise | |
finally: | |
self.is_speaking = False | |
self.should_stop = False | |
def set_voice_detector(self, voice_detector): | |
"""Establece el detector de voz para coordinar interrupciones""" | |
self.voice_detector = voice_detector | |
def __del__(self): | |
try: | |
pygame.mixer.quit() | |
if self.temp_dir.exists(): | |
for file in self.temp_dir.glob("*"): | |
try: | |
file.unlink() | |
except: | |
pass | |
self.temp_dir.rmdir() | |
except: | |
pass |