Spaces:
Runtime error
Runtime error
from flask import Flask, send_from_directory, request, jsonify, render_template | |
from flask_cors import CORS | |
import os | |
from pathlib import Path | |
from audio_utils import AudioUtils | |
import logging | |
import google.generativeai as genai | |
import yaml | |
from inference import InferenceManager | |
from chat_log import ChatLogger | |
# Configurar logging | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__) | |
CORS(app) | |
# Cargar configuraci贸n | |
with open('config.yaml', 'r') as file: | |
config = yaml.safe_load(file) | |
# Inicializar componentes con mensajes | |
try: | |
logger.info("Iniciando sistema...") | |
logger.info("Configurando API de Google...") | |
genai.configure(api_key=config.get('GOOGLE_API_KEY', '')) | |
logger.info("Inicializando sistema de inferencia...") | |
inference = InferenceManager(config) | |
logger.info("Inicializando registro de chat...") | |
chat_logger = ChatLogger() | |
logger.info("Inicializando sistema de audio...") | |
audio_utils = AudioUtils() | |
logger.info("Sistema iniciado correctamente") | |
except Exception as e: | |
logger.error(f"Error cr铆tico inicializando el sistema: {e}") | |
raise e | |
# Configurar el directorio de archivos temporales | |
TEMP_AUDIO_DIR = Path("static/temp_audio") | |
TEMP_AUDIO_DIR.mkdir(parents=True, exist_ok=True) | |
def after_request(response): | |
# Agregar headers necesarios para ngrok | |
response.headers.add('Accept-Ranges', 'bytes') | |
response.headers.add('Access-Control-Allow-Origin', '*') | |
logger.debug(f"Response headers: {dict(response.headers)}") | |
return response | |
def serve_audio(filename): | |
try: | |
logger.info(f"Attempting to serve audio file: {filename}") | |
file_path = TEMP_AUDIO_DIR / filename | |
if not file_path.exists(): | |
logger.error(f"File not found: {file_path}") | |
return jsonify({'error': 'File not found'}), 404 | |
if not filename.endswith(('.wav', '.mp3')): | |
logger.error(f"Invalid file format: {filename}") | |
return jsonify({'error': 'Invalid file format'}), 400 | |
logger.info(f"Serving file from: {file_path}") | |
response = send_from_directory(str(TEMP_AUDIO_DIR), filename) | |
response.headers['Content-Type'] = 'audio/wav' if filename.endswith('.wav') else 'audio/mpeg' | |
response.headers['Cache-Control'] = 'no-cache' | |
return response | |
except Exception as e: | |
logger.error(f"Error serving audio file: {e}", exc_info=True) | |
return jsonify({'error': str(e)}), 500 | |
def generate_audio(): | |
try: | |
text = request.json.get('text') | |
model = request.json.get('model', 'EDGE') # EDGE como default | |
logger.info(f"Generando audio para texto: '{text}' usando modelo: {model}") | |
if not text: | |
logger.error("No se proporcion贸 texto") | |
return jsonify({'error': 'No text provided'}), 400 | |
try: | |
temp_audio_utils = AudioUtils(model_name=model) | |
audio_file = temp_audio_utils.text_to_speech(text, return_file=True) | |
if not audio_file: | |
logger.error(f"Fallo al generar audio con {model}, intentando con EDGE") | |
temp_audio_utils = AudioUtils(model_name='EDGE') | |
audio_file = temp_audio_utils.text_to_speech(text, return_file=True) | |
except Exception as e: | |
logger.error(f"Error con modelo {model}, usando EDGE: {e}") | |
temp_audio_utils = AudioUtils(model_name='EDGE') | |
audio_file = temp_audio_utils.text_to_speech(text, return_file=True) | |
if audio_file: | |
audio_url = f'/temp_audio/{audio_file}' | |
logger.info(f"Audio generado: {audio_url}") | |
file_path = TEMP_AUDIO_DIR / audio_file | |
if not file_path.exists(): | |
logger.error(f"Archivo no encontrado: {file_path}") | |
return jsonify({ | |
'error': 'Generated file not found', | |
'details': 'El archivo de audio no se gener贸 correctamente' | |
}), 500 | |
return jsonify({ | |
'audio_url': audio_url, | |
'model_used': temp_audio_utils.current_model | |
}) | |
return jsonify({ | |
'error': 'Failed to generate audio', | |
'details': 'No se pudo generar el audio con ning煤n modelo' | |
}), 500 | |
except Exception as e: | |
logger.error(f"Error cr铆tico generando audio: {e}", exc_info=True) | |
return jsonify({ | |
'error': str(e), | |
'details': 'Error interno del servidor al generar audio' | |
}), 500 | |
def send_static(path): | |
logger.debug(f"Serving static file: {path}") | |
return send_from_directory('static', path) | |
def home(): | |
# Agregar mensajes iniciales | |
initial_messages = [{ | |
'role': 'assistant', | |
'content': 'Sistema iniciado. Todos los modelos cargados correctamente.' | |
}] | |
return render_template('chat.html', messages=initial_messages) | |
def chat(): | |
try: | |
message = request.json.get('message') | |
mode = request.json.get('mode', 'seguros') # Modo por defecto | |
if not message: | |
return jsonify({'error': 'No message provided'}), 400 | |
# Registrar mensaje del usuario | |
chat_logger.log_message(message, is_user=True) | |
# Obtener respuesta usando el sistema de inferencia | |
response = inference.get_response(message, mode) | |
if response: | |
# Registrar respuesta del bot | |
chat_logger.log_message(response, is_user=False) | |
return jsonify({ | |
'response': response | |
}) | |
return jsonify({'error': 'No response generated'}), 500 | |
except Exception as e: | |
logger.error(f"Error in chat endpoint: {e}", exc_info=True) | |
return jsonify({'error': str(e)}), 500 | |
def change_mode(): | |
try: | |
mode = request.json.get('mode') | |
if mode not in ['seguros', 'creditos', 'cobranza']: | |
return jsonify({'error': 'Invalid mode'}), 400 | |
# Actualizar modo en el sistema de inferencia | |
inference.mode = mode | |
return jsonify({'success': True}) | |
except Exception as e: | |
logger.error(f"Error changing mode: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def change_model(): | |
try: | |
model = request.json.get('model') | |
if model not in ['Gemini 8b', 'Mixtral 7b']: | |
return jsonify({'error': 'Invalid model'}), 400 | |
inference.current_model = model | |
return jsonify({'success': True}) | |
except Exception as e: | |
logger.error(f"Error changing model: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def change_tts(): | |
try: | |
model = request.json.get('model') | |
if model not in ['EDGE', 'VITS', 'gTTS']: | |
return jsonify({'error': 'Invalid TTS model'}), 400 | |
# Actualizar modelo TTS | |
audio_utils = AudioUtils(model_name=model) | |
return jsonify({'success': True}) | |
except Exception as e: | |
logger.error(f"Error changing TTS model: {e}") | |
return jsonify({'error': str(e)}), 500 | |
if __name__ == "__main__": | |
try: | |
logger.info("Iniciando servidor...") | |
# Cambiar para que funcione en Hugging Face Spaces | |
app.run(host='0.0.0.0', port=7860) # Puerto est谩ndar de Spaces | |
except Exception as e: | |
logger.error(f"Error al iniciar la aplicaci贸n: {e}", exc_info=True) | |