Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Atlas Unified Bridge Server | |
| This server connects all Atlas platform components together | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import logging | |
| import requests | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| from pydantic import BaseModel | |
| from typing import Dict, List, Any, Optional | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler("bridge_server.log") | |
| ] | |
| ) | |
| logger = logging.getLogger("atlas_bridge") | |
| # Component endpoints | |
| OPENMANUS_ENDPOINT = "http://localhost:50505" | |
| CASIBASE_ENDPOINT = "http://localhost:7777" | |
| CYPHER_ENDPOINT = "http://localhost:5000" | |
| QUANTUMVISION_ENDPOINT = "http://localhost:8000" | |
| CONVERSATION_ENDPOINT = "http://localhost:8001" | |
| # Initialize FastAPI app | |
| app = FastAPI(title="Atlas Unified Bridge") | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Adjust in production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Health check endpoint | |
| async def health_check(): | |
| """Health check endpoint to verify if bridge server is running""" | |
| status = { | |
| "bridge": "operational", | |
| "components": { | |
| "openmanus": check_component_health(OPENMANUS_ENDPOINT), | |
| "casibase": check_component_health(CASIBASE_ENDPOINT), | |
| "cypher": check_component_health(CYPHER_ENDPOINT), | |
| "quantumvision": check_component_health(QUANTUMVISION_ENDPOINT), | |
| "conversation": check_component_health(CONVERSATION_ENDPOINT) | |
| }, | |
| "timestamp": time.time() | |
| } | |
| return status | |
| def check_component_health(endpoint: str) -> str: | |
| """Check if a component is operational""" | |
| try: | |
| response = requests.get(f"{endpoint}/health", timeout=3) | |
| if response.status_code == 200: | |
| return "operational" | |
| except: | |
| pass | |
| return "unavailable" | |
| # Main entry point | |
| if __name__ == "__main__": | |
| logger.info("Starting Atlas Unified Bridge Server") | |
| uvicorn.run("bridge_server:app", host="0.0.0.0", port=8080, reload=True) | |