from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from pathlib import Path import shutil import zipfile import logging import tempfile from typing import Set # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class SiteManager: def __init__(self): self.sites_dir = Path("/app/sites") self.sites_dir.mkdir(parents=True, exist_ok=True) self.active_sites: Set[str] = set() self._load_existing_sites() def _load_existing_sites(self): """Load existing sites from disk""" for site_dir in self.sites_dir.iterdir(): if site_dir.is_dir() and (site_dir / 'index.html').exists(): self.active_sites.add(site_dir.name) # Mount the site directory app.mount(f"/{site_dir.name}", StaticFiles(directory=str(site_dir), html=True), name=site_dir.name) logger.info(f"Loaded site: {site_dir.name}") async def deploy_site(self, unique_id: str, zip_file: UploadFile) -> dict: """Deploy a new site from a ZIP file""" site_path = self.sites_dir / unique_id temp_file = None try: # Create a temporary file temp_file = tempfile.NamedTemporaryFile(delete=False) # Read the uploaded file content content = await zip_file.read() # Write to the temporary file temp_file.write(content) temp_file.close() # Extract ZIP contents with zipfile.ZipFile(temp_file.name) as zip_ref: # Clear existing site if present if site_path.exists(): shutil.rmtree(site_path) # Extract ZIP contents zip_ref.extractall(site_path) # Mount the new site app.mount(f"/{unique_id}", StaticFiles(directory=str(site_path), html=True), name=unique_id) self.active_sites.add(unique_id) return { "status": "success", "message": f"Site deployed at /{unique_id}", "url": f"/{unique_id}" } except Exception as e: logger.error(f"Error deploying site {unique_id}: {str(e)}") if site_path.exists(): shutil.rmtree(site_path) raise HTTPException(status_code=500, detail=str(e)) finally: # Clean up the temporary file if temp_file: try: Path(temp_file.name).unlink() except: pass def remove_site(self, unique_id: str) -> bool: """Remove a deployed site""" if unique_id in self.active_sites: site_path = self.sites_dir / unique_id if site_path.exists(): shutil.rmtree(site_path) self.active_sites.remove(unique_id) return True return False # Initialize site manager site_manager = SiteManager() @app.post("/deploy/{unique_id}") async def deploy_site(unique_id: str, file: UploadFile = File(...)): """Deploy a new site from a ZIP file""" if not file.filename.endswith('.zip'): raise HTTPException(status_code=400, detail="File must be a ZIP archive") return await site_manager.deploy_site(unique_id, file) @app.delete("/site/{unique_id}") async def remove_site(unique_id: str): """Remove a deployed site""" if site_manager.remove_site(unique_id): return {"status": "success", "message": f"Site {unique_id} removed"} raise HTTPException(status_code=404, detail="Site not found") @app.get("/sites") async def list_sites(): """List all deployed sites""" return {"sites": list(site_manager.active_sites)} @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "sites_count": len(site_manager.active_sites)} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)