Spaces:
Runtime error
Runtime error
Silicon Valley - Admin
Enhance session management and Dockerfile for improved performance and security
362d092
import asyncio | |
import uuid | |
import time | |
from typing import AsyncGenerator, Dict, Tuple, Any | |
from dataclasses import dataclass | |
class SessionDoesNotExist(Exception): | |
pass | |
class SessionAlreadyExists(Exception): | |
pass | |
class ClientError(Exception): | |
def __init__(self, message): | |
super().__init__(message) | |
self.message = message | |
class ClientRequest: | |
request_id: str | |
data: Any | |
class ClientResponse: | |
request_id: str | |
error: bool | |
data: Any | |
class SessionBroker: | |
def __init__(self, session_timeout: int = 3600): # 1 hora por defecto | |
self.sessions: Dict[str, asyncio.Queue] = {} | |
self.pending_responses: Dict[Tuple[str, str], asyncio.Future] = {} | |
self.session_last_active: Dict[str, float] = {} | |
self.session_timeout = session_timeout | |
asyncio.create_task(self._cleanup_inactive_sessions()) | |
async def _cleanup_inactive_sessions(self): | |
while True: | |
current_time = time.time() | |
inactive_sessions = [ | |
session_id for session_id, last_active in self.session_last_active.items() | |
if current_time - last_active > self.session_timeout | |
] | |
for session_id in inactive_sessions: | |
if session_id in self.sessions: | |
del self.sessions[session_id] | |
del self.session_last_active[session_id] | |
self.pending_responses = { | |
k: v for k, v in self.pending_responses.items() | |
if k[0] != session_id | |
} | |
await asyncio.sleep(60) # Verificar cada minuto | |
async def send_request(self, session_id: str, data: Any, timeout: int = 60) -> Any: | |
if session_id not in self.sessions: | |
raise SessionDoesNotExist() | |
self.session_last_active[session_id] = time.time() | |
request_id = str(uuid.uuid4()) | |
future = asyncio.get_event_loop().create_future() | |
self.pending_responses[(session_id, request_id)] = future | |
await self.sessions[session_id].put(ClientRequest(request_id=request_id, data=data)) | |
try: | |
return await asyncio.wait_for(future, timeout) | |
except asyncio.TimeoutError: | |
raise | |
finally: | |
if (session_id, request_id) in self.pending_responses: | |
del self.pending_responses[(session_id, request_id)] | |
async def receive_response(self, session_id: str, response: ClientResponse) -> None: | |
self.session_last_active[session_id] = time.time() | |
if (session_id, response.request_id) in self.pending_responses: | |
future = self.pending_responses.pop((session_id, response.request_id)) | |
if not future.done(): | |
if response.error: | |
future.set_exception(ClientError(message=response.data)) | |
else: | |
future.set_result(response.data) | |
async def subscribe(self, session_id: str) -> AsyncGenerator[ClientRequest, None]: | |
if session_id in self.sessions: | |
raise SessionAlreadyExists() | |
queue = asyncio.Queue() | |
self.sessions[session_id] = queue | |
self.session_last_active[session_id] = time.time() | |
try: | |
while True: | |
yield await queue.get() | |
self.session_last_active[session_id] = time.time() | |
finally: | |
if session_id in self.sessions: | |
del self.sessions[session_id] | |
if session_id in self.session_last_active: | |
del self.session_last_active[session_id] | |
self.pending_responses = { | |
k: v for k, v in self.pending_responses.items() | |
if k[0] != session_id | |
} |