đ° Fake News Detection System
--
diff --git "a/app/fastapi_server.py" "b/app/fastapi_server.py"
--- "a/app/fastapi_server.py"
+++ "b/app/fastapi_server.py"
@@ -1,2045 +1,2282 @@
import os
-import io
import sys
import json
import time
-import hashlib
+import joblib
import logging
-import requests
-import subprocess
-import pandas as pd
+import hashlib
+import uvicorn
+import asyncio
+import aiofiles
+import traceback
import numpy as np
-import altair as alt
-import streamlit as st
from pathlib import Path
-import plotly.express as px
-import plotly.graph_objects as go
-from plotly.subplots import make_subplots
+from typing import Optional
+from dataclasses import asdict
+from collections import defaultdict
from datetime import datetime, timedelta
-from typing import Dict, List, Optional, Any
+from contextlib import asynccontextmanager
+from typing import List, Dict, Optional, Any
+from fastapi.responses import JSONResponse
+from fastapi.openapi.utils import get_openapi
+from pydantic import BaseModel, Field, validator
+from fastapi.middleware.cors import CORSMiddleware
+from fastapi.openapi.docs import get_swagger_ui_html
+from fastapi.middleware.trustedhost import TrustedHostMiddleware
+from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
+from fastapi import FastAPI, HTTPException, Depends, Request, BackgroundTasks, status
+
+from data.data_validator import (
+ DataValidationPipeline, validate_text, validate_articles_list,
+ get_validation_stats, generate_quality_report
+)
+
+from model.retrain import AutomatedRetrainingManager
+from monitor.metrics_collector import MetricsCollector
+from monitor.prediction_monitor import PredictionMonitor
+from monitor.alert_system import AlertSystem, console_notification_handler
+
+from deployment.traffic_router import TrafficRouter
+from deployment.model_registry import ModelRegistry
+from deployment.blue_green_manager import BlueGreenDeploymentManager
+
# Import the new path manager
try:
from path_config import path_manager
except ImportError:
+ # Fallback for development environments
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append('/app')
from path_config import path_manager
-# Configure logging
-def setup_streamlit_logging():
- """Setup logging with fallback for restricted file access"""
+try:
+ from data.data_validator import DataValidator
+ from data.validation_schemas import TextQualityLevel
+ VALIDATION_AVAILABLE = True
+except ImportError as e:
+ logger.warning(f"Data validation not available: {e}")
+ VALIDATION_AVAILABLE = False
+
+# Configure logging with fallback for permission issues
+def setup_logging():
+ """Setup logging with fallback for environments with restricted file access"""
+ handlers = [logging.StreamHandler()] # Always include console output
+
try:
- log_file_path = path_manager.get_logs_path('streamlit_app.log')
+ # Try to create log file in the logs directory
+ log_file_path = path_manager.get_logs_path('fastapi_server.log')
log_file_path.parent.mkdir(parents=True, exist_ok=True)
- with open(log_file_path, 'a') as test_file:
- test_file.write('')
+ # Test if we can write to the file
+ test_handler = logging.FileHandler(log_file_path)
+ test_handler.close()
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler(log_file_path),
- logging.StreamHandler()
- ]
- )
- return True
- except (PermissionError, OSError):
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- handlers=[logging.StreamHandler()]
- )
- return False
+ # If successful, add file handler
+ handlers.append(logging.FileHandler(log_file_path))
+ print(f"Logging to file: {log_file_path}") # Use print instead of logger
+
+ except (PermissionError, OSError) as e:
+ # If file logging fails, just use console logging
+ print(f"Cannot create log file, using console only: {e}")
+
+ # Try alternative locations for file logging
+ try:
+ import tempfile
+ temp_log = tempfile.NamedTemporaryFile(mode='w', suffix='.log', delete=False, prefix='fastapi_')
+ temp_log.close()
+ handlers.append(logging.FileHandler(temp_log.name))
+ print(f"Using temporary log file: {temp_log.name}")
+ except Exception as temp_e:
+ print(f"Temporary file logging also failed: {temp_e}")
+
+ return handlers
-file_logging_enabled = setup_streamlit_logging()
+# Setup logging with error handling
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ handlers=setup_logging()
+)
logger = logging.getLogger(__name__)
-if not file_logging_enabled:
- logger.warning("File logging disabled due to permission restrictions")
+# Now that logger is defined, log the environment info
+try:
+ path_manager.log_environment_info()
+except Exception as e:
+ logger.warning(f"Could not log environment info: {e}")
-logger.info(f"Streamlit starting in {path_manager.environment} environment")
+# Security
+security = HTTPBearer(auto_error=False)
+# Rate limiting storage
+rate_limit_storage = defaultdict(list)
-# Page configuration - MUST be first Streamlit command
-st.set_page_config(
- page_title="Fake News Detection System | MLOps Dashboard",
- page_icon="đ°",
- layout="wide",
- initial_sidebar_state="expanded",
- menu_items={
- 'Get Help': 'https://github.com/your-repo/issues',
- 'Report a bug': 'https://github.com/your-repo/issues',
- 'About': """
- # Advanced Fake News Detection System
-
- A production-grade MLOps pipeline with statistical rigor and CPU optimization.
-
- ## Key Features:
- - Bootstrap confidence intervals
- - Statistical significance testing
- - Cross-validation with uncertainty quantification
- - CPU-optimized for HuggingFace Spaces
-
- Built with FastAPI, Streamlit, and scikit-learn.
- """
- }
-)
-class EnhancedStreamlitApp:
- """Enhanced Streamlit application with professional UI/UX"""
+class ModelManager:
+ """Manages model loading and health checks with dynamic paths"""
def __init__(self):
- self.setup_config()
- self.setup_api_client()
- self.initialize_session_state()
- self.setup_custom_css()
-
- def setup_config(self):
- """Setup application configuration"""
- self.config = {
- 'api_url': "http://localhost:8000",
- 'max_upload_size': 1000 * 1024 * 1024,
- 'supported_file_types': ['csv', 'txt', 'json'],
- 'max_text_length': 10000,
- 'prediction_timeout': 30,
- 'refresh_interval': 60,
- 'max_batch_size': 100,
- 'chart_theme': 'plotly_white'
- }
-
- def setup_api_client(self):
- """Setup API client with comprehensive error handling"""
- self.session = requests.Session()
- self.session.timeout = self.config['prediction_timeout']
-
- # Add retry logic with compatibility handling
- try:
- from requests.adapters import HTTPAdapter
- from urllib3.util.retry import Retry
-
- # Try new parameter name first, fall back to old one
- try:
- retry_strategy = Retry(
- total=3,
- status_forcelist=[429, 500, 502, 503, 504],
- allowed_methods=["HEAD", "GET", "OPTIONS"]
- )
- except TypeError:
- # Fallback for older urllib3 versions
- retry_strategy = Retry(
- total=3,
- status_forcelist=[429, 500, 502, 503, 504],
- method_whitelist=["HEAD", "GET", "OPTIONS"]
- )
-
- adapter = HTTPAdapter(max_retries=retry_strategy)
- self.session.mount("http://", adapter)
- self.session.mount("https://", adapter)
-
- except Exception as e:
- logger.warning(f"Could not setup retry strategy: {e}")
-
- self.api_available = self.test_api_connection()
-
- def test_api_connection(self) -> bool:
- """Test API connection with detailed status"""
+ self.model = None
+ self.vectorizer = None
+ self.pipeline = None
+ self.model_metadata = {}
+ self.last_health_check = None
+ self.health_status = "unknown"
+ self.load_model()
+
+ def load_model(self):
+ """Load model with comprehensive error handling and dynamic paths"""
try:
- response = self.session.get(f"{self.config['api_url']}/health", timeout=5)
- return response.status_code == 200
- except Exception as e:
- logger.warning(f"API connection failed: {e}")
- return False
-
- def initialize_session_state(self):
- """Initialize comprehensive session state"""
- default_states = {
- 'prediction_history': [],
- 'upload_history': [],
- 'last_refresh': datetime.now(),
- 'auto_refresh': False,
- 'selected_model_version': 'current',
- 'dashboard_theme': 'professional',
- 'expanded_sections': set(),
- 'user_preferences': {
- 'show_confidence_intervals': True,
- 'show_statistical_tests': True,
- 'chart_style': 'modern',
- 'auto_scroll': True
- }
- }
-
- for key, value in default_states.items():
- if key not in st.session_state:
- st.session_state[key] = value
+ logger.info("Loading ML model...")
- def setup_custom_css(self):
- """Setup advanced custom CSS styling with dark grey and gold theme"""
- st.markdown("""
-
- """, unsafe_allow_html=True)
+ # Initialize all to None first
+ self.model = None
+ self.vectorizer = None
+ self.pipeline = None
- # API Methods (enhanced versions of existing methods)
- def get_cv_results_from_api(self) -> Optional[Dict]:
- """Get cross-validation results with enhanced error handling"""
- try:
- if not self.api_available:
- return None
-
- response = self.session.get(f"{self.config['api_url']}/cv/results", timeout=10)
+ # Try to load pipeline first (preferred)
+ pipeline_path = path_manager.get_pipeline_path()
+ logger.info(f"Checking for pipeline at: {pipeline_path}")
- if response.status_code == 200:
- data = response.json()
- # Cache the results
- st.session_state['cached_cv_results'] = {
- 'data': data,
- 'timestamp': datetime.now(),
- 'ttl': 300 # 5 minutes
- }
- return data
- elif response.status_code == 404:
- return {'error': 'No CV results available'}
+ if pipeline_path.exists():
+ try:
+ self.pipeline = joblib.load(pipeline_path)
+ # Extract components from pipeline
+ if hasattr(self.pipeline, 'named_steps'):
+ self.model = self.pipeline.named_steps.get('model')
+ self.vectorizer = (self.pipeline.named_steps.get('vectorizer') or
+ self.pipeline.named_steps.get('vectorize'))
+ logger.info("Loaded model pipeline successfully")
+ logger.info(f"Pipeline steps: {list(self.pipeline.named_steps.keys()) if hasattr(self.pipeline, 'named_steps') else 'No named_steps'}")
+ except Exception as e:
+ logger.warning(f"Failed to load pipeline: {e}, falling back to individual components")
+ self.pipeline = None
else:
- return {'error': f'API Error: {response.status_code}'}
+ logger.info(f"Pipeline file not found at {pipeline_path}")
+
+ # If pipeline loading failed or doesn't exist, load individual components
+ if self.pipeline is None:
+ model_path = path_manager.get_model_file_path()
+ vectorizer_path = path_manager.get_vectorizer_path()
+ logger.info(f"Checking for model at: {model_path}")
+ logger.info(f"Checking for vectorizer at: {vectorizer_path}")
+
+ if model_path.exists() and vectorizer_path.exists():
+ try:
+ self.model = joblib.load(model_path)
+ self.vectorizer = joblib.load(vectorizer_path)
+ logger.info("Loaded model components successfully")
+ except Exception as e:
+ logger.error(f"Failed to load individual components: {e}")
+ raise e
+ else:
+ raise FileNotFoundError(f"No model files found. Checked:\n- {pipeline_path}\n- {model_path}\n- {vectorizer_path}")
+
+ # Verify we have what we need for predictions
+ if self.pipeline is None and (self.model is None or self.vectorizer is None):
+ raise ValueError("Neither complete pipeline nor individual model components are available")
+
+ # Load metadata
+ metadata_path = path_manager.get_metadata_path()
+ if metadata_path.exists():
+ with open(metadata_path, 'r') as f:
+ self.model_metadata = json.load(f)
+ logger.info(f"Loaded model metadata: {self.model_metadata.get('model_version', 'Unknown')}")
+ else:
+ logger.warning(f"Metadata file not found at: {metadata_path}")
+ self.model_metadata = {"model_version": "unknown"}
+
+ self.health_status = "healthy"
+ self.last_health_check = datetime.now()
+
+ # Log what was successfully loaded
+ logger.info(f"Model loading summary:")
+ logger.info(f" Pipeline available: {self.pipeline is not None}")
+ logger.info(f" Model available: {self.model is not None}")
+ logger.info(f" Vectorizer available: {self.vectorizer is not None}")
+
except Exception as e:
- logger.warning(f"Could not fetch CV results: {e}")
- # Return cached results if available
- cached = st.session_state.get('cached_cv_results')
- if cached and (datetime.now() - cached['timestamp']).seconds < cached['ttl']:
- return cached['data']
- return None
-
- def get_monitoring_metrics_from_api(self) -> Optional[Dict]:
- """Get real-time monitoring metrics"""
+ logger.error(f"Failed to load model: {e}")
+ logger.error(f"Traceback: {traceback.format_exc()}")
+ self.health_status = "unhealthy"
+ self.model = None
+ self.vectorizer = None
+ self.pipeline = None
+
+ def predict(self, text: str) -> tuple[str, float]:
+ """Make prediction with error handling"""
try:
- if not self.api_available:
- return None
-
- response = self.session.get(f"{self.config['api_url']}/monitor/metrics/current", timeout=10)
- return response.json() if response.status_code == 200 else None
-
- except Exception as e:
- logger.warning(f"Could not fetch monitoring metrics: {e}")
- return None
+ if self.pipeline:
+ # Use pipeline for prediction
+ prediction = self.pipeline.predict([text])[0]
+ probabilities = self.pipeline.predict_proba([text])[0]
+ logger.debug("Used pipeline for prediction")
+ elif self.model and self.vectorizer:
+ # Use individual components
+ X = self.vectorizer.transform([text])
+ prediction = self.model.predict(X)[0]
+ probabilities = self.model.predict_proba(X)[0]
+ logger.debug("Used individual components for prediction")
+ else:
+ raise ValueError("No model available for prediction")
- def make_prediction_request(self, text: str) -> Dict[str, Any]:
- """Enhanced prediction request with better error handling"""
- try:
- if not self.api_available:
- return {'error': 'API is not available'}
-
- # Show loading state
- with st.spinner('Analyzing text with statistical validation...'):
- response = self.session.post(
- f"{self.config['api_url']}/predict",
- json={"text": text},
- timeout=self.config['prediction_timeout']
- )
+ # Get confidence score
+ confidence = float(max(probabilities))
- if response.status_code == 200:
- result = response.json()
- # Add to prediction history
- self.add_to_prediction_history(text, result)
- return result
- else:
- return {'error': f'API Error: {response.status_code} - {response.text}'}
+ # Convert prediction to readable format
+ label = "Fake" if prediction == 1 else "Real"
+
+ return label, confidence
- except requests.exceptions.Timeout:
- return {'error': 'Request timed out. Please try again.'}
- except requests.exceptions.ConnectionError:
- return {'error': 'Cannot connect to prediction service.'}
except Exception as e:
- return {'error': f'Unexpected error: {str(e)}'}
+ logger.error(f"Prediction failed: {e}")
+ logger.error(f"Traceback: {traceback.format_exc()}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"Prediction failed: {str(e)}"
+ )
- def add_to_prediction_history(self, text: str, result: Dict):
- """Add prediction to session history with enhanced metadata"""
- prediction_entry = {
- 'timestamp': datetime.now().isoformat(),
- 'text': text[:200] + "..." if len(text) > 200 else text,
- 'prediction': result.get('prediction', 'Unknown'),
- 'confidence': result.get('confidence', 0.0),
- 'text_length': len(text),
- 'word_count': len(text.split()),
- 'processing_time': result.get('processing_time', 0.0),
- 'model_version': result.get('model_version', 'unknown'),
- 'session_id': st.session_state.get('session_id', 'default')
- }
+ def health_check(self) -> Dict[str, Any]:
+ """Perform health check"""
+ try:
+ # Test prediction with sample text
+ test_text = "This is a test article for health check purposes."
+ label, confidence = self.predict(test_text)
+
+ self.health_status = "healthy"
+ self.last_health_check = datetime.now()
+
+ return {
+ "status": "healthy",
+ "last_check": self.last_health_check.isoformat(),
+ "model_available": self.model is not None,
+ "vectorizer_available": self.vectorizer is not None,
+ "pipeline_available": self.pipeline is not None,
+ "test_prediction": {"label": label, "confidence": confidence},
+ "environment": path_manager.environment,
+ "model_path": str(path_manager.get_model_file_path()),
+ "vectorizer_path": str(path_manager.get_vectorizer_path()),
+ "pipeline_path": str(path_manager.get_pipeline_path()),
+ "data_path": str(path_manager.get_data_path()),
+ "file_exists": {
+ "model": path_manager.get_model_file_path().exists(),
+ "vectorizer": path_manager.get_vectorizer_path().exists(),
+ "pipeline": path_manager.get_pipeline_path().exists(),
+ "metadata": path_manager.get_metadata_path().exists()
+ }
+ }
- st.session_state.prediction_history.append(prediction_entry)
-
- # Keep only last 100 predictions
- if len(st.session_state.prediction_history) > 100:
- st.session_state.prediction_history = st.session_state.prediction_history[-100:]
-
- # Enhanced Visualization Methods
- def create_advanced_confidence_gauge(self, confidence: float, prediction: str):
- """Create advanced confidence gauge with statistical context"""
- # Color scheme based on prediction
- color = "#e74c3c" if prediction == "Fake" else "#27ae60"
-
- fig = go.Figure(go.Indicator(
- mode="gauge+number+delta",
- value=confidence * 100,
- domain={'x': [0, 1], 'y': [0, 1]},
- title={'text': f"Confidence: {prediction}
Statistical Validation"},
- delta={'reference': 75, 'position': "top"},
- gauge={
- 'axis': {'range': [None, 100], 'tickwidth': 1, 'tickcolor': "darkblue"},
- 'bar': {'color': color},
- 'bgcolor': "white",
- 'borderwidth': 2,
- 'bordercolor': "gray",
- 'steps': [
- {'range': [0, 50], 'color': '#f8f9fa'},
- {'range': [50, 75], 'color': '#e9ecef'},
- {'range': [75, 90], 'color': '#dee2e6'},
- {'range': [90, 100], 'color': '#ced4da'}
- ],
- 'threshold': {
- 'line': {'color': "red", 'width': 4},
- 'thickness': 0.75,
- 'value': 90
+ except Exception as e:
+ self.health_status = "unhealthy"
+ self.last_health_check = datetime.now()
+
+ return {
+ "status": "unhealthy",
+ "last_check": self.last_health_check.isoformat(),
+ "error": str(e),
+ "model_available": self.model is not None,
+ "vectorizer_available": self.vectorizer is not None,
+ "pipeline_available": self.pipeline is not None,
+ "environment": path_manager.environment,
+ "model_path": str(path_manager.get_model_file_path()),
+ "vectorizer_path": str(path_manager.get_vectorizer_path()),
+ "pipeline_path": str(path_manager.get_pipeline_path()),
+ "data_path": str(path_manager.get_data_path()),
+ "file_exists": {
+ "model": path_manager.get_model_file_path().exists(),
+ "vectorizer": path_manager.get_vectorizer_path().exists(),
+ "pipeline": path_manager.get_pipeline_path().exists(),
+ "metadata": path_manager.get_metadata_path().exists()
}
}
- ))
- fig.update_layout(
- paper_bgcolor="rgba(255,255,255,0)",
- plot_bgcolor="rgba(255,255,255,0)",
- font={'color': "darkblue", 'family': "Inter"},
- height=350
- )
- return fig
+# Background task functions
+async def log_prediction(text: str, prediction: str, confidence: float, client_ip: str, processing_time: float):
+ """Log prediction details with error handling for file access"""
+ try:
+ log_entry = {
+ "timestamp": datetime.now().isoformat(),
+ "client_ip": client_ip,
+ "text_length": len(text),
+ "prediction": prediction,
+ "confidence": confidence,
+ "processing_time": processing_time,
+ "text_hash": hashlib.md5(text.encode()).hexdigest()
+ }
- def create_cv_performance_visualization(self, cv_results: Dict):
- """Create comprehensive CV performance visualization"""
- if not cv_results or 'cross_validation' not in cv_results:
- return None
+ # Try to save to log file
+ try:
+ log_file = path_manager.get_logs_path("prediction_log.json")
+
+ # Load existing logs
+ logs = []
+ if log_file.exists():
+ try:
+ async with aiofiles.open(log_file, 'r') as f:
+ content = await f.read()
+ logs = json.loads(content)
+ except:
+ logs = []
+
+ # Add new log
+ logs.append(log_entry)
+
+ # Keep only last 1000 entries
+ if len(logs) > 1000:
+ logs = logs[-1000:]
+
+ # Save logs
+ async with aiofiles.open(log_file, 'w') as f:
+ await f.write(json.dumps(logs, indent=2))
+
+ except (PermissionError, OSError) as e:
+ # If file logging fails, just log to console
+ logger.warning(f"Cannot write prediction log to file: {e}")
+ logger.info(f"Prediction logged: {json.dumps(log_entry)}")
- cv_data = cv_results['cross_validation']
- fold_results = cv_data.get('individual_fold_results', [])
-
- if not fold_results:
- return None
+ except Exception as e:
+ logger.error(f"Failed to log prediction: {e}")
- # Create subplot figure
- fig = make_subplots(
- rows=2, cols=2,
- subplot_titles=('F1 Score Distribution', 'Accuracy Distribution',
- 'Performance by Fold', 'Train vs Test Scores'),
- specs=[[{"type": "histogram"}, {"type": "histogram"}],
- [{"type": "scatter"}, {"type": "bar"}]]
- )
- # Extract data for visualization
- f1_scores = [fold['test_scores'].get('f1', 0) for fold in fold_results if 'test_scores' in fold]
- accuracy_scores = [fold['test_scores'].get('accuracy', 0) for fold in fold_results if 'test_scores' in fold]
- fold_numbers = [fold.get('fold', i) for i, fold in enumerate(fold_results)]
+async def log_batch_prediction(total_texts: int, successful_predictions: int, client_ip: str, processing_time: float):
+ """Log batch prediction details"""
+ try:
+ log_entry = {
+ "timestamp": datetime.now().isoformat(),
+ "type": "batch_prediction",
+ "client_ip": client_ip,
+ "total_texts": total_texts,
+ "successful_predictions": successful_predictions,
+ "processing_time": processing_time,
+ "success_rate": successful_predictions / total_texts if total_texts > 0 else 0
+ }
- # F1 Distribution
- fig.add_trace(
- go.Histogram(x=f1_scores, nbinsx=10, name="F1 Distribution",
- marker_color='rgba(55, 128, 191, 0.7)'),
- row=1, col=1
- )
+ logger.info(f"Batch prediction logged: {json.dumps(log_entry)}")
- # Accuracy Distribution
- fig.add_trace(
- go.Histogram(x=accuracy_scores, nbinsx=10, name="Accuracy Distribution",
- marker_color='rgba(219, 64, 82, 0.7)'),
- row=1, col=2
- )
+ except Exception as e:
+ logger.error(f"Failed to log batch prediction: {e}")
- # Performance by Fold
- fig.add_trace(
- go.Scatter(x=fold_numbers, y=f1_scores, mode='lines+markers',
- name='F1 Score', line=dict(color='blue', width=3)),
- row=2, col=1
- )
-
- fig.add_trace(
- go.Scatter(x=fold_numbers, y=accuracy_scores, mode='lines+markers',
- name='Accuracy', line=dict(color='red', width=3)),
- row=2, col=1
- )
- # Train vs Test comparison (if available)
- if fold_results and 'train_scores' in fold_results[0]:
- test_f1 = [fold['test_scores'].get('f1', 0) for fold in fold_results]
- train_f1 = [fold['train_scores'].get('f1', 0) for fold in fold_results]
-
- fig.add_trace(
- go.Bar(x=fold_numbers, y=test_f1, name='Test F1',
- marker_color='rgba(55, 128, 191, 0.7)'),
- row=2, col=2
- )
-
- fig.add_trace(
- go.Bar(x=fold_numbers, y=train_f1, name='Train F1',
- marker_color='rgba(219, 64, 82, 0.7)'),
- row=2, col=2
- )
+# Global variables
+model_manager = ModelManager()
- fig.update_layout(
- height=800,
- showlegend=True,
- title_text="Cross-Validation Performance Analysis",
- template="plotly_white",
- font=dict(family="Inter", size=12)
- )
+# Initialize automation manager
+automation_manager = None
- return fig
+# Initialize deployment components
+deployment_manager = None
+traffic_router = None
+model_registry = None
- def create_prediction_history_chart(self):
- """Create enhanced prediction history visualization"""
- if not st.session_state.prediction_history:
- return None
- df = pd.DataFrame(st.session_state.prediction_history)
- df['timestamp'] = pd.to_datetime(df['timestamp'])
- df['confidence_percent'] = df['confidence'] * 100
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ """Manage application lifespan with deployment system"""
+ global deployment_manager, traffic_router, model_registry
+
+ logger.info("Starting FastAPI application...")
+
+ # Startup tasks
+ model_manager.load_model()
+
+ # Initialize deployment components
+ try:
+ deployment_manager = BlueGreenDeploymentManager()
+ traffic_router = TrafficRouter()
+ model_registry = ModelRegistry()
+ logger.info("Deployment system initialized")
+ except Exception as e:
+ logger.error(f"Failed to initialize deployment system: {e}")
+
+ # Initialize monitoring and automation...
+
+ yield
+
+ # Shutdown tasks
+ logger.info("Shutting down FastAPI application...")
- # Create subplot figure
- fig = make_subplots(
- rows=2, cols=2,
- subplot_titles=('Confidence Over Time', 'Prediction Distribution',
- 'Processing Time Trend', 'Text Length vs Confidence'),
- specs=[[{"secondary_y": True}, {"type": "pie"}],
- [{"type": "scatter"}, {"type": "scatter"}]]
- )
+# Initialize monitoring components
+prediction_monitor = PredictionMonitor(base_dir=Path("/tmp"))
+metrics_collector = MetricsCollector(base_dir=Path("/tmp"))
+alert_system = AlertSystem(base_dir=Path("/tmp"))
- # Confidence over time
- fig.add_trace(
- go.Scatter(x=df['timestamp'], y=df['confidence_percent'],
- mode='lines+markers', name='Confidence %',
- line=dict(color='blue', width=2),
- marker=dict(color=df['prediction'].map({'Fake': 'red', 'Real': 'green'}),
- size=8)),
- row=1, col=1
- )
+# Start monitoring
+prediction_monitor.start_monitoring()
- # Prediction distribution
- pred_counts = df['prediction'].value_counts()
- fig.add_trace(
- go.Pie(labels=pred_counts.index, values=pred_counts.values,
- name="Predictions", hole=0.3),
- row=1, col=2
- )
+alert_system.add_notification_handler("console", console_notification_handler)
- # Processing time trend
- fig.add_trace(
- go.Scatter(x=df['timestamp'], y=df['processing_time'],
- mode='lines+markers', name='Processing Time (s)',
- line=dict(color='purple', width=2)),
- row=2, col=1
- )
- # Text length vs confidence
- fig.add_trace(
- go.Scatter(x=df['text_length'], y=df['confidence_percent'],
- mode='markers', name='Length vs Confidence',
- marker=dict(
- color=df['prediction'].map({'Fake': 'red', 'Real': 'green'}),
- size=10,
- opacity=0.7
- )),
- row=2, col=2
- )
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ """Manage application lifespan"""
+ logger.info("Starting FastAPI application...")
- fig.update_layout(
- height=700,
- showlegend=True,
- title_text="Prediction Analytics Dashboard",
- template="plotly_white",
- font=dict(family="Inter", size=12)
- )
+ # Startup tasks
+ model_manager.load_model()
+
+ # Schedule periodic health checks
+ asyncio.create_task(periodic_health_check())
- return fig
+ yield
- def render_system_health_dashboard(self):
- """Render comprehensive system health dashboard"""
- st.markdown("## đĨ System Health Dashboard")
+ # Shutdown tasks
+ logger.info("Shutting down FastAPI application...")
- # Get health data
+
+# Background tasks
+async def periodic_health_check():
+ """Periodic health check"""
+ while True:
try:
- if self.api_available:
- health_response = self.session.get(f"{self.config['api_url']}/health", timeout=10)
- if health_response.status_code == 200:
- health_data = health_response.json()
-
- # Create health metrics grid
- col1, col2, col3, col4 = st.columns(4)
-
- with col1:
- status = health_data.get('status', 'unknown')
- if status == 'healthy':
- st.markdown('
Healthy
Issues Detected
{model_status.title()}
{cpu_percent:.1f}%
{memory_percent:.1f}%
Statistical Analysis Complete
-Statistical Analysis Complete
-