Spaces:
Runtime error
Runtime error
File size: 18,702 Bytes
d2c7775 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 |
import pygame
import tempfile
import uuid
import threading
import asyncio
from pathlib import Path
from TTS.api import TTS
from gtts import gTTS
import edge_tts
import logging
import time
import os
class TTSUtils:
AVAILABLE_MODELS = {
'EDGE': {
'name': "es-MX-JorgeNeural",
'description': "Voz masculina de Microsoft Edge (MX)",
'type': 'edge',
'rate': '+25%'
},
'EDGE_ES': {
'name': "es-ES-AlvaroNeural",
'description': "Voz masculina de Microsoft Edge (ES)",
'type': 'edge',
'rate': '+25%'
},
'VITS': {
'name': "tts_models/es/css10/vits",
'description': "Voz masculina de VITS (ES)",
'type': 'local',
'config': {
'speed': 1.25,
'model_path': "tts_models/es/css10/vits"
}
}
}
def __init__(self, model_name='EDGE', elevenlabs_api_key=None):
"""Inicializa el motor TTS"""
self.is_speaking = False
self.should_stop = False
self.temp_dir = Path(tempfile.gettempdir()) / "chatbot_audio"
self.temp_dir.mkdir(exist_ok=True)
self.tts = None
self.audio_initialized = False
self.current_model = model_name
print(f"Inicializando TTS con modelo: {model_name}")
try:
if pygame.mixer.get_init():
pygame.mixer.quit()
pygame.mixer.init(frequency=16000, size=-16, channels=1, buffer=2048)
pygame.mixer.music.set_volume(0.8)
self.audio_initialized = True
print("Audio inicializado correctamente")
except Exception as e:
print(f"Error inicializando audio: {str(e)}")
self.audio_initialized = False
self.play_lock = threading.Lock()
self.clock = pygame.time.Clock()
self.init_audio()
# Limpiar archivos temporales antiguos
self._cleanup_old_files()
def _cleanup_old_files(self, max_age_hours=1):
"""Limpia archivos temporales antiguos"""
try:
current_time = time.time()
for file in self.temp_dir.glob("*"):
if file.is_file():
file_age = current_time - file.stat().st_mtime
if file_age > max_age_hours * 3600: # Convertir horas a segundos
try:
file.unlink()
except:
pass
except Exception as e:
print(f"Error limpiando archivos temporales: {e}")
def _verify_audio_system(self):
"""Verifica el estado del sistema de audio"""
if not self.audio_initialized or not pygame.mixer.get_init():
try:
if pygame.mixer.get_init():
pygame.mixer.quit()
pygame.mixer.init(frequency=44100, size=-16, channels=2, buffer=4096)
pygame.mixer.music.set_volume(1.0)
self.audio_initialized = True
return True
except Exception as e:
print(f"Error reinicializando audio: {e}")
return False
return True
def init_audio(self):
"""Inicializa el modelo TTS seleccionado"""
try:
if self.current_model == 'VITS':
model_info = self.AVAILABLE_MODELS[self.current_model]
print(f"Cargando modelo VITS: {model_info['name']}")
try:
self.tts = TTS(model_name=model_info['name'])
if hasattr(self.tts, 'synthesizer') and hasattr(self.tts.synthesizer, 'tts_config'):
self.tts.synthesizer.tts_config.update(model_info['config'])
print("Modelo VITS cargado correctamente")
return True
else:
print("Error: El modelo VITS no tiene la estructura esperada")
self.current_model = 'EDGE' # Fallback a Edge si hay error
return False
except Exception as vits_error:
print(f"Error cargando modelo VITS: {vits_error}")
self.current_model = 'EDGE' # Fallback a Edge si hay error
return False
return True
except Exception as e:
print(f"Error inicializando audio: {e}")
self.current_model = 'EDGE' # Fallback a Edge si hay error
return False
def _number_to_words(self, number):
"""Convierte un número a palabras en español"""
UNITS = ['', 'uno', 'dos', 'tres', 'cuatro', 'cinco', 'seis', 'siete', 'ocho', 'nueve']
TENS = ['', 'diez', 'veinte', 'treinta', 'cuarenta', 'cincuenta', 'sesenta', 'setenta', 'ochenta', 'noventa']
TEENS = ['diez', 'once', 'doce', 'trece', 'catorce', 'quince', 'dieciséis', 'diecisiete', 'dieciocho', 'diecinueve']
try:
num = int(number)
if num == 0:
return 'cero'
elif num < 0:
return f"menos {self._number_to_words(abs(num))}"
elif num < 10:
return UNITS[num]
elif num < 20:
return TEENS[num - 10]
elif num < 100:
tens = num // 10
units = num % 10
if units == 0:
return TENS[tens]
else:
return f"{TENS[tens]} y {UNITS[units]}"
else:
return str(num) # Para números mayores a 99, mantener dígitos
except:
return number # Si hay error, devolver el número original
def _clean_text(self, text):
"""Limpia el texto de caracteres especiales antes de la síntesis"""
if not text:
return text
# Reemplazar asteriscos y otros caracteres especiales
replacements = {
'*': '',
'#': '',
'`': '',
'~': '',
'|': '',
'>': '',
'<': '',
'\\': '',
'&': 'y',
'_': ' ',
'...': ',',
'..': ',',
'---': ',',
'--': ',',
'%': ' por ciento',
'$': ' pesos',
'=': ' igual a ',
'+': ' más ',
'@': ' arroba ',
}
cleaned_text = text
for char, replacement in replacements.items():
cleaned_text = cleaned_text.replace(char, replacement)
# Convertir números a palabras
words = []
for word in cleaned_text.split():
# Verificar si es un número (entero o decimal)
if word.replace('.', '').replace('-', '').isdigit():
# Si es decimal
if '.' in word:
parts = word.split('.')
if len(parts) == 2:
integer_part = self._number_to_words(parts[0])
decimal_part = self._number_to_words(parts[1])
words.append(f"{integer_part} punto {decimal_part}")
else:
words.append(word)
else:
words.append(self._number_to_words(word))
else:
words.append(word)
cleaned_text = ' '.join(words)
# Eliminar espacios múltiples
cleaned_text = ' '.join(cleaned_text.split())
return cleaned_text
def text_to_speech(self, text, save_path=None):
"""Genera audio a partir de texto con verificaciones mejoradas"""
if not text:
return None
# Limpiar el texto antes de procesarlo
text = self._clean_text(text)
if not text:
return None
if not self._verify_audio_system():
print("Sistema de audio no disponible")
return None
try:
temp_file = save_path or str(self.temp_dir / f"{uuid.uuid4()}.mp3")
print(f"Generando audio para modelo: {self.current_model}")
# Verificar modelo actual y generar audio
try:
if self.current_model == 'VITS':
print("Usando modelo VITS")
if not self.tts:
print("Inicializando modelo VITS...")
if not self.init_audio():
print("Fallback a Edge debido a error en inicialización de VITS")
return self.fallback_to_edge(text, temp_file)
try:
self.tts.tts_to_file(
text=text,
file_path=temp_file,
speed=self.AVAILABLE_MODELS['VITS']['config']['speed']
)
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0:
print(f"Audio generado correctamente con VITS: {os.path.getsize(temp_file)} bytes")
return temp_file
raise Exception("Archivo de audio VITS inválido")
except Exception as vits_error:
print(f"Error generando audio con VITS: {vits_error}")
return self.fallback_to_edge(text, temp_file)
elif self.current_model in ['EDGE', 'EDGE_ES']:
return self.fallback_to_edge(text, temp_file)
except Exception as primary_error:
print(f"Error con el modelo primario {self.current_model}: {primary_error}")
return self.fallback_to_gtts(text, temp_file)
return temp_file
except Exception as e:
print(f"Error en text_to_speech: {e}")
return None
finally:
self._cleanup_old_files()
def fallback_to_edge(self, text, temp_file):
"""Método de respaldo usando Edge TTS"""
try:
voice = self.AVAILABLE_MODELS['EDGE']['name']
print(f"Usando voz Edge como respaldo: {voice}")
for attempt in range(3):
try:
async def tts_with_timeout():
return await asyncio.wait_for(
self.edge_tts_speak(text, voice, temp_file),
timeout=15.0
)
asyncio.run(tts_with_timeout())
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0:
print(f"Audio generado correctamente con Edge: {os.path.getsize(temp_file)} bytes")
return temp_file
raise Exception("Archivo de audio Edge inválido")
except Exception as e:
print(f"Intento {attempt + 1} fallido con Edge: {e}")
if attempt == 2:
return self.fallback_to_gtts(text, temp_file)
time.sleep(2 ** attempt)
except Exception as edge_error:
print(f"Error con Edge TTS: {edge_error}")
return self.fallback_to_gtts(text, temp_file)
def fallback_to_gtts(self, text, temp_file):
"""Método final de respaldo usando gTTS"""
print("Usando gTTS como último respaldo")
try:
tts = gTTS(text=text, lang='es', slow=False)
temp_normal = str(self.temp_dir / f"temp_normal_{uuid.uuid4()}.mp3")
tts.save(temp_normal)
import ffmpeg
stream = ffmpeg.input(temp_normal)
stream = ffmpeg.output(stream, temp_file, acodec='libmp3lame', atempo=1.25)
ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True)
try:
os.remove(temp_normal)
except:
pass
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0:
print(f"Audio generado correctamente con gTTS: {os.path.getsize(temp_file)} bytes")
return temp_file
raise Exception("Archivo de audio gTTS inválido")
except Exception as gtts_error:
print(f"Error con gTTS: {gtts_error}")
return None
async def edge_tts_speak(self, text, voice, output_file):
"""Genera audio usando edge-tts"""
try:
print(f"Generando audio con voz: {voice}")
rate = self.AVAILABLE_MODELS[self.current_model]['rate']
print(f"Usando rate: {rate}")
communicate = edge_tts.Communicate(text, voice, rate=rate)
await communicate.save(str(output_file))
print(f"Audio generado correctamente con {voice}")
return True
except Exception as e:
print(f"Error generando audio con edge-tts: {e}")
raise
def stop_speaking(self):
"""Detiene la reproducción actual"""
if self.is_speaking:
try:
self.should_stop = True
pygame.mixer.music.stop()
pygame.mixer.music.unload()
self.is_speaking = False
print("Reproducción detenida por interrupción")
except Exception as e:
print(f"Error al detener el audio: {e}")
finally:
self.is_speaking = False
self.should_stop = False
def change_model(self, model_name):
"""Cambia el modelo TTS actual"""
if model_name not in self.AVAILABLE_MODELS:
print(f"Modelo {model_name} no disponible")
return False
try:
print(f"Cambiando a modelo {model_name}...")
self.current_model = model_name
self.init_audio()
return True
except Exception as e:
print(f"Error cambiando modelo: {e}")
return False
def is_currently_speaking(self):
"""Verifica si hay audio reproduciéndose"""
return self.is_speaking
def create_audio_file(self, text, output_file):
"""Crea un archivo de audio permanente"""
try:
if 'EDGE' in self.current_model:
voice = self.AVAILABLE_MODELS[self.current_model]['name']
async def tts_with_timeout():
return await asyncio.wait_for(
self.edge_tts_speak(text, voice, output_file),
timeout=5.0
)
asyncio.run(tts_with_timeout())
elif self.current_model == 'gTTS':
tts = gTTS(text=text, lang='es', slow=False)
tts.save(str(output_file))
else: # VITS
self.tts.tts_to_file(
text=text,
file_path=str(output_file),
speaker_wav=None,
split_sentences=False
)
return str(output_file)
except Exception as e:
print(f"Error creando archivo de audio: {e}")
return None
def play_audio(self, file_path):
"""Reproduce un archivo de audio con verificaciones mejoradas y escucha activa"""
if not self._verify_audio_system():
raise Exception("Sistema de audio no disponible")
try:
if not Path(file_path).exists():
raise FileNotFoundError(f"Archivo no encontrado: {file_path}")
if not Path(file_path).stat().st_size > 0:
raise ValueError("Archivo de audio vacío o corrupto")
with self.play_lock:
if self.is_speaking:
self.stop_speaking()
self.is_speaking = True
pygame.mixer.music.load(file_path)
pygame.mixer.music.play()
# Notificar al detector de voz que estamos reproduciendo
if hasattr(self, 'voice_detector'):
self.voice_detector.update_last_audio_output()
# Mantener la escucha activa pero con umbral más alto
self.voice_detector.set_high_threshold_mode(True)
# Configurar callback para cuando termine la reproducción
def on_music_end():
self.is_speaking = False
self.should_stop = False
# Restaurar umbral normal de escucha
if hasattr(self, 'voice_detector'):
self.voice_detector.set_high_threshold_mode(False)
pygame.mixer.music.set_endevent(pygame.USEREVENT)
pygame.event.set_allowed(pygame.USEREVENT)
# Esperar a que termine sin bloquear
while pygame.mixer.music.get_busy() and not self.should_stop:
for event in pygame.event.get():
if event.type == pygame.USEREVENT:
on_music_end()
self.clock.tick(30)
if self.should_stop:
self.stop_speaking()
# Restaurar umbral normal al interrumpir
if hasattr(self, 'voice_detector'):
self.voice_detector.set_high_threshold_mode(False)
except Exception as e:
print(f"Error reproduciendo audio: {e}")
self.is_speaking = False
self.audio_initialized = False
if hasattr(self, 'voice_detector'):
self.voice_detector.set_high_threshold_mode(False)
raise
finally:
self.is_speaking = False
self.should_stop = False
def set_voice_detector(self, voice_detector):
"""Establece el detector de voz para coordinar interrupciones"""
self.voice_detector = voice_detector
def __del__(self):
try:
pygame.mixer.quit()
if self.temp_dir.exists():
for file in self.temp_dir.glob("*"):
try:
file.unlink()
except:
pass
self.temp_dir.rmdir()
except:
pass |