digiPal / core /state_manager.py
BladeSzaSza's picture
new design
fe24641
raw
history blame
10.8 kB
import json
import os
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import shutil
from core.game_mechanics import Monster
class StateManager:
"""Manages persistent state for users and monsters"""
def __init__(self, data_dir: Path):
self.data_dir = Path(data_dir)
self.users_dir = self.data_dir / "users"
self.monsters_dir = self.data_dir / "monsters"
self.cache_dir = self.data_dir / "cache"
# Create directories if they don't exist
for dir_path in [self.users_dir, self.monsters_dir, self.cache_dir]:
dir_path.mkdir(parents=True, exist_ok=True)
# In-memory cache for active sessions
self.active_sessions = {}
self.last_save_time = {}
def get_user_dir(self, user_id: str) -> Path:
"""Get or create user directory"""
user_dir = self.users_dir / user_id
user_dir.mkdir(exist_ok=True)
return user_dir
def save_monster(self, user_id: str, monster: Monster) -> bool:
"""Save monster to persistent storage"""
try:
user_dir = self.get_user_dir(user_id)
# Save monster data
monster_file = user_dir / f"monster_{monster.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(monster_file, 'w') as f:
json.dump(monster.to_dict(), f, indent=2)
# Update current monster reference
current_file = user_dir / "current_monster.json"
current_data = {
'monster_file': str(monster_file.name),
'monster_name': monster.name,
'last_updated': datetime.now().isoformat()
}
with open(current_file, 'w') as f:
json.dump(current_data, f, indent=2)
# Update user profile
self._update_user_profile(user_id, monster)
# Cache in memory
self.active_sessions[user_id] = {
'monster': monster,
'last_access': datetime.now()
}
return True
except Exception as e:
print(f"Error saving monster: {e}")
return False
def get_current_monster(self, user_id: str) -> Optional[Monster]:
"""Get the current active monster for a user"""
# Check memory cache first
if user_id in self.active_sessions:
session = self.active_sessions[user_id]
if datetime.now() - session['last_access'] < timedelta(minutes=30):
session['last_access'] = datetime.now()
return session['monster']
# Load from disk
try:
user_dir = self.get_user_dir(user_id)
current_file = user_dir / "current_monster.json"
if not current_file.exists():
return None
with open(current_file, 'r') as f:
current_data = json.load(f)
monster_file = user_dir / current_data['monster_file']
if not monster_file.exists():
return None
with open(monster_file, 'r') as f:
monster_data = json.load(f)
monster = Monster.from_dict(monster_data)
# Update cache
self.active_sessions[user_id] = {
'monster': monster,
'last_access': datetime.now()
}
return monster
except Exception as e:
print(f"Error loading monster: {e}")
return None
def update_monster(self, user_id: str, monster: Monster) -> bool:
"""Update existing monster data"""
# Update in memory
if user_id in self.active_sessions:
self.active_sessions[user_id]['monster'] = monster
self.active_sessions[user_id]['last_access'] = datetime.now()
# Save periodically (every 5 minutes) or if important changes
should_save = False
current_time = datetime.now()
if user_id not in self.last_save_time:
should_save = True
else:
time_since_save = current_time - self.last_save_time[user_id]
if time_since_save > timedelta(minutes=5):
should_save = True
# Always save on evolution or critical states
if monster.care_state['health'] < 30 or monster.care_state['hunger'] < 20:
should_save = True
if should_save:
self.last_save_time[user_id] = current_time
return self.save_monster(user_id, monster)
return True
def get_user_monsters(self, user_id: str) -> List[Dict[str, Any]]:
"""Get all monsters for a user"""
try:
user_dir = self.get_user_dir(user_id)
monsters = []
for file_path in user_dir.glob("monster_*.json"):
if file_path.name != "current_monster.json":
with open(file_path, 'r') as f:
monster_data = json.load(f)
monsters.append({
'file': file_path.name,
'name': monster_data.get('name'),
'species': monster_data.get('species'),
'stage': monster_data.get('stage'),
'birth_time': monster_data.get('birth_time')
})
# Sort by birth time (newest first)
monsters.sort(key=lambda x: x['birth_time'], reverse=True)
return monsters
except Exception as e:
print(f"Error getting user monsters: {e}")
return []
def _update_user_profile(self, user_id: str, monster: Monster):
"""Update user profile with monster statistics"""
try:
user_dir = self.get_user_dir(user_id)
profile_file = user_dir / "profile.json"
# Load existing profile or create new
if profile_file.exists():
with open(profile_file, 'r') as f:
profile = json.load(f)
else:
profile = {
'user_id': user_id,
'created': datetime.now().isoformat(),
'monsters_created': 0,
'total_training_sessions': 0,
'achievements': []
}
# Update statistics
profile['monsters_created'] = profile.get('monsters_created', 0) + 1
profile['last_active'] = datetime.now().isoformat()
profile['current_monster'] = monster.name
# Check for achievements
new_achievements = self._check_achievements(profile, monster)
profile['achievements'].extend(new_achievements)
# Save profile
with open(profile_file, 'w') as f:
json.dump(profile, f, indent=2)
except Exception as e:
print(f"Error updating user profile: {e}")
def _check_achievements(self, profile: Dict, monster: Monster) -> List[Dict[str, Any]]:
"""Check for new achievements"""
achievements = []
current_achievements = {a['id'] for a in profile.get('achievements', [])}
# First monster achievement
if profile['monsters_created'] == 1 and 'first_monster' not in current_achievements:
achievements.append({
'id': 'first_monster',
'name': 'Digital Pioneer',
'description': 'Created your first digital monster',
'icon': 'πŸ₯‡',
'unlocked': datetime.now().isoformat()
})
# Multiple monsters achievement
if profile['monsters_created'] == 5 and 'monster_collector' not in current_achievements:
achievements.append({
'id': 'monster_collector',
'name': 'Monster Collector',
'description': 'Created 5 digital monsters',
'icon': 'πŸ†',
'unlocked': datetime.now().isoformat()
})
# Perfect care achievement
if all(monster.care_state[stat] >= 90 for stat in ['hunger', 'happiness', 'health']):
if 'perfect_care' not in current_achievements:
achievements.append({
'id': 'perfect_care',
'name': 'Perfect Caretaker',
'description': 'Achieved perfect care status',
'icon': 'πŸ’–',
'unlocked': datetime.now().isoformat()
})
return achievements
def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Get user profile"""
try:
user_dir = self.get_user_dir(user_id)
profile_file = user_dir / "profile.json"
if profile_file.exists():
with open(profile_file, 'r') as f:
return json.load(f)
return None
except Exception as e:
print(f"Error loading user profile: {e}")
return None
def cleanup_old_sessions(self):
"""Clean up old sessions from memory"""
current_time = datetime.now()
expired_users = []
for user_id, session in self.active_sessions.items():
if current_time - session['last_access'] > timedelta(hours=1):
expired_users.append(user_id)
for user_id in expired_users:
# Save before removing from cache
if 'monster' in self.active_sessions[user_id]:
self.save_monster(user_id, self.active_sessions[user_id]['monster'])
del self.active_sessions[user_id]
def export_user_data(self, user_id: str) -> Optional[str]:
"""Export all user data as a zip file"""
try:
user_dir = self.get_user_dir(user_id)
export_path = self.cache_dir / f"export_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Create zip archive
shutil.make_archive(str(export_path), 'zip', user_dir)
return f"{export_path}.zip"
except Exception as e:
print(f"Error exporting user data: {e}")
return None