Spaces:
Runtime error
Runtime error
File size: 3,847 Bytes
76ca3e9 362d092 76ca3e9 362d092 76ca3e9 362d092 76ca3e9 362d092 76ca3e9 362d092 76ca3e9 362d092 76ca3e9 362d092 76ca3e9 362d092 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
import asyncio
import uuid
import time
from typing import AsyncGenerator, Dict, Tuple, Any
from dataclasses import dataclass
class SessionDoesNotExist(Exception):
pass
class SessionAlreadyExists(Exception):
pass
class ClientError(Exception):
def __init__(self, message):
super().__init__(message)
self.message = message
@dataclass
class ClientRequest:
request_id: str
data: Any
@dataclass
class ClientResponse:
request_id: str
error: bool
data: Any
class SessionBroker:
def __init__(self, session_timeout: int = 3600): # 1 hora por defecto
self.sessions: Dict[str, asyncio.Queue] = {}
self.pending_responses: Dict[Tuple[str, str], asyncio.Future] = {}
self.session_last_active: Dict[str, float] = {}
self.session_timeout = session_timeout
asyncio.create_task(self._cleanup_inactive_sessions())
async def _cleanup_inactive_sessions(self):
while True:
current_time = time.time()
inactive_sessions = [
session_id for session_id, last_active in self.session_last_active.items()
if current_time - last_active > self.session_timeout
]
for session_id in inactive_sessions:
if session_id in self.sessions:
del self.sessions[session_id]
del self.session_last_active[session_id]
self.pending_responses = {
k: v for k, v in self.pending_responses.items()
if k[0] != session_id
}
await asyncio.sleep(60) # Verificar cada minuto
async def send_request(self, session_id: str, data: Any, timeout: int = 60) -> Any:
if session_id not in self.sessions:
raise SessionDoesNotExist()
self.session_last_active[session_id] = time.time()
request_id = str(uuid.uuid4())
future = asyncio.get_event_loop().create_future()
self.pending_responses[(session_id, request_id)] = future
await self.sessions[session_id].put(ClientRequest(request_id=request_id, data=data))
try:
return await asyncio.wait_for(future, timeout)
except asyncio.TimeoutError:
raise
finally:
if (session_id, request_id) in self.pending_responses:
del self.pending_responses[(session_id, request_id)]
async def receive_response(self, session_id: str, response: ClientResponse) -> None:
self.session_last_active[session_id] = time.time()
if (session_id, response.request_id) in self.pending_responses:
future = self.pending_responses.pop((session_id, response.request_id))
if not future.done():
if response.error:
future.set_exception(ClientError(message=response.data))
else:
future.set_result(response.data)
async def subscribe(self, session_id: str) -> AsyncGenerator[ClientRequest, None]:
if session_id in self.sessions:
raise SessionAlreadyExists()
queue = asyncio.Queue()
self.sessions[session_id] = queue
self.session_last_active[session_id] = time.time()
try:
while True:
yield await queue.get()
self.session_last_active[session_id] = time.time()
finally:
if session_id in self.sessions:
del self.sessions[session_id]
if session_id in self.session_last_active:
del self.session_last_active[session_id]
self.pending_responses = {
k: v for k, v in self.pending_responses.items()
if k[0] != session_id
} |