Spaces:
Running
Running
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from datetime import datetime | |
| import os | |
| from dotenv import load_dotenv | |
| from huggingface_hub import HfApi | |
| import json | |
| import logging | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| # Load environment variables | |
| load_dotenv() | |
| # API configuration | |
| API_HOST = os.getenv("API_HOST", "0.0.0.0") | |
| API_PORT = int(os.getenv("API_PORT", "3002")) | |
| app = FastAPI() | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[ | |
| "http://localhost:5173", # Vite dev server | |
| f"http://localhost:{API_PORT}", # API port | |
| "https://huggingface.co", # HF main domain | |
| "https://*.hf.space", # HF Spaces domains | |
| ], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Cache storage | |
| cache = { | |
| "data": None, | |
| "last_updated": None | |
| } | |
| # HF API configuration | |
| HF_TOKEN = os.getenv("HUGGING_FACE_HUB_TOKEN") | |
| REPO_ID = os.getenv("HUGGING_FACE_STORAGE_REPO") | |
| FILE_PATH = os.getenv("HUGGING_FACE_STORAGE_FILE_PATH") | |
| CACHE_DURATION_MINUTES = int(os.getenv("UPDATE_INTERVAL_MINUTES", "15")) | |
| # Initialize HF API client | |
| hf_api = HfApi(token=HF_TOKEN) | |
| def fetch_leaderboards(): | |
| """Fetch leaderboards data from Hugging Face""" | |
| try: | |
| logging.info(f"Fetching leaderboards from {REPO_ID}/{FILE_PATH}") | |
| # Download the JSON file directly with force_download to ensure we get the latest version | |
| json_path = hf_api.hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=FILE_PATH, | |
| repo_type="dataset", | |
| force_download=True, # Force download to ensure we get the latest version | |
| force_filename="leaderboards_latest.json" # Force a specific filename to avoid caching issues | |
| ) | |
| logging.info(f"File downloaded to: {json_path}") | |
| with open(json_path, 'r') as f: | |
| new_data = json.load(f) | |
| old_data = cache["data"] | |
| cache["data"] = new_data | |
| cache["last_updated"] = datetime.now() | |
| # Log the differences | |
| old_len = len(old_data) if old_data and isinstance(old_data, list) else 0 | |
| new_len = len(new_data) if isinstance(new_data, list) else 0 | |
| logging.info(f"Cache updated: Old entries: {old_len}, New entries: {new_len}") | |
| logging.info(f"Cache update timestamp: {cache['last_updated']}") | |
| except Exception as e: | |
| logging.error(f"Error fetching data: {str(e)}", exc_info=True) | |
| if not cache["data"]: # Only raise if we don't have any cached data | |
| raise HTTPException(status_code=500, detail="Failed to fetch leaderboards data") | |
| # Initial fetch | |
| fetch_leaderboards() | |
| async def get_leaderboards(): | |
| """Get leaderboards data from cache""" | |
| if not cache["data"]: | |
| fetch_leaderboards() | |
| return { | |
| "data": cache["data"], | |
| "last_updated": cache["last_updated"].isoformat() if cache["last_updated"] else None | |
| } | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "cache_status": "initialized" if cache["data"] else "empty", | |
| "last_updated": cache["last_updated"].isoformat() if cache["last_updated"] else None | |
| } | |
| async def handle_webhook(request: Request): | |
| """Handle webhook notifications from Hugging Face Hub""" | |
| try: | |
| body = await request.json() | |
| logging.info(f"Received webhook with payload: {body}") | |
| # Get the event details | |
| event = body.get("event", {}) | |
| # Verify if it's a relevant update (repo content update) | |
| if event.get("action") == "update" and event.get("scope") == "repo.content": | |
| try: | |
| logging.info(f"Dataset update detected for repo {REPO_ID}, file {FILE_PATH}") | |
| # Force a clean fetch | |
| fetch_leaderboards() | |
| if cache["last_updated"]: | |
| logging.info(f"Cache successfully updated at {cache['last_updated']}") | |
| return {"status": "success", "message": "Cache updated"} | |
| else: | |
| logging.error("Cache update failed: last_updated is None") | |
| return {"status": "error", "message": "Cache update failed"} | |
| except Exception as fetch_error: | |
| logging.error(f"Error during fetch_leaderboards: {str(fetch_error)}", exc_info=True) | |
| return {"status": "error", "message": f"Failed to update cache: {str(fetch_error)}"} | |
| logging.info(f"Ignoring webhook event: action={event.get('action')}, scope={event.get('scope')}") | |
| return {"status": "ignored", "message": "Event type not relevant"} | |
| except Exception as e: | |
| logging.error(f"Error processing webhook: {str(e)}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"Failed to process webhook: {str(e)}") | |
| # Mount static files for the React client | |
| app.mount("/", StaticFiles(directory="static", html=True), name="static") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("server:app", host=API_HOST, port=API_PORT, reload=True) |