import json import time import joblib import logging import hashlib import uvicorn import asyncio import aiofiles import traceback import numpy as np from pathlib import Path from collections import defaultdict from datetime import datetime, timedelta from typing import List, Dict, Optional, Any from contextlib import asynccontextmanager from fastapi.responses import JSONResponse from fastapi.openapi.utils import get_openapi from pydantic import BaseModel, Field, validator from fastapi.middleware.cors import CORSMiddleware from fastapi.openapi.docs import get_swagger_ui_html from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi import FastAPI, HTTPException, Depends, Request, BackgroundTasks, status # Import the new path manager try: from path_config import path_manager except ImportError: # Fallback for development environments import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__))) from path_config import path_manager # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(path_manager.get_logs_path('fastapi_server.log')), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Log environment info at startup path_manager.log_environment_info() # Security security = HTTPBearer(auto_error=False) # Rate limiting storage rate_limit_storage = defaultdict(list) class ModelManager: """Manages model loading and health checks with dynamic paths""" def __init__(self): self.model = None self.vectorizer = None self.pipeline = None self.model_metadata = {} self.last_health_check = None self.health_status = "unknown" self.load_model() def load_model(self): """Load model with comprehensive error handling and dynamic paths""" try: logger.info("Loading ML model...") # Try to load pipeline first (preferred) pipeline_path = path_manager.get_pipeline_path() if pipeline_path.exists(): self.pipeline = joblib.load(pipeline_path) self.model = self.pipeline.named_steps.get('model') self.vectorizer = self.pipeline.named_steps.get('vectorize') or self.pipeline.named_steps.get('vectorizer') logger.info("Loaded model pipeline successfully") else: # Fallback to individual components model_path = path_manager.get_model_file_path() vectorizer_path = path_manager.get_vectorizer_path() if model_path.exists() and vectorizer_path.exists(): self.model = joblib.load(model_path) self.vectorizer = joblib.load(vectorizer_path) logger.info("Loaded model components successfully") else: raise FileNotFoundError(f"No model files found. Checked:\n- {pipeline_path}\n- {model_path}\n- {vectorizer_path}") # Load metadata metadata_path = path_manager.get_metadata_path() if metadata_path.exists(): with open(metadata_path, 'r') as f: self.model_metadata = json.load(f) logger.info( f"Loaded model metadata: {self.model_metadata.get('model_version', 'Unknown')}") self.health_status = "healthy" self.last_health_check = datetime.now() except Exception as e: logger.error(f"Failed to load model: {e}") self.health_status = "unhealthy" self.model = None self.vectorizer = None self.pipeline = None def predict(self, text: str) -> tuple[str, float]: """Make prediction with error handling""" try: if self.pipeline: # Use pipeline for prediction prediction = self.pipeline.predict([text])[0] probabilities = self.pipeline.predict_proba([text])[0] elif self.model and self.vectorizer: # Use individual components X = self.vectorizer.transform([text]) prediction = self.model.predict(X)[0] probabilities = self.model.predict_proba(X)[0] else: raise ValueError("No model available for prediction") # Get confidence score confidence = float(probabilities[prediction]) # Convert prediction to readable format label = "Fake" if prediction == 1 else "Real" return label, confidence except Exception as e: logger.error(f"Prediction failed: {e}") raise HTTPException( status_code=500, detail=f"Prediction failed: {str(e)}" ) def health_check(self) -> Dict[str, Any]: """Perform health check""" try: # Test prediction with sample text test_text = "This is a test article for health check purposes." label, confidence = self.predict(test_text) self.health_status = "healthy" self.last_health_check = datetime.now() return { "status": "healthy", "last_check": self.last_health_check.isoformat(), "model_available": self.model is not None, "vectorizer_available": self.vectorizer is not None, "pipeline_available": self.pipeline is not None, "test_prediction": {"label": label, "confidence": confidence}, "environment": path_manager.environment, "model_path": str(path_manager.get_model_file_path()), "data_path": str(path_manager.get_data_path()) } except Exception as e: self.health_status = "unhealthy" self.last_health_check = datetime.now() return { "status": "unhealthy", "last_check": self.last_health_check.isoformat(), "error": str(e), "model_available": self.model is not None, "vectorizer_available": self.vectorizer is not None, "pipeline_available": self.pipeline is not None, "environment": path_manager.environment, "model_path": str(path_manager.get_model_file_path()), "data_path": str(path_manager.get_data_path()) } # Global model manager model_manager = ModelManager() @asynccontextmanager async def lifespan(app: FastAPI): """Manage application lifespan""" logger.info("Starting FastAPI application...") # Startup tasks model_manager.load_model() # Schedule periodic health checks asyncio.create_task(periodic_health_check()) yield # Shutdown tasks logger.info("Shutting down FastAPI application...") # Create FastAPI app app = FastAPI( title="Fake News Detection API", description="Production-ready API for fake news detection with comprehensive monitoring and security features", version="2.0.0", docs_url="/docs", redoc_url="/redoc", lifespan=lifespan ) # Add middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure appropriately for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.add_middleware( TrustedHostMiddleware, allowed_hosts=["*"] # Configure appropriately for production ) # Request/Response models (unchanged) class PredictionRequest(BaseModel): text: str = Field(..., min_length=1, max_length=10000, description="Text to analyze for fake news detection") @validator('text') def validate_text(cls, v): if not v or not v.strip(): raise ValueError('Text cannot be empty') # Basic content validation if len(v.strip()) < 10: raise ValueError('Text must be at least 10 characters long') # Check for suspicious patterns suspicious_patterns = ['= 100: raise HTTPException( status_code=429, detail="Rate limit exceeded. Maximum 100 requests per hour." ) # Add current request rate_limit_storage[client_ip].append(current_time) # Logging middleware @app.middleware("http") async def log_requests(request: Request, call_next): """Log all requests""" start_time = time.time() response = await call_next(request) process_time = time.time() - start_time log_data = { "method": request.method, "url": str(request.url), "client_ip": request.client.host, "status_code": response.status_code, "process_time": process_time, "timestamp": datetime.now().isoformat() } logger.info(f"Request: {json.dumps(log_data)}") return response # Error handlers (unchanged) @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): """Handle HTTP exceptions""" error_data = { "error": True, "message": exc.detail, "status_code": exc.status_code, "timestamp": datetime.now().isoformat(), "path": request.url.path } logger.error(f"HTTP Exception: {json.dumps(error_data)}") return JSONResponse( status_code=exc.status_code, content=error_data ) @app.exception_handler(Exception) async def general_exception_handler(request: Request, exc: Exception): """Handle general exceptions""" error_data = { "error": True, "message": "Internal server error", "timestamp": datetime.now().isoformat(), "path": request.url.path } logger.error(f"General Exception: {str(exc)}\n{traceback.format_exc()}") return JSONResponse( status_code=500, content=error_data ) # Background tasks async def periodic_health_check(): """Periodic health check""" while True: try: await asyncio.sleep(300) # Check every 5 minutes health_status = model_manager.health_check() if health_status["status"] == "unhealthy": logger.warning( "Model health check failed, attempting to reload...") model_manager.load_model() except Exception as e: logger.error(f"Periodic health check failed: {e}") # API Routes @app.get("/", response_model=Dict[str, str]) async def root(): """Root endpoint""" return { "message": "Fake News Detection API", "version": "2.0.0", "environment": path_manager.environment, "documentation": "/docs", "health_check": "/health" } @app.post("/predict", response_model=PredictionResponse) async def predict( request: PredictionRequest, background_tasks: BackgroundTasks, http_request: Request, _: None = Depends(rate_limit_check) ): """ Predict whether a news article is fake or real - **text**: The news article text to analyze - **returns**: Prediction result with confidence score """ start_time = time.time() try: # Check model health if model_manager.health_status != "healthy": raise HTTPException( status_code=503, detail="Model is not available. Please try again later." ) # Make prediction label, confidence = model_manager.predict(request.text) # Calculate processing time processing_time = time.time() - start_time # Create response response = PredictionResponse( prediction=label, confidence=confidence, model_version=model_manager.model_metadata.get( 'model_version', 'unknown'), timestamp=datetime.now().isoformat(), processing_time=processing_time ) # Log prediction (background task) background_tasks.add_task( log_prediction, request.text, label, confidence, http_request.client.host, processing_time ) return response except HTTPException: raise except Exception as e: logger.error(f"Prediction failed: {e}") raise HTTPException( status_code=500, detail=f"Prediction failed: {str(e)}" ) @app.post("/predict/batch", response_model=BatchPredictionResponse) async def predict_batch( request: BatchPredictionRequest, background_tasks: BackgroundTasks, http_request: Request, _: None = Depends(rate_limit_check) ): """ Predict multiple news articles in batch - **texts**: List of news article texts to analyze - **returns**: List of prediction results """ start_time = time.time() try: # Check model health if model_manager.health_status != "healthy": raise HTTPException( status_code=503, detail="Model is not available. Please try again later." ) predictions = [] for text in request.texts: try: label, confidence = model_manager.predict(text) prediction = PredictionResponse( prediction=label, confidence=confidence, model_version=model_manager.model_metadata.get( 'model_version', 'unknown'), timestamp=datetime.now().isoformat(), processing_time=0.0 # Will be updated with total time ) predictions.append(prediction) except Exception as e: logger.error(f"Batch prediction failed for text: {e}") # Continue with other texts continue # Calculate total processing time total_processing_time = time.time() - start_time # Update processing time for all predictions for prediction in predictions: prediction.processing_time = total_processing_time / \ len(predictions) response = BatchPredictionResponse( predictions=predictions, total_count=len(predictions), processing_time=total_processing_time ) # Log batch prediction (background task) background_tasks.add_task( log_batch_prediction, len(request.texts), len(predictions), http_request.client.host, total_processing_time ) return response except HTTPException: raise except Exception as e: logger.error(f"Batch prediction failed: {e}") raise HTTPException( status_code=500, detail=f"Batch prediction failed: {str(e)}" ) @app.get("/health", response_model=HealthResponse) async def health_check(): """ Comprehensive health check endpoint - **returns**: Detailed health status of the API and model """ try: # Model health model_health = model_manager.health_check() # System health import psutil system_health = { "cpu_percent": psutil.cpu_percent(), "memory_percent": psutil.virtual_memory().percent, "disk_percent": psutil.disk_usage('/').percent, "uptime": time.time() - psutil.boot_time() } # API health api_health = { "rate_limit_active": len(rate_limit_storage) > 0, "active_connections": len(rate_limit_storage) } # Environment info environment_info = path_manager.get_environment_info() # Overall status overall_status = "healthy" if model_health["status"] == "healthy" else "unhealthy" return HealthResponse( status=overall_status, timestamp=datetime.now().isoformat(), model_health=model_health, system_health=system_health, api_health=api_health, environment_info=environment_info ) except Exception as e: logger.error(f"Health check failed: {e}") return HealthResponse( status="unhealthy", timestamp=datetime.now().isoformat(), model_health={"status": "unhealthy", "error": str(e)}, system_health={"error": str(e)}, api_health={"error": str(e)}, environment_info={"error": str(e)} ) @app.get("/metrics") async def get_metrics(): """ Get API metrics - **returns**: Usage statistics and performance metrics