neural-data-anlayst / database_manager.py
Dhruv Pawar
Initial commit: Neural Data Analyst v1.0
1273036
import json
import os
from datetime import datetime
from typing import Dict, List, Any
class DatabaseManager:
"""Simple file-based database manager for storing analysis history"""
def __init__(self, db_file: str = "analysis_history.json"):
"""Initialize the database manager
Args:
db_file: Path to the JSON file to store analysis history
"""
self.db_file = db_file
self.ensure_db_file_exists()
def ensure_db_file_exists(self):
"""Ensure the database file exists"""
if not os.path.exists(self.db_file):
with open(self.db_file, 'w') as f:
json.dump([], f)
def save_analysis(self, analysis_record: Dict[str, Any]) -> bool:
"""Save an analysis record to the database
Args:
analysis_record: Dictionary containing analysis data
Returns:
bool: True if successful, False otherwise
"""
try:
# Read existing data
existing_data = self.load_all_data()
# Add timestamp if not present
if 'timestamp' not in analysis_record:
analysis_record['timestamp'] = datetime.now().isoformat()
# Append new record
existing_data.append(analysis_record)
# Write back to file
with open(self.db_file, 'w') as f:
json.dump(existing_data, f, indent=2, default=str)
return True
except Exception as e:
print(f"Error saving analysis: {e}")
return False
def get_history(self, session_id: str = None, limit: int = 100) -> List[Dict[str, Any]]:
"""Get analysis history
Args:
session_id: Optional session ID to filter by
limit: Maximum number of records to return
Returns:
List of analysis records
"""
try:
data = self.load_all_data()
# Filter by session_id if provided
if session_id:
data = [record for record in data if record.get('session_id') == session_id]
# Sort by timestamp (newest first)
data.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
# Apply limit
return data[:limit]
except Exception as e:
print(f"Error getting history: {e}")
return []
def clear_history(self, session_id: str = None) -> bool:
"""Clear analysis history
Args:
session_id: Optional session ID to clear specific session data
Returns:
bool: True if successful, False otherwise
"""
try:
if session_id:
# Clear only specific session data
data = self.load_all_data()
filtered_data = [record for record in data if record.get('session_id') != session_id]
with open(self.db_file, 'w') as f:
json.dump(filtered_data, f, indent=2, default=str)
else:
# Clear all data
with open(self.db_file, 'w') as f:
json.dump([], f)
return True
except Exception as e:
print(f"Error clearing history: {e}")
return False
def load_all_data(self) -> List[Dict[str, Any]]:
"""Load all data from the database file
Returns:
List of all records
"""
try:
with open(self.db_file, 'r') as f:
data = json.load(f)
return data if isinstance(data, list) else []
except (FileNotFoundError, json.JSONDecodeError):
return []
def get_analysis_by_type(self, analysis_type: str, session_id: str = None) -> List[Dict[str, Any]]:
"""Get analyses by type
Args:
analysis_type: Type of analysis (e.g., 'EDA', 'Single Query Analysis')
session_id: Optional session ID to filter by
Returns:
List of matching analysis records
"""
try:
data = self.load_all_data()
# Filter by type
filtered_data = [record for record in data if record.get('type') == analysis_type]
# Filter by session_id if provided
if session_id:
filtered_data = [record for record in filtered_data if record.get('session_id') == session_id]
# Sort by timestamp (newest first)
filtered_data.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
return filtered_data
except Exception as e:
print(f"Error getting analysis by type: {e}")
return []
def get_stats(self) -> Dict[str, Any]:
"""Get database statistics
Returns:
Dictionary with database statistics
"""
try:
data = self.load_all_data()
stats = {
'total_records': len(data),
'unique_sessions': len(set(record.get('session_id', '') for record in data)),
'analysis_types': {},
'oldest_record': None,
'newest_record': None
}
# Count analysis types
for record in data:
analysis_type = record.get('type', 'Unknown')
stats['analysis_types'][analysis_type] = stats['analysis_types'].get(analysis_type, 0) + 1
# Find oldest and newest records
if data:
timestamps = [record.get('timestamp', '') for record in data if record.get('timestamp')]
if timestamps:
timestamps.sort()
stats['oldest_record'] = timestamps[0]
stats['newest_record'] = timestamps[-1]
return stats
except Exception as e:
print(f"Error getting stats: {e}")
return {
'total_records': 0,
'unique_sessions': 0,
'analysis_types': {},
'oldest_record': None,
'newest_record': None,
'error': str(e)
}
def backup_database(self, backup_file: str = None) -> bool:
"""Create a backup of the database
Args:
backup_file: Path for backup file. If None, uses timestamp-based name
Returns:
bool: True if successful, False otherwise
"""
try:
if backup_file is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"analysis_history_backup_{timestamp}.json"
data = self.load_all_data()
with open(backup_file, 'w') as f:
json.dump(data, f, indent=2, default=str)
return True
except Exception as e:
print(f"Error creating backup: {e}")
return False
def restore_from_backup(self, backup_file: str) -> bool:
"""Restore database from backup
Args:
backup_file: Path to backup file
Returns:
bool: True if successful, False otherwise
"""
try:
if not os.path.exists(backup_file):
print(f"Backup file not found: {backup_file}")
return False
with open(backup_file, 'r') as f:
data = json.load(f)
# Validate data format
if not isinstance(data, list):
print("Invalid backup file format")
return False
# Write to main database file
with open(self.db_file, 'w') as f:
json.dump(data, f, indent=2, default=str)
return True
except Exception as e:
print(f"Error restoring from backup: {e}")
return False
def delete_old_records(self, days_old: int = 30) -> int:
"""Delete records older than specified days
Args:
days_old: Number of days to keep records
Returns:
int: Number of records deleted
"""
try:
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days_old)
cutoff_str = cutoff_date.isoformat()
data = self.load_all_data()
original_count = len(data)
# Filter out old records
filtered_data = []
for record in data:
record_time = record.get('timestamp', '')
if record_time >= cutoff_str:
filtered_data.append(record)
# Write filtered data back
with open(self.db_file, 'w') as f:
json.dump(filtered_data, f, indent=2, default=str)
deleted_count = original_count - len(filtered_data)
return deleted_count
except Exception as e:
print(f"Error deleting old records: {e}")
return 0