chatbot-web-app / tts_utils.py
salomonsky's picture
Upload tts_utils.py with huggingface_hub
d2c7775 verified
import pygame
import tempfile
import uuid
import threading
import asyncio
from pathlib import Path
from TTS.api import TTS
from gtts import gTTS
import edge_tts
import logging
import time
import os
class TTSUtils:
AVAILABLE_MODELS = {
'EDGE': {
'name': "es-MX-JorgeNeural",
'description': "Voz masculina de Microsoft Edge (MX)",
'type': 'edge',
'rate': '+25%'
},
'EDGE_ES': {
'name': "es-ES-AlvaroNeural",
'description': "Voz masculina de Microsoft Edge (ES)",
'type': 'edge',
'rate': '+25%'
},
'VITS': {
'name': "tts_models/es/css10/vits",
'description': "Voz masculina de VITS (ES)",
'type': 'local',
'config': {
'speed': 1.25,
'model_path': "tts_models/es/css10/vits"
}
}
}
def __init__(self, model_name='EDGE', elevenlabs_api_key=None):
"""Inicializa el motor TTS"""
self.is_speaking = False
self.should_stop = False
self.temp_dir = Path(tempfile.gettempdir()) / "chatbot_audio"
self.temp_dir.mkdir(exist_ok=True)
self.tts = None
self.audio_initialized = False
self.current_model = model_name
print(f"Inicializando TTS con modelo: {model_name}")
try:
if pygame.mixer.get_init():
pygame.mixer.quit()
pygame.mixer.init(frequency=16000, size=-16, channels=1, buffer=2048)
pygame.mixer.music.set_volume(0.8)
self.audio_initialized = True
print("Audio inicializado correctamente")
except Exception as e:
print(f"Error inicializando audio: {str(e)}")
self.audio_initialized = False
self.play_lock = threading.Lock()
self.clock = pygame.time.Clock()
self.init_audio()
# Limpiar archivos temporales antiguos
self._cleanup_old_files()
def _cleanup_old_files(self, max_age_hours=1):
"""Limpia archivos temporales antiguos"""
try:
current_time = time.time()
for file in self.temp_dir.glob("*"):
if file.is_file():
file_age = current_time - file.stat().st_mtime
if file_age > max_age_hours * 3600: # Convertir horas a segundos
try:
file.unlink()
except:
pass
except Exception as e:
print(f"Error limpiando archivos temporales: {e}")
def _verify_audio_system(self):
"""Verifica el estado del sistema de audio"""
if not self.audio_initialized or not pygame.mixer.get_init():
try:
if pygame.mixer.get_init():
pygame.mixer.quit()
pygame.mixer.init(frequency=44100, size=-16, channels=2, buffer=4096)
pygame.mixer.music.set_volume(1.0)
self.audio_initialized = True
return True
except Exception as e:
print(f"Error reinicializando audio: {e}")
return False
return True
def init_audio(self):
"""Inicializa el modelo TTS seleccionado"""
try:
if self.current_model == 'VITS':
model_info = self.AVAILABLE_MODELS[self.current_model]
print(f"Cargando modelo VITS: {model_info['name']}")
try:
self.tts = TTS(model_name=model_info['name'])
if hasattr(self.tts, 'synthesizer') and hasattr(self.tts.synthesizer, 'tts_config'):
self.tts.synthesizer.tts_config.update(model_info['config'])
print("Modelo VITS cargado correctamente")
return True
else:
print("Error: El modelo VITS no tiene la estructura esperada")
self.current_model = 'EDGE' # Fallback a Edge si hay error
return False
except Exception as vits_error:
print(f"Error cargando modelo VITS: {vits_error}")
self.current_model = 'EDGE' # Fallback a Edge si hay error
return False
return True
except Exception as e:
print(f"Error inicializando audio: {e}")
self.current_model = 'EDGE' # Fallback a Edge si hay error
return False
def _number_to_words(self, number):
"""Convierte un n煤mero a palabras en espa帽ol"""
UNITS = ['', 'uno', 'dos', 'tres', 'cuatro', 'cinco', 'seis', 'siete', 'ocho', 'nueve']
TENS = ['', 'diez', 'veinte', 'treinta', 'cuarenta', 'cincuenta', 'sesenta', 'setenta', 'ochenta', 'noventa']
TEENS = ['diez', 'once', 'doce', 'trece', 'catorce', 'quince', 'diecis茅is', 'diecisiete', 'dieciocho', 'diecinueve']
try:
num = int(number)
if num == 0:
return 'cero'
elif num < 0:
return f"menos {self._number_to_words(abs(num))}"
elif num < 10:
return UNITS[num]
elif num < 20:
return TEENS[num - 10]
elif num < 100:
tens = num // 10
units = num % 10
if units == 0:
return TENS[tens]
else:
return f"{TENS[tens]} y {UNITS[units]}"
else:
return str(num) # Para n煤meros mayores a 99, mantener d铆gitos
except:
return number # Si hay error, devolver el n煤mero original
def _clean_text(self, text):
"""Limpia el texto de caracteres especiales antes de la s铆ntesis"""
if not text:
return text
# Reemplazar asteriscos y otros caracteres especiales
replacements = {
'*': '',
'#': '',
'`': '',
'~': '',
'|': '',
'>': '',
'<': '',
'\\': '',
'&': 'y',
'_': ' ',
'...': ',',
'..': ',',
'---': ',',
'--': ',',
'%': ' por ciento',
'$': ' pesos',
'=': ' igual a ',
'+': ' m谩s ',
'@': ' arroba ',
}
cleaned_text = text
for char, replacement in replacements.items():
cleaned_text = cleaned_text.replace(char, replacement)
# Convertir n煤meros a palabras
words = []
for word in cleaned_text.split():
# Verificar si es un n煤mero (entero o decimal)
if word.replace('.', '').replace('-', '').isdigit():
# Si es decimal
if '.' in word:
parts = word.split('.')
if len(parts) == 2:
integer_part = self._number_to_words(parts[0])
decimal_part = self._number_to_words(parts[1])
words.append(f"{integer_part} punto {decimal_part}")
else:
words.append(word)
else:
words.append(self._number_to_words(word))
else:
words.append(word)
cleaned_text = ' '.join(words)
# Eliminar espacios m煤ltiples
cleaned_text = ' '.join(cleaned_text.split())
return cleaned_text
def text_to_speech(self, text, save_path=None):
"""Genera audio a partir de texto con verificaciones mejoradas"""
if not text:
return None
# Limpiar el texto antes de procesarlo
text = self._clean_text(text)
if not text:
return None
if not self._verify_audio_system():
print("Sistema de audio no disponible")
return None
try:
temp_file = save_path or str(self.temp_dir / f"{uuid.uuid4()}.mp3")
print(f"Generando audio para modelo: {self.current_model}")
# Verificar modelo actual y generar audio
try:
if self.current_model == 'VITS':
print("Usando modelo VITS")
if not self.tts:
print("Inicializando modelo VITS...")
if not self.init_audio():
print("Fallback a Edge debido a error en inicializaci贸n de VITS")
return self.fallback_to_edge(text, temp_file)
try:
self.tts.tts_to_file(
text=text,
file_path=temp_file,
speed=self.AVAILABLE_MODELS['VITS']['config']['speed']
)
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0:
print(f"Audio generado correctamente con VITS: {os.path.getsize(temp_file)} bytes")
return temp_file
raise Exception("Archivo de audio VITS inv谩lido")
except Exception as vits_error:
print(f"Error generando audio con VITS: {vits_error}")
return self.fallback_to_edge(text, temp_file)
elif self.current_model in ['EDGE', 'EDGE_ES']:
return self.fallback_to_edge(text, temp_file)
except Exception as primary_error:
print(f"Error con el modelo primario {self.current_model}: {primary_error}")
return self.fallback_to_gtts(text, temp_file)
return temp_file
except Exception as e:
print(f"Error en text_to_speech: {e}")
return None
finally:
self._cleanup_old_files()
def fallback_to_edge(self, text, temp_file):
"""M茅todo de respaldo usando Edge TTS"""
try:
voice = self.AVAILABLE_MODELS['EDGE']['name']
print(f"Usando voz Edge como respaldo: {voice}")
for attempt in range(3):
try:
async def tts_with_timeout():
return await asyncio.wait_for(
self.edge_tts_speak(text, voice, temp_file),
timeout=15.0
)
asyncio.run(tts_with_timeout())
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0:
print(f"Audio generado correctamente con Edge: {os.path.getsize(temp_file)} bytes")
return temp_file
raise Exception("Archivo de audio Edge inv谩lido")
except Exception as e:
print(f"Intento {attempt + 1} fallido con Edge: {e}")
if attempt == 2:
return self.fallback_to_gtts(text, temp_file)
time.sleep(2 ** attempt)
except Exception as edge_error:
print(f"Error con Edge TTS: {edge_error}")
return self.fallback_to_gtts(text, temp_file)
def fallback_to_gtts(self, text, temp_file):
"""M茅todo final de respaldo usando gTTS"""
print("Usando gTTS como 煤ltimo respaldo")
try:
tts = gTTS(text=text, lang='es', slow=False)
temp_normal = str(self.temp_dir / f"temp_normal_{uuid.uuid4()}.mp3")
tts.save(temp_normal)
import ffmpeg
stream = ffmpeg.input(temp_normal)
stream = ffmpeg.output(stream, temp_file, acodec='libmp3lame', atempo=1.25)
ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True)
try:
os.remove(temp_normal)
except:
pass
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0:
print(f"Audio generado correctamente con gTTS: {os.path.getsize(temp_file)} bytes")
return temp_file
raise Exception("Archivo de audio gTTS inv谩lido")
except Exception as gtts_error:
print(f"Error con gTTS: {gtts_error}")
return None
async def edge_tts_speak(self, text, voice, output_file):
"""Genera audio usando edge-tts"""
try:
print(f"Generando audio con voz: {voice}")
rate = self.AVAILABLE_MODELS[self.current_model]['rate']
print(f"Usando rate: {rate}")
communicate = edge_tts.Communicate(text, voice, rate=rate)
await communicate.save(str(output_file))
print(f"Audio generado correctamente con {voice}")
return True
except Exception as e:
print(f"Error generando audio con edge-tts: {e}")
raise
def stop_speaking(self):
"""Detiene la reproducci贸n actual"""
if self.is_speaking:
try:
self.should_stop = True
pygame.mixer.music.stop()
pygame.mixer.music.unload()
self.is_speaking = False
print("Reproducci贸n detenida por interrupci贸n")
except Exception as e:
print(f"Error al detener el audio: {e}")
finally:
self.is_speaking = False
self.should_stop = False
def change_model(self, model_name):
"""Cambia el modelo TTS actual"""
if model_name not in self.AVAILABLE_MODELS:
print(f"Modelo {model_name} no disponible")
return False
try:
print(f"Cambiando a modelo {model_name}...")
self.current_model = model_name
self.init_audio()
return True
except Exception as e:
print(f"Error cambiando modelo: {e}")
return False
def is_currently_speaking(self):
"""Verifica si hay audio reproduci茅ndose"""
return self.is_speaking
def create_audio_file(self, text, output_file):
"""Crea un archivo de audio permanente"""
try:
if 'EDGE' in self.current_model:
voice = self.AVAILABLE_MODELS[self.current_model]['name']
async def tts_with_timeout():
return await asyncio.wait_for(
self.edge_tts_speak(text, voice, output_file),
timeout=5.0
)
asyncio.run(tts_with_timeout())
elif self.current_model == 'gTTS':
tts = gTTS(text=text, lang='es', slow=False)
tts.save(str(output_file))
else: # VITS
self.tts.tts_to_file(
text=text,
file_path=str(output_file),
speaker_wav=None,
split_sentences=False
)
return str(output_file)
except Exception as e:
print(f"Error creando archivo de audio: {e}")
return None
def play_audio(self, file_path):
"""Reproduce un archivo de audio con verificaciones mejoradas y escucha activa"""
if not self._verify_audio_system():
raise Exception("Sistema de audio no disponible")
try:
if not Path(file_path).exists():
raise FileNotFoundError(f"Archivo no encontrado: {file_path}")
if not Path(file_path).stat().st_size > 0:
raise ValueError("Archivo de audio vac铆o o corrupto")
with self.play_lock:
if self.is_speaking:
self.stop_speaking()
self.is_speaking = True
pygame.mixer.music.load(file_path)
pygame.mixer.music.play()
# Notificar al detector de voz que estamos reproduciendo
if hasattr(self, 'voice_detector'):
self.voice_detector.update_last_audio_output()
# Mantener la escucha activa pero con umbral m谩s alto
self.voice_detector.set_high_threshold_mode(True)
# Configurar callback para cuando termine la reproducci贸n
def on_music_end():
self.is_speaking = False
self.should_stop = False
# Restaurar umbral normal de escucha
if hasattr(self, 'voice_detector'):
self.voice_detector.set_high_threshold_mode(False)
pygame.mixer.music.set_endevent(pygame.USEREVENT)
pygame.event.set_allowed(pygame.USEREVENT)
# Esperar a que termine sin bloquear
while pygame.mixer.music.get_busy() and not self.should_stop:
for event in pygame.event.get():
if event.type == pygame.USEREVENT:
on_music_end()
self.clock.tick(30)
if self.should_stop:
self.stop_speaking()
# Restaurar umbral normal al interrumpir
if hasattr(self, 'voice_detector'):
self.voice_detector.set_high_threshold_mode(False)
except Exception as e:
print(f"Error reproduciendo audio: {e}")
self.is_speaking = False
self.audio_initialized = False
if hasattr(self, 'voice_detector'):
self.voice_detector.set_high_threshold_mode(False)
raise
finally:
self.is_speaking = False
self.should_stop = False
def set_voice_detector(self, voice_detector):
"""Establece el detector de voz para coordinar interrupciones"""
self.voice_detector = voice_detector
def __del__(self):
try:
pygame.mixer.quit()
if self.temp_dir.exists():
for file in self.temp_dir.glob("*"):
try:
file.unlink()
except:
pass
self.temp_dir.rmdir()
except:
pass