chatbot-web-app / tts_utils.py
salomonsky's picture
Upload tts_utils.py with huggingface_hub
d2c7775 verified
raw
history blame
18.7 kB
import pygame
import tempfile
import uuid
import threading
import asyncio
from pathlib import Path
from TTS.api import TTS
from gtts import gTTS
import edge_tts
import logging
import time
import os
class TTSUtils:
AVAILABLE_MODELS = {
'EDGE': {
'name': "es-MX-JorgeNeural",
'description': "Voz masculina de Microsoft Edge (MX)",
'type': 'edge',
'rate': '+25%'
},
'EDGE_ES': {
'name': "es-ES-AlvaroNeural",
'description': "Voz masculina de Microsoft Edge (ES)",
'type': 'edge',
'rate': '+25%'
},
'VITS': {
'name': "tts_models/es/css10/vits",
'description': "Voz masculina de VITS (ES)",
'type': 'local',
'config': {
'speed': 1.25,
'model_path': "tts_models/es/css10/vits"
}
}
}
def __init__(self, model_name='EDGE', elevenlabs_api_key=None):
"""Inicializa el motor TTS"""
self.is_speaking = False
self.should_stop = False
self.temp_dir = Path(tempfile.gettempdir()) / "chatbot_audio"
self.temp_dir.mkdir(exist_ok=True)
self.tts = None
self.audio_initialized = False
self.current_model = model_name
print(f"Inicializando TTS con modelo: {model_name}")
try:
if pygame.mixer.get_init():
pygame.mixer.quit()
pygame.mixer.init(frequency=16000, size=-16, channels=1, buffer=2048)
pygame.mixer.music.set_volume(0.8)
self.audio_initialized = True
print("Audio inicializado correctamente")
except Exception as e:
print(f"Error inicializando audio: {str(e)}")
self.audio_initialized = False
self.play_lock = threading.Lock()
self.clock = pygame.time.Clock()
self.init_audio()
# Limpiar archivos temporales antiguos
self._cleanup_old_files()
def _cleanup_old_files(self, max_age_hours=1):
"""Limpia archivos temporales antiguos"""
try:
current_time = time.time()
for file in self.temp_dir.glob("*"):
if file.is_file():
file_age = current_time - file.stat().st_mtime
if file_age > max_age_hours * 3600: # Convertir horas a segundos
try:
file.unlink()
except:
pass
except Exception as e:
print(f"Error limpiando archivos temporales: {e}")
def _verify_audio_system(self):
"""Verifica el estado del sistema de audio"""
if not self.audio_initialized or not pygame.mixer.get_init():
try:
if pygame.mixer.get_init():
pygame.mixer.quit()
pygame.mixer.init(frequency=44100, size=-16, channels=2, buffer=4096)
pygame.mixer.music.set_volume(1.0)
self.audio_initialized = True
return True
except Exception as e:
print(f"Error reinicializando audio: {e}")
return False
return True
def init_audio(self):
"""Inicializa el modelo TTS seleccionado"""
try:
if self.current_model == 'VITS':
model_info = self.AVAILABLE_MODELS[self.current_model]
print(f"Cargando modelo VITS: {model_info['name']}")
try:
self.tts = TTS(model_name=model_info['name'])
if hasattr(self.tts, 'synthesizer') and hasattr(self.tts.synthesizer, 'tts_config'):
self.tts.synthesizer.tts_config.update(model_info['config'])
print("Modelo VITS cargado correctamente")
return True
else:
print("Error: El modelo VITS no tiene la estructura esperada")
self.current_model = 'EDGE' # Fallback a Edge si hay error
return False
except Exception as vits_error:
print(f"Error cargando modelo VITS: {vits_error}")
self.current_model = 'EDGE' # Fallback a Edge si hay error
return False
return True
except Exception as e:
print(f"Error inicializando audio: {e}")
self.current_model = 'EDGE' # Fallback a Edge si hay error
return False
def _number_to_words(self, number):
"""Convierte un n煤mero a palabras en espa帽ol"""
UNITS = ['', 'uno', 'dos', 'tres', 'cuatro', 'cinco', 'seis', 'siete', 'ocho', 'nueve']
TENS = ['', 'diez', 'veinte', 'treinta', 'cuarenta', 'cincuenta', 'sesenta', 'setenta', 'ochenta', 'noventa']
TEENS = ['diez', 'once', 'doce', 'trece', 'catorce', 'quince', 'diecis茅is', 'diecisiete', 'dieciocho', 'diecinueve']
try:
num = int(number)
if num == 0:
return 'cero'
elif num < 0:
return f"menos {self._number_to_words(abs(num))}"
elif num < 10:
return UNITS[num]
elif num < 20:
return TEENS[num - 10]
elif num < 100:
tens = num // 10
units = num % 10
if units == 0:
return TENS[tens]
else:
return f"{TENS[tens]} y {UNITS[units]}"
else:
return str(num) # Para n煤meros mayores a 99, mantener d铆gitos
except:
return number # Si hay error, devolver el n煤mero original
def _clean_text(self, text):
"""Limpia el texto de caracteres especiales antes de la s铆ntesis"""
if not text:
return text
# Reemplazar asteriscos y otros caracteres especiales
replacements = {
'*': '',
'#': '',
'`': '',
'~': '',
'|': '',
'>': '',
'<': '',
'\\': '',
'&': 'y',
'_': ' ',
'...': ',',
'..': ',',
'---': ',',
'--': ',',
'%': ' por ciento',
'$': ' pesos',
'=': ' igual a ',
'+': ' m谩s ',
'@': ' arroba ',
}
cleaned_text = text
for char, replacement in replacements.items():
cleaned_text = cleaned_text.replace(char, replacement)
# Convertir n煤meros a palabras
words = []
for word in cleaned_text.split():
# Verificar si es un n煤mero (entero o decimal)
if word.replace('.', '').replace('-', '').isdigit():
# Si es decimal
if '.' in word:
parts = word.split('.')
if len(parts) == 2:
integer_part = self._number_to_words(parts[0])
decimal_part = self._number_to_words(parts[1])
words.append(f"{integer_part} punto {decimal_part}")
else:
words.append(word)
else:
words.append(self._number_to_words(word))
else:
words.append(word)
cleaned_text = ' '.join(words)
# Eliminar espacios m煤ltiples
cleaned_text = ' '.join(cleaned_text.split())
return cleaned_text
def text_to_speech(self, text, save_path=None):
"""Genera audio a partir de texto con verificaciones mejoradas"""
if not text:
return None
# Limpiar el texto antes de procesarlo
text = self._clean_text(text)
if not text:
return None
if not self._verify_audio_system():
print("Sistema de audio no disponible")
return None
try:
temp_file = save_path or str(self.temp_dir / f"{uuid.uuid4()}.mp3")
print(f"Generando audio para modelo: {self.current_model}")
# Verificar modelo actual y generar audio
try:
if self.current_model == 'VITS':
print("Usando modelo VITS")
if not self.tts:
print("Inicializando modelo VITS...")
if not self.init_audio():
print("Fallback a Edge debido a error en inicializaci贸n de VITS")
return self.fallback_to_edge(text, temp_file)
try:
self.tts.tts_to_file(
text=text,
file_path=temp_file,
speed=self.AVAILABLE_MODELS['VITS']['config']['speed']
)
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0:
print(f"Audio generado correctamente con VITS: {os.path.getsize(temp_file)} bytes")
return temp_file
raise Exception("Archivo de audio VITS inv谩lido")
except Exception as vits_error:
print(f"Error generando audio con VITS: {vits_error}")
return self.fallback_to_edge(text, temp_file)
elif self.current_model in ['EDGE', 'EDGE_ES']:
return self.fallback_to_edge(text, temp_file)
except Exception as primary_error:
print(f"Error con el modelo primario {self.current_model}: {primary_error}")
return self.fallback_to_gtts(text, temp_file)
return temp_file
except Exception as e:
print(f"Error en text_to_speech: {e}")
return None
finally:
self._cleanup_old_files()
def fallback_to_edge(self, text, temp_file):
"""M茅todo de respaldo usando Edge TTS"""
try:
voice = self.AVAILABLE_MODELS['EDGE']['name']
print(f"Usando voz Edge como respaldo: {voice}")
for attempt in range(3):
try:
async def tts_with_timeout():
return await asyncio.wait_for(
self.edge_tts_speak(text, voice, temp_file),
timeout=15.0
)
asyncio.run(tts_with_timeout())
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0:
print(f"Audio generado correctamente con Edge: {os.path.getsize(temp_file)} bytes")
return temp_file
raise Exception("Archivo de audio Edge inv谩lido")
except Exception as e:
print(f"Intento {attempt + 1} fallido con Edge: {e}")
if attempt == 2:
return self.fallback_to_gtts(text, temp_file)
time.sleep(2 ** attempt)
except Exception as edge_error:
print(f"Error con Edge TTS: {edge_error}")
return self.fallback_to_gtts(text, temp_file)
def fallback_to_gtts(self, text, temp_file):
"""M茅todo final de respaldo usando gTTS"""
print("Usando gTTS como 煤ltimo respaldo")
try:
tts = gTTS(text=text, lang='es', slow=False)
temp_normal = str(self.temp_dir / f"temp_normal_{uuid.uuid4()}.mp3")
tts.save(temp_normal)
import ffmpeg
stream = ffmpeg.input(temp_normal)
stream = ffmpeg.output(stream, temp_file, acodec='libmp3lame', atempo=1.25)
ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True)
try:
os.remove(temp_normal)
except:
pass
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0:
print(f"Audio generado correctamente con gTTS: {os.path.getsize(temp_file)} bytes")
return temp_file
raise Exception("Archivo de audio gTTS inv谩lido")
except Exception as gtts_error:
print(f"Error con gTTS: {gtts_error}")
return None
async def edge_tts_speak(self, text, voice, output_file):
"""Genera audio usando edge-tts"""
try:
print(f"Generando audio con voz: {voice}")
rate = self.AVAILABLE_MODELS[self.current_model]['rate']
print(f"Usando rate: {rate}")
communicate = edge_tts.Communicate(text, voice, rate=rate)
await communicate.save(str(output_file))
print(f"Audio generado correctamente con {voice}")
return True
except Exception as e:
print(f"Error generando audio con edge-tts: {e}")
raise
def stop_speaking(self):
"""Detiene la reproducci贸n actual"""
if self.is_speaking:
try:
self.should_stop = True
pygame.mixer.music.stop()
pygame.mixer.music.unload()
self.is_speaking = False
print("Reproducci贸n detenida por interrupci贸n")
except Exception as e:
print(f"Error al detener el audio: {e}")
finally:
self.is_speaking = False
self.should_stop = False
def change_model(self, model_name):
"""Cambia el modelo TTS actual"""
if model_name not in self.AVAILABLE_MODELS:
print(f"Modelo {model_name} no disponible")
return False
try:
print(f"Cambiando a modelo {model_name}...")
self.current_model = model_name
self.init_audio()
return True
except Exception as e:
print(f"Error cambiando modelo: {e}")
return False
def is_currently_speaking(self):
"""Verifica si hay audio reproduci茅ndose"""
return self.is_speaking
def create_audio_file(self, text, output_file):
"""Crea un archivo de audio permanente"""
try:
if 'EDGE' in self.current_model:
voice = self.AVAILABLE_MODELS[self.current_model]['name']
async def tts_with_timeout():
return await asyncio.wait_for(
self.edge_tts_speak(text, voice, output_file),
timeout=5.0
)
asyncio.run(tts_with_timeout())
elif self.current_model == 'gTTS':
tts = gTTS(text=text, lang='es', slow=False)
tts.save(str(output_file))
else: # VITS
self.tts.tts_to_file(
text=text,
file_path=str(output_file),
speaker_wav=None,
split_sentences=False
)
return str(output_file)
except Exception as e:
print(f"Error creando archivo de audio: {e}")
return None
def play_audio(self, file_path):
"""Reproduce un archivo de audio con verificaciones mejoradas y escucha activa"""
if not self._verify_audio_system():
raise Exception("Sistema de audio no disponible")
try:
if not Path(file_path).exists():
raise FileNotFoundError(f"Archivo no encontrado: {file_path}")
if not Path(file_path).stat().st_size > 0:
raise ValueError("Archivo de audio vac铆o o corrupto")
with self.play_lock:
if self.is_speaking:
self.stop_speaking()
self.is_speaking = True
pygame.mixer.music.load(file_path)
pygame.mixer.music.play()
# Notificar al detector de voz que estamos reproduciendo
if hasattr(self, 'voice_detector'):
self.voice_detector.update_last_audio_output()
# Mantener la escucha activa pero con umbral m谩s alto
self.voice_detector.set_high_threshold_mode(True)
# Configurar callback para cuando termine la reproducci贸n
def on_music_end():
self.is_speaking = False
self.should_stop = False
# Restaurar umbral normal de escucha
if hasattr(self, 'voice_detector'):
self.voice_detector.set_high_threshold_mode(False)
pygame.mixer.music.set_endevent(pygame.USEREVENT)
pygame.event.set_allowed(pygame.USEREVENT)
# Esperar a que termine sin bloquear
while pygame.mixer.music.get_busy() and not self.should_stop:
for event in pygame.event.get():
if event.type == pygame.USEREVENT:
on_music_end()
self.clock.tick(30)
if self.should_stop:
self.stop_speaking()
# Restaurar umbral normal al interrumpir
if hasattr(self, 'voice_detector'):
self.voice_detector.set_high_threshold_mode(False)
except Exception as e:
print(f"Error reproduciendo audio: {e}")
self.is_speaking = False
self.audio_initialized = False
if hasattr(self, 'voice_detector'):
self.voice_detector.set_high_threshold_mode(False)
raise
finally:
self.is_speaking = False
self.should_stop = False
def set_voice_detector(self, voice_detector):
"""Establece el detector de voz para coordinar interrupciones"""
self.voice_detector = voice_detector
def __del__(self):
try:
pygame.mixer.quit()
if self.temp_dir.exists():
for file in self.temp_dir.glob("*"):
try:
file.unlink()
except:
pass
self.temp_dir.rmdir()
except:
pass