Spaces:
				
			
			
	
			
			
		Sleeping
		
	
	
	
			
			
	
	
	
	
		
		
		Sleeping
		
	| # modules/database/discourse_mongo_db.py | |
| from .mongo_db import insert_document, find_documents, update_document, delete_document | |
| from datetime import datetime, timezone | |
| import logging | |
| import io | |
| import base64 | |
| logger = logging.getLogger(__name__) | |
| COLLECTION_NAME = 'student_discourse_analysis' | |
| def store_student_discourse_result(username, text1, text2, analysis_result): | |
| """ | |
| Guarda el resultado del análisis de discurso comparativo en MongoDB. | |
| Args: | |
| username: Nombre del usuario | |
| text1: Primer texto analizado (patrón) | |
| text2: Segundo texto analizado (comparación) | |
| analysis_result: Resultado del análisis | |
| """ | |
| try: | |
| # Convertir gráficos a formato base64 si existen | |
| graph1_data = None | |
| graph2_data = None | |
| if 'graph1' in analysis_result: | |
| buf = io.BytesIO() | |
| analysis_result['graph1'].savefig(buf, format='png') | |
| buf.seek(0) | |
| graph1_data = base64.b64encode(buf.getvalue()).decode('utf-8') | |
| if 'graph2' in analysis_result: | |
| buf = io.BytesIO() | |
| analysis_result['graph2'].savefig(buf, format='png') | |
| buf.seek(0) | |
| graph2_data = base64.b64encode(buf.getvalue()).decode('utf-8') | |
| # Crear documento para MongoDB | |
| analysis_document = { | |
| 'username': username, | |
| 'timestamp': datetime.now(timezone.utc).isoformat(), | |
| 'text1': text1, | |
| 'text2': text2, | |
| 'analysis_type': 'discourse', | |
| 'key_concepts1': analysis_result.get('key_concepts1', []), | |
| 'key_concepts2': analysis_result.get('key_concepts2', []), | |
| 'graph1': graph1_data, | |
| 'graph2': graph2_data | |
| } | |
| # Insertar en MongoDB | |
| result = insert_document(COLLECTION_NAME, analysis_document) | |
| if result: | |
| logger.info(f"Análisis del discurso guardado con ID: {result} para el usuario: {username}") | |
| return True | |
| logger.error("No se pudo insertar el documento en MongoDB") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error al guardar el análisis del discurso: {str(e)}") | |
| return False | |
| def get_student_discourse_analysis(username, limit=10): | |
| """ | |
| Recupera los análisis del discurso de un estudiante. | |
| """ | |
| try: | |
| query = { | |
| "username": username, | |
| "analysis_type": "discourse" | |
| } | |
| return find_documents(COLLECTION_NAME, query, sort=[("timestamp", -1)], limit=limit) | |
| except Exception as e: | |
| logger.error(f"Error al recuperar análisis del discurso: {str(e)}") | |
| return [] | |
| def get_student_discourse_data(username): | |
| """ | |
| Obtiene un resumen de los análisis del discurso de un estudiante. | |
| """ | |
| try: | |
| analyses = get_student_discourse_analysis(username, limit=None) | |
| formatted_analyses = [] | |
| for analysis in analyses: | |
| formatted_analysis = { | |
| 'timestamp': analysis['timestamp'], | |
| 'text1': analysis.get('text1', ''), | |
| 'text2': analysis.get('text2', ''), | |
| 'key_concepts1': analysis.get('key_concepts1', []), | |
| 'key_concepts2': analysis.get('key_concepts2', []) | |
| } | |
| formatted_analyses.append(formatted_analysis) | |
| return {'entries': formatted_analyses} | |
| except Exception as e: | |
| logger.error(f"Error al obtener datos del discurso: {str(e)}") | |
| return {'entries': []} | |
| def update_student_discourse_analysis(analysis_id, update_data): | |
| """ | |
| Actualiza un análisis del discurso existente. | |
| """ | |
| try: | |
| query = {"_id": analysis_id} | |
| update = {"$set": update_data} | |
| return update_document(COLLECTION_NAME, query, update) | |
| except Exception as e: | |
| logger.error(f"Error al actualizar análisis del discurso: {str(e)}") | |
| return False | |
| def delete_student_discourse_analysis(analysis_id): | |
| """ | |
| Elimina un análisis del discurso. | |
| """ | |
| try: | |
| query = {"_id": analysis_id} | |
| return delete_document(COLLECTION_NAME, query) | |
| except Exception as e: | |
| logger.error(f"Error al eliminar análisis del discurso: {str(e)}") | |
| return False | 
