from typing import Dict, Any, Optional, List import json import os from datetime import datetime, timedelta import logging from pydantic import BaseModel, Field, validator from enum import Enum import time import re from config.config import settings import threading import shutil from pathlib import Path import hashlib logger = logging.getLogger(__name__) # Global lock for profile operations _profile_lock = threading.Lock() class TravelStyle(str, Enum): BUDGET = "budget" LUXURY = "luxury" BALANCED = "balanced" class UserPreferences(BaseModel): """User preferences model with validation.""" travel_style: str = Field( default="balanced", description="Preferred travel style (budget, luxury, balanced)", ) preferred_destinations: list = Field( default_factory=list, description="List of preferred travel destinations", ) dietary_restrictions: list = Field( default_factory=list, description="List of dietary restrictions", ) accessibility_needs: list = Field( default_factory=list, description="List of accessibility requirements", ) preferred_activities: list = Field( default_factory=list, description="List of preferred activities", ) budget_range: Dict[str, float] = Field( default_factory=lambda: {"min": 0, "max": float("inf")}, description="Budget range for travel", ) preferred_accommodation: str = Field( default="hotel", description="Preferred type of accommodation", ) preferred_transportation: str = Field( default="flexible", description="Preferred mode of transportation", ) travel_frequency: str = Field( default="occasional", description="How often the user travels", ) preferred_seasons: list = Field( default_factory=list, description="Preferred travel seasons", ) special_requirements: list = Field( default_factory=list, description="Any special travel requirements", ) @validator("travel_style") def validate_travel_style(cls, v): allowed_styles = ["budget", "luxury", "balanced"] if v not in allowed_styles: raise ValueError(f"Travel style must be one of {allowed_styles}") return v @validator("preferred_accommodation") def validate_accommodation(cls, v): allowed_types = [ "hotel", "hostel", "apartment", "resort", "camping", "flexible", ] if v not in allowed_types: raise ValueError(f"Accommodation type must be one of {allowed_types}") return v @validator("preferred_transportation") def validate_transportation(cls, v): allowed_types = [ "car", "train", "bus", "plane", "flexible", ] if v not in allowed_types: raise ValueError(f"Transportation type must be one of {allowed_types}") return v @validator("travel_frequency") def validate_frequency(cls, v): allowed_frequencies = [ "rarely", "occasional", "frequent", "very_frequent", ] if v not in allowed_frequencies: raise ValueError(f"Travel frequency must be one of {allowed_frequencies}") return v @validator("budget_range") def validate_budget(cls, v): if v["min"] < 0: raise ValueError("Minimum budget cannot be negative") if v["max"] < v["min"]: raise ValueError("Maximum budget must be greater than minimum budget") return v class UserProfile: def __init__(self): """Initialize the user profile manager.""" self.profiles_dir = os.path.join("data", "user_profiles") self.backup_dir = os.path.join("data", "user_profiles_backup") self._ensure_directories() self.rate_limit_window = 3600 # 1 hour self.max_updates_per_window = 10 self.update_history: Dict[str, list] = {} self.max_profile_size = 1024 * 1024 # 1MB def _ensure_directories(self): """Ensure necessary directories exist.""" os.makedirs(self.profiles_dir, exist_ok=True) os.makedirs(self.backup_dir, exist_ok=True) def _validate_user_id(self, user_id: str) -> bool: """Validate user ID format.""" if not user_id or not isinstance(user_id, str): return False # Allow alphanumeric characters, hyphens, and underscores return bool(re.match(r"^[a-zA-Z0-9-_]+$", user_id)) def _check_rate_limit(self, user_id: str) -> bool: """Check if user has exceeded rate limit.""" current_time = time.time() if user_id not in self.update_history: self.update_history[user_id] = [] # Remove old entries self.update_history[user_id] = [ t for t in self.update_history[user_id] if current_time - t < self.rate_limit_window ] # Check if limit exceeded if len(self.update_history[user_id]) >= self.max_updates_per_window: return False # Add new entry self.update_history[user_id].append(current_time) return True def _create_backup(self, user_id: str) -> None: """Create a backup of the user profile.""" try: profile_path = os.path.join(self.profiles_dir, f"{user_id}.json") if not os.path.exists(profile_path): return # Create backup with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = os.path.join(self.backup_dir, f"{user_id}_{timestamp}.json") shutil.copy2(profile_path, backup_path) # Keep only the last 5 backups backups = sorted( Path(self.backup_dir).glob(f"{user_id}_*.json"), key=lambda x: x.stat().st_mtime, reverse=True, ) for old_backup in backups[5:]: old_backup.unlink() except Exception as e: logger.error( f"Error creating backup for {user_id}: {str(e)}", exc_info=True ) def _cleanup_old_profiles(self): """Clean up profiles older than 30 days.""" try: current_time = time.time() for filename in os.listdir(self.profiles_dir): if not filename.endswith(".json"): continue file_path = os.path.join(self.profiles_dir, filename) file_time = os.path.getmtime(file_path) if current_time - file_time > 30 * 24 * 3600: # 30 days try: # Create final backup before deletion user_id = filename[:-5] # Remove .json extension self._create_backup(user_id) os.remove(file_path) logger.info(f"Removed old profile: {filename}") except Exception as e: logger.warning( f"Error removing old profile {filename}: {str(e)}" ) except Exception as e: logger.error(f"Error cleaning up old profiles: {str(e)}", exc_info=True) def get_profile(self, user_id: str) -> Dict[str, Any]: """Get user profile with validation.""" try: if not self._validate_user_id(user_id): raise ValueError("Invalid user ID format") profile_path = os.path.join(self.profiles_dir, f"{user_id}.json") with _profile_lock: if not os.path.exists(profile_path): return self._create_default_profile(user_id) # Check file size if os.path.getsize(profile_path) > self.max_profile_size: raise ValueError("Profile file size exceeds limit") with open(profile_path, "r", encoding="utf-8") as f: profile = json.load(f) # Validate profile structure if not isinstance(profile, dict): raise ValueError("Invalid profile format") # Ensure all required fields exist required_fields = ["user_id", "preferences", "created_at", "updated_at"] if not all(field in profile for field in required_fields): raise ValueError("Missing required profile fields") return profile except Exception as e: logger.error( f"Error getting profile for {user_id}: {str(e)}", exc_info=True ) raise def update_profile( self, user_id: str, preferences: Dict[str, Any] ) -> Dict[str, Any]: """Update user profile with validation and rate limiting.""" try: if not self._validate_user_id(user_id): raise ValueError("Invalid user ID format") if not self._check_rate_limit(user_id): raise ValueError("Rate limit exceeded") # Validate preferences try: validated_preferences = UserPreferences(**preferences) except Exception as e: raise ValueError(f"Invalid preferences: {str(e)}") profile_path = os.path.join(self.profiles_dir, f"{user_id}.json") with _profile_lock: # Create backup before update self._create_backup(user_id) current_profile = self.get_profile(user_id) # Update profile current_profile["preferences"] = validated_preferences.dict() current_profile["updated_at"] = datetime.utcnow().isoformat() # Save updated profile with open(profile_path, "w", encoding="utf-8") as f: json.dump(current_profile, f, indent=2) logger.info(f"Updated profile for user {user_id}") return current_profile except Exception as e: logger.error( f"Error updating profile for {user_id}: {str(e)}", exc_info=True ) raise def _create_default_profile(self, user_id: str) -> Dict[str, Any]: """Create a default profile with validation.""" try: if not self._validate_user_id(user_id): raise ValueError("Invalid user ID format") default_preferences = UserPreferences().dict() profile = { "user_id": user_id, "preferences": default_preferences, "created_at": datetime.utcnow().isoformat(), "updated_at": datetime.utcnow().isoformat(), } profile_path = os.path.join(self.profiles_dir, f"{user_id}.json") with _profile_lock: with open(profile_path, "w", encoding="utf-8") as f: json.dump(profile, f, indent=2) logger.info(f"Created default profile for user {user_id}") return profile except Exception as e: logger.error( f"Error creating default profile for {user_id}: {str(e)}", exc_info=True ) raise def delete_profile(self, user_id: str) -> None: """Delete user profile with validation.""" try: if not self._validate_user_id(user_id): raise ValueError("Invalid user ID format") profile_path = os.path.join(self.profiles_dir, f"{user_id}.json") with _profile_lock: if os.path.exists(profile_path): # Create final backup before deletion self._create_backup(user_id) os.remove(profile_path) logger.info(f"Deleted profile for user {user_id}") else: logger.warning(f"Profile not found for user {user_id}") except Exception as e: logger.error( f"Error deleting profile for {user_id}: {str(e)}", exc_info=True ) raise def get_recommendations(self, user_id: str) -> Dict[str, Any]: """Get personalized recommendations based on user profile with validation.""" try: profile = self.get_profile(user_id) if not profile or "preferences" not in profile: return {} preferences = UserPreferences(**profile["preferences"]) recommendations = { "destinations": self._get_destination_recommendations(preferences), "activities": self._get_activity_recommendations(preferences), "tips": self._get_personalized_tips(preferences), "generated_at": datetime.now().isoformat(), } return recommendations except Exception as e: logger.error(f"Error getting recommendations: {str(e)}", exc_info=True) return {} def _get_destination_recommendations(self, profile: UserPreferences) -> List[str]: """Get destination recommendations based on preferences.""" try: recommendations = [] # Add recommendations based on favorite destinations if profile.preferred_destinations: recommendations.extend(profile.preferred_destinations[:3]) # Add recommendations based on interests if "beach" in profile.preferred_activities: recommendations.append("Bali, Indonesia") if "culture" in profile.preferred_activities: recommendations.append("Kyoto, Japan") if "food" in profile.preferred_activities: recommendations.append("Bangkok, Thailand") # Add recommendations based on travel style if profile.travel_style == TravelStyle.LUXURY: recommendations.append("Dubai, UAE") elif profile.travel_style == TravelStyle.BUDGET: recommendations.append("Bangkok, Thailand") return list(set(recommendations))[:5] # Return top 5 unique recommendations except Exception as e: logger.error( f"Error getting destination recommendations: {str(e)}", exc_info=True ) return [] def _get_activity_recommendations(self, profile: UserPreferences) -> List[str]: """Get activity recommendations based on preferences.""" try: activities = [] # Add activities based on interests if "culture" in profile.preferred_activities: activities.append("Visit local museums and historical sites") if "food" in profile.preferred_activities: activities.append("Try local cuisine and food tours") if "nature" in profile.preferred_activities: activities.append("Explore national parks and hiking trails") if "adventure" in profile.preferred_activities: activities.append("Try adventure sports and activities") # Add activities based on travel style if profile.travel_style == TravelStyle.LUXURY: activities.append("Book private guided tours") elif profile.travel_style == TravelStyle.BUDGET: activities.append("Explore local markets and street food") return list(set(activities))[:5] # Return top 5 unique activities except Exception as e: logger.error( f"Error getting activity recommendations: {str(e)}", exc_info=True ) return [] def _get_personalized_tips(self, profile: UserPreferences) -> List[str]: """Get personalized travel tips based on preferences.""" try: tips = [] # Add tips based on travel style if profile.travel_style == TravelStyle.BUDGET: tips.append( "Look for local markets and street food for affordable meals" ) tips.append("Consider staying in hostels or guesthouses") elif profile.travel_style == TravelStyle.LUXURY: tips.append("Book premium experiences and private tours in advance") tips.append("Consider luxury resorts and boutique hotels") # Add tips based on dietary restrictions if profile.dietary_restrictions: tips.append( f"Research restaurants that accommodate {', '.join(profile.dietary_restrictions)}" ) # Add tips based on accessibility needs if profile.accessibility_needs: tips.append( f"Research accessibility features for {', '.join(profile.accessibility_needs)}" ) return list(set(tips))[:5] # Return top 5 unique tips except Exception as e: logger.error(f"Error getting personalized tips: {str(e)}", exc_info=True) return []