Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, File, UploadFile, HTTPException | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.middleware.cors import CORSMiddleware | |
from pathlib import Path | |
import shutil | |
import zipfile | |
import logging | |
import tempfile | |
from typing import Set | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = FastAPI() | |
# Add CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
class SiteManager: | |
def __init__(self): | |
self.sites_dir = Path("/app/sites") | |
self.sites_dir.mkdir(parents=True, exist_ok=True) | |
self.active_sites: Set[str] = set() | |
self._load_existing_sites() | |
def _load_existing_sites(self): | |
"""Load existing sites from disk""" | |
for site_dir in self.sites_dir.iterdir(): | |
if site_dir.is_dir() and (site_dir / 'index.html').exists(): | |
self.active_sites.add(site_dir.name) | |
# Mount the site directory | |
app.mount(f"/{site_dir.name}", StaticFiles(directory=str(site_dir), html=True), name=site_dir.name) | |
logger.info(f"Loaded site: {site_dir.name}") | |
async def deploy_site(self, unique_id: str, zip_file: UploadFile) -> dict: | |
"""Deploy a new site from a ZIP file""" | |
site_path = self.sites_dir / unique_id | |
temp_file = None | |
try: | |
# Create a temporary file | |
temp_file = tempfile.NamedTemporaryFile(delete=False) | |
# Read the uploaded file content | |
content = await zip_file.read() | |
# Write to the temporary file | |
temp_file.write(content) | |
temp_file.close() | |
# Extract ZIP contents | |
with zipfile.ZipFile(temp_file.name) as zip_ref: | |
# Clear existing site if present | |
if site_path.exists(): | |
shutil.rmtree(site_path) | |
# Extract ZIP contents | |
zip_ref.extractall(site_path) | |
# Mount the new site | |
app.mount(f"/{unique_id}", StaticFiles(directory=str(site_path), html=True), name=unique_id) | |
self.active_sites.add(unique_id) | |
return { | |
"status": "success", | |
"message": f"Site deployed at /{unique_id}", | |
"url": f"/{unique_id}" | |
} | |
except Exception as e: | |
logger.error(f"Error deploying site {unique_id}: {str(e)}") | |
if site_path.exists(): | |
shutil.rmtree(site_path) | |
raise HTTPException(status_code=500, detail=str(e)) | |
finally: | |
# Clean up the temporary file | |
if temp_file: | |
try: | |
Path(temp_file.name).unlink() | |
except: | |
pass | |
def remove_site(self, unique_id: str) -> bool: | |
"""Remove a deployed site""" | |
if unique_id in self.active_sites: | |
site_path = self.sites_dir / unique_id | |
if site_path.exists(): | |
shutil.rmtree(site_path) | |
self.active_sites.remove(unique_id) | |
return True | |
return False | |
# Initialize site manager | |
site_manager = SiteManager() | |
async def deploy_site(unique_id: str, file: UploadFile = File(...)): | |
"""Deploy a new site from a ZIP file""" | |
if not file.filename.endswith('.zip'): | |
raise HTTPException(status_code=400, detail="File must be a ZIP archive") | |
return await site_manager.deploy_site(unique_id, file) | |
async def remove_site(unique_id: str): | |
"""Remove a deployed site""" | |
if site_manager.remove_site(unique_id): | |
return {"status": "success", "message": f"Site {unique_id} removed"} | |
raise HTTPException(status_code=404, detail="Site not found") | |
async def list_sites(): | |
"""List all deployed sites""" | |
return {"sites": list(site_manager.active_sites)} | |
async def health_check(): | |
"""Health check endpoint""" | |
return {"status": "healthy", "sites_count": len(site_manager.active_sites)} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |