kaiobash-server / broker.py
Silicon Valley - Admin
Enhance session management and Dockerfile for improved performance and security
362d092
import asyncio
import uuid
import time
from typing import AsyncGenerator, Dict, Tuple, Any
from dataclasses import dataclass
class SessionDoesNotExist(Exception):
pass
class SessionAlreadyExists(Exception):
pass
class ClientError(Exception):
def __init__(self, message):
super().__init__(message)
self.message = message
@dataclass
class ClientRequest:
request_id: str
data: Any
@dataclass
class ClientResponse:
request_id: str
error: bool
data: Any
class SessionBroker:
def __init__(self, session_timeout: int = 3600): # 1 hora por defecto
self.sessions: Dict[str, asyncio.Queue] = {}
self.pending_responses: Dict[Tuple[str, str], asyncio.Future] = {}
self.session_last_active: Dict[str, float] = {}
self.session_timeout = session_timeout
asyncio.create_task(self._cleanup_inactive_sessions())
async def _cleanup_inactive_sessions(self):
while True:
current_time = time.time()
inactive_sessions = [
session_id for session_id, last_active in self.session_last_active.items()
if current_time - last_active > self.session_timeout
]
for session_id in inactive_sessions:
if session_id in self.sessions:
del self.sessions[session_id]
del self.session_last_active[session_id]
self.pending_responses = {
k: v for k, v in self.pending_responses.items()
if k[0] != session_id
}
await asyncio.sleep(60) # Verificar cada minuto
async def send_request(self, session_id: str, data: Any, timeout: int = 60) -> Any:
if session_id not in self.sessions:
raise SessionDoesNotExist()
self.session_last_active[session_id] = time.time()
request_id = str(uuid.uuid4())
future = asyncio.get_event_loop().create_future()
self.pending_responses[(session_id, request_id)] = future
await self.sessions[session_id].put(ClientRequest(request_id=request_id, data=data))
try:
return await asyncio.wait_for(future, timeout)
except asyncio.TimeoutError:
raise
finally:
if (session_id, request_id) in self.pending_responses:
del self.pending_responses[(session_id, request_id)]
async def receive_response(self, session_id: str, response: ClientResponse) -> None:
self.session_last_active[session_id] = time.time()
if (session_id, response.request_id) in self.pending_responses:
future = self.pending_responses.pop((session_id, response.request_id))
if not future.done():
if response.error:
future.set_exception(ClientError(message=response.data))
else:
future.set_result(response.data)
async def subscribe(self, session_id: str) -> AsyncGenerator[ClientRequest, None]:
if session_id in self.sessions:
raise SessionAlreadyExists()
queue = asyncio.Queue()
self.sessions[session_id] = queue
self.session_last_active[session_id] = time.time()
try:
while True:
yield await queue.get()
self.session_last_active[session_id] = time.time()
finally:
if session_id in self.sessions:
del self.sessions[session_id]
if session_id in self.session_last_active:
del self.session_last_active[session_id]
self.pending_responses = {
k: v for k, v in self.pending_responses.items()
if k[0] != session_id
}