File size: 5,256 Bytes
4340657 7ef8110 77a25bb 4340657 7ef8110 4340657 ecf0cbb 7ef8110 4340657 7ef8110 ecf0cbb 7ef8110 ecf0cbb 7ef8110 ecf0cbb 7ef8110 ecf0cbb 7ef8110 ecf0cbb 7ef8110 ecf0cbb 4340657 ecf0cbb 4340657 ecf0cbb 4340657 ecf0cbb 4340657 7ef8110 ecf0cbb 4340657 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
#/modules/database/semantic_mongo_db.py
# Importaciones estándar
import io
import base64
from datetime import datetime, timezone
import logging
# Importaciones de terceros
import matplotlib.pyplot as plt
# Importaciones locales
from .mongo_db import (
insert_document,
find_documents,
update_document,
delete_document
)
# Configuración del logger
logger = logging.getLogger(__name__) # Cambiado de name a __name__
COLLECTION_NAME = 'student_semantic_analysis'
def store_student_semantic_result(username, text, analysis_result):
"""
Guarda el resultado del análisis semántico en MongoDB.
Args:
username: Nombre del usuario
text: Texto analizado
analysis_result: Resultado del análisis
Returns:
bool: True si se guardó correctamente, False en caso contrario
"""
try:
# Convertir gráfico conceptual a formato base64
concept_graph_data = None
if 'concept_graph' in analysis_result and analysis_result['concept_graph'] is not None:
buf = io.BytesIO()
try:
analysis_result['concept_graph'].savefig(buf, format='png', dpi=300, bbox_inches='tight')
buf.seek(0)
concept_graph_data = base64.b64encode(buf.getvalue()).decode('utf-8')
plt.close(analysis_result['concept_graph']) # Cerrar la figura
except Exception as e:
logger.error(f"Error al convertir gráfico conceptual: {str(e)}")
# Convertir gráfico de entidades a formato base64
entity_graph_data = None
if 'entity_graph' in analysis_result and analysis_result['entity_graph'] is not None:
buf = io.BytesIO()
try:
analysis_result['entity_graph'].savefig(buf, format='png', dpi=300, bbox_inches='tight')
buf.seek(0)
entity_graph_data = base64.b64encode(buf.getvalue()).decode('utf-8')
plt.close(analysis_result['entity_graph']) # Cerrar la figura
except Exception as e:
logger.error(f"Error al convertir gráfico de entidades: {str(e)}")
# Crear documento para MongoDB
analysis_document = {
'username': username,
'timestamp': datetime.now(timezone.utc).isoformat(),
'text': text,
'analysis_type': 'semantic',
'key_concepts': analysis_result.get('key_concepts', []),
'concept_graph': concept_graph_data,
'entities': analysis_result.get('entities', {}),
'entity_graph': entity_graph_data
}
# Insertar en MongoDB
result = insert_document(COLLECTION_NAME, analysis_document)
if result:
logger.info(f"Análisis semántico guardado con ID: {result} para el usuario: {username}")
return True
logger.error("No se pudo insertar el documento en MongoDB")
return False
except Exception as e:
logger.error(f"Error al guardar el análisis semántico: {str(e)}")
return False
finally:
# Asegurarnos de cerrar cualquier figura pendiente
plt.close('all')
def get_student_semantic_analysis(username, limit=10):
"""
Recupera los análisis semánticos de un estudiante.
Args:
username: Nombre del usuario
limit: Número máximo de análisis a retornar
Returns:
list: Lista de análisis semánticos
"""
query = {"username": username, "analysis_type": "semantic"}
return find_documents(COLLECTION_NAME, query, sort=[("timestamp", -1)], limit=limit)
def update_student_semantic_analysis(analysis_id, update_data):
"""
Actualiza un análisis semántico existente.
Args:
analysis_id: ID del análisis a actualizar
update_data: Datos a actualizar
"""
query = {"_id": analysis_id}
update = {"$set": update_data}
return update_document(COLLECTION_NAME, query, update)
def delete_student_semantic_analysis(analysis_id):
"""
Elimina un análisis semántico.
Args:
analysis_id: ID del análisis a eliminar
"""
query = {"_id": analysis_id}
return delete_document(COLLECTION_NAME, query)
def get_student_semantic_data(username):
"""
Obtiene todos los análisis semánticos de un estudiante.
Args:
username: Nombre del usuario
Returns:
dict: Diccionario con todos los análisis del estudiante
"""
analyses = get_student_semantic_analysis(username, limit=None)
formatted_analyses = []
for analysis in analyses:
formatted_analysis = {
'timestamp': analysis['timestamp'],
'text': analysis['text'],
'key_concepts': analysis['key_concepts'],
'entities': analysis['entities']
# No incluimos los gráficos en el resumen general
}
formatted_analyses.append(formatted_analysis)
return {
'entries': formatted_analyses
}
# Exportar las funciones necesarias
__all__ = [
'store_student_semantic_result',
'get_student_semantic_analysis',
'update_student_semantic_analysis',
'delete_student_semantic_analysis',
'get_student_semantic_data'
] |