Spaces:
Sleeping
Sleeping
from typing import Dict, Any, Optional, List | |
import json | |
import os | |
from datetime import datetime, timedelta | |
import logging | |
from pydantic import BaseModel, Field, validator | |
from enum import Enum | |
import time | |
import re | |
from config.config import settings | |
import threading | |
import shutil | |
from pathlib import Path | |
import hashlib | |
logger = logging.getLogger(__name__) | |
# Global lock for profile operations | |
_profile_lock = threading.Lock() | |
class TravelStyle(str, Enum): | |
BUDGET = "budget" | |
LUXURY = "luxury" | |
BALANCED = "balanced" | |
class UserPreferences(BaseModel): | |
"""User preferences model with validation.""" | |
travel_style: str = Field( | |
default="balanced", | |
description="Preferred travel style (budget, luxury, balanced)", | |
) | |
preferred_destinations: list = Field( | |
default_factory=list, | |
description="List of preferred travel destinations", | |
) | |
dietary_restrictions: list = Field( | |
default_factory=list, | |
description="List of dietary restrictions", | |
) | |
accessibility_needs: list = Field( | |
default_factory=list, | |
description="List of accessibility requirements", | |
) | |
preferred_activities: list = Field( | |
default_factory=list, | |
description="List of preferred activities", | |
) | |
budget_range: Dict[str, float] = Field( | |
default_factory=lambda: {"min": 0, "max": float("inf")}, | |
description="Budget range for travel", | |
) | |
preferred_accommodation: str = Field( | |
default="hotel", | |
description="Preferred type of accommodation", | |
) | |
preferred_transportation: str = Field( | |
default="flexible", | |
description="Preferred mode of transportation", | |
) | |
travel_frequency: str = Field( | |
default="occasional", | |
description="How often the user travels", | |
) | |
preferred_seasons: list = Field( | |
default_factory=list, | |
description="Preferred travel seasons", | |
) | |
special_requirements: list = Field( | |
default_factory=list, | |
description="Any special travel requirements", | |
) | |
def validate_travel_style(cls, v): | |
allowed_styles = ["budget", "luxury", "balanced"] | |
if v not in allowed_styles: | |
raise ValueError(f"Travel style must be one of {allowed_styles}") | |
return v | |
def validate_accommodation(cls, v): | |
allowed_types = [ | |
"hotel", | |
"hostel", | |
"apartment", | |
"resort", | |
"camping", | |
"flexible", | |
] | |
if v not in allowed_types: | |
raise ValueError(f"Accommodation type must be one of {allowed_types}") | |
return v | |
def validate_transportation(cls, v): | |
allowed_types = [ | |
"car", | |
"train", | |
"bus", | |
"plane", | |
"flexible", | |
] | |
if v not in allowed_types: | |
raise ValueError(f"Transportation type must be one of {allowed_types}") | |
return v | |
def validate_frequency(cls, v): | |
allowed_frequencies = [ | |
"rarely", | |
"occasional", | |
"frequent", | |
"very_frequent", | |
] | |
if v not in allowed_frequencies: | |
raise ValueError(f"Travel frequency must be one of {allowed_frequencies}") | |
return v | |
def validate_budget(cls, v): | |
if v["min"] < 0: | |
raise ValueError("Minimum budget cannot be negative") | |
if v["max"] < v["min"]: | |
raise ValueError("Maximum budget must be greater than minimum budget") | |
return v | |
class UserProfile: | |
def __init__(self): | |
"""Initialize the user profile manager.""" | |
self.profiles_dir = os.path.join("data", "user_profiles") | |
self.backup_dir = os.path.join("data", "user_profiles_backup") | |
self._ensure_directories() | |
self.rate_limit_window = 3600 # 1 hour | |
self.max_updates_per_window = 10 | |
self.update_history: Dict[str, list] = {} | |
self.max_profile_size = 1024 * 1024 # 1MB | |
def _ensure_directories(self): | |
"""Ensure necessary directories exist.""" | |
os.makedirs(self.profiles_dir, exist_ok=True) | |
os.makedirs(self.backup_dir, exist_ok=True) | |
def _validate_user_id(self, user_id: str) -> bool: | |
"""Validate user ID format.""" | |
if not user_id or not isinstance(user_id, str): | |
return False | |
# Allow alphanumeric characters, hyphens, and underscores | |
return bool(re.match(r"^[a-zA-Z0-9-_]+$", user_id)) | |
def _check_rate_limit(self, user_id: str) -> bool: | |
"""Check if user has exceeded rate limit.""" | |
current_time = time.time() | |
if user_id not in self.update_history: | |
self.update_history[user_id] = [] | |
# Remove old entries | |
self.update_history[user_id] = [ | |
t | |
for t in self.update_history[user_id] | |
if current_time - t < self.rate_limit_window | |
] | |
# Check if limit exceeded | |
if len(self.update_history[user_id]) >= self.max_updates_per_window: | |
return False | |
# Add new entry | |
self.update_history[user_id].append(current_time) | |
return True | |
def _create_backup(self, user_id: str) -> None: | |
"""Create a backup of the user profile.""" | |
try: | |
profile_path = os.path.join(self.profiles_dir, f"{user_id}.json") | |
if not os.path.exists(profile_path): | |
return | |
# Create backup with timestamp | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
backup_path = os.path.join(self.backup_dir, f"{user_id}_{timestamp}.json") | |
shutil.copy2(profile_path, backup_path) | |
# Keep only the last 5 backups | |
backups = sorted( | |
Path(self.backup_dir).glob(f"{user_id}_*.json"), | |
key=lambda x: x.stat().st_mtime, | |
reverse=True, | |
) | |
for old_backup in backups[5:]: | |
old_backup.unlink() | |
except Exception as e: | |
logger.error( | |
f"Error creating backup for {user_id}: {str(e)}", exc_info=True | |
) | |
def _cleanup_old_profiles(self): | |
"""Clean up profiles older than 30 days.""" | |
try: | |
current_time = time.time() | |
for filename in os.listdir(self.profiles_dir): | |
if not filename.endswith(".json"): | |
continue | |
file_path = os.path.join(self.profiles_dir, filename) | |
file_time = os.path.getmtime(file_path) | |
if current_time - file_time > 30 * 24 * 3600: # 30 days | |
try: | |
# Create final backup before deletion | |
user_id = filename[:-5] # Remove .json extension | |
self._create_backup(user_id) | |
os.remove(file_path) | |
logger.info(f"Removed old profile: {filename}") | |
except Exception as e: | |
logger.warning( | |
f"Error removing old profile {filename}: {str(e)}" | |
) | |
except Exception as e: | |
logger.error(f"Error cleaning up old profiles: {str(e)}", exc_info=True) | |
def get_profile(self, user_id: str) -> Dict[str, Any]: | |
"""Get user profile with validation.""" | |
try: | |
if not self._validate_user_id(user_id): | |
raise ValueError("Invalid user ID format") | |
profile_path = os.path.join(self.profiles_dir, f"{user_id}.json") | |
with _profile_lock: | |
if not os.path.exists(profile_path): | |
return self._create_default_profile(user_id) | |
# Check file size | |
if os.path.getsize(profile_path) > self.max_profile_size: | |
raise ValueError("Profile file size exceeds limit") | |
with open(profile_path, "r", encoding="utf-8") as f: | |
profile = json.load(f) | |
# Validate profile structure | |
if not isinstance(profile, dict): | |
raise ValueError("Invalid profile format") | |
# Ensure all required fields exist | |
required_fields = ["user_id", "preferences", "created_at", "updated_at"] | |
if not all(field in profile for field in required_fields): | |
raise ValueError("Missing required profile fields") | |
return profile | |
except Exception as e: | |
logger.error( | |
f"Error getting profile for {user_id}: {str(e)}", exc_info=True | |
) | |
raise | |
def update_profile( | |
self, user_id: str, preferences: Dict[str, Any] | |
) -> Dict[str, Any]: | |
"""Update user profile with validation and rate limiting.""" | |
try: | |
if not self._validate_user_id(user_id): | |
raise ValueError("Invalid user ID format") | |
if not self._check_rate_limit(user_id): | |
raise ValueError("Rate limit exceeded") | |
# Validate preferences | |
try: | |
validated_preferences = UserPreferences(**preferences) | |
except Exception as e: | |
raise ValueError(f"Invalid preferences: {str(e)}") | |
profile_path = os.path.join(self.profiles_dir, f"{user_id}.json") | |
with _profile_lock: | |
# Create backup before update | |
self._create_backup(user_id) | |
current_profile = self.get_profile(user_id) | |
# Update profile | |
current_profile["preferences"] = validated_preferences.dict() | |
current_profile["updated_at"] = datetime.utcnow().isoformat() | |
# Save updated profile | |
with open(profile_path, "w", encoding="utf-8") as f: | |
json.dump(current_profile, f, indent=2) | |
logger.info(f"Updated profile for user {user_id}") | |
return current_profile | |
except Exception as e: | |
logger.error( | |
f"Error updating profile for {user_id}: {str(e)}", exc_info=True | |
) | |
raise | |
def _create_default_profile(self, user_id: str) -> Dict[str, Any]: | |
"""Create a default profile with validation.""" | |
try: | |
if not self._validate_user_id(user_id): | |
raise ValueError("Invalid user ID format") | |
default_preferences = UserPreferences().dict() | |
profile = { | |
"user_id": user_id, | |
"preferences": default_preferences, | |
"created_at": datetime.utcnow().isoformat(), | |
"updated_at": datetime.utcnow().isoformat(), | |
} | |
profile_path = os.path.join(self.profiles_dir, f"{user_id}.json") | |
with _profile_lock: | |
with open(profile_path, "w", encoding="utf-8") as f: | |
json.dump(profile, f, indent=2) | |
logger.info(f"Created default profile for user {user_id}") | |
return profile | |
except Exception as e: | |
logger.error( | |
f"Error creating default profile for {user_id}: {str(e)}", exc_info=True | |
) | |
raise | |
def delete_profile(self, user_id: str) -> None: | |
"""Delete user profile with validation.""" | |
try: | |
if not self._validate_user_id(user_id): | |
raise ValueError("Invalid user ID format") | |
profile_path = os.path.join(self.profiles_dir, f"{user_id}.json") | |
with _profile_lock: | |
if os.path.exists(profile_path): | |
# Create final backup before deletion | |
self._create_backup(user_id) | |
os.remove(profile_path) | |
logger.info(f"Deleted profile for user {user_id}") | |
else: | |
logger.warning(f"Profile not found for user {user_id}") | |
except Exception as e: | |
logger.error( | |
f"Error deleting profile for {user_id}: {str(e)}", exc_info=True | |
) | |
raise | |
def get_recommendations(self, user_id: str) -> Dict[str, Any]: | |
"""Get personalized recommendations based on user profile with validation.""" | |
try: | |
profile = self.get_profile(user_id) | |
if not profile or "preferences" not in profile: | |
return {} | |
preferences = UserPreferences(**profile["preferences"]) | |
recommendations = { | |
"destinations": self._get_destination_recommendations(preferences), | |
"activities": self._get_activity_recommendations(preferences), | |
"tips": self._get_personalized_tips(preferences), | |
"generated_at": datetime.now().isoformat(), | |
} | |
return recommendations | |
except Exception as e: | |
logger.error(f"Error getting recommendations: {str(e)}", exc_info=True) | |
return {} | |
def _get_destination_recommendations(self, profile: UserPreferences) -> List[str]: | |
"""Get destination recommendations based on preferences.""" | |
try: | |
recommendations = [] | |
# Add recommendations based on favorite destinations | |
if profile.preferred_destinations: | |
recommendations.extend(profile.preferred_destinations[:3]) | |
# Add recommendations based on interests | |
if "beach" in profile.preferred_activities: | |
recommendations.append("Bali, Indonesia") | |
if "culture" in profile.preferred_activities: | |
recommendations.append("Kyoto, Japan") | |
if "food" in profile.preferred_activities: | |
recommendations.append("Bangkok, Thailand") | |
# Add recommendations based on travel style | |
if profile.travel_style == TravelStyle.LUXURY: | |
recommendations.append("Dubai, UAE") | |
elif profile.travel_style == TravelStyle.BUDGET: | |
recommendations.append("Bangkok, Thailand") | |
return list(set(recommendations))[:5] # Return top 5 unique recommendations | |
except Exception as e: | |
logger.error( | |
f"Error getting destination recommendations: {str(e)}", exc_info=True | |
) | |
return [] | |
def _get_activity_recommendations(self, profile: UserPreferences) -> List[str]: | |
"""Get activity recommendations based on preferences.""" | |
try: | |
activities = [] | |
# Add activities based on interests | |
if "culture" in profile.preferred_activities: | |
activities.append("Visit local museums and historical sites") | |
if "food" in profile.preferred_activities: | |
activities.append("Try local cuisine and food tours") | |
if "nature" in profile.preferred_activities: | |
activities.append("Explore national parks and hiking trails") | |
if "adventure" in profile.preferred_activities: | |
activities.append("Try adventure sports and activities") | |
# Add activities based on travel style | |
if profile.travel_style == TravelStyle.LUXURY: | |
activities.append("Book private guided tours") | |
elif profile.travel_style == TravelStyle.BUDGET: | |
activities.append("Explore local markets and street food") | |
return list(set(activities))[:5] # Return top 5 unique activities | |
except Exception as e: | |
logger.error( | |
f"Error getting activity recommendations: {str(e)}", exc_info=True | |
) | |
return [] | |
def _get_personalized_tips(self, profile: UserPreferences) -> List[str]: | |
"""Get personalized travel tips based on preferences.""" | |
try: | |
tips = [] | |
# Add tips based on travel style | |
if profile.travel_style == TravelStyle.BUDGET: | |
tips.append( | |
"Look for local markets and street food for affordable meals" | |
) | |
tips.append("Consider staying in hostels or guesthouses") | |
elif profile.travel_style == TravelStyle.LUXURY: | |
tips.append("Book premium experiences and private tours in advance") | |
tips.append("Consider luxury resorts and boutique hotels") | |
# Add tips based on dietary restrictions | |
if profile.dietary_restrictions: | |
tips.append( | |
f"Research restaurants that accommodate {', '.join(profile.dietary_restrictions)}" | |
) | |
# Add tips based on accessibility needs | |
if profile.accessibility_needs: | |
tips.append( | |
f"Research accessibility features for {', '.join(profile.accessibility_needs)}" | |
) | |
return list(set(tips))[:5] # Return top 5 unique tips | |
except Exception as e: | |
logger.error(f"Error getting personalized tips: {str(e)}", exc_info=True) | |
return [] | |