import os
import io
import sys
import json
import time
import hashlib
import logging
import requests
import subprocess
import pandas as pd
import altair as alt
import streamlit as st
from pathlib import Path
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import contextlib
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Add root to sys.path for imports
sys.path.append(str(Path(__file__).resolve().parent.parent))
# Try to import trainer directly for better progress tracking
try:
from model.train import RobustModelTrainer, estimate_training_time
DIRECT_TRAINING_AVAILABLE = True
except ImportError:
RobustModelTrainer = None
estimate_training_time = None
DIRECT_TRAINING_AVAILABLE = False
logger.warning("Direct training import failed, using subprocess fallback")
class StreamlitAppManager:
"""Manages Streamlit application state and functionality"""
def __init__(self):
self.setup_config()
self.setup_paths()
self.setup_api_client()
self.initialize_session_state()
def setup_config(self):
"""Setup application configuration"""
self.config = {
'api_url': "http://localhost:8000",
'max_upload_size': 10 * 1024 * 1024, # 10MB
'supported_file_types': ['csv', 'txt', 'json'],
'max_text_length': 10000,
'prediction_timeout': 30,
'refresh_interval': 60,
'max_batch_size': 10
}
def setup_paths(self):
"""Setup file paths"""
self.paths = {
'custom_data': Path("/tmp/custom_upload.csv"),
'metadata': Path("/tmp/metadata.json"),
'activity_log': Path("/tmp/activity_log.json"),
'drift_log': Path("/tmp/logs/monitoring_log.json"),
'prediction_log': Path("/tmp/prediction_log.json"),
'scheduler_log': Path("/tmp/logs/scheduler_execution.json"),
'error_log': Path("/tmp/logs/scheduler_errors.json")
}
def setup_api_client(self):
"""Setup API client with error handling"""
self.session = requests.Session()
self.session.timeout = self.config['prediction_timeout']
# Test API connection
self.api_available = self.test_api_connection()
def test_api_connection(self) -> bool:
"""Test API connection"""
try:
response = self.session.get(
f"{self.config['api_url']}/health", timeout=5)
return response.status_code == 200
except:
return False
def initialize_session_state(self):
"""Initialize Streamlit session state"""
if 'prediction_history' not in st.session_state:
st.session_state.prediction_history = []
if 'upload_history' not in st.session_state:
st.session_state.upload_history = []
if 'last_refresh' not in st.session_state:
st.session_state.last_refresh = datetime.now()
if 'auto_refresh' not in st.session_state:
st.session_state.auto_refresh = False
# Initialize app manager
app_manager = StreamlitAppManager()
# Page configuration
st.set_page_config(
page_title="Fake News Detection System",
page_icon="đ°",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS for better styling
st.markdown("""
""", unsafe_allow_html=True)
def load_json_file(file_path: Path, default: Any = None) -> Any:
"""Safely load JSON file with error handling"""
try:
if file_path.exists():
with open(file_path, 'r') as f:
return json.load(f)
return default or {}
except Exception as e:
logger.error(f"Failed to load {file_path}: {e}")
return default or {}
def save_prediction_to_history(text: str, prediction: str, confidence: float):
"""Save prediction to session history"""
prediction_entry = {
'timestamp': datetime.now().isoformat(),
'text': text[:100] + "..." if len(text) > 100 else text,
'prediction': prediction,
'confidence': confidence,
'text_length': len(text)
}
st.session_state.prediction_history.append(prediction_entry)
# Keep only last 50 predictions
if len(st.session_state.prediction_history) > 50:
st.session_state.prediction_history = st.session_state.prediction_history[-50:]
def make_prediction_request(text: str) -> Dict[str, Any]:
"""Make prediction request to API"""
try:
if not app_manager.api_available:
return {'error': 'API is not available'}
response = app_manager.session.post(
f"{app_manager.config['api_url']}/predict",
json={"text": text},
timeout=app_manager.config['prediction_timeout']
)
if response.status_code == 200:
return response.json()
else:
return {'error': f'API Error: {response.status_code} - {response.text}'}
except requests.exceptions.Timeout:
return {'error': 'Request timed out. Please try again.'}
except requests.exceptions.ConnectionError:
return {'error': 'Cannot connect to prediction service.'}
except Exception as e:
return {'error': f'Unexpected error: {str(e)}'}
def validate_text_input(text: str) -> tuple[bool, str]:
"""Validate text input"""
if not text or not text.strip():
return False, "Please enter some text to analyze."
if len(text) < 10:
return False, "Text must be at least 10 characters long."
if len(text) > app_manager.config['max_text_length']:
return False, f"Text must be less than {app_manager.config['max_text_length']} characters."
# Check for suspicious content
suspicious_patterns = ['