import asyncio import uuid import time from typing import AsyncGenerator, Dict, Tuple, Any from dataclasses import dataclass class SessionDoesNotExist(Exception): pass class SessionAlreadyExists(Exception): pass class ClientError(Exception): def __init__(self, message): super().__init__(message) self.message = message @dataclass class ClientRequest: request_id: str data: Any @dataclass class ClientResponse: request_id: str error: bool data: Any class SessionBroker: def __init__(self, session_timeout: int = 3600): # 1 hora por defecto self.sessions: Dict[str, asyncio.Queue] = {} self.pending_responses: Dict[Tuple[str, str], asyncio.Future] = {} self.session_last_active: Dict[str, float] = {} self.session_timeout = session_timeout asyncio.create_task(self._cleanup_inactive_sessions()) async def _cleanup_inactive_sessions(self): while True: current_time = time.time() inactive_sessions = [ session_id for session_id, last_active in self.session_last_active.items() if current_time - last_active > self.session_timeout ] for session_id in inactive_sessions: if session_id in self.sessions: del self.sessions[session_id] del self.session_last_active[session_id] self.pending_responses = { k: v for k, v in self.pending_responses.items() if k[0] != session_id } await asyncio.sleep(60) # Verificar cada minuto async def send_request(self, session_id: str, data: Any, timeout: int = 60) -> Any: if session_id not in self.sessions: raise SessionDoesNotExist() self.session_last_active[session_id] = time.time() request_id = str(uuid.uuid4()) future = asyncio.get_event_loop().create_future() self.pending_responses[(session_id, request_id)] = future await self.sessions[session_id].put(ClientRequest(request_id=request_id, data=data)) try: return await asyncio.wait_for(future, timeout) except asyncio.TimeoutError: raise finally: if (session_id, request_id) in self.pending_responses: del self.pending_responses[(session_id, request_id)] async def receive_response(self, session_id: str, response: ClientResponse) -> None: self.session_last_active[session_id] = time.time() if (session_id, response.request_id) in self.pending_responses: future = self.pending_responses.pop((session_id, response.request_id)) if not future.done(): if response.error: future.set_exception(ClientError(message=response.data)) else: future.set_result(response.data) async def subscribe(self, session_id: str) -> AsyncGenerator[ClientRequest, None]: if session_id in self.sessions: raise SessionAlreadyExists() queue = asyncio.Queue() self.sessions[session_id] = queue self.session_last_active[session_id] = time.time() try: while True: yield await queue.get() self.session_last_active[session_id] = time.time() finally: if session_id in self.sessions: del self.sessions[session_id] if session_id in self.session_last_active: del self.session_last_active[session_id] self.pending_responses = { k: v for k, v in self.pending_responses.items() if k[0] != session_id }