Spaces:
Sleeping
Sleeping
import json | |
import os | |
from datetime import datetime | |
from typing import Dict, List, Any | |
class DatabaseManager: | |
"""Simple file-based database manager for storing analysis history""" | |
def __init__(self, db_file: str = "analysis_history.json"): | |
"""Initialize the database manager | |
Args: | |
db_file: Path to the JSON file to store analysis history | |
""" | |
self.db_file = db_file | |
self.ensure_db_file_exists() | |
def ensure_db_file_exists(self): | |
"""Ensure the database file exists""" | |
if not os.path.exists(self.db_file): | |
with open(self.db_file, 'w') as f: | |
json.dump([], f) | |
def save_analysis(self, analysis_record: Dict[str, Any]) -> bool: | |
"""Save an analysis record to the database | |
Args: | |
analysis_record: Dictionary containing analysis data | |
Returns: | |
bool: True if successful, False otherwise | |
""" | |
try: | |
# Read existing data | |
existing_data = self.load_all_data() | |
# Add timestamp if not present | |
if 'timestamp' not in analysis_record: | |
analysis_record['timestamp'] = datetime.now().isoformat() | |
# Append new record | |
existing_data.append(analysis_record) | |
# Write back to file | |
with open(self.db_file, 'w') as f: | |
json.dump(existing_data, f, indent=2, default=str) | |
return True | |
except Exception as e: | |
print(f"Error saving analysis: {e}") | |
return False | |
def get_history(self, session_id: str = None, limit: int = 100) -> List[Dict[str, Any]]: | |
"""Get analysis history | |
Args: | |
session_id: Optional session ID to filter by | |
limit: Maximum number of records to return | |
Returns: | |
List of analysis records | |
""" | |
try: | |
data = self.load_all_data() | |
# Filter by session_id if provided | |
if session_id: | |
data = [record for record in data if record.get('session_id') == session_id] | |
# Sort by timestamp (newest first) | |
data.sort(key=lambda x: x.get('timestamp', ''), reverse=True) | |
# Apply limit | |
return data[:limit] | |
except Exception as e: | |
print(f"Error getting history: {e}") | |
return [] | |
def clear_history(self, session_id: str = None) -> bool: | |
"""Clear analysis history | |
Args: | |
session_id: Optional session ID to clear specific session data | |
Returns: | |
bool: True if successful, False otherwise | |
""" | |
try: | |
if session_id: | |
# Clear only specific session data | |
data = self.load_all_data() | |
filtered_data = [record for record in data if record.get('session_id') != session_id] | |
with open(self.db_file, 'w') as f: | |
json.dump(filtered_data, f, indent=2, default=str) | |
else: | |
# Clear all data | |
with open(self.db_file, 'w') as f: | |
json.dump([], f) | |
return True | |
except Exception as e: | |
print(f"Error clearing history: {e}") | |
return False | |
def load_all_data(self) -> List[Dict[str, Any]]: | |
"""Load all data from the database file | |
Returns: | |
List of all records | |
""" | |
try: | |
with open(self.db_file, 'r') as f: | |
data = json.load(f) | |
return data if isinstance(data, list) else [] | |
except (FileNotFoundError, json.JSONDecodeError): | |
return [] | |
def get_analysis_by_type(self, analysis_type: str, session_id: str = None) -> List[Dict[str, Any]]: | |
"""Get analyses by type | |
Args: | |
analysis_type: Type of analysis (e.g., 'EDA', 'Single Query Analysis') | |
session_id: Optional session ID to filter by | |
Returns: | |
List of matching analysis records | |
""" | |
try: | |
data = self.load_all_data() | |
# Filter by type | |
filtered_data = [record for record in data if record.get('type') == analysis_type] | |
# Filter by session_id if provided | |
if session_id: | |
filtered_data = [record for record in filtered_data if record.get('session_id') == session_id] | |
# Sort by timestamp (newest first) | |
filtered_data.sort(key=lambda x: x.get('timestamp', ''), reverse=True) | |
return filtered_data | |
except Exception as e: | |
print(f"Error getting analysis by type: {e}") | |
return [] | |
def get_stats(self) -> Dict[str, Any]: | |
"""Get database statistics | |
Returns: | |
Dictionary with database statistics | |
""" | |
try: | |
data = self.load_all_data() | |
stats = { | |
'total_records': len(data), | |
'unique_sessions': len(set(record.get('session_id', '') for record in data)), | |
'analysis_types': {}, | |
'oldest_record': None, | |
'newest_record': None | |
} | |
# Count analysis types | |
for record in data: | |
analysis_type = record.get('type', 'Unknown') | |
stats['analysis_types'][analysis_type] = stats['analysis_types'].get(analysis_type, 0) + 1 | |
# Find oldest and newest records | |
if data: | |
timestamps = [record.get('timestamp', '') for record in data if record.get('timestamp')] | |
if timestamps: | |
timestamps.sort() | |
stats['oldest_record'] = timestamps[0] | |
stats['newest_record'] = timestamps[-1] | |
return stats | |
except Exception as e: | |
print(f"Error getting stats: {e}") | |
return { | |
'total_records': 0, | |
'unique_sessions': 0, | |
'analysis_types': {}, | |
'oldest_record': None, | |
'newest_record': None, | |
'error': str(e) | |
} | |
def backup_database(self, backup_file: str = None) -> bool: | |
"""Create a backup of the database | |
Args: | |
backup_file: Path for backup file. If None, uses timestamp-based name | |
Returns: | |
bool: True if successful, False otherwise | |
""" | |
try: | |
if backup_file is None: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
backup_file = f"analysis_history_backup_{timestamp}.json" | |
data = self.load_all_data() | |
with open(backup_file, 'w') as f: | |
json.dump(data, f, indent=2, default=str) | |
return True | |
except Exception as e: | |
print(f"Error creating backup: {e}") | |
return False | |
def restore_from_backup(self, backup_file: str) -> bool: | |
"""Restore database from backup | |
Args: | |
backup_file: Path to backup file | |
Returns: | |
bool: True if successful, False otherwise | |
""" | |
try: | |
if not os.path.exists(backup_file): | |
print(f"Backup file not found: {backup_file}") | |
return False | |
with open(backup_file, 'r') as f: | |
data = json.load(f) | |
# Validate data format | |
if not isinstance(data, list): | |
print("Invalid backup file format") | |
return False | |
# Write to main database file | |
with open(self.db_file, 'w') as f: | |
json.dump(data, f, indent=2, default=str) | |
return True | |
except Exception as e: | |
print(f"Error restoring from backup: {e}") | |
return False | |
def delete_old_records(self, days_old: int = 30) -> int: | |
"""Delete records older than specified days | |
Args: | |
days_old: Number of days to keep records | |
Returns: | |
int: Number of records deleted | |
""" | |
try: | |
from datetime import datetime, timedelta | |
cutoff_date = datetime.now() - timedelta(days=days_old) | |
cutoff_str = cutoff_date.isoformat() | |
data = self.load_all_data() | |
original_count = len(data) | |
# Filter out old records | |
filtered_data = [] | |
for record in data: | |
record_time = record.get('timestamp', '') | |
if record_time >= cutoff_str: | |
filtered_data.append(record) | |
# Write filtered data back | |
with open(self.db_file, 'w') as f: | |
json.dump(filtered_data, f, indent=2, default=str) | |
deleted_count = original_count - len(filtered_data) | |
return deleted_count | |
except Exception as e: | |
print(f"Error deleting old records: {e}") | |
return 0 |