import pygame import tempfile import uuid import threading import asyncio from pathlib import Path from TTS.api import TTS from gtts import gTTS import edge_tts import logging import time import os class TTSUtils: AVAILABLE_MODELS = { 'EDGE': { 'name': "es-MX-JorgeNeural", 'description': "Voz masculina de Microsoft Edge (MX)", 'type': 'edge', 'rate': '+25%' }, 'EDGE_ES': { 'name': "es-ES-AlvaroNeural", 'description': "Voz masculina de Microsoft Edge (ES)", 'type': 'edge', 'rate': '+25%' }, 'VITS': { 'name': "tts_models/es/css10/vits", 'description': "Voz masculina de VITS (ES)", 'type': 'local', 'config': { 'speed': 1.25, 'model_path': "tts_models/es/css10/vits" } } } def __init__(self, model_name='EDGE', elevenlabs_api_key=None): """Inicializa el motor TTS""" self.is_speaking = False self.should_stop = False self.temp_dir = Path(tempfile.gettempdir()) / "chatbot_audio" self.temp_dir.mkdir(exist_ok=True) self.tts = None self.audio_initialized = False self.current_model = model_name print(f"Inicializando TTS con modelo: {model_name}") try: if pygame.mixer.get_init(): pygame.mixer.quit() pygame.mixer.init(frequency=16000, size=-16, channels=1, buffer=2048) pygame.mixer.music.set_volume(0.8) self.audio_initialized = True print("Audio inicializado correctamente") except Exception as e: print(f"Error inicializando audio: {str(e)}") self.audio_initialized = False self.play_lock = threading.Lock() self.clock = pygame.time.Clock() self.init_audio() # Limpiar archivos temporales antiguos self._cleanup_old_files() def _cleanup_old_files(self, max_age_hours=1): """Limpia archivos temporales antiguos""" try: current_time = time.time() for file in self.temp_dir.glob("*"): if file.is_file(): file_age = current_time - file.stat().st_mtime if file_age > max_age_hours * 3600: # Convertir horas a segundos try: file.unlink() except: pass except Exception as e: print(f"Error limpiando archivos temporales: {e}") def _verify_audio_system(self): """Verifica el estado del sistema de audio""" if not self.audio_initialized or not pygame.mixer.get_init(): try: if pygame.mixer.get_init(): pygame.mixer.quit() pygame.mixer.init(frequency=44100, size=-16, channels=2, buffer=4096) pygame.mixer.music.set_volume(1.0) self.audio_initialized = True return True except Exception as e: print(f"Error reinicializando audio: {e}") return False return True def init_audio(self): """Inicializa el modelo TTS seleccionado""" try: if self.current_model == 'VITS': model_info = self.AVAILABLE_MODELS[self.current_model] print(f"Cargando modelo VITS: {model_info['name']}") try: self.tts = TTS(model_name=model_info['name']) if hasattr(self.tts, 'synthesizer') and hasattr(self.tts.synthesizer, 'tts_config'): self.tts.synthesizer.tts_config.update(model_info['config']) print("Modelo VITS cargado correctamente") return True else: print("Error: El modelo VITS no tiene la estructura esperada") self.current_model = 'EDGE' # Fallback a Edge si hay error return False except Exception as vits_error: print(f"Error cargando modelo VITS: {vits_error}") self.current_model = 'EDGE' # Fallback a Edge si hay error return False return True except Exception as e: print(f"Error inicializando audio: {e}") self.current_model = 'EDGE' # Fallback a Edge si hay error return False def _number_to_words(self, number): """Convierte un número a palabras en español""" UNITS = ['', 'uno', 'dos', 'tres', 'cuatro', 'cinco', 'seis', 'siete', 'ocho', 'nueve'] TENS = ['', 'diez', 'veinte', 'treinta', 'cuarenta', 'cincuenta', 'sesenta', 'setenta', 'ochenta', 'noventa'] TEENS = ['diez', 'once', 'doce', 'trece', 'catorce', 'quince', 'dieciséis', 'diecisiete', 'dieciocho', 'diecinueve'] try: num = int(number) if num == 0: return 'cero' elif num < 0: return f"menos {self._number_to_words(abs(num))}" elif num < 10: return UNITS[num] elif num < 20: return TEENS[num - 10] elif num < 100: tens = num // 10 units = num % 10 if units == 0: return TENS[tens] else: return f"{TENS[tens]} y {UNITS[units]}" else: return str(num) # Para números mayores a 99, mantener dígitos except: return number # Si hay error, devolver el número original def _clean_text(self, text): """Limpia el texto de caracteres especiales antes de la síntesis""" if not text: return text # Reemplazar asteriscos y otros caracteres especiales replacements = { '*': '', '#': '', '`': '', '~': '', '|': '', '>': '', '<': '', '\\': '', '&': 'y', '_': ' ', '...': ',', '..': ',', '---': ',', '--': ',', '%': ' por ciento', '$': ' pesos', '=': ' igual a ', '+': ' más ', '@': ' arroba ', } cleaned_text = text for char, replacement in replacements.items(): cleaned_text = cleaned_text.replace(char, replacement) # Convertir números a palabras words = [] for word in cleaned_text.split(): # Verificar si es un número (entero o decimal) if word.replace('.', '').replace('-', '').isdigit(): # Si es decimal if '.' in word: parts = word.split('.') if len(parts) == 2: integer_part = self._number_to_words(parts[0]) decimal_part = self._number_to_words(parts[1]) words.append(f"{integer_part} punto {decimal_part}") else: words.append(word) else: words.append(self._number_to_words(word)) else: words.append(word) cleaned_text = ' '.join(words) # Eliminar espacios múltiples cleaned_text = ' '.join(cleaned_text.split()) return cleaned_text def text_to_speech(self, text, save_path=None): """Genera audio a partir de texto con verificaciones mejoradas""" if not text: return None # Limpiar el texto antes de procesarlo text = self._clean_text(text) if not text: return None if not self._verify_audio_system(): print("Sistema de audio no disponible") return None try: temp_file = save_path or str(self.temp_dir / f"{uuid.uuid4()}.mp3") print(f"Generando audio para modelo: {self.current_model}") # Verificar modelo actual y generar audio try: if self.current_model == 'VITS': print("Usando modelo VITS") if not self.tts: print("Inicializando modelo VITS...") if not self.init_audio(): print("Fallback a Edge debido a error en inicialización de VITS") return self.fallback_to_edge(text, temp_file) try: self.tts.tts_to_file( text=text, file_path=temp_file, speed=self.AVAILABLE_MODELS['VITS']['config']['speed'] ) if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0: print(f"Audio generado correctamente con VITS: {os.path.getsize(temp_file)} bytes") return temp_file raise Exception("Archivo de audio VITS inválido") except Exception as vits_error: print(f"Error generando audio con VITS: {vits_error}") return self.fallback_to_edge(text, temp_file) elif self.current_model in ['EDGE', 'EDGE_ES']: return self.fallback_to_edge(text, temp_file) except Exception as primary_error: print(f"Error con el modelo primario {self.current_model}: {primary_error}") return self.fallback_to_gtts(text, temp_file) return temp_file except Exception as e: print(f"Error en text_to_speech: {e}") return None finally: self._cleanup_old_files() def fallback_to_edge(self, text, temp_file): """Método de respaldo usando Edge TTS""" try: voice = self.AVAILABLE_MODELS['EDGE']['name'] print(f"Usando voz Edge como respaldo: {voice}") for attempt in range(3): try: async def tts_with_timeout(): return await asyncio.wait_for( self.edge_tts_speak(text, voice, temp_file), timeout=15.0 ) asyncio.run(tts_with_timeout()) if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0: print(f"Audio generado correctamente con Edge: {os.path.getsize(temp_file)} bytes") return temp_file raise Exception("Archivo de audio Edge inválido") except Exception as e: print(f"Intento {attempt + 1} fallido con Edge: {e}") if attempt == 2: return self.fallback_to_gtts(text, temp_file) time.sleep(2 ** attempt) except Exception as edge_error: print(f"Error con Edge TTS: {edge_error}") return self.fallback_to_gtts(text, temp_file) def fallback_to_gtts(self, text, temp_file): """Método final de respaldo usando gTTS""" print("Usando gTTS como último respaldo") try: tts = gTTS(text=text, lang='es', slow=False) temp_normal = str(self.temp_dir / f"temp_normal_{uuid.uuid4()}.mp3") tts.save(temp_normal) import ffmpeg stream = ffmpeg.input(temp_normal) stream = ffmpeg.output(stream, temp_file, acodec='libmp3lame', atempo=1.25) ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True) try: os.remove(temp_normal) except: pass if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0: print(f"Audio generado correctamente con gTTS: {os.path.getsize(temp_file)} bytes") return temp_file raise Exception("Archivo de audio gTTS inválido") except Exception as gtts_error: print(f"Error con gTTS: {gtts_error}") return None async def edge_tts_speak(self, text, voice, output_file): """Genera audio usando edge-tts""" try: print(f"Generando audio con voz: {voice}") rate = self.AVAILABLE_MODELS[self.current_model]['rate'] print(f"Usando rate: {rate}") communicate = edge_tts.Communicate(text, voice, rate=rate) await communicate.save(str(output_file)) print(f"Audio generado correctamente con {voice}") return True except Exception as e: print(f"Error generando audio con edge-tts: {e}") raise def stop_speaking(self): """Detiene la reproducción actual""" if self.is_speaking: try: self.should_stop = True pygame.mixer.music.stop() pygame.mixer.music.unload() self.is_speaking = False print("Reproducción detenida por interrupción") except Exception as e: print(f"Error al detener el audio: {e}") finally: self.is_speaking = False self.should_stop = False def change_model(self, model_name): """Cambia el modelo TTS actual""" if model_name not in self.AVAILABLE_MODELS: print(f"Modelo {model_name} no disponible") return False try: print(f"Cambiando a modelo {model_name}...") self.current_model = model_name self.init_audio() return True except Exception as e: print(f"Error cambiando modelo: {e}") return False def is_currently_speaking(self): """Verifica si hay audio reproduciéndose""" return self.is_speaking def create_audio_file(self, text, output_file): """Crea un archivo de audio permanente""" try: if 'EDGE' in self.current_model: voice = self.AVAILABLE_MODELS[self.current_model]['name'] async def tts_with_timeout(): return await asyncio.wait_for( self.edge_tts_speak(text, voice, output_file), timeout=5.0 ) asyncio.run(tts_with_timeout()) elif self.current_model == 'gTTS': tts = gTTS(text=text, lang='es', slow=False) tts.save(str(output_file)) else: # VITS self.tts.tts_to_file( text=text, file_path=str(output_file), speaker_wav=None, split_sentences=False ) return str(output_file) except Exception as e: print(f"Error creando archivo de audio: {e}") return None def play_audio(self, file_path): """Reproduce un archivo de audio con verificaciones mejoradas y escucha activa""" if not self._verify_audio_system(): raise Exception("Sistema de audio no disponible") try: if not Path(file_path).exists(): raise FileNotFoundError(f"Archivo no encontrado: {file_path}") if not Path(file_path).stat().st_size > 0: raise ValueError("Archivo de audio vacío o corrupto") with self.play_lock: if self.is_speaking: self.stop_speaking() self.is_speaking = True pygame.mixer.music.load(file_path) pygame.mixer.music.play() # Notificar al detector de voz que estamos reproduciendo if hasattr(self, 'voice_detector'): self.voice_detector.update_last_audio_output() # Mantener la escucha activa pero con umbral más alto self.voice_detector.set_high_threshold_mode(True) # Configurar callback para cuando termine la reproducción def on_music_end(): self.is_speaking = False self.should_stop = False # Restaurar umbral normal de escucha if hasattr(self, 'voice_detector'): self.voice_detector.set_high_threshold_mode(False) pygame.mixer.music.set_endevent(pygame.USEREVENT) pygame.event.set_allowed(pygame.USEREVENT) # Esperar a que termine sin bloquear while pygame.mixer.music.get_busy() and not self.should_stop: for event in pygame.event.get(): if event.type == pygame.USEREVENT: on_music_end() self.clock.tick(30) if self.should_stop: self.stop_speaking() # Restaurar umbral normal al interrumpir if hasattr(self, 'voice_detector'): self.voice_detector.set_high_threshold_mode(False) except Exception as e: print(f"Error reproduciendo audio: {e}") self.is_speaking = False self.audio_initialized = False if hasattr(self, 'voice_detector'): self.voice_detector.set_high_threshold_mode(False) raise finally: self.is_speaking = False self.should_stop = False def set_voice_detector(self, voice_detector): """Establece el detector de voz para coordinar interrupciones""" self.voice_detector = voice_detector def __del__(self): try: pygame.mixer.quit() if self.temp_dir.exists(): for file in self.temp_dir.glob("*"): try: file.unlink() except: pass self.temp_dir.rmdir() except: pass