pvanand commited on
Commit
79b10c5
·
verified ·
1 Parent(s): 3e8d00a

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +25 -138
main.py CHANGED
@@ -1,28 +1,19 @@
1
- import os
 
 
 
2
  import shutil
3
  import zipfile
4
  import logging
5
- import tempfile
6
- import magic
7
- from pathlib import Path
8
- from typing import Set, Optional
9
- from fastapi import FastAPI, File, UploadFile, HTTPException, Request
10
- from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
11
- from fastapi.staticfiles import StaticFiles
12
- from fastapi.middleware.cors import CORSMiddleware
13
- from fastapi.middleware.trustedhost import TrustedHostMiddleware
14
 
15
  # Configure logging
16
- logging.basicConfig(
17
- level=logging.INFO,
18
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
19
- )
20
  logger = logging.getLogger(__name__)
21
 
22
- # Initialize FastAPI app
23
- app = FastAPI(title="Static Site Server")
24
 
25
- # Add security middlewares
26
  app.add_middleware(
27
  CORSMiddleware,
28
  allow_origins=["*"],
@@ -31,105 +22,40 @@ app.add_middleware(
31
  allow_headers=["*"],
32
  )
33
 
34
- # Constants
35
- MAX_UPLOAD_SIZE = 100 * 1024 * 1024 # 100MB
36
- ALLOWED_EXTENSIONS = {'.html', '.css', '.js', '.jpg', '.jpeg', '.png', '.gif', '.svg', '.ico', '.woff', '.woff2', '.ttf', '.eot'}
37
-
38
  class SiteManager:
39
  def __init__(self):
40
  self.sites_dir = Path("/app/sites")
41
- self.temp_dir = Path("/app/temp")
42
- self.active_sites: Set[str] = set()
43
-
44
- # Ensure directories exist
45
  self.sites_dir.mkdir(parents=True, exist_ok=True)
46
- self.temp_dir.mkdir(parents=True, exist_ok=True)
47
-
48
- # Load existing sites
49
  self._load_existing_sites()
50
-
51
  def _load_existing_sites(self):
52
  """Load existing sites from disk"""
53
- logger.info("Loading existing sites...")
54
  for site_dir in self.sites_dir.iterdir():
55
  if site_dir.is_dir() and (site_dir / 'index.html').exists():
56
  self.active_sites.add(site_dir.name)
 
 
57
  logger.info(f"Loaded site: {site_dir.name}")
58
 
59
- def _validate_file_types(self, zip_path: Path) -> bool:
60
- """Validate file types in ZIP archive"""
61
- mime = magic.Magic(mime=True)
62
- with zipfile.ZipFile(zip_path) as zip_ref:
63
- for file_info in zip_ref.filelist:
64
- if file_info.filename.endswith('/'): # Skip directories
65
- continue
66
-
67
- suffix = Path(file_info.filename).suffix.lower()
68
- if suffix not in ALLOWED_EXTENSIONS:
69
- return False
70
-
71
- # Extract file to check MIME type
72
- with tempfile.NamedTemporaryFile() as tmp:
73
- with zip_ref.open(file_info) as source:
74
- shutil.copyfileobj(source, tmp)
75
- tmp.flush()
76
- mime_type = mime.from_file(tmp.name)
77
- if mime_type.startswith('application/x-'):
78
- return False
79
- return True
80
-
81
  async def deploy_site(self, unique_id: str, zip_file: UploadFile) -> dict:
82
  """Deploy a new site from a ZIP file"""
83
- if await zip_file.read(1) == b'':
84
- raise HTTPException(status_code=400, detail="Empty file")
85
- await zip_file.seek(0)
86
-
87
- # Create temporary file
88
- temp_file = self.temp_dir / f"{unique_id}.zip"
89
  try:
90
- # Save uploaded file
91
- content = await zip_file.read()
92
- if len(content) > MAX_UPLOAD_SIZE:
93
- raise HTTPException(status_code=400, detail="File too large")
94
-
95
- temp_file.write_bytes(content)
96
-
97
- # Validate ZIP file
98
- if not zipfile.is_zipfile(temp_file):
99
- raise HTTPException(status_code=400, detail="Invalid ZIP file")
100
-
101
- # Validate file types
102
- if not self._validate_file_types(temp_file):
103
- raise HTTPException(status_code=400, detail="Invalid file types in ZIP")
104
-
105
- # Process the ZIP file
106
- site_path = self.sites_dir / unique_id
107
- with zipfile.ZipFile(temp_file) as zip_ref:
108
- # Verify index.html exists
109
- if not any(name.endswith('/index.html') or name == 'index.html'
110
- for name in zip_ref.namelist()):
111
- raise HTTPException(
112
- status_code=400,
113
- detail="ZIP file must contain index.html in root directory"
114
- )
115
-
116
  # Clear existing site if present
117
  if site_path.exists():
118
  shutil.rmtree(site_path)
119
 
120
- # Extract files
121
- zip_ref.extractall(self.temp_dir / unique_id)
122
-
123
- # Move to final location
124
- extraction_path = self.temp_dir / unique_id
125
- root_dir = next(
126
- (p for p in extraction_path.iterdir() if p.is_dir()
127
- and (p / 'index.html').exists()),
128
- extraction_path
129
- )
130
- shutil.move(str(root_dir), str(site_path))
131
-
132
  self.active_sites.add(unique_id)
 
133
  return {
134
  "status": "success",
135
  "message": f"Site deployed at /{unique_id}",
@@ -138,14 +64,9 @@ class SiteManager:
138
 
139
  except Exception as e:
140
  logger.error(f"Error deploying site {unique_id}: {str(e)}")
 
 
141
  raise HTTPException(status_code=500, detail=str(e))
142
- finally:
143
- # Cleanup
144
- if temp_file.exists():
145
- temp_file.unlink()
146
- cleanup_path = self.temp_dir / unique_id
147
- if cleanup_path.exists():
148
- shutil.rmtree(cleanup_path)
149
 
150
  def remove_site(self, unique_id: str) -> bool:
151
  """Remove a deployed site"""
@@ -157,13 +78,6 @@ class SiteManager:
157
  return True
158
  return False
159
 
160
- def get_site_path(self, site_id: str) -> Optional[Path]:
161
- """Get the path for a site if it exists"""
162
- site_path = self.sites_dir / site_id
163
- if site_path.is_dir() and (site_path / 'index.html').exists():
164
- return site_path
165
- return None
166
-
167
  # Initialize site manager
168
  site_manager = SiteManager()
169
 
@@ -173,8 +87,7 @@ async def deploy_site(unique_id: str, file: UploadFile = File(...)):
173
  if not file.filename.endswith('.zip'):
174
  raise HTTPException(status_code=400, detail="File must be a ZIP archive")
175
 
176
- result = await site_manager.deploy_site(unique_id, file)
177
- return JSONResponse(content=result)
178
 
179
  @app.delete("/site/{unique_id}")
180
  async def remove_site(unique_id: str):
@@ -193,32 +106,6 @@ async def health_check():
193
  """Health check endpoint"""
194
  return {"status": "healthy", "sites_count": len(site_manager.active_sites)}
195
 
196
- @app.get("/{site_id}/{file_path:path}")
197
- async def serve_site(site_id: str, file_path: str = ""):
198
- """Serve files from the site directory"""
199
- site_path = site_manager.get_site_path(site_id)
200
- if not site_path:
201
- raise HTTPException(status_code=404, detail="Site not found")
202
-
203
- # Default to index.html if no file specified
204
- if not file_path:
205
- file_path = "index.html"
206
-
207
- file_full_path = site_path / file_path
208
-
209
- # Prevent directory traversal
210
- try:
211
- file_full_path = file_full_path.resolve()
212
- if not str(file_full_path).startswith(str(site_path)):
213
- raise HTTPException(status_code=403, detail="Access denied")
214
- except (RuntimeError, ValueError):
215
- raise HTTPException(status_code=400, detail="Invalid path")
216
-
217
- if not file_full_path.is_file():
218
- raise HTTPException(status_code=404, detail="File not found")
219
-
220
- return FileResponse(file_full_path)
221
-
222
  if __name__ == "__main__":
223
  import uvicorn
224
  uvicorn.run(app, host="0.0.0.0", port=8000)
 
1
+ from fastapi import FastAPI, File, UploadFile, HTTPException
2
+ from fastapi.staticfiles import StaticFiles
3
+ from fastapi.middleware.cors import CORSMiddleware
4
+ from pathlib import Path
5
  import shutil
6
  import zipfile
7
  import logging
8
+ from typing import Set
 
 
 
 
 
 
 
 
9
 
10
  # Configure logging
11
+ logging.basicConfig(level=logging.INFO)
 
 
 
12
  logger = logging.getLogger(__name__)
13
 
14
+ app = FastAPI()
 
15
 
16
+ # Add CORS middleware
17
  app.add_middleware(
18
  CORSMiddleware,
19
  allow_origins=["*"],
 
22
  allow_headers=["*"],
23
  )
24
 
 
 
 
 
25
  class SiteManager:
26
  def __init__(self):
27
  self.sites_dir = Path("/app/sites")
 
 
 
 
28
  self.sites_dir.mkdir(parents=True, exist_ok=True)
29
+ self.active_sites: Set[str] = set()
 
 
30
  self._load_existing_sites()
31
+
32
  def _load_existing_sites(self):
33
  """Load existing sites from disk"""
 
34
  for site_dir in self.sites_dir.iterdir():
35
  if site_dir.is_dir() and (site_dir / 'index.html').exists():
36
  self.active_sites.add(site_dir.name)
37
+ # Mount the site directory
38
+ app.mount(f"/{site_dir.name}", StaticFiles(directory=str(site_dir), html=True), name=site_dir.name)
39
  logger.info(f"Loaded site: {site_dir.name}")
40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  async def deploy_site(self, unique_id: str, zip_file: UploadFile) -> dict:
42
  """Deploy a new site from a ZIP file"""
43
+ site_path = self.sites_dir / unique_id
44
+
 
 
 
 
45
  try:
46
+ # Save and extract ZIP
47
+ with zipfile.ZipFile(zip_file.file) as zip_ref:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
  # Clear existing site if present
49
  if site_path.exists():
50
  shutil.rmtree(site_path)
51
 
52
+ # Extract ZIP contents
53
+ zip_ref.extractall(site_path)
54
+
55
+ # Mount the new site
56
+ app.mount(f"/{unique_id}", StaticFiles(directory=str(site_path), html=True), name=unique_id)
 
 
 
 
 
 
 
57
  self.active_sites.add(unique_id)
58
+
59
  return {
60
  "status": "success",
61
  "message": f"Site deployed at /{unique_id}",
 
64
 
65
  except Exception as e:
66
  logger.error(f"Error deploying site {unique_id}: {str(e)}")
67
+ if site_path.exists():
68
+ shutil.rmtree(site_path)
69
  raise HTTPException(status_code=500, detail=str(e))
 
 
 
 
 
 
 
70
 
71
  def remove_site(self, unique_id: str) -> bool:
72
  """Remove a deployed site"""
 
78
  return True
79
  return False
80
 
 
 
 
 
 
 
 
81
  # Initialize site manager
82
  site_manager = SiteManager()
83
 
 
87
  if not file.filename.endswith('.zip'):
88
  raise HTTPException(status_code=400, detail="File must be a ZIP archive")
89
 
90
+ return await site_manager.deploy_site(unique_id, file)
 
91
 
92
  @app.delete("/site/{unique_id}")
93
  async def remove_site(unique_id: str):
 
106
  """Health check endpoint"""
107
  return {"status": "healthy", "sites_count": len(site_manager.active_sites)}
108
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
109
  if __name__ == "__main__":
110
  import uvicorn
111
  uvicorn.run(app, host="0.0.0.0", port=8000)