import json import os from datetime import datetime from typing import Dict, List, Any class DatabaseManager: """Simple file-based database manager for storing analysis history""" def __init__(self, db_file: str = "analysis_history.json"): """Initialize the database manager Args: db_file: Path to the JSON file to store analysis history """ self.db_file = db_file self.ensure_db_file_exists() def ensure_db_file_exists(self): """Ensure the database file exists""" if not os.path.exists(self.db_file): with open(self.db_file, 'w') as f: json.dump([], f) def save_analysis(self, analysis_record: Dict[str, Any]) -> bool: """Save an analysis record to the database Args: analysis_record: Dictionary containing analysis data Returns: bool: True if successful, False otherwise """ try: # Read existing data existing_data = self.load_all_data() # Add timestamp if not present if 'timestamp' not in analysis_record: analysis_record['timestamp'] = datetime.now().isoformat() # Append new record existing_data.append(analysis_record) # Write back to file with open(self.db_file, 'w') as f: json.dump(existing_data, f, indent=2, default=str) return True except Exception as e: print(f"Error saving analysis: {e}") return False def get_history(self, session_id: str = None, limit: int = 100) -> List[Dict[str, Any]]: """Get analysis history Args: session_id: Optional session ID to filter by limit: Maximum number of records to return Returns: List of analysis records """ try: data = self.load_all_data() # Filter by session_id if provided if session_id: data = [record for record in data if record.get('session_id') == session_id] # Sort by timestamp (newest first) data.sort(key=lambda x: x.get('timestamp', ''), reverse=True) # Apply limit return data[:limit] except Exception as e: print(f"Error getting history: {e}") return [] def clear_history(self, session_id: str = None) -> bool: """Clear analysis history Args: session_id: Optional session ID to clear specific session data Returns: bool: True if successful, False otherwise """ try: if session_id: # Clear only specific session data data = self.load_all_data() filtered_data = [record for record in data if record.get('session_id') != session_id] with open(self.db_file, 'w') as f: json.dump(filtered_data, f, indent=2, default=str) else: # Clear all data with open(self.db_file, 'w') as f: json.dump([], f) return True except Exception as e: print(f"Error clearing history: {e}") return False def load_all_data(self) -> List[Dict[str, Any]]: """Load all data from the database file Returns: List of all records """ try: with open(self.db_file, 'r') as f: data = json.load(f) return data if isinstance(data, list) else [] except (FileNotFoundError, json.JSONDecodeError): return [] def get_analysis_by_type(self, analysis_type: str, session_id: str = None) -> List[Dict[str, Any]]: """Get analyses by type Args: analysis_type: Type of analysis (e.g., 'EDA', 'Single Query Analysis') session_id: Optional session ID to filter by Returns: List of matching analysis records """ try: data = self.load_all_data() # Filter by type filtered_data = [record for record in data if record.get('type') == analysis_type] # Filter by session_id if provided if session_id: filtered_data = [record for record in filtered_data if record.get('session_id') == session_id] # Sort by timestamp (newest first) filtered_data.sort(key=lambda x: x.get('timestamp', ''), reverse=True) return filtered_data except Exception as e: print(f"Error getting analysis by type: {e}") return [] def get_stats(self) -> Dict[str, Any]: """Get database statistics Returns: Dictionary with database statistics """ try: data = self.load_all_data() stats = { 'total_records': len(data), 'unique_sessions': len(set(record.get('session_id', '') for record in data)), 'analysis_types': {}, 'oldest_record': None, 'newest_record': None } # Count analysis types for record in data: analysis_type = record.get('type', 'Unknown') stats['analysis_types'][analysis_type] = stats['analysis_types'].get(analysis_type, 0) + 1 # Find oldest and newest records if data: timestamps = [record.get('timestamp', '') for record in data if record.get('timestamp')] if timestamps: timestamps.sort() stats['oldest_record'] = timestamps[0] stats['newest_record'] = timestamps[-1] return stats except Exception as e: print(f"Error getting stats: {e}") return { 'total_records': 0, 'unique_sessions': 0, 'analysis_types': {}, 'oldest_record': None, 'newest_record': None, 'error': str(e) } def backup_database(self, backup_file: str = None) -> bool: """Create a backup of the database Args: backup_file: Path for backup file. If None, uses timestamp-based name Returns: bool: True if successful, False otherwise """ try: if backup_file is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = f"analysis_history_backup_{timestamp}.json" data = self.load_all_data() with open(backup_file, 'w') as f: json.dump(data, f, indent=2, default=str) return True except Exception as e: print(f"Error creating backup: {e}") return False def restore_from_backup(self, backup_file: str) -> bool: """Restore database from backup Args: backup_file: Path to backup file Returns: bool: True if successful, False otherwise """ try: if not os.path.exists(backup_file): print(f"Backup file not found: {backup_file}") return False with open(backup_file, 'r') as f: data = json.load(f) # Validate data format if not isinstance(data, list): print("Invalid backup file format") return False # Write to main database file with open(self.db_file, 'w') as f: json.dump(data, f, indent=2, default=str) return True except Exception as e: print(f"Error restoring from backup: {e}") return False def delete_old_records(self, days_old: int = 30) -> int: """Delete records older than specified days Args: days_old: Number of days to keep records Returns: int: Number of records deleted """ try: from datetime import datetime, timedelta cutoff_date = datetime.now() - timedelta(days=days_old) cutoff_str = cutoff_date.isoformat() data = self.load_all_data() original_count = len(data) # Filter out old records filtered_data = [] for record in data: record_time = record.get('timestamp', '') if record_time >= cutoff_str: filtered_data.append(record) # Write filtered data back with open(self.db_file, 'w') as f: json.dump(filtered_data, f, indent=2, default=str) deleted_count = original_count - len(filtered_data) return deleted_count except Exception as e: print(f"Error deleting old records: {e}") return 0