import pygame import uuid from pathlib import Path import threading import edge_tts import asyncio from gtts import gTTS import shutil import time class AudioUtils: AVAILABLE_MODELS = { 'EDGE': { 'name': "es-MX-JorgeNeural", # Voz en español mexicano 'description': "Voz de Edge TTS", 'type': 'cloud', 'fallback': 'gTTS' }, 'VITS': { 'name': "tts_models/es/css10/vits", 'description': "Voz masculina clara y natural", 'type': 'local', 'fallback': 'gTTS' }, 'gTTS': { 'name': "google_tts", 'description': "Google Text-to-Speech", 'type': 'cloud' } } def __init__(self, model_name='EDGE'): self.is_speaking = False self.should_stop = False self.temp_dir = Path("static/temp_audio") self.temp_dir.mkdir(parents=True, exist_ok=True) self.current_model = model_name self.play_lock = threading.Lock() self.init_audio() self.cleanup_old_files() def init_audio(self): try: pygame.init() pygame.mixer.init(frequency=44100, size=-16, channels=2, buffer=4096) return True except Exception as e: print(f"Error inicializando audio: {e}") return False async def generate_edge_tts(self, text, output_file): try: communicate = edge_tts.Communicate(text, self.AVAILABLE_MODELS['EDGE']['name']) await communicate.save(str(output_file)) return True except Exception as e: print(f"Error con Edge TTS: {e}") return False def text_to_speech(self, text, return_file=False): if not text: return None try: filename = f"{uuid.uuid4()}" temp_file = self.temp_dir / filename print(f"Generando audio con modelo {self.current_model}") if self.current_model == 'EDGE': temp_file = temp_file.with_suffix('.mp3') # Ejecutar Edge TTS de manera asíncrona loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) success = loop.run_until_complete(self.generate_edge_tts(text, temp_file)) loop.close() if not success: print("Fallback a gTTS") temp_file = temp_file.with_suffix('.mp3') tts = gTTS(text=text, lang='es', slow=False) tts.save(str(temp_file)) else: # gTTS como fallback por defecto temp_file = temp_file.with_suffix('.mp3') tts = gTTS(text=text, lang='es', slow=False) tts.save(str(temp_file)) if not temp_file.exists(): raise Exception(f"El archivo no se generó: {temp_file}") print(f"Archivo generado exitosamente: {temp_file}") return temp_file.name if return_file else temp_file except Exception as e: print(f"Error crítico en text_to_speech: {e}") import traceback traceback.print_exc() return None def play_audio(self, file_path): try: with self.play_lock: if pygame.mixer.music.get_busy(): pygame.mixer.music.stop() pygame.mixer.music.load(str(file_path)) pygame.mixer.music.play() while pygame.mixer.music.get_busy(): if self.should_stop: pygame.mixer.music.stop() self.should_stop = False break pygame.time.Clock().tick(10) except Exception as e: print(f"Error reproduciendo audio: {e}") def stop_audio(self): self.should_stop = True def cleanup_old_files(self, max_age_minutes=5): try: current_time = time.time() for file in self.temp_dir.glob("*"): if file.is_file(): file_age_minutes = (current_time - file.stat().st_mtime) / 60 if file_age_minutes > max_age_minutes: file.unlink() except Exception as e: print(f"Error limpiando archivos temporales: {e}")