Silicon Valley - Admin commited on
Commit
362d092
1 Parent(s): 26b4d36

Enhance session management and Dockerfile for improved performance and security

Browse files

- Added session timeout management in broker.py to automatically clean up inactive sessions.
- Updated Dockerfile to create a non-root user for enhanced security and adjusted file permissions.
- Improved logging in server.py to handle exceptions and connection closures more effectively.
- Enhanced hypercorn.toml with security and performance configurations, including timeouts and worker class settings.

Files changed (4) hide show
  1. Dockerfile +13 -7
  2. broker.py +34 -2
  3. hypercorn.toml +9 -0
  4. server.py +37 -18
Dockerfile CHANGED
@@ -8,27 +8,33 @@ RUN apt-get update && apt-get install -y \
8
  build-essential \
9
  && rm -rf /var/lib/apt/lists/*
10
 
 
 
 
 
 
11
  # Copiar solo requirements.txt primero para aprovechar la cach茅 de Docker
12
- COPY requirements.txt .
13
 
14
  # Instalar dependencias de Python con pip limpio
15
  RUN pip install --no-cache-dir --upgrade pip && \
16
  pip install --no-cache-dir -r requirements.txt
17
 
18
- # Crear estructura de directorios
19
- RUN mkdir -p /code/static
20
-
21
  # Copiar archivos est谩ticos primero
22
- COPY static/swagger.html /code/static/
23
- COPY openapi.yaml /code/
24
 
25
  # Copiar el resto de los archivos
26
- COPY . .
27
 
28
  # Variables de entorno para Hugging Face Spaces
29
  ENV PYTHONUNBUFFERED=1
30
  ENV PYTHONPATH=/code
31
  ENV PORT=7860
 
 
 
 
32
 
33
  # Exponer el puerto que Hugging Face Spaces espera
34
  EXPOSE 7860
 
8
  build-essential \
9
  && rm -rf /var/lib/apt/lists/*
10
 
11
+ # Crear usuario no root
12
+ RUN adduser --disabled-password --gecos '' appuser && \
13
+ mkdir -p /code/static && \
14
+ chown -R appuser:appuser /code
15
+
16
  # Copiar solo requirements.txt primero para aprovechar la cach茅 de Docker
17
+ COPY --chown=appuser:appuser requirements.txt .
18
 
19
  # Instalar dependencias de Python con pip limpio
20
  RUN pip install --no-cache-dir --upgrade pip && \
21
  pip install --no-cache-dir -r requirements.txt
22
 
 
 
 
23
  # Copiar archivos est谩ticos primero
24
+ COPY --chown=appuser:appuser static/swagger.html /code/static/
25
+ COPY --chown=appuser:appuser openapi.yaml /code/
26
 
27
  # Copiar el resto de los archivos
28
+ COPY --chown=appuser:appuser . .
29
 
30
  # Variables de entorno para Hugging Face Spaces
31
  ENV PYTHONUNBUFFERED=1
32
  ENV PYTHONPATH=/code
33
  ENV PORT=7860
34
+ ENV PYTHONDONTWRITEBYTECODE=1
35
+
36
+ # Cambiar al usuario no root
37
+ USER appuser
38
 
39
  # Exponer el puerto que Hugging Face Spaces espera
40
  EXPOSE 7860
broker.py CHANGED
@@ -1,5 +1,6 @@
1
  import asyncio
2
  import uuid
 
3
  from typing import AsyncGenerator, Dict, Tuple, Any
4
  from dataclasses import dataclass
5
 
@@ -26,14 +27,37 @@ class ClientResponse:
26
  data: Any
27
 
28
  class SessionBroker:
29
- def __init__(self):
30
  self.sessions: Dict[str, asyncio.Queue] = {}
31
  self.pending_responses: Dict[Tuple[str, str], asyncio.Future] = {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
 
33
  async def send_request(self, session_id: str, data: Any, timeout: int = 60) -> Any:
34
  if session_id not in self.sessions:
35
  raise SessionDoesNotExist()
36
 
 
37
  request_id = str(uuid.uuid4())
38
  future = asyncio.get_event_loop().create_future()
39
  self.pending_responses[(session_id, request_id)] = future
@@ -49,6 +73,7 @@ class SessionBroker:
49
  del self.pending_responses[(session_id, request_id)]
50
 
51
  async def receive_response(self, session_id: str, response: ClientResponse) -> None:
 
52
  if (session_id, response.request_id) in self.pending_responses:
53
  future = self.pending_responses.pop((session_id, response.request_id))
54
  if not future.done():
@@ -63,11 +88,18 @@ class SessionBroker:
63
 
64
  queue = asyncio.Queue()
65
  self.sessions[session_id] = queue
 
66
 
67
  try:
68
  while True:
69
  yield await queue.get()
 
70
  finally:
71
  if session_id in self.sessions:
72
  del self.sessions[session_id]
73
- self.pending_responses = {k: v for k, v in self.pending_responses.items() if k[0] != session_id}
 
 
 
 
 
 
1
  import asyncio
2
  import uuid
3
+ import time
4
  from typing import AsyncGenerator, Dict, Tuple, Any
5
  from dataclasses import dataclass
6
 
 
27
  data: Any
28
 
29
  class SessionBroker:
30
+ def __init__(self, session_timeout: int = 3600): # 1 hora por defecto
31
  self.sessions: Dict[str, asyncio.Queue] = {}
32
  self.pending_responses: Dict[Tuple[str, str], asyncio.Future] = {}
33
+ self.session_last_active: Dict[str, float] = {}
34
+ self.session_timeout = session_timeout
35
+ asyncio.create_task(self._cleanup_inactive_sessions())
36
+
37
+ async def _cleanup_inactive_sessions(self):
38
+ while True:
39
+ current_time = time.time()
40
+ inactive_sessions = [
41
+ session_id for session_id, last_active in self.session_last_active.items()
42
+ if current_time - last_active > self.session_timeout
43
+ ]
44
+
45
+ for session_id in inactive_sessions:
46
+ if session_id in self.sessions:
47
+ del self.sessions[session_id]
48
+ del self.session_last_active[session_id]
49
+ self.pending_responses = {
50
+ k: v for k, v in self.pending_responses.items()
51
+ if k[0] != session_id
52
+ }
53
+
54
+ await asyncio.sleep(60) # Verificar cada minuto
55
 
56
  async def send_request(self, session_id: str, data: Any, timeout: int = 60) -> Any:
57
  if session_id not in self.sessions:
58
  raise SessionDoesNotExist()
59
 
60
+ self.session_last_active[session_id] = time.time()
61
  request_id = str(uuid.uuid4())
62
  future = asyncio.get_event_loop().create_future()
63
  self.pending_responses[(session_id, request_id)] = future
 
73
  del self.pending_responses[(session_id, request_id)]
74
 
75
  async def receive_response(self, session_id: str, response: ClientResponse) -> None:
76
+ self.session_last_active[session_id] = time.time()
77
  if (session_id, response.request_id) in self.pending_responses:
78
  future = self.pending_responses.pop((session_id, response.request_id))
79
  if not future.done():
 
88
 
89
  queue = asyncio.Queue()
90
  self.sessions[session_id] = queue
91
+ self.session_last_active[session_id] = time.time()
92
 
93
  try:
94
  while True:
95
  yield await queue.get()
96
+ self.session_last_active[session_id] = time.time()
97
  finally:
98
  if session_id in self.sessions:
99
  del self.sessions[session_id]
100
+ if session_id in self.session_last_active:
101
+ del self.session_last_active[session_id]
102
+ self.pending_responses = {
103
+ k: v for k, v in self.pending_responses.items()
104
+ if k[0] != session_id
105
+ }
hypercorn.toml CHANGED
@@ -12,3 +12,12 @@ websocket_max_message_size = 16777216
12
  accesslog = "-"
13
  errorlog = "-"
14
  loglevel = "INFO"
 
 
 
 
 
 
 
 
 
 
12
  accesslog = "-"
13
  errorlog = "-"
14
  loglevel = "INFO"
15
+
16
+ # Configuraciones de seguridad
17
+ h11_max_incomplete_size = 16384
18
+ keep_alive_timeout = 5
19
+ graceful_timeout = 10
20
+
21
+ # Configuraciones de rendimiento
22
+ worker_class = "asyncio"
23
+ backlog = 2048
server.py CHANGED
@@ -36,7 +36,8 @@ with open(OPENAPI_PATH) as f:
36
  openapi_spec = yaml.safe_load(f)
37
 
38
  # Configurar Quart Schema con la especificaci贸n OpenAPI
39
- QuartSchema(app, openapi_spec)
 
40
 
41
  app.asgi_app = ProxyHeadersMiddleware(app.asgi_app, trusted_hosts=TRUSTED_HOSTS)
42
  app.logger.setLevel(LOG_LEVEL)
@@ -103,26 +104,44 @@ async def status() -> Status:
103
 
104
  @app.websocket('/session')
105
  async def session_handler():
106
- session_id = secrets.token_hex()
107
- app.logger.info(f"{websocket.remote_addr} - NEW SESSION - {session_id}")
108
- await websocket.send_as(Session(session_id=session_id), Session)
109
-
110
- task = None
111
  try:
112
- task = asyncio.ensure_future(_receive(session_id))
113
- async for request in broker.subscribe(session_id):
114
- app.logger.info(f"{websocket.remote_addr} - REQUEST - {session_id} - {json.dumps(asdict(request))}")
115
- await websocket.send_as(request, ClientRequest)
116
- finally:
117
- if task is not None:
118
- task.cancel()
119
- await task
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
 
121
  async def _receive(session_id: str) -> None:
122
- while True:
123
- response = await websocket.receive_as(ClientResponse)
124
- app.logger.info(f"{websocket.remote_addr} - RESPONSE - {session_id} - {json.dumps(asdict(response))}")
125
- await broker.receive_response(session_id, response)
 
 
 
 
 
 
126
 
127
  @app.post('/command')
128
  @validate_request(Command)
 
36
  openapi_spec = yaml.safe_load(f)
37
 
38
  # Configurar Quart Schema con la especificaci贸n OpenAPI
39
+ schema = QuartSchema(app)
40
+ schema.update_openapi(openapi_spec)
41
 
42
  app.asgi_app = ProxyHeadersMiddleware(app.asgi_app, trusted_hosts=TRUSTED_HOSTS)
43
  app.logger.setLevel(LOG_LEVEL)
 
104
 
105
  @app.websocket('/session')
106
  async def session_handler():
 
 
 
 
 
107
  try:
108
+ session_id = secrets.token_hex()
109
+ app.logger.info(f"{websocket.remote_addr} - NEW SESSION - {session_id}")
110
+ await websocket.send_as(Session(session_id=session_id), Session)
111
+
112
+ task = None
113
+ try:
114
+ task = asyncio.ensure_future(_receive(session_id))
115
+ async for request in broker.subscribe(session_id):
116
+ app.logger.info(f"{websocket.remote_addr} - REQUEST - {session_id} - {json.dumps(asdict(request))}")
117
+ await websocket.send_as(request, ClientRequest)
118
+ except websockets.exceptions.ConnectionClosed:
119
+ app.logger.warning(f"{websocket.remote_addr} - CONNECTION CLOSED - {session_id}")
120
+ except Exception as e:
121
+ app.logger.error(f"{websocket.remote_addr} - ERROR - {session_id} - {str(e)}")
122
+ raise
123
+ finally:
124
+ if task is not None:
125
+ task.cancel()
126
+ try:
127
+ await task
128
+ except asyncio.CancelledError:
129
+ pass
130
+ except Exception as e:
131
+ app.logger.error(f"Error in session handler: {str(e)}")
132
+ raise
133
 
134
  async def _receive(session_id: str) -> None:
135
+ try:
136
+ while True:
137
+ response = await websocket.receive_as(ClientResponse)
138
+ app.logger.info(f"{websocket.remote_addr} - RESPONSE - {session_id} - {json.dumps(asdict(response))}")
139
+ await broker.receive_response(session_id, response)
140
+ except websockets.exceptions.ConnectionClosed:
141
+ app.logger.warning(f"{websocket.remote_addr} - RECEIVE CONNECTION CLOSED - {session_id}")
142
+ except Exception as e:
143
+ app.logger.error(f"{websocket.remote_addr} - RECEIVE ERROR - {session_id} - {str(e)}")
144
+ raise
145
 
146
  @app.post('/command')
147
  @validate_request(Command)