Spaces:
Runtime error
Runtime error
from flask import render_template, request, jsonify, session, abort | |
import base64 | |
import os | |
import yaml | |
import logging | |
from datetime import datetime, timedelta | |
import traceback | |
class Routes: | |
def __init__(self, app, chatbot): | |
self.app = app | |
self.chatbot = chatbot | |
# Crear directorios necesarios | |
self.temp_dir = os.path.join(os.path.dirname(__file__), 'temp') | |
self.session_dir = os.path.join(os.path.dirname(__file__), 'sessions') | |
self.log_dir = os.path.join(os.path.dirname(__file__), 'logs') | |
# Asegurar que existan los directorios | |
for directory in [self.temp_dir, self.session_dir, self.log_dir]: | |
os.makedirs(directory, exist_ok=True) | |
# Configurar el manejo de sesiones | |
self.app.config['SESSION_TYPE'] = 'filesystem' | |
self.app.config['SESSION_FILE_DIR'] = self.session_dir | |
self.app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=2) | |
# Inicializar logger | |
self.setup_logger() | |
# Configurar rutas | |
self.setup_routes() | |
def setup_logger(self): | |
"""Configurar el logger para la aplicaci贸n""" | |
log_file = os.path.join(self.log_dir, 'routes.log') | |
self.logger = logging.getLogger('routes') | |
self.logger.setLevel(logging.DEBUG) | |
handler = logging.FileHandler(log_file) | |
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) | |
self.logger.addHandler(handler) | |
self.logger.info("Rutas inicializadas") | |
def setup_routes(self): | |
def index(): | |
try: | |
# Asegurar que existe la sesi贸n | |
session_id = self.chatbot.session_manager.get_session() | |
self.logger.info(f"Nueva sesi贸n iniciada: {session_id}") | |
# Crear directorio de sesi贸n si no existe | |
session_path = os.path.join(self.session_dir, session_id) | |
os.makedirs(session_path, exist_ok=True) | |
return render_template('index.html', config=self.app.config) | |
except Exception as e: | |
self.logger.error(f"Error en index: {str(e)}") | |
return render_template('error.html', error="Error al cargar la p谩gina") | |
def inferencia(): | |
try: | |
if not request.is_json: | |
abort(400, "Se requiere contenido JSON") | |
data = request.get_json() | |
session_id = self.chatbot.session_manager.get_session() | |
mensaje = data.get('mensaje') | |
if not mensaje: | |
return jsonify({ | |
'success': False, | |
'error': 'Mensaje vac铆o' | |
}), 400 | |
# Obtener respuesta del modelo | |
respuesta = self.chatbot.procesar_mensaje(mensaje) | |
if not respuesta: | |
return jsonify({ | |
'success': False, | |
'error': 'Error al procesar el mensaje' | |
}), 500 | |
# Guardar mensajes en el historial | |
self.chatbot.session_manager.add_message_to_history(session_id, mensaje, 'user') | |
self.chatbot.session_manager.add_message_to_history(session_id, respuesta, 'bot') | |
return jsonify({ | |
'success': True, | |
'texto': respuesta, | |
'session_id': session_id | |
}) | |
except Exception as e: | |
self.logger.error(f"Error en inferencia: {str(e)}\n{traceback.format_exc()}") | |
return jsonify({ | |
'success': False, | |
'error': 'Error interno del servidor' | |
}), 500 | |
def generar_audio(): | |
try: | |
session_id = self.chatbot.session_manager.get_session() | |
data = request.get_json() | |
texto = data.get('texto', '') | |
tts_model = data.get('tts', 'EDGE') | |
if not texto: | |
return jsonify({ | |
'success': False, | |
'error': 'No se recibi贸 texto para generar audio' | |
}) | |
# Crear directorio temporal si no existe | |
temp_dir = os.path.join(os.path.dirname(__file__), 'temp') | |
os.makedirs(temp_dir, exist_ok=True) | |
# Generar nombre 煤nico para el archivo temporal | |
temp_file = os.path.join(temp_dir, f"temp_audio_{os.urandom(4).hex()}.mp3") | |
# Generar el audio | |
print(f"Generando audio para: {texto[:50]}...") | |
audio_path = self.chatbot.tts.text_to_speech(texto, save_path=temp_file) | |
if not audio_path or not os.path.exists(audio_path): | |
raise Exception('No se pudo generar el archivo de audio') | |
# Verificar tama帽o del archivo | |
if os.path.getsize(audio_path) == 0: | |
raise Exception('El archivo de audio generado est谩 vac铆o') | |
# Convertir audio a base64 | |
try: | |
with open(audio_path, 'rb') as audio_file: | |
audio_data = base64.b64encode(audio_file.read()).decode('utf-8') | |
finally: | |
# Limpiar archivo temporal | |
try: | |
os.remove(audio_path) | |
except Exception as e: | |
print(f"Error eliminando archivo temporal: {e}") | |
# Actualizar estado del audio en la sesi贸n | |
self.chatbot.session_manager.update_audio_state( | |
session_id, | |
is_playing=True, | |
current_audio=audio_data | |
) | |
print("Audio generado correctamente") | |
return jsonify({ | |
'success': True, | |
'audio': audio_data, | |
'audio_type': 'mp3' | |
}) | |
except Exception as e: | |
print(f"Error generando audio: {str(e)}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}) | |
def chat(): | |
try: | |
session_id = self.chatbot.session_manager.get_session() | |
data = request.get_json() | |
# Obtener todos los par谩metros necesarios | |
mensaje = data.get('mensaje', '') | |
mode = data.get('mode', 'soporte') | |
model = data.get('model', 'gemini') | |
tts = data.get('tts', 'EDGE') | |
self.logger.info(f"Chat - Sesi贸n: {session_id} | Mensaje: {mensaje} | Modo: {mode}") | |
if not mensaje: | |
return jsonify({ | |
'success': False, | |
'error': 'No se recibi贸 mensaje para procesar' | |
}) | |
# Crear directorio de sesi贸n si no existe | |
session_path = os.path.join(self.session_dir, session_id) | |
os.makedirs(session_path, exist_ok=True) | |
# Guardar mensaje en archivo de historial | |
history_file = os.path.join(session_path, 'chat_history.txt') | |
with open(history_file, 'a', encoding='utf-8') as f: | |
f.write(f"Usuario ({datetime.now()}): {mensaje}\n") | |
# Actualizar modo y modelo en la sesi贸n | |
self.chatbot.session_manager.update_session(session_id, 'mode', mode) | |
self.chatbot.session_manager.update_session(session_id, 'model', model) | |
self.chatbot.session_manager.update_session(session_id, 'tts', tts) | |
# Obtener respuesta del modelo | |
respuesta = self.chatbot.procesar_mensaje(mensaje) | |
if not respuesta: | |
return jsonify({ | |
'success': False, | |
'error': 'No se pudo obtener respuesta del modelo' | |
}) | |
print(f"Respuesta generada: {respuesta}") | |
# Guardar mensajes en el historial | |
self.chatbot.session_manager.add_message_to_history(session_id, mensaje, 'user') | |
self.chatbot.session_manager.add_message_to_history(session_id, respuesta, 'bot') | |
# Generar el audio | |
try: | |
temp_dir = os.path.join(os.path.dirname(__file__), 'temp') | |
os.makedirs(temp_dir, exist_ok=True) | |
temp_file = os.path.join(temp_dir, f"temp_audio_{os.urandom(4).hex()}.wav") | |
print("Generando audio para la respuesta...") | |
audio_path = self.chatbot.tts.text_to_speech(respuesta, save_path=temp_file) | |
if not audio_path or not os.path.exists(audio_path): | |
print("No se pudo generar el archivo de audio") | |
return jsonify({ | |
'success': True, | |
'texto': respuesta, | |
'session_id': session_id | |
}) | |
with open(audio_path, 'rb') as audio_file: | |
audio_data = base64.b64encode(audio_file.read()).decode('utf-8') | |
try: | |
os.remove(audio_path) | |
except: | |
pass | |
self.chatbot.session_manager.update_audio_state( | |
session_id, | |
is_playing=True, | |
current_audio=audio_data | |
) | |
print("Audio generado correctamente") | |
return jsonify({ | |
'success': True, | |
'texto': respuesta, | |
'audio': audio_data, | |
'audio_type': 'wav', | |
'session_id': session_id | |
}) | |
except Exception as audio_error: | |
print(f"Error generando audio: {audio_error}") | |
return jsonify({ | |
'success': True, | |
'texto': respuesta, | |
'session_id': session_id | |
}) | |
except Exception as e: | |
print(f"Error en chat: {str(e)}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}) | |
def procesar_voz(): | |
try: | |
session_id = self.chatbot.session_manager.get_session() | |
data = request.get_json() | |
# Obtener todos los par谩metros necesarios | |
texto = data.get('texto', '') | |
mode = data.get('mode', 'soporte') | |
model = data.get('model', 'gemini') | |
tts = data.get('tts', 'EDGE') | |
self.logger.info(f"Voz - Sesi贸n: {session_id} | Texto: {texto} | Modo: {mode}") | |
if not texto: | |
return jsonify({ | |
'success': False, | |
'error': 'No se recibi贸 texto para procesar' | |
}) | |
# Crear directorio de sesi贸n si no existe | |
session_path = os.path.join(self.session_dir, session_id) | |
os.makedirs(session_path, exist_ok=True) | |
# Guardar mensaje en archivo de historial | |
history_file = os.path.join(session_path, 'chat_history.txt') | |
with open(history_file, 'a', encoding='utf-8') as f: | |
f.write(f"Voz ({datetime.now()}): {texto}\n") | |
# Actualizar modo y modelo en la sesi贸n | |
self.chatbot.session_manager.update_session(session_id, 'mode', mode) | |
self.chatbot.session_manager.update_session(session_id, 'model', model) | |
self.chatbot.session_manager.update_session(session_id, 'tts', tts) | |
# Obtener respuesta del modelo | |
respuesta = self.chatbot.procesar_mensaje(texto) | |
if not respuesta: | |
self.logger.error(f"No se pudo obtener respuesta para: {texto}") | |
return jsonify({ | |
'success': False, | |
'error': 'No se pudo obtener respuesta del modelo' | |
}) | |
self.logger.info(f"Respuesta generada: {respuesta}") | |
# Guardar respuesta en historial | |
with open(history_file, 'a', encoding='utf-8') as f: | |
f.write(f"Bot ({datetime.now()}): {respuesta}\n") | |
# Guardar mensajes en el historial de sesi贸n | |
self.chatbot.session_manager.add_message_to_history(session_id, texto, 'user') | |
self.chatbot.session_manager.add_message_to_history(session_id, respuesta, 'bot') | |
# Generar el audio | |
try: | |
temp_file = os.path.join(self.temp_dir, f"temp_audio_{os.urandom(4).hex()}.wav") | |
self.logger.info("Generando audio para la respuesta de voz...") | |
audio_path = self.chatbot.tts.text_to_speech(respuesta, save_path=temp_file) | |
if not audio_path or not os.path.exists(audio_path): | |
self.logger.error("No se pudo generar el archivo de audio") | |
return jsonify({ | |
'success': True, | |
'texto': respuesta, | |
'session_id': session_id | |
}) | |
with open(audio_path, 'rb') as audio_file: | |
audio_data = base64.b64encode(audio_file.read()).decode('utf-8') | |
try: | |
os.remove(audio_path) | |
except Exception as e: | |
self.logger.warning(f"Error limpiando archivo temporal: {e}") | |
self.chatbot.session_manager.update_audio_state( | |
session_id, | |
is_playing=True, | |
current_audio=audio_data | |
) | |
self.logger.info("Audio generado correctamente") | |
return jsonify({ | |
'success': True, | |
'texto': respuesta, | |
'audio': audio_data, | |
'audio_type': 'wav', | |
'session_id': session_id | |
}) | |
except Exception as audio_error: | |
self.logger.error(f"Error generando audio: {audio_error}") | |
return jsonify({ | |
'success': True, | |
'texto': respuesta, | |
'session_id': session_id | |
}) | |
except Exception as e: | |
self.logger.error(f"Error en procesar_voz: {str(e)}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}) | |
def cambiar_modo(): | |
try: | |
session_id = self.chatbot.session_manager.get_session() | |
data = request.get_json() | |
mode = data.get('mode') | |
if not mode: | |
return jsonify({ | |
'success': False, | |
'error': 'No se especific贸 el modo' | |
}) | |
self.logger.info(f"Cambiando modo - Sesi贸n: {session_id} | Modo: {mode}") | |
if mode in self.chatbot.flow_bot.FLOWS: | |
self.chatbot.session_manager.update_session(session_id, 'mode', mode) | |
mensaje = self.chatbot.flow_bot.get_success_message(mode) | |
# Guardar cambio de modo en historial | |
session_path = os.path.join(self.session_dir, session_id) | |
os.makedirs(session_path, exist_ok=True) | |
history_file = os.path.join(session_path, 'chat_history.txt') | |
with open(history_file, 'a', encoding='utf-8') as f: | |
f.write(f"Sistema ({datetime.now()}): Cambio a modo {mode}\n") | |
f.write(f"Bot ({datetime.now()}): {mensaje}\n") | |
# Generar audio para el mensaje | |
try: | |
temp_file = os.path.join(self.temp_dir, f"temp_audio_{os.urandom(4).hex()}.wav") | |
self.logger.info("Generando audio para cambio de modo...") | |
audio_path = self.chatbot.tts.text_to_speech(mensaje, save_path=temp_file) | |
if not audio_path or not os.path.exists(audio_path): | |
self.logger.error("No se pudo generar el archivo de audio para el modo") | |
return jsonify({ | |
'success': True, | |
'texto': mensaje | |
}) | |
with open(audio_path, 'rb') as audio_file: | |
audio_data = base64.b64encode(audio_file.read()).decode('utf-8') | |
try: | |
os.remove(audio_path) | |
except Exception as e: | |
self.logger.warning(f"Error limpiando archivo temporal: {e}") | |
self.logger.info("Audio de modo generado correctamente") | |
return jsonify({ | |
'success': True, | |
'texto': mensaje, | |
'audio': audio_data, | |
'audio_type': 'wav' | |
}) | |
except Exception as audio_error: | |
self.logger.error(f"Error generando audio de modo: {audio_error}") | |
return jsonify({ | |
'success': True, | |
'texto': mensaje | |
}) | |
return jsonify({ | |
'success': False, | |
'error': 'Modo no v谩lido' | |
}) | |
except Exception as e: | |
self.logger.error(f"Error cambiando modo: {str(e)}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}) | |
def cambiar_modelo(): | |
try: | |
session_id = self.chatbot.session_manager.get_session() | |
data = request.get_json() | |
model = data.get('model', 'gemini') | |
self.chatbot.session_manager.update_session(session_id, 'model', model) | |
print(f"Modelo cambiado a: {model}") | |
return jsonify({'success': True}) | |
except Exception as e: | |
print(f"Error cambiando modelo: {str(e)}") | |
return jsonify({'success': False, 'error': str(e)}) | |
def cambiar_tts(): | |
try: | |
session_id = self.chatbot.session_manager.get_session() | |
data = request.get_json() | |
tts = data.get('model') | |
print(f"Solicitud de cambio de modelo TTS recibida: {tts}") | |
if not tts or tts not in ['EDGE', 'EDGE_ES', 'VITS']: | |
print(f"Modelo TTS inv谩lido: {tts}") | |
return jsonify({ | |
'error': 'Modelo de voz inv谩lido' | |
}), 400 | |
# Actualizar el modelo TTS | |
self.chatbot.tts.current_model = tts | |
self.chatbot.session_manager.update_session(session_id, 'tts', tts) | |
print(f"TTS cambiado exitosamente a: {tts}") | |
return jsonify({'success': True}) | |
except Exception as e: | |
print(f"Error cambiando TTS: {str(e)}") | |
return jsonify({'success': False, 'error': str(e)}) | |
def get_session_data(): | |
try: | |
session_id = self.chatbot.session_manager.get_session() | |
data = self.chatbot.session_manager.get_session_data(session_id) | |
return jsonify(data) | |
except Exception as e: | |
print(f"Error obteniendo datos de sesi贸n: {str(e)}") | |
return jsonify({'error': str(e)}) | |
def update_audio_state(): | |
session_id = self.chatbot.session_manager.get_session() | |
is_playing = request.json.get('is_playing', False) | |
current_audio = request.json.get('current_audio') | |
self.chatbot.session_manager.update_audio_state( | |
session_id, | |
is_playing=is_playing, | |
current_audio=current_audio | |
) | |
return jsonify({'success': True}) | |
def save_config(): | |
try: | |
config_data = request.json | |
# Actualizar el archivo config.yaml | |
config_path = os.path.join(os.path.dirname(__file__), 'config.yaml') | |
with open(config_path, 'r') as file: | |
current_config = yaml.safe_load(file) or {} | |
# Actualizar solo las claves de API | |
current_config['OPENAI_API_KEY'] = config_data.get('openai_key', '') | |
current_config['HUGGINGFACE_TOKEN'] = config_data.get('huggingface_key', '') | |
current_config['ELEVENLABS_API_KEY'] = config_data.get('elevenlabs_key', '') | |
with open(config_path, 'w') as file: | |
yaml.dump(current_config, file) | |
# Reiniciar los componentes que usan las APIs | |
self.chatbot.reinit_components() | |
return jsonify({'status': 'success'}) | |
except Exception as e: | |
return jsonify({'status': 'error', 'message': str(e)}), 500 | |
def get_config(): | |
try: | |
config_path = os.path.join(os.path.dirname(__file__), 'config.yaml') | |
with open(config_path, 'r') as file: | |
current_config = yaml.safe_load(file) or {} | |
return jsonify({ | |
'openai_key': current_config.get('OPENAI_API_KEY', ''), | |
'huggingface_key': current_config.get('HUGGINGFACE_TOKEN', ''), | |
'elevenlabs_key': current_config.get('ELEVENLABS_API_KEY', '') | |
}) | |
except Exception as e: | |
return jsonify({'status': 'error', 'message': str(e)}), 500 | |
def not_found_error(error): | |
return jsonify({'error': 'Recurso no encontrado'}), 404 | |
def internal_error(error): | |
return jsonify({'error': 'Error interno del servidor'}), 500 | |