Spaces:
Sleeping
Sleeping
| #modules/database/sql_db.py | |
| from .database_init import get_sql_containers | |
| from datetime import datetime, timezone | |
| import logging | |
| import bcrypt | |
| import uuid | |
| logger = logging.getLogger(__name__) | |
| def get_user(username, role=None): | |
| user_container, _, _ = get_sql_containers() | |
| try: | |
| query = f"SELECT * FROM c WHERE c.id = '{username}'" | |
| if role: | |
| query += f" AND c.role = '{role}'" | |
| items = list(user_container.query_items(query=query, enable_cross_partition_query=True)) | |
| return items[0] if items else None | |
| except Exception as e: | |
| logger.error(f"Error al obtener usuario {username}: {str(e)}") | |
| return None | |
| def get_admin_user(username): | |
| return get_user(username, role='Administrador') | |
| def get_student_user(username): | |
| return get_user(username, role='Estudiante') | |
| def get_teacher_user(username): | |
| return get_user(username, role='Profesor') | |
| def create_user(username, password, role, additional_info=None): | |
| user_container, _, _ = get_sql_containers() | |
| try: | |
| hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') | |
| user_data = { | |
| 'id': username, | |
| 'password': hashed_password, | |
| 'role': role, | |
| 'timestamp': datetime.now(timezone.utc).isoformat(), | |
| 'additional_info': additional_info or {} | |
| } | |
| user_container.create_item(body=user_data) | |
| logger.info(f"Usuario {role} creado: {username}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error al crear usuario {role}: {str(e)}") | |
| return False | |
| def create_student_user(username, password, additional_info=None): | |
| return create_user(username, password, 'Estudiante', additional_info) | |
| def create_teacher_user(username, password, additional_info=None): | |
| return create_user(username, password, 'Profesor', additional_info) | |
| def create_admin_user(username, password, additional_info=None): | |
| return create_user(username, password, 'Administrador', additional_info) | |
| ############-- Funciones de control del tiempo de sesi贸n ################## | |
| def record_login(username): | |
| """Registra el inicio de sesi贸n de un usuario""" | |
| try: | |
| container = get_container("user_sessions") | |
| if not container: | |
| return None | |
| session_doc = { | |
| "id": str(uuid.uuid4()), | |
| "type": "session", | |
| "username": username, | |
| "loginTime": datetime.now(timezone.utc).isoformat(), | |
| "partitionKey": username | |
| } | |
| result = container.create_item(body=session_doc) | |
| return result['id'] | |
| except Exception as e: | |
| logger.error(f"Error registrando login: {str(e)}") | |
| return None | |
| def record_logout(username, session_id): | |
| """Registra el cierre de sesi贸n y calcula la duraci贸n""" | |
| try: | |
| container = get_container("user_sessions") | |
| if not container: | |
| return False | |
| # Obtener la sesi贸n actual | |
| query = "SELECT * FROM c WHERE c.id = @id AND c.username = @username" | |
| params = [ | |
| {"name": "@id", "value": session_id}, | |
| {"name": "@username", "value": username} | |
| ] | |
| sessions = list(container.query_items( | |
| query=query, | |
| parameters=params, | |
| enable_cross_partition_query=True | |
| )) | |
| if not sessions: | |
| return False | |
| session = sessions[0] | |
| login_time = datetime.fromisoformat(session['loginTime'].rstrip('Z')) | |
| logout_time = datetime.now(timezone.utc) | |
| duration = int((logout_time - login_time).total_seconds()) | |
| # Actualizar el documento | |
| session.update({ | |
| "logoutTime": logout_time.isoformat(), | |
| "sessionDuration": duration | |
| }) | |
| container.upsert_item(body=session) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error registrando logout: {str(e)}") | |
| return False | |
| def get_recent_sessions(limit=10): | |
| """Obtiene las sesiones m谩s recientes""" | |
| try: | |
| container = get_container("user_sessions") | |
| if not container: | |
| return [] | |
| query = """ | |
| SELECT c.username, c.loginTime, c.logoutTime, c.sessionDuration | |
| FROM c | |
| WHERE c.type = 'session' | |
| AND c.logoutTime != null | |
| ORDER BY c.loginTime DESC | |
| OFFSET 0 LIMIT @limit | |
| """ | |
| params = [{"name": "@limit", "value": limit}] | |
| sessions = container.query_items( | |
| query=query, | |
| parameters=params, | |
| enable_cross_partition_query=True | |
| ) | |
| return list(sessions) | |
| except Exception as e: | |
| logger.error(f"Error obteniendo sesiones recientes: {str(e)}") | |
| return [] | |
| def get_user_total_time(username): | |
| """Obtiene el tiempo total que un usuario ha pasado en la plataforma""" | |
| try: | |
| container = get_container("user_sessions") | |
| if not container: | |
| return None | |
| query = """ | |
| SELECT VALUE SUM(c.sessionDuration) | |
| FROM c | |
| WHERE c.type = 'session' | |
| AND c.username = @username | |
| AND c.logoutTime != null | |
| """ | |
| params = [{"name": "@username", "value": username}] | |
| result = list(container.query_items( | |
| query=query, | |
| parameters=params, | |
| enable_cross_partition_query=True | |
| )) | |
| return result[0] if result else 0 | |
| except Exception as e: | |
| logger.error(f"Error obteniendo tiempo total: {str(e)}") | |
| return None | |
| ########################################################### | |
| def update_student_user(username, new_info): | |
| user_container, _, _ = get_sql_containers() | |
| try: | |
| user = get_student_user(username) | |
| if user: | |
| user['additional_info'].update(new_info) | |
| user_container.upsert_item(body=user) | |
| logger.info(f"Informaci贸n del estudiante actualizada: {username}") | |
| return True | |
| else: | |
| logger.warning(f"Intento de actualizar estudiante no existente: {username}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error al actualizar informaci贸n del estudiante {username}: {str(e)}") | |
| return False | |
| def delete_student_user(username): | |
| user_container, _, _ = get_sql_containers() | |
| try: | |
| user = get_student_user(username) | |
| if user: | |
| user_container.delete_item(item=user['id'], partition_key=username) | |
| logger.info(f"Estudiante eliminado: {username}") | |
| return True | |
| else: | |
| logger.warning(f"Intento de eliminar estudiante no existente: {username}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error al eliminar estudiante {username}: {str(e)}") | |
| return False | |
| def store_application_request(name, lastname, email, institution, current_role, desired_role, reason): | |
| _, application_requests_container, _ = get_sql_containers() | |
| try: | |
| application_request = { | |
| "id": str(uuid.uuid4()), | |
| "name": name, | |
| "lastname": lastname, | |
| "email": email, | |
| "institution": institution, | |
| "current_role": current_role, | |
| "desired_role": desired_role, | |
| "reason": reason, | |
| "requestDate": datetime.utcnow().isoformat() | |
| } | |
| application_requests_container.create_item(body=application_request) | |
| logger.info(f"Solicitud de aplicaci贸n almacenada para el email: {email}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error al almacenar la solicitud de aplicaci贸n: {str(e)}") | |
| return False | |
| def store_student_feedback(username, name, email, feedback): | |
| _, _, user_feedback_container = get_sql_containers() | |
| try: | |
| feedback_item = { | |
| "id": str(uuid.uuid4()), | |
| "username": username, | |
| "name": name, | |
| "email": email, | |
| "feedback": feedback, | |
| "role": "Estudiante", | |
| 'timestamp': datetime.now(timezone.utc).isoformat(), | |
| } | |
| result = user_feedback_container.create_item(body=feedback_item) | |
| logger.info(f"Feedback de estudiante almacenado con ID: {result['id']} para el usuario: {username}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error al almacenar el feedback del estudiante {username}: {str(e)}") | |
| return False |