import os import json import requests import logging import numpy as np import pandas as pd from datetime import datetime from typing import Dict, List, Optional, Any import pickle import math from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from transformers import AutoTokenizer, AutoModel, pipeline import torch import warnings warnings.filterwarnings('ignore') # Parallel processing imports import concurrent.futures import threading # Import centralized configuration from config import get_backend_url, get_ngrok_headers, get_api_endpoint, get_email_config from concurrent.futures import ThreadPoolExecutor, as_completed import time # ChromaDB imports import chromadb from chromadb.config import Settings import uuid # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AIRecommendationEngine: def __init__(self): self.device = 'cpu' # Force CPU usage logger.info("🤖 Initializing AI Recommendation Engine with ChromaDB (CPU-based)") # Set Hugging Face cache directories for Hugging Face Spaces self.setup_cache_directories() # Configuration for API endpoints and settings self.config = { 'backend_url': get_backend_url(), 'headers': get_ngrok_headers() } # Email configuration (SendGrid with fallback to mock) # Uses real SendGrid API when network is available, falls back to mock service self.email_config = get_email_config() # API configuration - NO TIMEOUT for complete data fetch self.properties_api = f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails" # Initialize ChromaDB collections (will be set in initialize_chromadb) self.properties_collection = None self.user_preferences_collection = None # Initialize ChromaDB self.initialize_chromadb() # Initialize AI models self.initialize_ai_models() def setup_cache_directories(self): """Setup cache directories for Hugging Face Spaces compatibility""" try: # Create cache directories with proper permissions cache_dir = "/tmp/hf_cache" os.makedirs(cache_dir, exist_ok=True) os.chmod(cache_dir, 0o755) # Set environment variables for Hugging Face cache os.environ['HF_HOME'] = cache_dir os.environ['TRANSFORMERS_CACHE'] = f"{cache_dir}/transformers" os.environ['HF_DATASETS_CACHE'] = f"{cache_dir}/datasets" os.environ['HF_HUB_CACHE'] = f"{cache_dir}/hub" # Create subdirectories for subdir in ['transformers', 'datasets', 'hub', 'chroma']: subdir_path = f"{cache_dir}/{subdir}" os.makedirs(subdir_path, exist_ok=True) os.chmod(subdir_path, 0o755) logger.info(f"✅ Cache directories setup complete: {cache_dir}") except Exception as e: logger.warning(f"⚠️ Cache directory setup failed: {e}") # Fallback to default locations pass def initialize_chromadb(self): """Initialize ChromaDB with robust containerized environment support""" try: logger.info("🗄️ Initializing ChromaDB with robust containerized support...") # Multiple database path strategies for different environments import os import tempfile # Strategy 1: Try /tmp for containerized environments db_paths = [ "/tmp/chromadb_properties", # Container-friendly "/tmp/hf_cache/chromadb", # Hugging Face cache os.path.abspath("./property_db"), # Local development tempfile.mkdtemp(prefix="chromadb_") # Fallback temp directory ] chroma_client = None selected_path = None # Try each path until one works for db_path in db_paths: try: logger.info(f"🔄 Trying ChromaDB path: {db_path}") os.makedirs(db_path, exist_ok=True) # Set proper permissions for containerized environments os.chmod(db_path, 0o755) # Try to create client chroma_client = chromadb.PersistentClient(path=db_path) # Test the client by listing collections collections = chroma_client.list_collections() logger.info(f"✅ ChromaDB client test successful at: {db_path}") selected_path = db_path break except Exception as path_error: logger.warning(f"⚠️ Failed to initialize ChromaDB at {db_path}: {path_error}") continue if chroma_client is None: logger.error("❌ All ChromaDB paths failed - falling back to in-memory client") chroma_client = chromadb.Client() selected_path = "in-memory" self.chroma_client = chroma_client logger.info(f"🗄️ ChromaDB initialized successfully at: {selected_path}") # Create robust embedding function embedding_function = self.create_robust_embedding_function() # Initialize collections with retry logic collection_name = "properties_main_collection" max_retries = 3 for attempt in range(max_retries): try: # Try to get existing collection self.properties_collection = self.chroma_client.get_collection(collection_name) count = self.properties_collection.count() logger.info(f"📚 Using existing ChromaDB collection '{collection_name}' with {count} properties") break except Exception as e: logger.info(f"📚 Attempt {attempt + 1}/{max_retries}: Creating new ChromaDB collection '{collection_name}'") try: # Delete existing collection if it exists try: self.chroma_client.delete_collection(collection_name) logger.info(f"🗑️ Deleted existing collection '{collection_name}'") except: pass # Create new collection self.properties_collection = self.chroma_client.create_collection( name=collection_name, metadata={"description": "Real estate properties with robust embeddings"}, embedding_function=embedding_function ) logger.info(f"📚 Successfully created ChromaDB collection '{collection_name}'") break except Exception as create_error: logger.error(f"❌ Attempt {attempt + 1} failed to create collection: {create_error}") if attempt == max_retries - 1: raise create_error import time time.sleep(1) # Wait before retry # Create user preferences collection try: self.user_preferences_collection = self.chroma_client.get_collection("user_preferences_robust") logger.info("📚 Using existing robust user preferences collection") except: self.user_preferences_collection = self.chroma_client.create_collection( name="user_preferences_robust", metadata={"description": "User behavior and preferences with robust embeddings"}, embedding_function=embedding_function ) logger.info("📚 Created new robust user preferences collection") logger.info(f"✅ Robust ChromaDB initialized successfully") logger.info(f"📊 Properties in DB: {self.properties_collection.count()}") # Test the embedding function try: test_text = "Test property for robust embedding function" test_embedding = embedding_function([test_text]) if test_embedding and len(test_embedding) > 0 and len(test_embedding[0]) > 0: logger.info(f"✅ Robust embedding function test successful - embedding shape: {len(test_embedding[0])}") else: logger.warning("⚠️ Robust embedding function returned empty result") except Exception as test_error: logger.warning(f"⚠️ Robust embedding function test failed: {test_error}") except Exception as e: logger.error(f"❌ Failed to initialize robust ChromaDB: {e}") # Set to None so other methods can handle gracefully self.properties_collection = None self.user_preferences_collection = None def get_custom_embedding_function(self): """Get custom embedding function with proper cache directory""" try: from chromadb.utils import embedding_functions # Set environment variables for all cache directories cache_dir = '/tmp/hf_cache' chroma_cache_dir = f'{cache_dir}/chroma' onnx_cache_dir = f'{cache_dir}/onnx' # Create all necessary cache directories with proper permissions for dir_path in [cache_dir, chroma_cache_dir, onnx_cache_dir]: os.makedirs(dir_path, exist_ok=True) os.chmod(dir_path, 0o755) # Set environment variables to control cache locations os.environ['CHROMA_CACHE_DIR'] = chroma_cache_dir os.environ['ONNXRUNTIME_CACHE_DIR'] = onnx_cache_dir os.environ['HF_HOME'] = cache_dir os.environ['TRANSFORMERS_CACHE'] = f'{cache_dir}/transformers' os.environ['SENTENCE_TRANSFORMERS_HOME'] = f'{cache_dir}/sentence_transformers' # Try ONNX embedding function first (faster, smaller) try: logger.info("🔄 Trying ONNX embedding function...") embedding_func = embedding_functions.ONNXMiniLM_L6_V2() logger.info("✅ ONNX embedding function created successfully") return embedding_func except Exception as onnx_error: logger.warning(f"⚠️ ONNX embedding function failed: {onnx_error}") # Fallback to sentence transformers try: logger.info("🔄 Trying SentenceTransformer embedding function...") embedding_func = embedding_functions.SentenceTransformerEmbeddingFunction( model_name='all-MiniLM-L6-v2', cache_folder=chroma_cache_dir ) logger.info("✅ SentenceTransformer embedding function created successfully") return embedding_func except Exception as st_error: logger.warning(f"⚠️ SentenceTransformer embedding function failed: {st_error}") # Final fallback to default try: logger.info("🔄 Trying default embedding function...") embedding_func = embedding_functions.DefaultEmbeddingFunction() logger.info("✅ Default embedding function created successfully") return embedding_func except Exception as default_error: logger.warning(f"⚠️ Default embedding function failed: {default_error}") # Ultimate fallback - create a simple embedding function try: logger.info("🔄 Creating simple fallback embedding function...") embedding_func = self.create_simple_embedding_function() logger.info("✅ Simple fallback embedding function created successfully") return embedding_func except Exception as simple_error: logger.error(f"❌ All embedding functions failed: {simple_error}") return None except Exception as e: logger.error(f"❌ Error in get_custom_embedding_function: {e}") return None def create_robust_embedding_function(self): """Create robust embedding function that works in containerized environments""" class RobustEmbeddingFunction: def __init__(self): self.embedding_dim = 384 # Standard dimension self.cache = {} # Simple cache for performance logger.info(f"🔧 Created robust embedding function with dimension: {self.embedding_dim}") def __call__(self, input): """Generate robust embeddings that work in any environment""" try: # Handle both string and list inputs if isinstance(input, str): texts = [input] elif isinstance(input, list): texts = input else: texts = [str(input)] if not texts: return [[0.0] * self.embedding_dim] embeddings = [] for text in texts: if not text or len(text.strip()) == 0: embeddings.append([0.0] * self.embedding_dim) continue # Check cache first text_key = text.lower().strip() if text_key in self.cache: embeddings.append(self.cache[text_key]) continue # Create robust embedding based on text characteristics embedding = self._generate_robust_embedding(text) # Cache the result self.cache[text_key] = embedding embeddings.append(embedding) return embeddings except Exception as e: logger.warning(f"⚠️ Robust embedding failed, using fallback: {e}") return [[0.0] * self.embedding_dim for _ in texts] def _generate_robust_embedding(self, text): """Generate embedding using multiple strategies for robustness""" import hashlib import struct # Strategy 1: MD5 hash-based hash_obj = hashlib.md5(text.encode('utf-8')) hash_bytes = hash_obj.digest() # Strategy 2: SHA256 for additional entropy sha_obj = hashlib.sha256(text.encode('utf-8')) sha_bytes = sha_obj.digest() # Strategy 3: Text characteristics char_freq = {} for char in text: char_freq[char] = char_freq.get(char, 0) + 1 # Combine strategies for robust embedding embedding = [] for i in range(self.embedding_dim): if i < 16: # Use MD5 bytes embedding.append(float(hash_bytes[i]) / 255.0) elif i < 48: # Use SHA256 bytes embedding.append(float(sha_bytes[i - 16]) / 255.0) else: # Use derived values # Combine character frequency, position, and hash char_idx = i % len(text) if text else 0 char_val = ord(text[char_idx]) if char_idx < len(text) else 0 freq_val = char_freq.get(text[char_idx] if char_idx < len(text) else ' ', 0) combined_val = (char_val + freq_val + hash_bytes[i % 16] + sha_bytes[i % 32]) % 256 embedding.append(float(combined_val) / 255.0) return embedding return RobustEmbeddingFunction() def initialize_ai_models(self): """Initialize AI models for analysis and recommendations""" try: logger.info("🧠 Initializing AI models for enhanced recommendations...") # Initialize small AI models for property recommendations self.initialize_recommendation_models() # Initialize existing models self.initialize_fallback_models() logger.info("✅ AI models initialized successfully") except Exception as e: logger.error(f"❌ Error initializing AI models: {e}") self.initialize_fallback_models() def initialize_recommendation_models(self): """Initialize small AI models specifically for property recommendations""" try: logger.info("🔍 Initializing recommendation AI models...") # Small transformer model for property analysis model_name = "sentence-transformers/all-MiniLM-L6-v2" # Small, fast model logger.info(f"📥 Loading recommendation model: {model_name}") self.recommendation_tokenizer = AutoTokenizer.from_pretrained(model_name) self.recommendation_model = AutoModel.from_pretrained(model_name) # Move to CPU for compatibility self.recommendation_model.to(self.device) self.recommendation_model.eval() # Initialize property classification model self.property_classifier = self.initialize_property_classifier() # Initialize price range predictor self.price_predictor = self.initialize_price_predictor() # Initialize preference analyzer self.preference_analyzer = self.initialize_preference_analyzer() logger.info("✅ Recommendation AI models loaded successfully") except Exception as e: logger.warning(f"⚠️ Could not load recommendation models: {e}") self.recommendation_tokenizer = None self.recommendation_model = None self.property_classifier = None self.price_predictor = None self.preference_analyzer = None def initialize_property_classifier(self): """Initialize a small model for property type classification""" try: # Simple rule-based classifier with ML enhancement classifier = { 'luxury_threshold': 50000000, # 50M 'premium_threshold': 25000000, # 25M 'mid_range_threshold': 10000000, # 10M 'affordable_threshold': 5000000, # 5M 'property_type_weights': { 'Apartment': 1.0, 'Villa': 1.5, 'Penthouse': 2.0, 'Townhouse': 1.2, 'Plot': 1.3, 'Commercial': 1.8 }, 'location_weights': { 'premium': 1.5, 'upcoming': 1.2, 'established': 1.0, 'developing': 0.8 } } return classifier except Exception as e: logger.warning(f"⚠️ Property classifier initialization failed: {e}") return None def initialize_price_predictor(self): """Initialize a simple price range predictor""" try: predictor = { 'price_ranges': { 'ultra_luxury': (100000000, float('inf')), # 100M+ 'luxury': (50000000, 100000000), # 50M-100M 'premium': (25000000, 50000000), # 25M-50M 'mid_range': (10000000, 25000000), # 10M-25M 'affordable': (5000000, 10000000), # 5M-10M 'budget': (0, 5000000) # 0-5M }, 'price_preferences': { 'conservative': 0.7, # Prefer lower prices 'balanced': 1.0, # No preference 'premium': 1.3 # Prefer higher prices } } return predictor except Exception as e: logger.warning(f"⚠️ Price predictor initialization failed: {e}") return None def initialize_preference_analyzer(self): """Initialize preference analysis model""" try: analyzer = { 'engagement_weights': { 'high': 1.5, 'medium': 1.0, 'low': 0.7 }, 'viewing_pattern_weights': { 'morning': 1.1, 'afternoon': 1.0, 'evening': 0.9 }, 'property_type_preferences': { 'diverse': 1.2, 'focused': 1.0, 'exploratory': 1.1 } } return analyzer except Exception as e: logger.warning(f"⚠️ Preference analyzer initialization failed: {e}") return None def initialize_fallback_models(self): """Initialize fallback models if main models fail""" logger.info("🔄 Initializing fallback models...") try: from transformers import pipeline self.sentiment_analyzer = pipeline("sentiment-analysis", device=-1) self.tokenizer = None self.embedding_model = None self.behavior_classifier = None logger.info("✅ Fallback models loaded") except Exception as e: logger.error(f"❌ Fallback models failed: {e}") self.sentiment_analyzer = None self.tokenizer = None self.embedding_model = None self.behavior_classifier = None def get_property_embeddings(self, property_text: str) -> np.ndarray: """Generate embeddings for property text using Hugging Face model""" try: if self.tokenizer and self.embedding_model: inputs = self.tokenizer(property_text, return_tensors='pt', truncation=True, padding=True, max_length=512) with torch.no_grad(): outputs = self.embedding_model(**inputs) embeddings = outputs.last_hidden_state.mean(dim=1) return embeddings.numpy().flatten() else: # Fallback to TF-IDF return self.get_tfidf_embeddings(property_text) except Exception as e: logger.error(f"❌ Error generating embeddings: {e}") return self.get_tfidf_embeddings(property_text) def get_tfidf_embeddings(self, text: str) -> np.ndarray: """Fallback TF-IDF embeddings""" vectorizer = TfidfVectorizer(max_features=100, stop_words='english') try: tfidf = vectorizer.fit_transform([text]) return tfidf.toarray().flatten() except: return np.random.rand(100) # Random fallback def analyze_user_behavior_with_ai(self, analysis_data: Dict) -> Dict: """Use AI to analyze user behavior patterns""" logger.info("🧠 Analyzing user behavior with AI...") try: # Extract behavior patterns lead_qual = analysis_data.get('data', {}).get('lead_qualification', {}) analytics = analysis_data.get('data', {}).get('analytics', {}) properties = analysis_data.get('data', {}).get('properties', []) # Create behavior text for AI analysis behavior_text = self.create_behavior_description(lead_qual, analytics, properties) # Sentiment analysis of user engagement sentiment_result = None if self.sentiment_analyzer: try: sentiment_result = self.sentiment_analyzer(behavior_text[:512]) except Exception as e: logger.warning(f"⚠️ Sentiment analysis failed: {e}") # AI-enhanced insights ai_insights = { 'behavior_summary': behavior_text, 'sentiment_analysis': sentiment_result, 'ai_personality_type': self.determine_personality_type(analytics), 'buying_motivation': self.analyze_buying_motivation(properties), 'recommendation_strategy': self.determine_recommendation_strategy(lead_qual), 'urgency_level': self.calculate_urgency_level(analytics), 'property_preferences': self.extract_ai_preferences(properties) } logger.info("✅ AI behavior analysis completed") return ai_insights except Exception as e: logger.error(f"❌ AI behavior analysis failed: {e}") return {'error': str(e)} def create_behavior_description(self, lead_qual: Dict, analytics: Dict, properties: List) -> str: """Create a text description of user behavior for AI analysis""" lead_status = lead_qual.get('lead_status', 'UNKNOWN') engagement_level = analytics.get('engagement_level', 'Unknown') total_views = len(properties) # Calculate average price interest prices = [p.get('price', 0) for p in properties if p.get('price', 0) > 0] avg_price = sum(prices) / len(prices) if prices else 0 # Property types viewed prop_types = list(set(p.get('propertyTypeName', 'Unknown') for p in properties)) behavior_text = f""" Customer shows {engagement_level.lower()} engagement with {lead_status.lower()} lead status. They have viewed {total_views} properties with an average price interest of ₹{avg_price:,.0f}. Property types of interest include: {', '.join(prop_types[:3])}. Their viewing pattern suggests they are a {self.get_buyer_archetype(analytics)} buyer. """ return behavior_text.strip() def determine_personality_type(self, analytics: Dict) -> str: """Determine customer personality type using AI analysis""" engagement = analytics.get('engagement_level', 'Medium') viewing_patterns = analytics.get('viewing_patterns', {}) if engagement == 'Very High': return "Decisive Decision Maker" elif engagement == 'High': return "Research-Oriented Buyer" elif engagement == 'Medium': return "Cautious Evaluator" else: return "Passive Browser" def analyze_buying_motivation(self, properties: List) -> str: """Analyze what motivates the buyer based on property choices""" if not properties: return "Unknown motivation" # Analyze price ranges prices = [p.get('price', 0) for p in properties if p.get('price', 0) > 0] if not prices: return "Price-conscious buyer" price_range = max(prices) - min(prices) avg_price = sum(prices) / len(prices) if price_range < avg_price * 0.2: # Consistent price range return "Budget-focused with clear price target" elif avg_price > 20000000: # High-value properties return "Luxury-seeking with investment focus" elif len(set(p.get('propertyTypeName') for p in properties)) > 3: return "Exploring options, needs guidance" else: return "Value-conscious with specific requirements" def determine_recommendation_strategy(self, lead_qual: Dict) -> str: """Determine the best recommendation strategy""" lead_status = lead_qual.get('lead_status', 'COLD') lead_score = lead_qual.get('lead_score', 0) if lead_status == 'HOT' or lead_score > 70: return "aggressive_immediate" # Show premium properties, create urgency elif lead_status == 'WARM' or lead_score > 50: return "nurturing_educational" # Provide comparisons, detailed info elif lead_status == 'LUKEWARM' or lead_score > 30: return "engagement_building" # Variety of options, build interest else: return "awareness_creation" # Basic information, build trust def calculate_urgency_level(self, analytics: Dict) -> str: """Calculate how urgent the customer's need is""" recent_activity = analytics.get('viewing_patterns', {}).get('peak_viewing_time') engagement = analytics.get('engagement_level', 'Low') if engagement in ['Very High', 'High'] and recent_activity: return "High" elif engagement == 'Medium': return "Medium" else: return "Low" def extract_ai_preferences(self, properties: List) -> Dict: """Extract detailed preferences using AI analysis""" if not properties: return {} # Analyze property features that attract the user features = { 'preferred_types': [], 'price_sensitivity': 'medium', 'location_preferences': [], 'feature_importance': {} } # Property types preference type_counts = {} for prop in properties: prop_type = prop.get('propertyTypeName', 'Unknown') type_counts[prop_type] = type_counts.get(prop_type, 0) + prop.get('viewCount', 1) # Return just the property type names as strings, not tuples features['preferred_types'] = [prop_type for prop_type, count in sorted(type_counts.items(), key=lambda x: x[1], reverse=True)[:3]] # Price sensitivity analysis prices = [p.get('price', 0) for p in properties if p.get('price', 0) > 0] if prices: price_std = np.std(prices) price_mean = np.mean(prices) cv = price_std / price_mean if price_mean > 0 else 0 if cv < 0.2: features['price_sensitivity'] = 'high' # Very specific about price elif cv > 0.5: features['price_sensitivity'] = 'low' # Flexible on price else: features['price_sensitivity'] = 'medium' return features def get_buyer_archetype(self, analytics: Dict) -> str: """Determine buyer archetype""" engagement = analytics.get('engagement_level', 'Medium') opportunity_score = analytics.get('opportunity_score', 50) if engagement == 'Very High' and opportunity_score > 80: return "motivated" elif engagement in ['High', 'Medium'] and opportunity_score > 60: return "serious" else: return "casual" def fetch_and_store_all_properties(self) -> bool: """Fetch ALL properties from API using PARALLEL processing and store in ChromaDB""" try: logger.info("🚀 Starting PARALLEL property fetching for faster performance...") # Use parallel fetching for speed all_properties = self.fetch_all_properties_parallel(max_workers=8) if not all_properties: logger.warning("⚠️ No properties fetched from API, falling back to sequential method...") # Fallback to the original sequential method return self.fetch_all_properties_sequential() logger.info(f"🎉 Successfully fetched {len(all_properties)} total properties using parallel processing!") # Store all properties in ChromaDB if all_properties: success = self.store_properties_in_chromadb(all_properties) return success else: logger.warning("⚠️ No properties to store") return False except Exception as e: logger.error(f"❌ Error in parallel property fetch: {e}") logger.info("🔄 Falling back to sequential fetching...") return self.fetch_all_properties_sequential() def fetch_all_properties_sequential(self) -> bool: """Fallback sequential property fetching method""" try: logger.info("🌐 Fetching properties using sequential method...") all_properties = [] page = 1 page_size = 100 while True: try: params = { 'pageNumber': page, 'pageSize': page_size } logger.info(f"📄 Fetching page {page}...") response = requests.get(self.properties_api, params=params, verify=False, timeout=None) response.raise_for_status() data = response.json() # Handle different response structures if isinstance(data, list): properties_batch = data elif isinstance(data, dict): if 'data' in data: properties_batch = data['data'] elif 'properties' in data: properties_batch = data['properties'] elif 'results' in data: properties_batch = data['results'] else: properties_batch = list(data.values()) if data else [] else: logger.warning(f"Unexpected response type on page {page}: {type(data)}") break if not properties_batch or len(properties_batch) == 0: logger.info(f"📄 Page {page} is empty - reached end") break all_properties.extend(properties_batch) logger.info(f"✅ Page {page}: {len(properties_batch)} properties (Total: {len(all_properties)})") page += 1 if page > 1000: # Safety limit logger.warning("⚠️ Reached page limit (1000) - stopping fetch") break except Exception as page_error: logger.error(f"❌ Error fetching page {page}: {page_error}") page += 1 if page > 5: break continue logger.info(f"🎉 Sequential fetch completed: {len(all_properties)} total properties!") if all_properties: success = self.store_properties_in_chromadb(all_properties) return success else: logger.warning("⚠️ No properties fetched from API") return False except Exception as e: logger.error(f"❌ Error in sequential property fetch: {e}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return False def store_properties_in_chromadb(self, properties: List[Dict]): """Store properties in ChromaDB with embeddings""" try: logger.info(f"🗄️ Storing {len(properties)} properties in ChromaDB...") # Ensure ChromaDB is properly initialized if self.properties_collection is None: logger.error("❌ ChromaDB properties collection not initialized, cannot store properties") return False # Clear existing properties to avoid duplicates existing_count = self.properties_collection.count() if existing_count > 0: logger.info(f"🗑️ Clearing {existing_count} existing properties...") # ChromaDB doesn't have a clear method, so we recreate the collection collection_name = self.properties_collection.name self.chroma_client.delete_collection(collection_name) # Get the same embedding function that was used initially embedding_function = self.get_custom_embedding_function() if embedding_function is None: logger.error("❌ No embedding function available - cannot recreate collection") return False self.properties_collection = self.chroma_client.create_collection( name=collection_name, metadata={"description": "Real estate properties with embeddings"}, embedding_function=embedding_function ) # Process properties in batches for efficiency batch_size = 50 total_stored = 0 for i in range(0, len(properties), batch_size): batch = properties[i:i+batch_size] # Prepare data for ChromaDB documents = [] metadatas = [] ids = [] for prop in batch: # Create comprehensive text description for embeddings description = self.create_property_description(prop) documents.append(description) # Store all property metadata metadata = { 'id': str(prop.get('id', prop.get('propertyId', f'prop_{i}'))), 'propertyName': prop.get('propertyName', ''), 'propertyTypeName': prop.get('propertyTypeName', prop.get('typeName', '')), 'price': float(prop.get('price', 0) or prop.get('marketValue', 0) or 0), 'address': prop.get('address', prop.get('location', '')), 'beds': int(prop.get('beds', 0) or prop.get('bedrooms', 0) or 0), 'baths': int(prop.get('baths', 0) or prop.get('bathrooms', 0) or 0), 'sqft': float(prop.get('totalSquareFeet', 0) or prop.get('area', 0) or 0), 'description': prop.get('description', ''), 'features': json.dumps(prop.get('features', [])) if prop.get('features') else '[]' } # Clean metadata to remove None values metadata = self.clean_metadata_for_chromadb(metadata) metadatas.append(metadata) # Use property ID as document ID prop_id = str(prop.get('id', prop.get('propertyId', f'prop_{total_stored + len(ids)}'))) ids.append(prop_id) # Add batch to ChromaDB try: # Validate that all metadatas are clean for idx, metadata in enumerate(metadatas): for key, value in metadata.items(): if value is None: logger.warning(f"⚠️ Found None value in metadata[{idx}][{key}], this should have been cleaned") self.properties_collection.add( documents=documents, metadatas=metadatas, ids=ids ) total_stored += len(batch) logger.info(f"📊 Stored batch {i//batch_size + 1}: {len(batch)} properties (Total: {total_stored})") except Exception as batch_error: logger.error(f"❌ Error storing batch {i//batch_size + 1}: {batch_error}") # Check if it's an embedding function error if "embedding function" in str(batch_error).lower(): logger.error("🔧 This appears to be an embedding function issue. Checking collection setup...") if self.properties_collection is None: logger.error("❌ Properties collection is None - ChromaDB not properly initialized") else: logger.error("❌ Collection exists but embedding function may be missing") # Log the problematic data for debugging logger.error(f"Batch size: {len(batch)}, Documents: {len(documents)}, Metadatas: {len(metadatas)}, IDs: {len(ids)}") if metadatas: logger.error(f"Sample metadata keys: {list(metadatas[0].keys())}") for idx, metadata in enumerate(metadatas[:3]): # Log first 3 for debugging logger.error(f"Metadata {idx}: {metadata}") # Log sample document for debugging if documents: logger.error(f"Sample document: {documents[0][:200]}...") raise # Re-raise to stop processing logger.info(f"✅ Successfully stored {total_stored} properties in ChromaDB!") return True except Exception as e: logger.error(f"❌ Error storing properties in ChromaDB: {e}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return False def create_property_description(self, prop: Dict) -> str: """Create a comprehensive text description for property embeddings""" parts = [] # Basic info name = prop.get('propertyName', prop.get('name', '')) if name: parts.append(f"Property: {name}") property_type = prop.get('propertyTypeName', prop.get('typeName', '')) if property_type: parts.append(f"Type: {property_type}") # Location location = prop.get('address', prop.get('location', '')) if location: parts.append(f"Location: {location}") # Price price = prop.get('price', 0) or prop.get('marketValue', 0) or 0 if price > 0: parts.append(f"Price: ${price:,.0f}") # Specs beds = prop.get('beds', 0) or prop.get('bedrooms', 0) or 0 baths = prop.get('baths', 0) or prop.get('bathrooms', 0) or 0 sqft = prop.get('totalSquareFeet', 0) or prop.get('area', 0) or 0 if beds > 0: parts.append(f"{beds} bedrooms") if baths > 0: parts.append(f"{baths} bathrooms") if sqft > 0: parts.append(f"{sqft:,.0f} sq ft") # Description description = prop.get('description', '') if description: parts.append(f"Description: {description}") # Features features = prop.get('features', []) if features: if isinstance(features, list): parts.append(f"Features: {', '.join(features)}") elif isinstance(features, str): parts.append(f"Features: {features}") return '. '.join(parts) def clean_metadata_for_chromadb(self, metadata: Dict) -> Dict: """Clean metadata to ensure no None values for ChromaDB storage""" cleaned = {} for key, value in metadata.items(): if value is None: # Set appropriate defaults based on expected type if key in ['price', 'sqft']: cleaned[key] = 0.0 elif key in ['beds', 'baths']: cleaned[key] = 0 else: cleaned[key] = '' elif isinstance(value, str): # Ensure string is not None and handle encoding issues cleaned[key] = str(value).strip() if value else '' elif isinstance(value, (int, float)): # Handle NaN and infinity values if value != value or value == float('inf') or value == float('-inf'): cleaned[key] = 0.0 if key in ['price', 'sqft'] else 0 else: # Ensure proper type conversion if key in ['price', 'sqft']: cleaned[key] = float(value) elif key in ['beds', 'baths']: cleaned[key] = int(value) else: cleaned[key] = value elif isinstance(value, bool): # Convert boolean to string for ChromaDB compatibility cleaned[key] = str(value).lower() else: # Convert any other type to string try: cleaned[key] = str(value) if value is not None else '' except Exception: cleaned[key] = '' return cleaned def find_similar_properties_ai(self, user_analysis: Dict, ai_insights: Dict) -> List[Dict]: """Find similar properties using enhanced AI recommendation system""" try: logger.info("🔍 Finding AI recommendations using enhanced system...") # Use the new enhanced AI recommendation system recommendations = self.get_enhanced_ai_recommendations(user_analysis, ai_insights, count=15) if recommendations: logger.info(f"✅ Found {len(recommendations)} enhanced AI recommendations") return recommendations else: logger.warning("⚠️ No enhanced recommendations found, using fallback") return self.get_fallback_recommendations(10) except Exception as e: logger.error(f"❌ Error in enhanced AI recommendations: {e}") return self.get_fallback_recommendations(10) def create_multiple_user_queries(self, viewed_properties: List[Dict], preferences: Dict, personality_type: str) -> List[str]: """Create multiple search queries for better property coverage""" queries = [] # Query 1: Based on user's actual viewed properties if viewed_properties: primary_query = self.create_user_preference_query(viewed_properties, preferences, personality_type) queries.append(primary_query) # Query 2: Based on preferred property types property_types = preferences.get('preferred_types', []) if property_types: # Handle both strings and tuples in property_types type_strings = [] for prop_type in property_types: if isinstance(prop_type, tuple): # If it's a tuple, take the first element (usually the type name) type_strings.append(str(prop_type[0]) if prop_type else 'Property') elif isinstance(prop_type, str): type_strings.append(prop_type) else: type_strings.append(str(prop_type)) if type_strings: type_query = f"Property types: {', '.join(type_strings)}. Modern amenities and good location." queries.append(type_query) # Query 3: Based on price range preferences price_range = preferences.get('price_range', {}) if price_range: min_price = price_range.get('min', 0) max_price = price_range.get('max', 0) if min_price > 0 or max_price > 0: price_query = f"Properties in price range {min_price} to {max_price}. Good value for money." queries.append(price_query) # Query 4: Generic high-quality properties queries.append("High-quality properties with modern amenities, good location, and excellent facilities.") # Query 5: Diverse property types queries.append("Diverse mix of Villas, Flats, Apartments, and Houses with different price ranges.") return queries[:5] # Limit to 5 queries for performance def enhance_recommendations_with_ai_parallel(self, properties: List[Dict], viewed_properties: List[Dict], preferences: Dict, strategy: str) -> List[Dict]: """Enhanced AI scoring with parallel processing for speed""" logger.info(f"🧠 Enhanced {len(properties)} properties with AI analysis using parallel processing") if not properties: return [] # Process properties in parallel batches batch_size = 20 processed_properties = [] with ThreadPoolExecutor(max_workers=6) as executor: # Split properties into batches batches = [properties[i:i + batch_size] for i in range(0, len(properties), batch_size)] # Submit batch processing tasks future_to_batch = { executor.submit( self.process_property_batch_ai, batch, viewed_properties, preferences, strategy ): batch_idx for batch_idx, batch in enumerate(batches) } for future in as_completed(future_to_batch): batch_idx = future_to_batch[future] try: batch_results = future.result() processed_properties.extend(batch_results) logger.info(f"✅ Processed batch {batch_idx + 1}/{len(batches)} with AI analysis") except Exception as e: logger.error(f"❌ Failed to process batch {batch_idx + 1}: {e}") # Sort by AI score processed_properties.sort(key=lambda x: x.get('ai_score', 0), reverse=True) return processed_properties def process_property_batch_ai(self, properties_batch: List[Dict], viewed_properties: List[Dict], preferences: Dict, strategy: str) -> List[Dict]: """Process a batch of properties with AI analysis""" enhanced_batch = [] for prop in properties_batch: try: # Calculate AI score based on user preferences and behavior ai_score = self.calculate_ai_property_score(prop, viewed_properties, preferences, strategy) # Add AI insights prop['ai_score'] = ai_score prop['ai_match_reasons'] = self.generate_match_reasons(prop, preferences) prop['ai_recommendation_confidence'] = min(ai_score / 100, 1.0) enhanced_batch.append(prop) except Exception as e: logger.error(f"❌ Error processing property {prop.get('id', 'unknown')}: {e}") # Include property without AI enhancement prop['ai_score'] = 50 # Default score enhanced_batch.append(prop) return enhanced_batch def get_guaranteed_additional_recommendations(self, current_recommendations: List[Dict], viewed_properties: List[Dict], needed_count: int) -> List[Dict]: """Get additional recommendations with broader search criteria to guarantee minimum count""" logger.info(f"➕ Getting {needed_count} additional recommendations with broader criteria...") if self.properties_collection is None or needed_count <= 0: return [] try: # Get excluded IDs excluded_ids = set() for prop in current_recommendations + viewed_properties: prop_id = str(prop.get('id', prop.get('propertyId', ''))) if prop_id: excluded_ids.add(prop_id) # Use multiple broader queries in parallel broader_queries = [ "Properties with good amenities and location", "Affordable housing options with modern features", "Premium properties with luxury amenities", "Family-friendly properties with good connectivity", "Investment properties with high potential" ] additional_properties = [] with ThreadPoolExecutor(max_workers=3) as executor: future_to_query = { executor.submit( self.search_properties_with_query, query, excluded_ids, needed_count ): query for query in broader_queries } for future in as_completed(future_to_query): try: properties = future.result() additional_properties.extend(properties) if len(additional_properties) >= needed_count: break except Exception as e: logger.error(f"❌ Broader search failed: {e}") # Remove duplicates seen_ids = set() unique_additional = [] for prop in additional_properties: prop_id = str(prop.get('id', prop.get('propertyId', ''))) if prop_id and prop_id not in seen_ids and prop_id not in excluded_ids: seen_ids.add(prop_id) unique_additional.append(prop) if len(unique_additional) >= needed_count: break logger.info(f"➕ Added {len(unique_additional)} additional recommendations") return unique_additional[:needed_count] except Exception as e: logger.error(f"❌ Error getting additional recommendations: {e}") return [] def get_diverse_properties(self, current_recommendations: List[Dict], viewed_properties: List[Dict], needed_count: int) -> List[Dict]: """Get diverse properties to ensure variety in recommendations""" logger.info(f"🎲 Getting {needed_count} diverse properties...") if self.properties_collection is None or needed_count <= 0: return [] try: # Get all property IDs to exclude excluded_ids = set() for prop in current_recommendations + viewed_properties: prop_id = str(prop.get('id', prop.get('propertyId', ''))) if prop_id: excluded_ids.add(prop_id) # Get random selection from different property types diverse_queries = [ "Villa with luxury amenities", "Apartment with modern facilities", "House with family features", "Flat with good connectivity", "Commercial property with investment potential" ] diverse_properties = [] for query in diverse_queries: try: results = self.properties_collection.query( query_texts=[query], n_results=min(5, needed_count), include=["metadatas", "documents", "distances"] ) if results and results['metadatas']: for metadata in results['metadatas'][0]: prop_id = metadata.get('id', '') if prop_id not in excluded_ids: diverse_properties.append(metadata) excluded_ids.add(prop_id) if len(diverse_properties) >= needed_count: break if len(diverse_properties) >= needed_count: break except Exception as e: logger.error(f"❌ Error in diverse search with query '{query}': {e}") continue logger.info(f"🎲 Found {len(diverse_properties)} diverse properties") return diverse_properties[:needed_count] except Exception as e: logger.error(f"❌ Error getting diverse properties: {e}") return [] def get_emergency_fallback_properties(self, needed_count: int) -> List[Dict]: """Emergency fallback when all else fails - return any available properties""" logger.warning(f"🚨 Emergency fallback: getting {needed_count} any available properties") try: if self.properties_collection is None: return [] # Get any properties from ChromaDB results = self.properties_collection.query( query_texts=["property"], n_results=needed_count, include=["metadatas", "documents"] ) if results and results['metadatas']: properties = results['metadatas'][0] logger.info(f"🚨 Emergency fallback provided {len(properties)} properties") return properties except Exception as e: logger.error(f"❌ Emergency fallback failed: {e}") return [] def create_user_preference_query(self, viewed_properties: List[Dict], preferences: Dict, personality_type: str) -> str: """Create a comprehensive query based on user behavior and preferences""" query_parts = [] # Property types from user behavior property_types = set() for prop in viewed_properties: prop_type = prop.get('propertyTypeName', '') if prop_type: property_types.add(prop_type) if property_types: query_parts.append(f"Property types: {', '.join(property_types)}") # Price range from viewed properties prices = [p.get('price', 0) for p in viewed_properties if p.get('price', 0) > 0] if prices: min_price = min(prices) max_price = max(prices) avg_price = sum(prices) / len(prices) query_parts.append(f"Price range {min_price} to {max_price}, average {avg_price}") # Location preferences locations = set() for prop in viewed_properties: address = prop.get('address', '') if address: # Extract city/area from address parts = address.split(',') if parts: locations.add(parts[0].strip()) if locations: query_parts.append(f"Preferred locations: {', '.join(locations)}") # Specifications preferences beds_list = [p.get('beds', 0) for p in viewed_properties if p.get('beds', 0) > 0] if beds_list: avg_beds = sum(beds_list) / len(beds_list) query_parts.append(f"Preferred {avg_beds:.0f} bedrooms") # Personality-based preferences if personality_type == 'Decisive': query_parts.append("Premium properties, luxury features, move-in ready") elif personality_type == 'Research-Oriented': query_parts.append("Good value, detailed specifications, investment potential") elif personality_type == 'Cautious': query_parts.append("Established neighborhoods, reliable developers, good amenities") else: query_parts.append("Modern amenities, good connectivity, family-friendly") return '. '.join(query_parts) def search_similar_properties_chromadb(self, query: str, viewed_properties: List[Dict], min_results: int = 15) -> List[Dict]: """Search for similar properties in ChromaDB""" try: # Auto-initialize ChromaDB if not available if self.properties_collection is None: logger.warning("⚠️ ChromaDB properties collection not initialized, auto-initializing...") self.initialize_chromadb() # If still not available after init, try to fetch properties if self.properties_collection is None: logger.warning("⚠️ ChromaDB still not available, fetching properties...") self.auto_fetch_and_store_properties() # Final check if self.properties_collection is None: logger.error("❌ ChromaDB properties collection not available after auto-setup") return [] # Check if collection is empty and auto-populate try: collection_count = self.properties_collection.count() logger.info(f"📊 ChromaDB collection has {collection_count} properties") if collection_count == 0: logger.warning("⚠️ ChromaDB collection is empty, auto-fetching properties...") self.auto_fetch_and_store_properties() else: logger.info(f"✅ ChromaDB collection has {collection_count} properties - ready for recommendations") except Exception as count_error: logger.warning(f"⚠️ Collection access error: {count_error}") # Reinitialize the collection logger.info("🔄 Reinitializing ChromaDB collection...") self.initialize_chromadb() # Try to fetch properties anyway self.auto_fetch_and_store_properties() # Get IDs of viewed properties to exclude viewed_ids = set() for prop in viewed_properties: prop_id = str(prop.get('propertyId', prop.get('id', ''))) if prop_id: viewed_ids.add(prop_id) # Perform semantic search with proper result count collection_count = self.properties_collection.count() n_results = max(1, min(min_results * 2, collection_count, 100)) # Ensure n_results is at least 1 logger.info(f"🔍 Querying ChromaDB: '{query}' (requesting {n_results} results from {collection_count} total)") results = self.properties_collection.query( query_texts=[query], n_results=n_results, include=["metadatas", "documents", "distances"] ) # Convert results to property format similar_properties = [] if results['metadatas'] and results['metadatas'][0]: for i, metadata in enumerate(results['metadatas'][0]): # Skip viewed properties if metadata.get('id') in viewed_ids: continue # Safely extract metadata fields with fallbacks try: property_data = { 'id': metadata.get('id', ''), 'propertyId': metadata.get('id', ''), 'propertyName': metadata.get('propertyName', metadata.get('name', 'Unknown Property')), 'propertyTypeName': metadata.get('propertyTypeName', metadata.get('typeName', metadata.get('type', 'Property'))), 'price': float(metadata.get('price', 0) or metadata.get('marketValue', 0) or 0), 'marketValue': float(metadata.get('price', 0) or metadata.get('marketValue', 0) or 0), 'address': metadata.get('address', metadata.get('location', '')), 'location': metadata.get('address', metadata.get('location', '')), 'beds': int(metadata.get('beds', 0) or metadata.get('bedrooms', 0) or 0), 'baths': int(metadata.get('baths', 0) or metadata.get('bathrooms', 0) or 0), 'totalSquareFeet': float(metadata.get('sqft', 0) or metadata.get('area', 0) or 0), 'description': metadata.get('description', ''), 'features': self.safe_json_parse(metadata.get('features', '[]')), 'similarity_score': 1 - results['distances'][0][i], # Convert distance to similarity 'chromadb_rank': i + 1 } # Add any additional fields that might exist for key, value in metadata.items(): if key not in property_data: property_data[key] = value similar_properties.append(property_data) # Stop when we have enough if len(similar_properties) >= min_results: break except Exception as field_error: logger.warning(f"⚠️ Error processing metadata field: {field_error}") # Try to create a minimal property object try: minimal_property = { 'id': metadata.get('id', f'prop_{i}'), 'propertyId': metadata.get('id', f'prop_{i}'), 'propertyName': 'Property', 'propertyTypeName': 'Property', 'price': 0.0, 'marketValue': 0.0, 'address': '', 'location': '', 'beds': 0, 'baths': 0, 'totalSquareFeet': 0.0, 'description': '', 'features': [], 'similarity_score': 1 - results['distances'][0][i], 'chromadb_rank': i + 1 } similar_properties.append(minimal_property) except Exception as minimal_error: logger.error(f"❌ Failed to create minimal property: {minimal_error}") continue logger.info(f"🎯 ChromaDB found {len(similar_properties)} similar properties") return similar_properties except Exception as e: logger.error(f"❌ Error searching ChromaDB: {e}") return [] def auto_fetch_and_store_properties(self): """Automatically fetch and store properties in ChromaDB""" try: logger.info("🚀 Auto-fetching properties for ChromaDB...") # Fetch properties from backend success = self.fetch_all_properties_parallel( max_workers=5, page_size=100, max_pages=10 ) if success: logger.info("✅ Auto-fetched and stored properties in ChromaDB successfully") return True else: logger.warning("⚠️ Failed to auto-fetch from backend, using fallback properties...") # Create some fallback properties for testing fallback_properties = self.create_fallback_properties() fallback_success = self._store_properties_in_chromadb_parallel(fallback_properties) if fallback_success: logger.info(f"✅ Stored {len(fallback_properties)} fallback properties in ChromaDB") return True else: logger.error("❌ Failed to store fallback properties") return False except Exception as e: logger.error(f"❌ Error in auto-fetch and store: {e}") return False def create_fallback_properties(self) -> List[Dict]: """Create fallback properties for testing when backend is unavailable""" fallback_properties = [] property_types = ['Apartment', 'Villa', 'House', 'Condo', 'Townhouse'] locations = ['Mumbai', 'Delhi', 'Bangalore', 'Chennai', 'Hyderabad', 'Pune', 'Kolkata'] for i in range(50): # Create 50 fallback properties property_data = { 'propertyId': f'fallback_{i+1}', 'property_name': f'Fallback Property {i+1}', 'propertyType': property_types[i % len(property_types)], 'price': 1000000 + (i * 100000), # Varying prices 'location': locations[i % len(locations)], 'beds': (i % 4) + 1, 'baths': (i % 3) + 1, 'totalSquareFeet': 800 + (i * 50), 'description': f'Beautiful {property_types[i % len(property_types)]} in {locations[i % len(locations)]} with modern amenities', 'features': ['Parking', 'Security', 'Garden', 'Gym'][:(i % 4) + 1], 'yearBuilt': 2020 - (i % 10), 'propertyStatus': 'For Sale', 'listingDate': '2024-01-01', 'agent': f'Agent {i % 10 + 1}', 'is_fallback': True } fallback_properties.append(property_data) logger.info(f"✅ Created {len(fallback_properties)} fallback properties") return fallback_properties def _get_properties_with_multiple_queries(self, queries: List[str], viewed_properties: List[Dict], count: int) -> List[Dict]: """Helper function to get properties using multiple queries with fallback""" all_results = [] for query in queries: try: results = self.search_similar_properties_chromadb(query, viewed_properties, count) all_results.extend(results) if len(all_results) >= count: break except Exception as e: logger.warning(f"⚠️ Query '{query}' failed: {e}") continue # Remove duplicates while preserving order seen = set() unique_results = [] for prop in all_results: prop_id = str(prop.get('id', prop.get('propertyId', ''))) if prop_id not in seen: seen.add(prop_id) unique_results.append(prop) if len(unique_results) >= count: break # If still not enough, try generic queries if len(unique_results) < count: logger.info(f"🔄 Only {len(unique_results)} properties found, trying generic queries...") generic_queries = [ "modern property good amenities", "residential property parking security", "apartment villa house property", "property with good facilities" ] for query in generic_queries: try: results = self.search_similar_properties_chromadb(query, [], count) for prop in results: prop_id = str(prop.get('id', prop.get('propertyId', ''))) if prop_id not in seen and len(unique_results) < count: seen.add(prop_id) unique_results.append(prop) if len(unique_results) >= count: break except Exception as e: continue return unique_results[:count] def get_multi_ai_property_recommendations(self, analysis_data: Dict, ai_insights: Dict, email_type: str, count: int = 15) -> List[Dict]: """Get property recommendations using multiple AI models based on email type""" try: logger.info(f"🤖 Getting {email_type} recommendations using specialized AI model...") # Extract customer preferences and behavior patterns preferences = self.extract_customer_preferences(analysis_data, ai_insights) behavioral_patterns = self.analyze_behavioral_patterns(analysis_data, ai_insights) viewed_properties = analysis_data.get('data', {}).get('properties', []) # Use different AI strategies based on email type if email_type == 'property_based': return self._get_property_type_based_recommendations(preferences, viewed_properties, count) elif email_type == 'price_based': return self._get_price_based_recommendations(preferences, viewed_properties, count) elif email_type == 'location_based': return self._get_location_based_recommendations(preferences, viewed_properties, count) elif email_type == 'similarity_based': return self._get_similarity_based_recommendations(viewed_properties, preferences, count) elif email_type == 'behavioral_based': return self._get_behavioral_based_recommendations(behavioral_patterns, viewed_properties, count) elif email_type == 'premium_properties': return self._get_premium_properties_recommendations(preferences, count) elif email_type == 'budget_friendly': return self._get_budget_friendly_recommendations(preferences, count) elif email_type == 'trending_properties': return self._get_trending_properties_recommendations(preferences, count) elif email_type == 'family_oriented': return self._get_family_oriented_recommendations(preferences, count) elif email_type == 'investment_opportunities': return self._get_investment_opportunities_recommendations(preferences, ai_insights, count) else: # Fallback to general recommendations return self.get_enhanced_ai_recommendations(analysis_data, ai_insights, count) except Exception as e: logger.error(f"❌ Error in multi-AI property recommendations for {email_type}: {e}") return [] def _get_property_type_based_recommendations(self, preferences: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]: """AI Model 1: Property type preference analysis""" try: preferred_types = preferences.get('preferred_property_types', []) if not preferred_types: # Analyze viewed properties to infer preferences type_counts = {} for prop in viewed_properties: prop_type = prop.get('propertyType', '') if prop_type: type_counts[prop_type] = type_counts.get(prop_type, 0) + 1 preferred_types = [k for k, v in sorted(type_counts.items(), key=lambda x: x[1], reverse=True)] # Build queries based on property type preferences queries = [] if preferred_types: queries.extend([ f"property type {' or '.join(preferred_types[:3])} with modern amenities", f"{preferred_types[0]} property modern facilities" if preferred_types else "modern property", f"residential {preferred_types[0] if preferred_types else 'apartment'} premium amenities" ]) else: queries.extend([ "modern property with good amenities", "residential apartment villa house premium", "property with parking security amenities" ]) return self._get_properties_with_multiple_queries(queries, viewed_properties, count) except Exception as e: logger.error(f"❌ Error in property type based recommendations: {e}") return [] def _get_price_based_recommendations(self, preferences: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]: """AI Model 2: Price range optimization""" try: price_range = preferences.get('price_range', {}) min_price = price_range.get('min_price', 0) max_price = price_range.get('max_price', 0) # Build price-based queries queries = [] if min_price > 0 and max_price > 0: avg_price = (min_price + max_price) / 2 queries.extend([ f"property price range {min_price} to {max_price} rupees affordable good value", f"affordable property around {avg_price} rupees", f"budget property good value for money under {max_price}" ]) else: # Analyze viewed properties for price patterns prices = [prop.get('price', 0) for prop in viewed_properties if prop.get('price', 0) > 0] if prices: avg_price = sum(prices) / len(prices) queries.extend([ f"property around {avg_price} rupees good value for money", f"affordable property budget range", f"good value property reasonable price" ]) else: queries.extend([ "affordable property good value for money", "budget friendly property reasonable price", "economical property good investment" ]) return self._get_properties_with_multiple_queries(queries, viewed_properties, count) except Exception as e: logger.error(f"❌ Error in price based recommendations: {e}") return [] def _get_location_based_recommendations(self, preferences: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]: """AI Model 3: Location preference analysis""" try: preferred_locations = preferences.get('preferred_locations', []) if not preferred_locations: # Extract locations from viewed properties locations = [prop.get('location', '') for prop in viewed_properties if prop.get('location')] preferred_locations = list(set(locations)) queries = [] if preferred_locations: queries.extend([ f"property in {' or '.join(preferred_locations[:3])} good location connectivity", f"property near {preferred_locations[0] if preferred_locations else 'city center'} transport", f"residential property {preferred_locations[0] if preferred_locations else 'prime area'} amenities" ]) else: queries.extend([ "property in prime location good connectivity transport", "property near metro station bus transport", "property central location amenities nearby" ]) return self._get_properties_with_multiple_queries(queries, viewed_properties, count) except Exception as e: logger.error(f"❌ Error in location based recommendations: {e}") return [] def _get_similarity_based_recommendations(self, viewed_properties: List[Dict], preferences: Dict, count: int) -> List[Dict]: """AI Model 4: Semantic similarity analysis""" try: if not viewed_properties: queries = ["modern apartment villa house property", "residential property good amenities"] else: # Create queries based on viewed property characteristics most_viewed = viewed_properties[0] if viewed_properties else {} property_type = most_viewed.get('propertyType', 'apartment') location = most_viewed.get('location', '') beds = most_viewed.get('beds', 2) queries = [ f"similar to {property_type} in {location} with {beds} bedrooms modern amenities", f"{property_type} property modern facilities parking", f"residential {property_type} good amenities security" ] return self._get_properties_with_multiple_queries(queries, viewed_properties, count) except Exception as e: logger.error(f"❌ Error in similarity based recommendations: {e}") return [] def _get_behavioral_based_recommendations(self, behavioral_patterns: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]: """AI Model 5: Behavioral pattern analysis""" try: engagement_level = behavioral_patterns.get('engagement_level', 'medium') if engagement_level == 'high': queries = ["premium property luxury amenities high-end features", "luxury villa penthouse executive", "high-end property modern amenities"] elif engagement_level == 'low': queries = ["simple property basic amenities affordable", "budget property economical good value", "affordable residential property"] else: queries = ["balanced property good amenities reasonable price", "moderate property facilities parking", "residential property good value"] return self._get_properties_with_multiple_queries(queries, viewed_properties, count) except Exception as e: logger.error(f"❌ Error in behavioral based recommendations: {e}") return [] def _get_premium_properties_recommendations(self, preferences: Dict, count: int) -> List[Dict]: """AI Model 6: Premium property analysis""" try: queries = [ "luxury premium property high-end amenities villa penthouse executive", "premium villa luxury amenities swimming pool", "high-end property executive modern facilities" ] return self._get_properties_with_multiple_queries(queries, [], count) except Exception as e: logger.error(f"❌ Error in premium properties recommendations: {e}") return [] def _get_budget_friendly_recommendations(self, preferences: Dict, count: int) -> List[Dict]: """AI Model 7: Budget optimization analysis""" try: queries = [ "affordable budget friendly property good value money economical", "budget property affordable reasonable price", "economical property good investment value" ] return self._get_properties_with_multiple_queries(queries, [], count) except Exception as e: logger.error(f"❌ Error in budget friendly recommendations: {e}") return [] def _get_trending_properties_recommendations(self, preferences: Dict, count: int) -> List[Dict]: """AI Model 8: Market trend analysis""" try: queries = [ "trending popular property new development modern design latest", "new property modern design contemporary", "popular property development amenities" ] return self._get_properties_with_multiple_queries(queries, [], count) except Exception as e: logger.error(f"❌ Error in trending properties recommendations: {e}") return [] def _get_family_oriented_recommendations(self, preferences: Dict, count: int) -> List[Dict]: """AI Model 9: Family suitability analysis""" try: queries = [ "family friendly property multiple bedrooms school nearby children playground safety", "family property children school park nearby", "residential property family multiple bedrooms" ] return self._get_properties_with_multiple_queries(queries, [], count) except Exception as e: logger.error(f"❌ Error in family oriented recommendations: {e}") return [] def _get_investment_opportunities_recommendations(self, preferences: Dict, ai_insights: Dict, count: int) -> List[Dict]: """AI Model 10: Investment potential analysis""" try: queries = [ "investment property rental yield capital appreciation commercial residential ROI", "investment opportunity property rental income", "commercial property investment good returns" ] return self._get_properties_with_multiple_queries(queries, [], count) except Exception as e: logger.error(f"❌ Error in investment opportunities recommendations: {e}") return [] def enhance_recommendations_with_ai(self, similar_properties: List[Dict], viewed_properties: List[Dict], preferences: Dict, strategy: str) -> List[Dict]: """Enhance ChromaDB results with additional AI analysis""" try: enhanced_properties = [] for prop in similar_properties: # Calculate additional AI scores ai_score = self.calculate_comprehensive_ai_score(prop, viewed_properties, preferences, strategy) # Combine ChromaDB similarity with AI analysis chromadb_score = prop.get('similarity_score', 0.5) combined_score = (chromadb_score * 0.6) + (ai_score * 0.4) # Weight ChromaDB higher # Add AI-generated recommendation reason recommendation_reason = self.generate_advanced_recommendation_reason(prop, preferences, strategy) # Enhanced property data enhanced_prop = prop.copy() enhanced_prop.update({ 'ai_similarity_score': combined_score, 'chromadb_similarity': chromadb_score, 'ai_analysis_score': ai_score, 'recommendation_reason': recommendation_reason, 'recommendation_confidence': self.calculate_recommendation_confidence(prop, viewed_properties) }) enhanced_properties.append(enhanced_prop) # Sort by combined score enhanced_properties.sort(key=lambda x: x['ai_similarity_score'], reverse=True) logger.info(f"🧠 Enhanced {len(enhanced_properties)} properties with AI analysis") return enhanced_properties except Exception as e: logger.error(f"❌ Error enhancing recommendations: {e}") return similar_properties # Return original if enhancement fails def get_additional_recommendations(self, current_recommendations: List[Dict], viewed_properties: List[Dict], needed_count: int) -> List[Dict]: """Get additional recommendations to reach minimum count""" try: if needed_count <= 0: return [] # Ensure ChromaDB is properly initialized if self.properties_collection is None: logger.error("❌ ChromaDB properties collection not initialized, cannot get additional recommendations") return [] # Get excluded IDs excluded_ids = set() for prop in current_recommendations + viewed_properties: prop_id = str(prop.get('id', prop.get('propertyId', ''))) if prop_id: excluded_ids.add(prop_id) # Get additional properties using broader search results = self.properties_collection.query( query_texts=["modern property with good amenities"], # Generic query n_results=needed_count * 3, # Get extra for filtering include=["metadatas", "documents", "distances"] ) additional_properties = [] if results['metadatas'] and results['metadatas'][0]: for i, metadata in enumerate(results['metadatas'][0]): if metadata['id'] in excluded_ids: continue # Convert to property format property_data = { 'id': metadata['id'], 'propertyId': metadata['id'], 'propertyName': metadata['propertyName'], 'propertyTypeName': metadata['propertyTypeName'], 'price': metadata['price'], 'marketValue': metadata['price'], 'address': metadata['address'], 'beds': metadata['beds'], 'baths': metadata['baths'], 'totalSquareFeet': metadata['sqft'], 'description': metadata['description'], 'features': self.safe_json_parse(metadata['features']), 'ai_similarity_score': 0.3, # Lower score for generic recommendations 'recommendation_reason': "Popular choice with good features and location" } additional_properties.append(property_data) excluded_ids.add(metadata['id']) if len(additional_properties) >= needed_count: break logger.info(f"➕ Added {len(additional_properties)} additional recommendations") return additional_properties except Exception as e: logger.error(f"❌ Error getting additional recommendations: {e}") return [] def calculate_comprehensive_ai_score(self, property_item: Dict, viewed_properties: List, preferences: Dict, strategy: str) -> float: """Calculate comprehensive AI score for a property""" try: score = 0.0 # Base similarity from ChromaDB base_score = property_item.get('similarity_score', 0.5) score += base_score * 0.3 # Property type preference prop_type = property_item.get('propertyTypeName', '') viewed_types = [p.get('propertyTypeName', '') for p in viewed_properties] if prop_type in viewed_types: score += 0.2 # Price compatibility prop_price = property_item.get('price', 0) viewed_prices = [p.get('price', 0) for p in viewed_properties if p.get('price', 0) > 0] if viewed_prices and prop_price > 0: avg_viewed_price = sum(viewed_prices) / len(viewed_prices) price_diff = abs(prop_price - avg_viewed_price) / avg_viewed_price price_score = max(0, 1 - price_diff) # Higher score for similar prices score += price_score * 0.2 # Feature matching prop_features = property_item.get('features', []) if isinstance(prop_features, str): prop_features = self.safe_json_parse(prop_features) if prop_features else [] feature_match_count = 0 total_viewed_features = set() for viewed_prop in viewed_properties: viewed_features = viewed_prop.get('features', []) if isinstance(viewed_features, list): total_viewed_features.update(viewed_features) if total_viewed_features and prop_features: feature_match_count = len(set(prop_features) & total_viewed_features) feature_score = min(1.0, feature_match_count / len(total_viewed_features)) score += feature_score * 0.15 # Strategy bonus if strategy == 'aggressive_immediate' and prop_price > 15000000: score += 0.1 # Prefer premium properties elif strategy == 'nurturing_educational' and 5000000 <= prop_price <= 20000000: score += 0.1 # Prefer mid-range # Specification matching prop_beds = property_item.get('beds', 0) viewed_beds = [p.get('beds', 0) for p in viewed_properties if p.get('beds', 0) > 0] if viewed_beds and prop_beds > 0: avg_beds = sum(viewed_beds) / len(viewed_beds) beds_diff = abs(prop_beds - avg_beds) if beds_diff <= 1: # Within 1 bedroom score += 0.05 return min(score, 1.0) except Exception as e: logger.error(f"Error calculating AI score: {e}") return 0.5 # Default score def generate_advanced_recommendation_reason(self, property_item: Dict, preferences: Dict, strategy: str) -> str: """Generate advanced AI recommendation reason""" try: reasons = [] # Property type appeal prop_type = property_item.get('propertyTypeName', '') if prop_type: reasons.append(f"Excellent {prop_type.lower()} option") # Price positioning price = property_item.get('price', 0) if price > 20000000: reasons.append("premium location and features") elif price > 10000000: reasons.append("great value in mid-range segment") else: reasons.append("affordable with good potential") # Specification highlights beds = property_item.get('beds', 0) if beds >= 3: reasons.append("spacious family accommodation") elif beds == 2: reasons.append("perfect for couples or small families") # Location appeal address = property_item.get('address', '') if 'Gachibowli' in address or 'Hitech City' in address: reasons.append("prime IT corridor location") elif 'Banjara Hills' in address or 'Jubilee Hills' in address: reasons.append("prestigious neighborhood") else: reasons.append("well-connected area") # Strategy-specific reasons if strategy == 'aggressive_immediate': reasons.append("immediate possession available") elif strategy == 'nurturing_educational': reasons.append("excellent investment opportunity") else: reasons.append("suits your viewing preferences") return "Perfect match with " + ", ".join(reasons[:3]) except Exception as e: logger.error(f"Error generating recommendation reason: {e}") return "Recommended based on your preferences and AI analysis" def calculate_recommendation_confidence(self, property_item: Dict, viewed_properties: List) -> float: """Calculate confidence level for recommendation""" try: confidence = 0.5 # Base confidence # Higher confidence for properties similar to heavily viewed ones prop_type = property_item.get('propertyTypeName', '') type_views = sum(1 for p in viewed_properties if p.get('propertyTypeName') == prop_type) if type_views > 0: confidence += min(0.3, type_views * 0.1) # Price range confidence prop_price = property_item.get('price', 0) viewed_prices = [p.get('price', 0) for p in viewed_properties if p.get('price', 0) > 0] if viewed_prices and prop_price > 0: avg_price = sum(viewed_prices) / len(viewed_prices) price_variance = abs(prop_price - avg_price) / avg_price if price_variance < 0.2: # Within 20% of average confidence += 0.2 return min(confidence, 1.0) except Exception as e: logger.error(f"Error calculating confidence: {e}") return 0.5 def score_properties_with_ai(self, all_properties: List, viewed_properties: List, preferences: Dict, strategy: str) -> List[Dict]: """Score properties using AI analysis""" scored_properties = [] # Get viewed property IDs to exclude viewed_ids = set() for p in viewed_properties: if isinstance(p, dict): viewed_ids.add(str(p.get('propertyId', ''))) for i, prop in enumerate(all_properties): try: # Ensure property is a dictionary if not isinstance(prop, dict): logger.warning(f"Property {i} is not a dictionary: {type(prop)}") continue # Skip already viewed properties prop_id = str(prop.get('propertyId', prop.get('id', ''))) if prop_id in viewed_ids: continue # Calculate AI-based similarity score score = self.calculate_ai_similarity_score(prop, viewed_properties, preferences, strategy) if score > 0.3: # Minimum threshold prop_copy = prop.copy() prop_copy['ai_similarity_score'] = score prop_copy['recommendation_reason'] = self.generate_recommendation_reason(prop, preferences, strategy) scored_properties.append(prop_copy) except Exception as e: logger.warning(f"Error processing property {i}: {e}") continue # Sort by AI score scored_properties.sort(key=lambda x: x['ai_similarity_score'], reverse=True) return scored_properties def calculate_ai_similarity_score(self, property_item: Dict, viewed_properties: List, preferences: Dict, strategy: str) -> float: """Calculate similarity score using AI analysis""" score = 0.0 # Property type preference (30% weight) preferred_types = dict(preferences.get('preferred_types', [])) prop_type = property_item.get('propertyTypeName', '') or property_item.get('typeName', '') if prop_type in preferred_types: score += 0.3 * (preferred_types[prop_type] / max(preferred_types.values())) # Price similarity (25% weight) viewed_prices = [p.get('price', 0) for p in viewed_properties if p.get('price', 0) > 0] if viewed_prices: avg_viewed_price = sum(viewed_prices) / len(viewed_prices) # Try multiple price field names prop_price = property_item.get('price', 0) or property_item.get('marketValue', 0) or property_item.get('amount', 0) if prop_price > 0: price_ratio = min(prop_price, avg_viewed_price) / max(prop_price, avg_viewed_price) score += 0.25 * price_ratio # Strategy-based scoring (25% weight) strategy_bonus = self.get_strategy_bonus(property_item, strategy) score += 0.25 * strategy_bonus # Feature similarity using AI (20% weight) feature_score = self.calculate_feature_similarity_ai(property_item, viewed_properties) score += 0.2 * feature_score return min(score, 1.0) def get_strategy_bonus(self, property_item: Dict, strategy: str) -> float: """Get bonus score based on recommendation strategy""" price = property_item.get('price', 0) if strategy == 'aggressive_immediate': # Prefer premium properties for hot leads return 1.0 if price > 15000000 else 0.5 elif strategy == 'nurturing_educational': # Prefer mid-range with good value return 1.0 if 5000000 <= price <= 20000000 else 0.7 elif strategy == 'engagement_building': # Prefer variety and attractive options return 0.8 # Neutral bonus else: # awareness_creation # Prefer budget-friendly options return 1.0 if price < 10000000 else 0.3 def calculate_feature_similarity_ai(self, property_item: Dict, viewed_properties: List) -> float: """Calculate feature similarity using AI embeddings""" try: # Create text description of the property prop_text = self.create_property_description(property_item) # Get embeddings for current property prop_embedding = self.get_property_embeddings(prop_text) # Calculate similarity with viewed properties similarities = [] for viewed_prop in viewed_properties: viewed_text = self.create_property_description(viewed_prop) viewed_embedding = self.get_property_embeddings(viewed_text) # Calculate cosine similarity similarity = cosine_similarity([prop_embedding], [viewed_embedding])[0][0] similarities.append(similarity) return max(similarities) if similarities else 0.0 except Exception as e: logger.warning(f"⚠️ Feature similarity calculation failed: {e}") return 0.5 # Default similarity def create_property_description(self, property_item: Dict) -> str: """Create a text description of property for AI analysis""" name = property_item.get('propertyName', 'Property') prop_type = property_item.get('propertyTypeName', 'Residential') price = property_item.get('price', 0) description = f"{name} is a {prop_type} property priced at ₹{price:,.0f}" # Add more features if available if property_item.get('locationName'): description += f" located in {property_item['locationName']}" return description def generate_recommendation_reason(self, property_item: Dict, preferences: Dict, strategy: str) -> str: """Generate AI-powered recommendation reason""" reasons = [] # Property type match preferred_types = dict(preferences.get('preferred_types', [])) prop_type = property_item.get('propertyTypeName', '') if prop_type in preferred_types: reasons.append(f"Matches your preference for {prop_type} properties") # Price consideration price = property_item.get('price', 0) if strategy == 'aggressive_immediate': reasons.append("Premium property perfect for immediate purchase") elif strategy == 'nurturing_educational': reasons.append("Great value proposition with excellent features") elif strategy == 'engagement_building': reasons.append("Interesting option to explore based on your browsing pattern") else: reasons.append("Budget-friendly option worth considering") return '; '.join(reasons) if reasons else "Recommended based on your viewing history" def filter_recommendations(self, scored_properties: List, viewed_properties: List) -> List[Dict]: """Filter and refine recommendations""" # Remove duplicates and ensure variety filtered = [] seen_types = set() price_ranges = [] for prop in scored_properties: prop_type = prop.get('propertyTypeName', 'Unknown') price = prop.get('price', 0) # Ensure variety in property types (max 3 per type) type_count = sum(1 for p in filtered if p.get('propertyTypeName') == prop_type) if type_count >= 3: continue # Ensure price variety price_range = self.get_price_range(price) range_count = sum(1 for p in filtered if self.get_price_range(p.get('price', 0)) == price_range) if range_count >= 4: # Max 4 properties per price range continue filtered.append(prop) if len(filtered) >= 10: # Limit recommendations break return filtered def get_price_range(self, price: float) -> str: """Categorize price into ranges""" if price < 5000000: return "budget" elif price < 15000000: return "mid-range" elif price < 30000000: return "premium" else: return "luxury" def generate_personalized_email(self, user_analysis: Dict, ai_insights: Dict, recommendations: List[Dict], recipient_email: str, email_type: str = None) -> Dict: """Generate personalized email using AI insights""" logger.info(f"📧 Generating personalized email for {recipient_email}") try: # Extract key information customer_id = user_analysis.get('customer_id') lead_status = user_analysis.get('data', {}).get('lead_qualification', {}).get('lead_status', 'UNKNOWN') personality_type = ai_insights.get('ai_personality_type', 'Valued Customer') buying_motivation = ai_insights.get('buying_motivation', 'finding the perfect property') urgency_level = ai_insights.get('urgency_level', 'Medium') # Generate personalized content email_content = self.create_email_content( customer_id, lead_status, personality_type, buying_motivation, urgency_level, recommendations, email_type ) # For preview mode, don't actually send the email if recipient_email == 'preview@example.com': return { 'success': True, 'recipient': recipient_email, 'subject': email_content['subject'], 'html_content': email_content['html_content'], 'text_content': email_content['text_content'], 'recommendations_count': len(recommendations), 'personalization': { 'personality_type': personality_type, 'buying_motivation': buying_motivation, 'urgency_level': urgency_level } } # Send email for real recipients success = self.send_email(recipient_email, email_content) return { 'success': success, 'recipient': recipient_email, 'subject': email_content['subject'], 'html_content': email_content['html_content'], 'text_content': email_content['text_content'], 'recommendations_count': len(recommendations), 'personalization': { 'personality_type': personality_type, 'buying_motivation': buying_motivation, 'urgency_level': urgency_level } } except Exception as e: logger.error(f"❌ Error generating email: {e}") return {'success': False, 'error': str(e)} def create_email_content(self, customer_id: int, lead_status: str, personality_type: str, buying_motivation: str, urgency_level: str, recommendations: List[Dict], email_type: str = None) -> Dict: """Create personalized email content based on AI insights""" # Personalized greeting greeting = self.get_personalized_greeting(personality_type, lead_status) # Subject line based on urgency, personality, and email type subject = self.generate_subject_line(personality_type, urgency_level, len(recommendations), email_type) # Prepare recommendations for email - ENSURE MINIMUM 10 PROPERTIES recommendations_to_show = recommendations[:10] if len(recommendations) >= 10 else recommendations logger.info(f"📧 Including {len(recommendations_to_show)} properties in email (target: 10 minimum)") # If we have less than 10, log a warning but still proceed if len(recommendations_to_show) < 10: logger.warning(f"⚠️ Only {len(recommendations_to_show)} properties available for email (target was 10)") # Main content html_content = f"""

🏠 Personalized Property Recommendations

AI-Curated Properties Just for You

{greeting}

🤖 AI Analysis Insights

Your Profile: {personality_type}

Buying Motivation: {buying_motivation}

Current Status: {lead_status} Lead

Based on your recent property viewing activity and our AI analysis, we've curated these perfect matches for you:

🎯 {len(recommendations_to_show)} Personalized Property Recommendations

Handpicked by our AI based on your preferences and behavior

🏠 Your Property Matches

""" # Add property recommendations - already defined above for i, prop in enumerate(recommendations_to_show, 1): # Try multiple price field names price = prop.get('price', 0) or prop.get('marketValue', 0) or prop.get('amount', 0) logger.info(f"Property {i} price fields - price: {prop.get('price')}, marketValue: {prop.get('marketValue')}, amount: {prop.get('amount')}") if price == 0: # If still no price, try to extract from other fields price_text = prop.get('priceText', '') or prop.get('priceDisplay', '') if price_text: # Try to extract numeric value from price text import re price_match = re.findall(r'[\d,]+', str(price_text).replace(',', '')) if price_match: try: price = int(price_match[0]) except: price = 0 # Extract comprehensive property details name = prop.get('propertyName', '') or prop.get('name', '') or prop.get('title', '') or f'Premium Property {i}' prop_type = prop.get('propertyTypeName', '') or prop.get('typeName', '') or prop.get('type', '') or 'Residential' description = prop.get('description', '') or prop.get('desc', '') or 'Excellent property opportunity with modern amenities and prime location.' location = prop.get('address', '') or prop.get('location', '') or prop.get('area', '') or 'Prime Location' # Property specifications beds = prop.get('beds', 0) or prop.get('bedrooms', 0) or prop.get('bhk', 0) baths = prop.get('baths', 0) or prop.get('bathrooms', 0) or prop.get('bath', 0) sqft = prop.get('totalSquareFeet', 0) or prop.get('area', 0) or prop.get('sqft', 0) or prop.get('size', 0) # Features features = prop.get('features', []) or prop.get('amenities', []) or [] if isinstance(features, str): features = [f.strip() for f in features.split(',') if f.strip()] elif not features: # Default features based on property type if 'villa' in prop_type.lower(): features = ['Parking', 'Garden', 'Security', 'Swimming Pool'] elif 'flat' in prop_type.lower() or 'apartment' in prop_type.lower(): features = ['Lift', 'Security', 'Parking', 'Gym'] else: features = ['Parking', 'Security', 'CCTV', 'Power Backup'] # Create property link property_id = prop.get('id', '') or prop.get('propertyId', '') or f'prop_{i}' property_link = f"https://your-property-website.com/property/{property_id}" reason = prop.get('recommendation_reason', 'Recommended based on your viewing patterns and preferences') # Format price display if price > 0: price_display = f"₹{price:,.0f}" else: price_display = "Price on Request" # Format specifications specs_html = "" if beds > 0: specs_html += f" {beds} Bed{'s' if beds > 1 else ''}" if baths > 0: specs_html += f" {baths} Bath{'s' if baths > 1 else ''}" if sqft > 0: specs_html += f" {sqft:,.0f} sq ft" # Format features features_html = "" if features: top_features = features[:4] # Show top 4 features features_html = "".join([f"{feature}" for feature in top_features]) if len(features) > 4: features_html += f"+{len(features) - 4} more" html_content += f"""

{name}

{price_display}
{prop_type} {location}
{f'
{specs_html}
' if specs_html else ''}

{description}

{f'
{features_html}
' if features_html else ''}
AI Recommendation: {reason}
""" # Add call-to-action based on urgency cta_text = self.get_cta_text(urgency_level) html_content += f"""
{cta_text}

💡 AI Recommendation

{self.get_ai_recommendation_text(personality_type, urgency_level)}

Our AI system analyzed your viewing patterns, preferences, and behavior to find these properties. Each recommendation is personally curated for your specific needs.

Questions? Reply to this email or call us directly. We're here to help you find your perfect property!

""" return { 'subject': subject, 'html_content': html_content, 'text_content': self.html_to_text(html_content) } def get_personalized_greeting(self, personality_type: str, lead_status: str) -> str: """Generate personalized greeting""" if personality_type == "Decisive Decision Maker": return "Hello, Decisive Property Investor!" elif personality_type == "Research-Oriented Buyer": return "Dear Informed Property Seeker," elif personality_type == "Cautious Evaluator": return "Hello, Thoughtful Property Explorer," else: return "Dear Valued Property Browser," def generate_subject_line(self, personality_type: str, urgency_level: str, count: int, email_type: str = None) -> str: """Generate compelling subject line""" # Use email type specific subjects if available if email_type: type_subjects = { 'property_based': f"🏠 {count} Property-Based Recommendations Just for You", 'price_based': f"💰 {count} Price-Based Property Matches Within Your Budget", 'location_based': f"📍 {count} Location-Based Properties in Your Preferred Areas", 'similarity_based': f"🔍 {count} Properties Similar to Your Viewed Listings", 'behavioral_based': f"🧠 {count} Behavioral-Based Property Recommendations", 'premium_properties': f"⭐ {count} Premium Luxury Properties Just for You", 'budget_friendly': f"💵 {count} Budget-Friendly Value Properties", 'trending_properties': f"📈 {count} Trending Properties in Today's Market", 'family_oriented': f"👨‍👩‍👧‍👦 {count} Family-Oriented Properties with Great Amenities", 'investment_opportunities': f"💼 {count} Investment Properties with High ROI Potential" } if email_type in type_subjects: return type_subjects[email_type] # Fallback to original logic if urgency_level == "High": return f"🔥 {count} Hot Properties - Perfect Match for You!" elif personality_type == "Decisive Decision Maker": return f"⚡ {count} Premium Properties - Ready for Immediate Purchase" elif personality_type == "Research-Oriented Buyer": return f"📊 {count} Carefully Selected Properties + Detailed Analysis" else: return f"🏠 {count} Personalized Property Recommendations Just for You" def get_cta_text(self, urgency_level: str) -> str: """Get call-to-action text based on urgency""" if urgency_level == "High": return "🔥 View Properties Now - Limited Time!" elif urgency_level == "Medium": return "📞 Schedule Viewing Today" else: return "💭 Explore These Options" def get_ai_recommendation_text(self, personality_type: str, urgency_level: str) -> str: """Get AI recommendation text""" if personality_type == "Decisive Decision Maker": return "Our AI suggests focusing on 1-2 properties for immediate action. Your profile indicates you prefer quick decisions with clear value propositions." elif personality_type == "Research-Oriented Buyer": return "Our AI recommends comparing these properties in detail. We can provide additional market analysis and property reports to support your research." else: return "Our AI suggests exploring these options at your own pace. Each property offers unique value that aligns with your viewing patterns." def html_to_text(self, html_content: str) -> str: """Convert HTML to plain text""" # Simple HTML to text conversion import re text = re.sub('<[^<]+?>', '', html_content) text = re.sub(r'\s+', ' ', text) return text.strip() def send_email(self, recipient_email: str, email_content: Dict) -> bool: """Send personalized email with SendGrid support""" try: logger.info(f"📧 Attempting to send real email to {recipient_email}") # Real email sending - always attempt SendGrid import requests # SendGrid API endpoint url = "https://api.sendgrid.com/v3/mail/send" # Prepare email data email_data = { "personalizations": [ { "to": [ { "email": recipient_email } ] } ], "from": { "email": self.email_config['sender_email'] }, "subject": email_content['subject'], "content": [ { "type": "text/plain", "value": email_content['text_content'] }, { "type": "text/html", "value": email_content['html_content'] } ] } # Send email with SendGrid API headers = { "Authorization": f"Bearer {self.email_config['sendgrid_api_key']}", "Content-Type": "application/json" } response = requests.post(url, json=email_data, headers=headers) if response.status_code == 202: logger.info(f"✅ SendGrid email sent successfully to {recipient_email}") return True elif (response.status_code == 403 and "messaging limits" in response.text) or \ (response.status_code == 401 and "Maximum credits exceeded" in response.text): logger.warning(f"⚠️ SendGrid limits/credits exceeded - trying SMTP fallback...") # Try SMTP fallback first if self._send_via_smtp(recipient_email, email_content): logger.info(f"✅ SMTP fallback successful for {recipient_email}") return True else: # Save email to file if SMTP also fails logger.info("📁 Both SendGrid and SMTP failed - saving email to file...") self._save_email_to_file(recipient_email, email_content) return True # Return True since email was "processed" (saved to file) else: logger.error(f"❌ SendGrid API error: {response.status_code} - {response.text}") # Save email to file for manual sending logger.info("📁 Saving email to file for manual sending...") # Try SMTP fallback if self._send_via_smtp(recipient_email, email_content): logger.info(f"✅ SMTP fallback successful for {recipient_email}") return True else: # Save to file as last resort self._save_email_to_file(recipient_email, email_content) return False except Exception as e: logger.error(f"❌ Failed to send SendGrid email to {recipient_email}: {e}") # Save email to file for manual sending instead of using mock service logger.info("📁 Saving email to file for manual sending...") try: self._save_email_to_file(recipient_email, email_content) except Exception as save_error: logger.error(f"❌ Failed to save email to file: {save_error}") # Return True anyway since email generation was successful return True return False def _send_mock_email(self, recipient_email: str, email_content: Dict) -> bool: """Mock email service for environments without network access""" try: logger.info(f"📧 Mock email service - Logging email content for {recipient_email}") # Log email details logger.info(f"📧 Email Subject: {email_content['subject']}") logger.info(f"📧 Email From: {self.email_config['sender_email']}") logger.info(f"📧 Email To: {recipient_email}") # Log a preview of the email content text_preview = email_content['text_content'][:500] + "..." if len(email_content['text_content']) > 500 else email_content['text_content'] logger.info(f"📧 Email Text Preview: {text_preview}") # Log the number of properties in the email if 'html_content' in email_content: property_count = email_content['html_content'].count('property-card') logger.info(f"📧 Email contains {property_count} property recommendations") # Save email content to a file for manual sending self._save_email_to_file(recipient_email, email_content) logger.info(f"✅ Mock email logged successfully for {recipient_email}") logger.info(f"📧 Email would be sent with {len(email_content.get('html_content', ''))} characters of HTML content") return True except Exception as e: logger.error(f"❌ Failed to log mock email for {recipient_email}: {e}") return False def _save_email_to_file(self, recipient_email: str, email_content: Dict): """Save email content to a file for manual sending""" try: import os from datetime import datetime # Create emails directory if it doesn't exist - use current directory instead of /tmp emails_dir = "./saved_emails" os.makedirs(emails_dir, exist_ok=True) # Create filename with timestamp and email type info timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Extract email type from subject if possible subject = email_content.get('subject', 'email') email_type = 'unknown' if 'Property-Based' in subject or 'property type' in subject.lower(): email_type = 'property_based' elif 'Price-Based' in subject or 'budget' in subject.lower(): email_type = 'price_based' elif 'Location-Based' in subject or 'location' in subject.lower(): email_type = 'location_based' elif 'Premium' in subject or 'luxury' in subject.lower(): email_type = 'premium' elif 'Budget' in subject or 'affordable' in subject.lower(): email_type = 'budget_friendly' elif 'Family' in subject or 'family' in subject.lower(): email_type = 'family_oriented' elif 'Investment' in subject or 'investment' in subject.lower(): email_type = 'investment' elif 'Trending' in subject or 'trending' in subject.lower(): email_type = 'trending' filename = f"{emails_dir}/{email_type}_{timestamp}.html" # Create complete HTML email with better styling full_html = f""" {email_content['subject']}
📧 EMAIL SAVED DUE TO SENDGRID LIMITS - READY FOR MANUAL REVIEW

📧 Email Details

📧 To: {recipient_email}

📝 Subject: {email_content['subject']}

🕒 Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

📋 Type: {email_type.replace('_', ' ').title()}

📁 File: {filename}

{email_content['html_content']}
""" # Save to file with open(filename, 'w', encoding='utf-8') as f: f.write(full_html) logger.info(f"📁 Email saved to file: {filename}") logger.info(f"💡 Open this file in your browser to view the beautiful email content") logger.info(f"🎯 Email type: {email_type.replace('_', ' ').title()}") except Exception as e: logger.error(f"❌ Failed to save email to file: {e}") def _send_via_smtp(self, recipient_email: str, email_content: Dict) -> bool: """Send email via Gmail SMTP with authentication""" try: import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart # Gmail SMTP configuration smtp_server = 'smtp.gmail.com' smtp_port = 587 smtp_username = 'sameermujahid7777@gmail.com' smtp_password = 'wpgn fmut nbkt mdvw' # App password sender_email = 'shaiksameermujahid@gmail.com' # Extract email components subject = email_content.get('subject', 'Property Recommendations') html_content = email_content.get('html_content', '') text_content = email_content.get('text_content', '') # Create message msg = MIMEMultipart() msg['From'] = sender_email msg['To'] = recipient_email msg['Subject'] = subject # Attach HTML content msg.attach(MIMEText(html_content, 'html')) # Attach text content if provided if text_content: msg.attach(MIMEText(text_content, 'plain')) # Send email via Gmail SMTP server = smtplib.SMTP(smtp_server, smtp_port) server.starttls() # Enable TLS server.login(smtp_username, smtp_password) # Send email text = msg.as_string() server.sendmail(sender_email, recipient_email, text) server.quit() logger.info(f"✅ Gmail SMTP email sent successfully to {recipient_email}") return True except Exception as e: logger.error(f"❌ Gmail SMTP error: {e}") return False def process_customer_with_ai(self, customer_id: int, recipient_email: str, analysis_data: Dict) -> Dict: """Complete AI processing pipeline for a customer""" logger.info(f"🤖 Starting AI processing for customer {customer_id}") try: # Step 1: AI behavior analysis ai_insights = self.analyze_user_behavior_with_ai(analysis_data) # Step 2: Find similar properties using AI recommendations = self.find_similar_properties_ai(analysis_data, ai_insights) # Step 3: Generate and send personalized email email_result = self.generate_personalized_email( analysis_data, ai_insights, recommendations, recipient_email ) # Return comprehensive results return { 'success': True, 'customer_id': customer_id, 'ai_insights': ai_insights, 'recommendations': recommendations, 'email_result': email_result, 'processing_time': datetime.now().isoformat() } except Exception as e: logger.error(f"❌ AI processing failed for customer {customer_id}: {e}") return { 'success': False, 'customer_id': customer_id, 'error': str(e) } def fetch_properties_page(self, page: int, max_retries: int = 3) -> List[Dict]: """Fetch a single page of properties with retry logic""" for attempt in range(max_retries): try: # Use the correct API endpoint with proper parameters url = f"{self.properties_api}" params = { 'pageNumber': page, 'pageSize': 100 } response = requests.get(url, params=params, verify=False, timeout=None) if response.status_code == 200: data = response.json() # Handle different response structures if isinstance(data, list): properties = data elif isinstance(data, dict): if 'data' in data: properties = data['data'] elif 'properties' in data: properties = data['properties'] elif 'results' in data: properties = data['results'] else: properties = list(data.values()) if data else [] else: properties = [] if properties: logger.info(f"✅ Page {page}: {len(properties)} properties") return properties else: logger.info(f"📄 Page {page} is empty - reached end") return [] else: logger.warning(f"⚠️ Page {page} returned status {response.status_code}") if attempt < max_retries - 1: time.sleep(2) # Wait before retry continue return [] except Exception as e: logger.error(f"❌ Error fetching page {page}, attempt {attempt + 1}: {e}") if attempt < max_retries - 1: time.sleep(2) # Wait before retry continue return [] return [] def fetch_all_properties_parallel(self, max_workers: int = 50, page_size: int = 600, max_pages: int = 20) -> bool: """ULTRA-ENHANCED parallel property fetch optimized for maximum speed and 600+ properties""" logger.info(f"🚀 Starting ULTRA-ENHANCED parallel property fetch: {max_workers} workers, {page_size} per page, max {max_pages} pages") # Validate parameters if page_size <= 0: logger.error("❌ Invalid page_size: must be greater than 0") return False if max_workers <= 0: logger.error("❌ Invalid max_workers: must be greater than 0") return False if max_pages <= 0: logger.error("❌ Invalid max_pages: must be greater than 0") return False try: # Enhanced total count detection with multiple fallbacks total_count = self._get_total_property_count() # Calculate optimal pages needed if total_count > 0: estimated_pages = (total_count // page_size) + 1 max_pages = min(estimated_pages, max_pages) logger.info(f"📄 Optimized pages needed: {estimated_pages}, using max: {max_pages}") else: logger.info(f"📄 Using default max_pages: {max_pages}") # Enhanced parallel fetching with better error handling and progress tracking all_properties = [] failed_pages = [] # ULTRA-ENHANCED parallel fetching with maximum concurrency with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all page fetch tasks simultaneously for maximum speed futures = {} for page in range(1, max_pages + 1): future = executor.submit(self._fetch_property_page_enhanced, page, page_size) futures[future] = page # Use future as key, page as value logger.info(f"🚀 Submitted {len(futures)} parallel fetch tasks with {max_workers} workers for ultra-fast processing") # Collect results as they complete with enhanced progress tracking completed_pages = 0 for future in concurrent.futures.as_completed(futures.keys()): page = futures[future] # Get the page number from the future try: properties = future.result() # No timeout - let it take as long as needed completed_pages += 1 if properties: all_properties.extend(properties) progress = (completed_pages / max_pages) * 100 logger.info(f"✅ Page {page}: {len(properties)} properties fetched (Total: {len(all_properties)}, Progress: {progress:.1f}%)") else: progress = (completed_pages / max_pages) * 100 logger.info(f"📄 Page {page}: No properties found (Progress: {progress:.1f}%)") if page > 1: # Don't stop on first page break except Exception as e: completed_pages += 1 progress = (completed_pages / max_pages) * 100 logger.error(f"❌ Error fetching page {page}: {e} (Progress: {progress:.1f}%)") failed_pages.append(page) continue # Retry failed pages with exponential backoff if failed_pages: logger.info(f"🔄 Retrying {len(failed_pages)} failed pages...") retry_properties = self._retry_failed_pages(failed_pages, page_size, max_workers) all_properties.extend(retry_properties) logger.info(f"📦 Total properties fetched: {len(all_properties)} (Failed pages: {len(failed_pages)})") # Enhanced ChromaDB storage with parallel processing if all_properties: success = self._store_properties_in_chromadb_parallel(all_properties) if success: logger.info(f"💾 Successfully stored {len(all_properties)} properties in ChromaDB") # Schedule next update in 24 hours self._schedule_property_update() return True else: logger.error("❌ Failed to store properties in ChromaDB") return False else: logger.warning("⚠️ No properties fetched") return False except Exception as e: logger.error(f"❌ Error in enhanced parallel property fetch: {e}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return False def _get_total_property_count(self) -> int: """Enhanced method to get total property count with multiple fallbacks""" try: # Try multiple endpoints and response structures endpoints = [ f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails", f"{self.config['backend_url']}/api/Property/count", f"{self.config['backend_url']}/api/Property/total" ] for endpoint in endpoints: try: response = requests.get( endpoint, params={'pageNumber': 1, 'pageSize': 1}, timeout=30, headers=self.config.get('headers', {}) ) if response.status_code == 200: data = response.json() # Try multiple possible field names for total count count_fields = ['totalCount', 'total', 'count', 'totalRecords', 'recordCount'] for field in count_fields: if isinstance(data, dict) and field in data: total_count = data.get(field, 0) logger.info(f"📊 Total properties found via {endpoint}: {total_count}") return total_count # If no count field found, estimate from first page if isinstance(data, list) and len(data) > 0: estimated_count = len(data) * 50 # Conservative estimate logger.info(f"📊 Estimated total properties from {endpoint}: {estimated_count}") return estimated_count except Exception as e: logger.debug(f"⚠️ Failed to get count from {endpoint}: {e}") continue logger.warning("⚠️ Could not determine total property count, using default") return 0 except Exception as e: logger.error(f"❌ Error getting total property count: {e}") return 0 def _fetch_property_page_enhanced(self, page: int, page_size: int) -> List[Dict]: """Enhanced property page fetching with better error handling and retry logic""" max_retries = 3 retry_delay = 2 for attempt in range(max_retries): try: logger.info(f"📄 Fetching page {page} with {page_size} properties (attempt {attempt + 1}/{max_retries})...") # Log the exact URL and parameters being sent url = f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails" params = {'pageNumber': page, 'pageSize': page_size} logger.info(f"🌐 Making request to: {url}") logger.info(f"📋 Parameters: {params}") response = requests.get( url, params=params, headers=self.config.get('headers', {}) ) if response.status_code == 200: data = response.json() # Log the raw response structure logger.info(f"📥 Raw response type: {type(data)}") if isinstance(data, dict): logger.info(f"📥 Response keys: {list(data.keys())}") # Handle different response structures if isinstance(data, list): properties = data logger.info(f"📥 Direct list response with {len(properties)} properties") elif isinstance(data, dict): # Try different possible field names for field in ['data', 'properties', 'items', 'results']: if field in data and isinstance(data[field], list): properties = data[field] logger.info(f"📥 Found properties in '{field}' field: {len(properties)} properties") break else: properties = [] logger.warning(f"⚠️ No properties found in response data") else: properties = [] logger.warning(f"⚠️ Unexpected response type: {type(data)}") # Clean and validate properties cleaned_properties = [] for prop in properties: if isinstance(prop, dict) and (prop.get('id') or prop.get('propertyId')): cleaned_prop = self._clean_property_data(prop) cleaned_properties.append(cleaned_prop) logger.info(f"✅ Page {page}: {len(cleaned_properties)} valid properties fetched (requested: {page_size})") return cleaned_properties elif response.status_code == 404: logger.info(f"📄 Page {page}: No more data (404)") return [] else: logger.warning(f"⚠️ Page {page}: HTTP {response.status_code}") except requests.exceptions.Timeout: logger.warning(f"⏰ Page {page}: Timeout on attempt {attempt + 1}") if attempt < max_retries - 1: time.sleep(retry_delay * (2 ** attempt)) # Exponential backoff continue except Exception as e: logger.error(f"❌ Page {page}: Error on attempt {attempt + 1}: {e}") if attempt < max_retries - 1: time.sleep(retry_delay * (2 ** attempt)) continue logger.error(f"❌ Page {page}: All attempts failed") return [] def _fetch_property_page(self, page: int, page_size: int) -> List[Dict]: """Legacy method - now calls enhanced version""" return self._fetch_property_page_enhanced(page, page_size) def calculate_ai_property_score(self, property_data: Dict, viewed_properties: List[Dict], preferences: Dict, strategy: str) -> float: """Calculate AI-based property score for recommendations""" score = 0.0 try: # Base score score = 50.0 # Property type preference prop_type = property_data.get('propertyTypeName', '').lower() preferred_types_raw = preferences.get('preferred_types', []) # Handle both strings and tuples in preferred_types preferred_types = [] for item in preferred_types_raw: if isinstance(item, tuple): preferred_types.append(str(item[0]).lower() if item else '') elif isinstance(item, str): preferred_types.append(item.lower()) else: preferred_types.append(str(item).lower()) if prop_type in preferred_types: score += 20 # Price range matching prop_price = float(property_data.get('price', 0) or 0) price_range = preferences.get('price_range', {}) min_price = price_range.get('min', 0) max_price = price_range.get('max', float('inf')) if min_price <= prop_price <= max_price: score += 15 elif prop_price < min_price: score -= 10 # Too cheap might be suspicious else: score -= 20 # Too expensive # Location and amenities features = property_data.get('features', []) if isinstance(features, str): import json try: features = json.loads(features) except: features = [] preferred_features = preferences.get('preferred_features', []) feature_matches = sum(1 for f in preferred_features if f.lower() in [feat.lower() for feat in features]) score += feature_matches * 3 # Property size scoring sqft = float(property_data.get('sqft', 0) or 0) beds = int(property_data.get('beds', 0) or 0) baths = int(property_data.get('baths', 0) or 0) # Reasonable size scoring if 500 <= sqft <= 5000: score += 10 if 1 <= beds <= 6: score += 5 if 1 <= baths <= 4: score += 5 # Strategy-based scoring if strategy == 'motivated': score += 10 # Boost high-quality matches elif strategy == 'casual': score += 5 # Moderate boost # Similarity to viewed properties if viewed_properties: similarity_score = self.calculate_property_similarity(property_data, viewed_properties) score += similarity_score * 15 # Ensure score is in reasonable range score = max(0, min(100, score)) except Exception as e: logger.error(f"❌ Error calculating AI score: {e}") score = 50.0 # Default score return score def calculate_property_similarity(self, property_data: Dict, viewed_properties: List[Dict]) -> float: """Calculate similarity between a property and user's viewed properties""" if not viewed_properties: return 0.0 similarities = [] for viewed_prop in viewed_properties: similarity = 0.0 # Type similarity if (property_data.get('propertyTypeName', '').lower() == viewed_prop.get('propertyTypeName', '').lower()): similarity += 0.3 # Price similarity (within 50% range) prop_price = float(property_data.get('price', 0) or 0) viewed_price = float(viewed_prop.get('price', 0) or viewed_prop.get('marketValue', 0) or 0) if prop_price > 0 and viewed_price > 0: price_ratio = min(prop_price, viewed_price) / max(prop_price, viewed_price) if price_ratio > 0.5: # Within 50% price range similarity += 0.4 * price_ratio # Size similarity prop_sqft = float(property_data.get('sqft', 0) or 0) viewed_sqft = float(viewed_prop.get('totalSquareFeet', 0) or 0) if prop_sqft > 0 and viewed_sqft > 0: size_ratio = min(prop_sqft, viewed_sqft) / max(prop_sqft, viewed_sqft) if size_ratio > 0.7: # Within 30% size range similarity += 0.3 * size_ratio similarities.append(similarity) return max(similarities) if similarities else 0.0 def generate_match_reasons(self, property_data: Dict, preferences: Dict) -> List[str]: """Generate AI explanations for why this property matches user preferences""" reasons = [] try: # Property type match prop_type = property_data.get('propertyTypeName', '') preferred_types_raw = preferences.get('preferred_types', []) # Handle both strings and tuples in preferred_types preferred_types = [] for item in preferred_types_raw: if isinstance(item, tuple): preferred_types.append(str(item[0]) if item else '') elif isinstance(item, str): preferred_types.append(item) else: preferred_types.append(str(item)) if prop_type in preferred_types: reasons.append(f"Matches your preferred property type: {prop_type}") # Price range match prop_price = float(property_data.get('price', 0) or 0) price_range = preferences.get('price_range', {}) min_price = price_range.get('min', 0) max_price = price_range.get('max', float('inf')) if min_price <= prop_price <= max_price: reasons.append(f"Within your budget range: ₹{prop_price:,.0f}") # Feature matches features = property_data.get('features', []) if isinstance(features, str): import json try: features = json.loads(features) except: features = [] preferred_features = preferences.get('preferred_features', []) matching_features = [f for f in preferred_features if f.lower() in [feat.lower() for feat in features]] if matching_features: reasons.append(f"Has your preferred amenities: {', '.join(matching_features[:3])}") # Size appropriateness beds = int(property_data.get('beds', 0) or 0) if beds > 0: reasons.append(f"Suitable size with {beds} bedroom{'s' if beds != 1 else ''}") # Default reasons if none found if not reasons: reasons = [ "Good property in desirable location", "Competitive pricing for the area", "Modern amenities and facilities" ] except Exception as e: logger.error(f"❌ Error generating match reasons: {e}") reasons = ["AI-recommended based on your preferences"] return reasons[:3] # Limit to top 3 reasons def search_properties_with_query(self, query: str, excluded_ids: set, max_results: int) -> List[Dict]: """Search properties with a specific query and exclusions""" try: if self.properties_collection is None: return [] results = self.properties_collection.query( query_texts=[query], n_results=max_results * 2, # Get extra for filtering include=["metadatas", "documents", "distances"] ) properties = [] if results and results['metadatas']: for metadata in results['metadatas'][0]: prop_id = metadata.get('id', '') if prop_id and prop_id not in excluded_ids: properties.append(metadata) if len(properties) >= max_results: break return properties except Exception as e: logger.error(f"❌ Error searching with query '{query}': {e}") return [] def test_sendgrid_email(self, recipient_email: str = None) -> Dict: """Test SendGrid email functionality""" if not recipient_email: recipient_email = self.email_config['sender_email'] # Send to self for testing logger.info(f"🧪 Testing SendGrid email to {recipient_email}") try: import requests # First, validate the API key validation_result = self.validate_sendgrid_api_key() if not validation_result['valid']: return { 'success': False, 'error': f"SendGrid API key validation failed: {validation_result['error']}", 'recipient': recipient_email, 'timestamp': datetime.now().isoformat() } # SendGrid API endpoint url = "https://api.sendgrid.com/v3/mail/send" # Simple test email email_data = { "personalizations": [ { "to": [ { "email": recipient_email } ] } ], "from": { "email": self.email_config['sender_email'] }, "subject": "🧪 SendGrid Test Email - AI Lead Analysis System", "content": [ { "type": "text/plain", "value": f"SendGrid Test Email - AI Lead Analysis System\n\nTest successful at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" }, { "type": "text/html", "value": f"""

🧪 SendGrid Test Email

This is a test email to verify SendGrid integration is working correctly.

Test Details:

🎉 SendGrid email system is working!


Sent by AI Lead Analysis System

""" } ] } # Send email with SendGrid API headers = { "Authorization": f"Bearer {self.email_config['sendgrid_api_key']}", "Content-Type": "application/json" } response = requests.post(url, json=email_data, headers=headers, timeout=30) if response.status_code == 202: logger.info(f"✅ SendGrid test email sent successfully to {recipient_email}") return { 'success': True, 'recipient': recipient_email, 'subject': email_data['subject'], 'timestamp': datetime.now().isoformat(), 'message': 'SendGrid test email sent successfully' } else: logger.error(f"❌ SendGrid API error: {response.status_code} - {response.text}") return { 'success': False, 'error': f"SendGrid API error: {response.status_code} - {response.text}", 'recipient': recipient_email, 'timestamp': datetime.now().isoformat() } except Exception as e: logger.error(f"❌ Failed to send SendGrid test email: {e}") return { 'success': False, 'error': str(e), 'recipient': recipient_email, 'timestamp': datetime.now().isoformat() } def validate_sendgrid_api_key(self) -> Dict: """Validate SendGrid API key""" try: logger.info("🔑 Validating SendGrid API key...") import requests url = "https://api.sendgrid.com/v3/user/profile" headers = { "Authorization": f"Bearer {self.email_config['sendgrid_api_key']}", "Content-Type": "application/json" } response = requests.get(url, headers=headers) if response.status_code == 200: profile_data = response.json() logger.info("✅ SendGrid API key is valid") return { 'success': True, 'valid': True, 'account_info': { 'email': profile_data.get('email', 'Unknown'), 'first_name': profile_data.get('first_name', 'Unknown'), 'last_name': profile_data.get('last_name', 'Unknown') } } else: logger.error(f"❌ SendGrid API key validation failed: {response.status_code}") return { 'success': False, 'valid': False, 'error': f"API key validation failed: {response.status_code}" } except Exception as e: logger.error(f"❌ Error validating SendGrid API key: {e}") return { 'success': False, 'valid': False, 'error': str(e) } def analyze_automated_email_triggers(self, customer_analysis: Dict) -> Dict: """Analyze customer behavior and determine automated email triggers based on AI analysis""" logger.info("🤖 Analyzing automated email triggers based on customer behavior...") try: # Extract customer data from the actual analysis structure customer_data = customer_analysis.get('data', {}) analytics = customer_data.get('analytics', {}) lead_qual = customer_data.get('lead_qualification', {}) properties = customer_data.get('properties', []) if not properties: return { 'success': False, 'error': 'No property data available for analysis' } # Extract key metrics from the actual analysis engagement_level = analytics.get('engagement_level', 'Low') lead_score = lead_qual.get('lead_score', 0) lead_status = lead_qual.get('lead_status', 'UNKNOWN') # Get viewing patterns viewing_patterns = analytics.get('viewing_patterns', {}) total_views = viewing_patterns.get('morning_views', 0) + viewing_patterns.get('afternoon_views', 0) + viewing_patterns.get('evening_views', 0) # Get price preferences price_prefs = analytics.get('price_preferences', {}) avg_price = price_prefs.get('avg_price', 0) max_price = price_prefs.get('max_price', 0) min_price = price_prefs.get('min_price', 0) price_range = price_prefs.get('price_range', 0) # Get property type preferences preferred_types = analytics.get('preferred_property_types', []) # Get lead timeline for inactivity analysis lead_timeline = analytics.get('lead_timeline', []) days_since_last_view = 0 if lead_timeline: from datetime import datetime last_view_date = lead_timeline[0].get('date', '') if last_view_date: try: last_view = datetime.fromisoformat(last_view_date.replace('Z', '+00:00')) days_since_last_view = (datetime.now() - last_view).days except: days_since_last_view = 14 # Default fallback # Analyze different trigger conditions based on actual data triggers = [] # 1. Inactivity Alert Trigger (High Priority) if days_since_last_view > 7: triggers.append({ 'trigger_type': 'inactivity_alert', 'condition': f"Customer hasn't viewed properties in {days_since_last_view} days", 'reason': 'Customer may need re-engagement with fresh property listings', 'priority': 'high', 'recommendation_count': 5 }) # 2. Property Type Interest Trigger (Medium Priority) if len(preferred_types) >= 3: triggers.append({ 'trigger_type': 'property_type_interest', 'condition': f"Customer interested in {len(preferred_types)} property types: {', '.join(preferred_types[:3])}", 'reason': 'Customer exploring multiple property categories, showing diverse interests', 'priority': 'medium', 'recommendation_count': 7 }) # 3. Price Range Diversity Trigger (Medium Priority) if price_range > 10000000: # 1 crore range triggers.append({ 'trigger_type': 'price_range_diversity', 'condition': f"Customer exploring properties from ₹{min_price:,.0f} to ₹{max_price:,.0f}", 'reason': 'Customer showing interest across different price segments', 'priority': 'medium', 'recommendation_count': 6 }) # 4. Lead Score Milestone Trigger (High Priority) if lead_score >= 40: # LUKEWARM or better triggers.append({ 'trigger_type': 'lead_score_milestone', 'condition': f"Customer has {lead_status.lower()} lead status with score {lead_score}/100", 'reason': 'Customer showing qualified interest, ready for targeted recommendations', 'priority': 'high', 'recommendation_count': 8 }) # 5. Viewing Pattern Trigger (Medium Priority) if total_views >= 10: triggers.append({ 'trigger_type': 'viewing_pattern', 'condition': f"Customer has viewed {total_views} properties with peak time: {viewing_patterns.get('peak_viewing_time', 'unknown')}", 'reason': 'Customer actively researching properties, ready for curated recommendations', 'priority': 'medium', 'recommendation_count': 6 }) # 6. Engagement Level Trigger (Based on actual engagement score) if engagement_level == 'Low' and lead_score < 50: triggers.append({ 'trigger_type': 'engagement_boost', 'condition': f"Customer has {engagement_level.lower()} engagement ({lead_score}/100 score)", 'reason': 'Customer needs engagement boost with compelling property options', 'priority': 'high', 'recommendation_count': 5 }) # 7. Conversion Probability Trigger (High Priority) conversion_prob = analytics.get('conversion_probability', {}) final_prob = conversion_prob.get('final_probability', 0) if final_prob >= 50: triggers.append({ 'trigger_type': 'high_conversion_potential', 'condition': f"Customer has {final_prob:.1f}% conversion probability", 'reason': 'High-value prospect needing premium property recommendations', 'priority': 'high', 'recommendation_count': 10 }) # Generate AI insights based on actual analysis ai_insights = { 'personality_type': 'Explorer' if len(preferred_types) >= 3 else 'Focused', 'decision_making_style': 'Analytical' if total_views >= 10 else 'Casual', 'urgency_level': 'High' if days_since_last_view > 7 else 'Medium', 'budget_confidence': 'Flexible' if price_range > 10000000 else 'Specific', 'location_focus': 'Diverse' if len(preferred_types) >= 3 else 'Specific', 'engagement_trend': 'Declining' if days_since_last_view > 7 else 'Stable', 'conversion_readiness': 'High' if final_prob >= 50 else 'Medium' } return { 'success': True, 'triggers': triggers, 'total_triggers': len(triggers), 'ai_insights': ai_insights, 'analysis_summary': { 'engagement_level': engagement_level, 'lead_score': lead_score, 'lead_status': lead_status, 'total_properties_viewed': len(properties), 'total_views': total_views, 'average_price_range': f"₹{avg_price:,.0f}", 'price_range_diversity': f"₹{min_price:,.0f} - ₹{max_price:,.0f}", 'preferred_locations': len(preferred_types), 'days_since_last_view': days_since_last_view, 'conversion_probability': f"{final_prob:.1f}%", 'peak_viewing_time': viewing_patterns.get('peak_viewing_time', 'unknown') } } except Exception as e: logger.error(f"❌ Error analyzing automated email triggers: {e}") return { 'success': False, 'error': str(e) } def generate_automated_email_content(self, trigger: Dict, customer_analysis: Dict, ai_insights: Dict) -> Dict: """Generate email content based on trigger type and actual customer analysis""" logger.info(f"📝 Generating automated email content for trigger: {trigger['trigger_type']}") try: # Get customer data from actual analysis structure customer_data = customer_analysis.get('data', {}) analytics = customer_data.get('analytics', {}) lead_qual = customer_data.get('lead_qualification', {}) properties = customer_data.get('properties', []) # Generate personalized greeting customer_name = f"Customer {customer_analysis.get('customer_id', 'Unknown')}" greeting = f"Hello {customer_name}," # Generate content based on trigger type and actual data trigger_type = trigger['trigger_type'] if trigger_type == 'inactivity_alert': days = analytics.get('lead_timeline', [{}])[0].get('date', '') if days: from datetime import datetime try: last_view = datetime.fromisoformat(days.replace('Z', '+00:00')) days_ago = (datetime.now() - last_view).days except: days_ago = 13 else: days_ago = 13 content = f"""

We noticed you haven't been browsing properties for {days_ago} days. The real estate market is constantly evolving with exciting new opportunities!

Based on your previous interest in {len(analytics.get('preferred_property_types', []))} different property types, we've curated some fresh listings that might reignite your search for the perfect property.

""" subject = "🆕 Fresh Properties to Rekindle Your Search" elif trigger_type == 'property_type_interest': preferred_types = analytics.get('preferred_property_types', []) content = f"""

Your interest in {', '.join(preferred_types[:3])} properties shows you have excellent taste and diverse preferences!

We've found some amazing properties across these categories that match your sophisticated requirements and might be perfect for your next investment.

""" subject = "🏠 Diverse Property Options for You" elif trigger_type == 'price_range_diversity': price_prefs = analytics.get('price_preferences', {}) min_price = price_prefs.get('min_price', 0) max_price = price_prefs.get('max_price', 0) content = f"""

We see you're exploring properties from ₹{min_price:,.0f} to ₹{max_price:,.0f} - that's an impressive range showing you're considering various investment options!

Here are some outstanding properties across your preferred price segments that offer exceptional value and investment potential.

""" subject = "💰 Properties Across Your Investment Range" elif trigger_type == 'lead_score_milestone': lead_score = lead_qual.get('lead_score', 0) lead_status = lead_qual.get('lead_status', 'UNKNOWN') content = f"""

Congratulations! Your lead score of {lead_score}/100 puts you in our {lead_status.lower()} category of prospects.

As a qualified customer, we're excited to share these exclusive properties that match your refined preferences and investment criteria.

""" subject = "⭐ Exclusive Properties for Our Qualified Customer" elif trigger_type == 'viewing_pattern': viewing_patterns = analytics.get('viewing_patterns', {}) total_views = viewing_patterns.get('morning_views', 0) + viewing_patterns.get('afternoon_views', 0) + viewing_patterns.get('evening_views', 0) peak_time = viewing_patterns.get('peak_viewing_time', 'your preferred time') content = f"""

You've viewed {total_views} properties with peak activity during {peak_time} - that's impressive research! You clearly know what you're looking for.

Based on your extensive browsing patterns, here are some properties that align perfectly with your refined preferences and viewing behavior.

""" subject = "🎯 Targeted Recommendations Based on Your Research" elif trigger_type == 'engagement_boost': engagement_level = analytics.get('engagement_level', 'Low') lead_score = lead_qual.get('lead_score', 0) content = f"""

We noticed your current engagement level is {engagement_level.lower()} with a lead score of {lead_score}/100.

To help you find your perfect property faster, we've curated some compelling options that might boost your interest and engagement with our platform.

""" subject = "🚀 Boost Your Property Search with These Gems" elif trigger_type == 'high_conversion_potential': conversion_prob = analytics.get('conversion_probability', {}) final_prob = conversion_prob.get('final_probability', 0) content = f"""

Excellent news! Our AI analysis shows you have a {final_prob:.1f}% conversion probability - you're a high-value prospect!

As a premium customer with strong buying potential, we're excited to share these exclusive properties that match your sophisticated requirements.

""" subject = "💎 Premium Properties for High-Value Prospect" else: content = """

We've analyzed your property viewing behavior and found some excellent recommendations for you.

These properties are carefully selected based on your preferences, viewing patterns, and market trends.

""" subject = "🏠 Personalized Property Recommendations" # Get AI recommendations using the stored properties in ChromaDB ai_engine = self.get_ai_engine() if hasattr(self, 'get_ai_engine') else None if ai_engine and hasattr(ai_engine, 'properties_collection') and ai_engine.properties_collection: # Use ChromaDB to find similar properties recommendations = self._get_ai_recommendations_from_chromadb( properties, analytics, trigger.get('recommendation_count', 5), trigger_type ) else: # Fallback to existing properties recommendations = properties[:trigger.get('recommendation_count', 5)] # Create email HTML email_content = self.create_automated_email_html( greeting, content, recommendations, trigger, customer_analysis ) return { 'success': True, 'subject': subject, 'html_content': email_content['html'], 'text_content': email_content['text'], 'recommendations_count': len(recommendations), 'trigger_type': trigger_type, 'priority': trigger['priority'] } except Exception as e: logger.error(f"❌ Error generating automated email content: {e}") return { 'success': False, 'error': str(e) } def _get_ai_recommendations_from_chromadb(self, viewed_properties: List[Dict], analytics: Dict, count: int, trigger_type: str = None) -> List[Dict]: """Get AI recommendations from ChromaDB based on customer analysis and trigger type""" try: if not hasattr(self, 'properties_collection') or not self.properties_collection: return viewed_properties[:count] # Create different search queries based on trigger type preferred_types = analytics.get('preferred_property_types', []) price_prefs = analytics.get('price_preferences', {}) avg_price = price_prefs.get('avg_price', 0) min_price = price_prefs.get('min_price', 0) max_price = price_prefs.get('max_price', 0) # Different search strategies based on trigger type if trigger_type == 'inactivity_alert': # For inactive customers, show fresh/new properties query_text = f"New and fresh properties in {', '.join(preferred_types[:2])} categories around ₹{avg_price:,.0f}" count = min(count, 5) # Fewer properties for re-engagement elif trigger_type == 'property_type_interest': # For diverse interests, show variety across types query_text = f"Diverse properties across {', '.join(preferred_types)} types with good investment potential" count = min(count, 7) # More properties to show variety elif trigger_type == 'price_range_diversity': # For price diversity, show properties across different price ranges query_text = f"Properties from ₹{min_price:,.0f} to ₹{max_price:,.0f} with excellent value" count = min(count, 6) elif trigger_type == 'lead_score_milestone': # For qualified leads, show premium properties query_text = f"Premium {', '.join(preferred_types[:2])} properties for qualified customers around ₹{avg_price:,.0f}" count = min(count, 8) elif trigger_type == 'viewing_pattern': # For active researchers, show highly relevant properties query_text = f"Highly relevant {', '.join(preferred_types)} properties matching viewing patterns" count = min(count, 6) elif trigger_type == 'engagement_boost': # For low engagement, show compelling properties query_text = f"Compelling and attractive properties in {', '.join(preferred_types[:2])} categories" count = min(count, 5) elif trigger_type == 'high_conversion_potential': # For high conversion potential, show exclusive properties query_text = f"Exclusive and premium properties for high-value prospects in {', '.join(preferred_types)} categories" count = min(count, 10) else: # Default search query_text = f"Properties similar to {', '.join(preferred_types)} types around ₹{avg_price:,.0f} price range" logger.info(f"🔍 Searching ChromaDB with query: {query_text} for trigger: {trigger_type}") # Search in ChromaDB results = self.properties_collection.query( query_texts=[query_text], n_results=count * 2, # Get more to filter include=["metadatas", "documents", "distances"] ) if results and results['metadatas'] and results['metadatas'][0]: recommendations = [] for i, metadata in enumerate(results['metadatas'][0]): if len(recommendations) >= count: break # Safely extract metadata fields with fallbacks try: property_data = { 'id': metadata.get('id', f'prop_{i}'), 'propertyId': metadata.get('id', f'prop_{i}'), 'propertyName': metadata.get('propertyName', metadata.get('name', 'Property')), 'propertyTypeName': metadata.get('propertyTypeName', metadata.get('typeName', metadata.get('type', 'Property'))), 'price': float(metadata.get('price', 0) or metadata.get('marketValue', 0) or 0), 'marketValue': float(metadata.get('price', 0) or metadata.get('marketValue', 0) or 0), 'address': metadata.get('address', metadata.get('location', '')), 'location': metadata.get('address', metadata.get('location', '')), 'beds': int(metadata.get('beds', 0) or metadata.get('bedrooms', 0) or 0), 'baths': int(metadata.get('baths', 0) or metadata.get('bathrooms', 0) or 0), 'totalSquareFeet': float(metadata.get('sqft', 0) or metadata.get('area', 0) or 0), 'description': metadata.get('description', ''), 'features': self.safe_json_parse(metadata.get('features', '[]')), 'similarity_score': 1 - results['distances'][0][i] if results.get('distances') and results['distances'][0] else 0.5, 'chromadb_rank': i + 1 } # Add any additional fields that might exist for key, value in metadata.items(): if key not in property_data: property_data[key] = value recommendations.append(property_data) except Exception as field_error: logger.warning(f"⚠️ Error processing metadata field in _get_ai_recommendations_from_chromadb: {field_error}") # Try to create a minimal property object try: minimal_property = { 'id': metadata.get('id', f'prop_{i}'), 'propertyId': metadata.get('id', f'prop_{i}'), 'propertyName': 'Property', 'propertyTypeName': 'Property', 'price': 0.0, 'marketValue': 0.0, 'address': '', 'location': '', 'beds': 0, 'baths': 0, 'totalSquareFeet': 0.0, 'description': '', 'features': [], 'similarity_score': 0.5, 'chromadb_rank': i + 1 } recommendations.append(minimal_property) except Exception as minimal_error: logger.error(f"❌ Failed to create minimal property in _get_ai_recommendations_from_chromadb: {minimal_error}") continue logger.info(f"✅ Found {len(recommendations)} recommendations for trigger: {trigger_type}") return recommendations else: logger.warning(f"⚠️ No results from ChromaDB for trigger: {trigger_type}") return viewed_properties[:count] except Exception as e: logger.error(f"❌ Error in _get_ai_recommendations_from_chromadb: {e}") return viewed_properties[:count] def create_automated_email_html(self, greeting: str, content: str, recommendations: List[Dict], trigger: Dict, customer_analysis: Dict) -> Dict: """Create HTML and text versions of automated email content""" try: logger.info("🎨 Creating automated email HTML content...") # Create property cards property_cards = "" for i, prop in enumerate(recommendations[:5]): # Limit to 5 properties property_cards += f"""

{prop.get('propertyName', f'Property {i+1}')}

📍 {prop.get('address', 'Address not available')}

💰 ₹{prop.get('price', 0):,.0f}

🏠 {prop.get('propertyTypeName', 'Property Type')}

⭐ AI Match Score: {prop.get('ai_score', 0):.1f}%

""" # Create HTML content using string concatenation to avoid f-string backslash issues html_content = """ Automated Property Recommendations

🤖 AI-Powered Property Recommendations

Automated Email Trigger: """ + trigger['trigger_type'].replace('_', ' ').title() + """

""" + greeting + """

""" + content + """

📊 Why You Received This Email:

Condition: """ + trigger['condition'] + """

Reason: """ + trigger['reason'] + """

Priority: """ + trigger['priority'].title() + """

🏠 Recommended Properties

""" + property_cards + """
View All Properties
""" # Create text version using string concatenation text_content = """ AI-Powered Property Recommendations """ + greeting + """ """ + content.replace('

', '').replace('

', '\n') + """ Why you received this email: Condition: """ + trigger['condition'] + """ Reason: """ + trigger['reason'] + """ Priority: """ + trigger['priority'].title() + """ Recommended Properties: """ for i, prop in enumerate(recommendations[:5]): # Limit to 5 in text version text_content += """ """ + str(i+1) + """. """ + prop.get('propertyName', f'Property {i+1}') + """ Address: """ + prop.get('address', 'Address not available') + """ Price: ₹""" + f"{prop.get('price', 0):,.0f}" + """ Type: """ + prop.get('propertyTypeName', 'Property Type') + """ AI Match Score: """ + f"{prop.get('ai_score', 0):.1f}" + """% """ text_content += """ Generated at: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """ """ return { 'html': html_content, 'text': text_content } except Exception as e: logger.error(f"❌ Error creating automated email HTML: {e}") return { 'html': f"

Error generating email content: {e}

", 'text': f"Error generating email content: {e}" } def _store_properties_in_chromadb(self, properties: List[Dict]) -> bool: """Store properties in ChromaDB for AI analysis""" try: if not self.properties_collection: logger.error("❌ Properties collection not initialized") return False logger.info(f"💾 Storing {len(properties)} properties in ChromaDB...") # Process properties in batches batch_size = 100 total_stored = 0 for i in range(0, len(properties), batch_size): batch = properties[i:i + batch_size] # Prepare batch data for ChromaDB documents = [] metadatas = [] ids = [] for prop in batch: try: # Create property description description = self.create_property_description(prop) # Clean metadata metadata = self.clean_metadata_for_chromadb(prop) # Generate unique ID prop_id = str(uuid.uuid4()) documents.append(description) metadatas.append(metadata) ids.append(prop_id) except Exception as e: logger.warning(f"⚠️ Skipping property due to error: {e}") continue if documents: try: # Add batch to ChromaDB self.properties_collection.add( documents=documents, metadatas=metadatas, ids=ids ) total_stored += len(documents) logger.info(f"✅ Stored batch {i//batch_size + 1}: {len(documents)} properties") except Exception as e: logger.error(f"❌ Error storing batch {i//batch_size + 1}: {e}") continue logger.info(f"🎉 Successfully stored {total_stored} properties in ChromaDB") return total_stored > 0 except Exception as e: logger.error(f"❌ Error storing properties in ChromaDB: {e}") return False def get_enhanced_ai_recommendations(self, customer_analysis: Dict, ai_insights: Dict, count: int = 10) -> List[Dict]: """Get enhanced AI recommendations using small AI models""" try: logger.info("🤖 Generating enhanced AI recommendations...") # Extract customer preferences and behavior patterns preferences = self.extract_customer_preferences(customer_analysis, ai_insights) # Get viewed properties for similarity analysis viewed_properties = customer_analysis.get('data', {}).get('properties', []) # Generate multiple recommendation strategies recommendations = [] # Strategy 1: Similarity-based recommendations similarity_recs = self.get_similarity_based_recommendations(viewed_properties, preferences, count // 3) recommendations.extend(similarity_recs) # Strategy 2: Preference-based recommendations preference_recs = self.get_preference_based_recommendations(preferences, viewed_properties, count // 3) recommendations.extend(preference_recs) # Strategy 3: Behavioral recommendations behavioral_recs = self.get_behavioral_recommendations(customer_analysis, ai_insights, count // 3) recommendations.extend(behavioral_recs) # Remove duplicates and limit to requested count unique_recommendations = self.remove_duplicate_recommendations(recommendations) final_recommendations = unique_recommendations[:count] # Enhance with AI explanations enhanced_recommendations = self.add_ai_explanations(final_recommendations, preferences, ai_insights) logger.info(f"✅ Generated {len(enhanced_recommendations)} enhanced AI recommendations") return enhanced_recommendations except Exception as e: logger.error(f"❌ Error generating enhanced AI recommendations: {e}") return self.get_fallback_recommendations(count) def extract_customer_preferences(self, customer_analysis: Dict, ai_insights: Dict) -> Dict: """Extract detailed customer preferences using AI models""" try: preferences = { 'price_range': self.analyze_price_preferences(customer_analysis), 'property_types': self.analyze_property_type_preferences(customer_analysis), 'locations': self.analyze_location_preferences(customer_analysis), 'features': self.analyze_feature_preferences(customer_analysis), 'behavioral_patterns': self.analyze_behavioral_patterns(customer_analysis, ai_insights), 'engagement_level': self.analyze_engagement_level(customer_analysis), 'urgency_level': ai_insights.get('urgency_level', 'Medium'), 'personality_type': ai_insights.get('personality_type', 'Balanced') } # Use AI models to enhance preferences if self.property_classifier: preferences['property_classification'] = self.classify_property_preferences(preferences) if self.price_predictor: preferences['price_prediction'] = self.predict_price_preferences(preferences) if self.preference_analyzer: preferences['preference_weights'] = self.calculate_preference_weights(preferences) return preferences except Exception as e: logger.error(f"❌ Error extracting customer preferences: {e}") return {} def analyze_price_preferences(self, customer_analysis: Dict) -> Dict: """Analyze price preferences using AI models""" try: properties = customer_analysis.get('data', {}).get('properties', []) if not properties: return {'preferred_range': 'mid_range', 'confidence': 0.5} prices = [p.get('price', 0) for p in properties if p.get('price', 0) > 0] if not prices: return {'preferred_range': 'mid_range', 'confidence': 0.5} avg_price = sum(prices) / len(prices) min_price = min(prices) max_price = max(prices) price_variance = np.var(prices) if len(prices) > 1 else 0 # Use price predictor to classify if self.price_predictor: for range_name, (min_val, max_val) in self.price_predictor['price_ranges'].items(): if min_val <= avg_price <= max_val: confidence = 1.0 - (price_variance / (avg_price ** 2)) if avg_price > 0 else 0.5 return { 'preferred_range': range_name, 'avg_price': avg_price, 'min_price': min_price, 'max_price': max_price, 'confidence': min(confidence, 1.0), 'price_consistency': 1.0 - (price_variance / (avg_price ** 2)) if avg_price > 0 else 0.5 } # Fallback classification if avg_price > 50000000: range_name = 'luxury' elif avg_price > 25000000: range_name = 'premium' elif avg_price > 10000000: range_name = 'mid_range' else: range_name = 'affordable' return { 'preferred_range': range_name, 'avg_price': avg_price, 'min_price': min_price, 'max_price': max_price, 'confidence': 0.7 } except Exception as e: logger.error(f"❌ Error analyzing price preferences: {e}") return {'preferred_range': 'mid_range', 'confidence': 0.5} def analyze_property_type_preferences(self, customer_analysis: Dict) -> Dict: """Analyze property type preferences using AI models""" try: properties = customer_analysis.get('data', {}).get('properties', []) if not properties: return {'preferred_types': [], 'confidence': 0.5} type_counts = {} type_engagement = {} for prop in properties: prop_type = prop.get('propertyTypeName', 'Unknown') engagement = prop.get('viewCount', 0) * prop.get('totalDuration', 0) if prop_type not in type_counts: type_counts[prop_type] = 0 type_engagement[prop_type] = 0 type_counts[prop_type] += 1 type_engagement[prop_type] += engagement # Calculate weighted preferences weighted_preferences = [] for prop_type, count in type_counts.items(): avg_engagement = type_engagement[prop_type] / count weight = self.property_classifier.get('property_type_weights', {}).get(prop_type, 1.0) if self.property_classifier else 1.0 weighted_score = count * avg_engagement * weight weighted_preferences.append((prop_type, weighted_score)) # Sort by weighted score weighted_preferences.sort(key=lambda x: x[1], reverse=True) preferred_types = [pt[0] for pt in weighted_preferences[:3]] confidence = min(len(preferred_types) / 3, 1.0) return { 'preferred_types': preferred_types, 'type_counts': type_counts, 'type_engagement': type_engagement, 'confidence': confidence, 'diversity_score': len(type_counts) / max(len(properties), 1) } except Exception as e: logger.error(f"❌ Error analyzing property type preferences: {e}") return {'preferred_types': [], 'confidence': 0.5} def analyze_location_preferences(self, customer_analysis: Dict) -> Dict: """Analyze location preferences using AI models""" try: properties = customer_analysis.get('data', {}).get('properties', []) if not properties: return {'preferred_locations': [], 'confidence': 0.5} location_counts = {} location_engagement = {} for prop in properties: location = prop.get('location', 'Unknown') engagement = prop.get('viewCount', 0) * prop.get('totalDuration', 0) if location not in location_counts: location_counts[location] = 0 location_engagement[location] = 0 location_counts[location] += 1 location_engagement[location] += engagement # Calculate location preferences location_scores = [] for location, count in location_counts.items(): avg_engagement = location_engagement[location] / count score = count * avg_engagement location_scores.append((location, score)) location_scores.sort(key=lambda x: x[1], reverse=True) preferred_locations = [loc[0] for loc in location_scores[:5]] return { 'preferred_locations': preferred_locations, 'location_counts': location_counts, 'confidence': min(len(preferred_locations) / 5, 1.0) } except Exception as e: logger.error(f"❌ Error analyzing location preferences: {e}") return {'preferred_locations': [], 'confidence': 0.5} def analyze_feature_preferences(self, customer_analysis: Dict) -> Dict: """Analyze feature preferences using AI models""" try: properties = customer_analysis.get('data', {}).get('properties', []) if not properties: return {'preferred_features': [], 'confidence': 0.5} # Extract features from property descriptions feature_counts = {} for prop in properties: description = prop.get('description', '') + ' ' + prop.get('propertyName', '') features = self.extract_features_from_text(description) for feature in features: if feature not in feature_counts: feature_counts[feature] = 0 feature_counts[feature] += 1 # Get top features sorted_features = sorted(feature_counts.items(), key=lambda x: x[1], reverse=True) preferred_features = [f[0] for f in sorted_features[:10]] return { 'preferred_features': preferred_features, 'feature_counts': feature_counts, 'confidence': min(len(preferred_features) / 10, 1.0) } except Exception as e: logger.error(f"❌ Error analyzing feature preferences: {e}") return {'preferred_features': [], 'confidence': 0.5} def extract_features_from_text(self, text: str) -> List[str]: """Extract property features from text using simple NLP""" try: features = [] text_lower = text.lower() # Common property features feature_keywords = { 'balcony': ['balcony', 'terrace', 'veranda'], 'parking': ['parking', 'garage', 'car space'], 'garden': ['garden', 'lawn', 'backyard'], 'swimming_pool': ['pool', 'swimming'], 'gym': ['gym', 'fitness', 'exercise'], 'security': ['security', 'guard', 'cctv'], 'elevator': ['elevator', 'lift'], 'modern': ['modern', 'contemporary', 'new'], 'luxury': ['luxury', 'premium', 'exclusive'], 'spacious': ['spacious', 'large', 'big'], 'corner': ['corner', 'end unit'], 'view': ['view', 'panoramic', 'city view'] } for feature, keywords in feature_keywords.items(): if any(keyword in text_lower for keyword in keywords): features.append(feature) return features except Exception as e: logger.error(f"❌ Error extracting features: {e}") return [] def analyze_behavioral_patterns(self, customer_analysis: Dict, ai_insights: Dict) -> Dict: """Analyze behavioral patterns using AI models""" try: properties = customer_analysis.get('data', {}).get('properties', []) analytics = customer_analysis.get('data', {}).get('analytics', {}) patterns = { 'viewing_frequency': self.analyze_viewing_frequency(properties), 'engagement_timing': self.analyze_engagement_timing(properties), 'property_exploration': self.analyze_property_exploration(properties), 'decision_making_style': ai_insights.get('decision_making_style', 'Balanced'), 'urgency_level': ai_insights.get('urgency_level', 'Medium') } # Use preference analyzer to enhance patterns if self.preference_analyzer: patterns['engagement_weight'] = self.preference_analyzer['engagement_weights'].get( analytics.get('engagement_level', 'medium').lower(), 1.0 ) return patterns except Exception as e: logger.error(f"❌ Error analyzing behavioral patterns: {e}") return {} def analyze_viewing_frequency(self, properties: List[Dict]) -> str: """Analyze viewing frequency patterns""" try: if not properties: return 'low' total_views = sum(p.get('viewCount', 0) for p in properties) avg_views = total_views / len(properties) if avg_views > 5: return 'high' elif avg_views > 2: return 'medium' else: return 'low' except Exception as e: logger.error(f"❌ Error analyzing viewing frequency: {e}") return 'medium' def analyze_engagement_timing(self, properties: List[Dict]) -> str: """Analyze engagement timing patterns""" try: if not properties: return 'afternoon' morning_views = 0 afternoon_views = 0 evening_views = 0 for prop in properties: if prop.get('lastViewedAt'): try: view_time = datetime.fromisoformat(prop['lastViewedAt'].replace('Z', '+00:00')) hour = view_time.hour if 6 <= hour < 12: morning_views += prop.get('viewCount', 0) elif 12 <= hour < 18: afternoon_views += prop.get('viewCount', 0) else: evening_views += prop.get('viewCount', 0) except: pass if morning_views > max(afternoon_views, evening_views): return 'morning' elif afternoon_views > evening_views: return 'afternoon' else: return 'evening' except Exception as e: logger.error(f"❌ Error analyzing engagement timing: {e}") return 'afternoon' def analyze_property_exploration(self, properties: List[Dict]) -> str: """Analyze property exploration patterns""" try: if not properties: return 'focused' property_types = set(p.get('propertyTypeName', 'Unknown') for p in properties) price_range = max(p.get('price', 0) for p in properties) - min(p.get('price', 0) for p in properties) avg_price = sum(p.get('price', 0) for p in properties) / len(properties) if len(property_types) > 3: return 'diverse' elif price_range > avg_price * 0.5: return 'exploratory' else: return 'focused' except Exception as e: logger.error(f"❌ Error analyzing property exploration: {e}") return 'focused' def analyze_engagement_level(self, customer_analysis: Dict) -> str: """Analyze overall engagement level""" try: analytics = customer_analysis.get('data', {}).get('analytics', {}) engagement_score = analytics.get('engagement_level', 'Medium') if isinstance(engagement_score, str): return engagement_score.lower() else: return 'medium' except Exception as e: logger.error(f"❌ Error analyzing engagement level: {e}") return 'medium' def get_similarity_based_recommendations(self, viewed_properties: List[Dict], preferences: Dict, count: int) -> List[Dict]: """Get recommendations based on similarity to viewed properties""" try: if not viewed_properties: return [] # Create similarity queries based on viewed properties queries = [] for prop in viewed_properties[:3]: # Use top 3 most engaged properties query = self.create_similarity_query(prop, preferences) queries.append(query) # Search for similar properties similar_properties = [] for query in queries: results = self.search_similar_properties_chromadb(query, viewed_properties, count // len(queries)) similar_properties.extend(results) # Score and rank properties scored_properties = [] for prop in similar_properties: score = self.calculate_similarity_score(prop, viewed_properties, preferences) prop['ai_similarity_score'] = score scored_properties.append(prop) # Sort by score and return top results scored_properties.sort(key=lambda x: x.get('ai_similarity_score', 0), reverse=True) return scored_properties[:count] except Exception as e: logger.error(f"❌ Error getting similarity-based recommendations: {e}") return [] def get_preference_based_recommendations(self, preferences: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]: """Get recommendations based on customer preferences""" try: # Create preference-based queries queries = self.create_preference_queries(preferences) # Search for properties matching preferences preference_properties = [] for query in queries: results = self.search_similar_properties_chromadb(query, viewed_properties, count // len(queries)) preference_properties.extend(results) # Score based on preference match scored_properties = [] for prop in preference_properties: score = self.calculate_preference_match_score(prop, preferences) prop['ai_preference_score'] = score scored_properties.append(prop) # Sort by score and return top results scored_properties.sort(key=lambda x: x.get('ai_preference_score', 0), reverse=True) return scored_properties[:count] except Exception as e: logger.error(f"❌ Error getting preference-based recommendations: {e}") return [] def get_behavioral_recommendations(self, customer_analysis: Dict, ai_insights: Dict, count: int) -> List[Dict]: """Get recommendations based on behavioral patterns""" try: behavioral_patterns = customer_analysis.get('data', {}).get('analytics', {}) personality_type = ai_insights.get('personality_type', 'Balanced') urgency_level = ai_insights.get('urgency_level', 'Medium') # Create behavioral queries queries = self.create_behavioral_queries(behavioral_patterns, personality_type, urgency_level) # Search for properties matching behavioral patterns behavioral_properties = [] for query in queries: results = self.search_similar_properties_chromadb(query, [], count // len(queries)) behavioral_properties.extend(results) # Score based on behavioral match scored_properties = [] for prop in behavioral_properties: score = self.calculate_behavioral_match_score(prop, behavioral_patterns, personality_type, urgency_level) prop['ai_behavioral_score'] = score scored_properties.append(prop) # Sort by score and return top results scored_properties.sort(key=lambda x: x.get('ai_behavioral_score', 0), reverse=True) return scored_properties[:count] except Exception as e: logger.error(f"❌ Error getting behavioral recommendations: {e}") return [] def create_similarity_query(self, property_data: Dict, preferences: Dict) -> str: """Create a query for finding similar properties""" try: prop_type = property_data.get('propertyTypeName', '') location = property_data.get('location', '') price_range = self.get_price_range(property_data.get('price', 0)) # Extract key features description = property_data.get('description', '') + ' ' + property_data.get('propertyName', '') features = self.extract_features_from_text(description) # Build query query_parts = [prop_type, location, price_range] query_parts.extend(features[:3]) # Top 3 features query = f"{' '.join(query_parts)} property similar to {property_data.get('propertyName', 'this property')}" return query except Exception as e: logger.error(f"❌ Error creating similarity query: {e}") return f"{property_data.get('propertyTypeName', 'Property')} in {property_data.get('location', '')}" def create_preference_queries(self, preferences: Dict) -> List[str]: """Create queries based on customer preferences""" try: queries = [] # Price range query price_range = preferences.get('price_range', {}).get('preferred_range', 'mid_range') queries.append(f"{price_range} price range properties") # Property type queries preferred_types = preferences.get('property_types', {}).get('preferred_types', []) for prop_type in preferred_types[:2]: # Top 2 types queries.append(f"{prop_type} properties") # Location queries preferred_locations = preferences.get('locations', {}).get('preferred_locations', []) for location in preferred_locations[:2]: # Top 2 locations queries.append(f"properties in {location}") # Feature queries preferred_features = preferences.get('features', {}).get('preferred_features', []) for feature in preferred_features[:2]: # Top 2 features queries.append(f"{feature} properties") return queries except Exception as e: logger.error(f"❌ Error creating preference queries: {e}") return ["luxury properties", "premium apartments"] def create_behavioral_queries(self, behavioral_patterns: Dict, personality_type: str, urgency_level: str) -> List[str]: """Create queries based on behavioral patterns""" try: queries = [] # Personality-based queries if personality_type == 'Analytical': queries.extend(["detailed property information", "comprehensive property features"]) elif personality_type == 'Impulsive': queries.extend(["exclusive properties", "limited time offers"]) elif personality_type == 'Conservative': queries.extend(["established properties", "proven locations"]) # Urgency-based queries if urgency_level == 'High': queries.extend(["immediate availability", "ready to move properties"]) elif urgency_level == 'Low': queries.extend(["upcoming projects", "pre-launch properties"]) # Engagement-based queries viewing_frequency = behavioral_patterns.get('viewing_frequency', 'medium') if viewing_frequency == 'high': queries.extend(["high engagement properties", "popular properties"]) elif viewing_frequency == 'low': queries.extend(["unique properties", "exclusive listings"]) return queries except Exception as e: logger.error(f"❌ Error creating behavioral queries: {e}") return ["premium properties", "luxury apartments"] def calculate_similarity_score(self, property_data: Dict, viewed_properties: List[Dict], preferences: Dict) -> float: """Calculate similarity score for a property""" try: score = 0.0 # Property type similarity prop_type = property_data.get('propertyTypeName', '') preferred_types = preferences.get('property_types', {}).get('preferred_types', []) if prop_type in preferred_types: score += 0.3 # Price range similarity price = property_data.get('price', 0) price_range = preferences.get('price_range', {}) if self.is_price_in_range(price, price_range): score += 0.3 # Location similarity location = property_data.get('location', '') preferred_locations = preferences.get('locations', {}).get('preferred_locations', []) if location in preferred_locations: score += 0.2 # Feature similarity description = property_data.get('description', '') + ' ' + property_data.get('propertyName', '') features = self.extract_features_from_text(description) preferred_features = preferences.get('features', {}).get('preferred_features', []) feature_match = len(set(features) & set(preferred_features)) score += min(feature_match * 0.1, 0.2) return min(score, 1.0) except Exception as e: logger.error(f"❌ Error calculating similarity score: {e}") return 0.5 def calculate_preference_match_score(self, property_data: Dict, preferences: Dict) -> float: """Calculate preference match score for a property""" try: score = 0.0 # Price preference match price = property_data.get('price', 0) price_range = preferences.get('price_range', {}) if self.is_price_in_range(price, price_range): score += 0.4 # Property type preference match prop_type = property_data.get('propertyTypeName', '') preferred_types = preferences.get('property_types', {}).get('preferred_types', []) if prop_type in preferred_types: score += 0.3 # Location preference match location = property_data.get('location', '') preferred_locations = preferences.get('locations', {}).get('preferred_locations', []) if location in preferred_locations: score += 0.2 # Feature preference match description = property_data.get('description', '') + ' ' + property_data.get('propertyName', '') features = self.extract_features_from_text(description) preferred_features = preferences.get('features', {}).get('preferred_features', []) feature_match = len(set(features) & set(preferred_features)) score += min(feature_match * 0.1, 0.1) return min(score, 1.0) except Exception as e: logger.error(f"❌ Error calculating preference match score: {e}") return 0.5 def calculate_behavioral_match_score(self, property_data: Dict, behavioral_patterns: Dict, personality_type: str, urgency_level: str) -> float: """Calculate behavioral match score for a property""" try: score = 0.0 # Personality-based scoring if personality_type == 'Analytical': # Prefer properties with detailed information description = property_data.get('description', '') if len(description) > 100: score += 0.3 elif personality_type == 'Impulsive': # Prefer exclusive or unique properties if 'exclusive' in property_data.get('propertyName', '').lower(): score += 0.3 elif personality_type == 'Conservative': # Prefer established properties if 'established' in property_data.get('location', '').lower(): score += 0.3 # Urgency-based scoring if urgency_level == 'High': # Prefer ready properties if 'ready' in property_data.get('description', '').lower(): score += 0.2 elif urgency_level == 'Low': # Prefer upcoming projects if 'upcoming' in property_data.get('description', '').lower(): score += 0.2 # Engagement-based scoring viewing_frequency = behavioral_patterns.get('viewing_frequency', 'medium') if viewing_frequency == 'high': # Prefer popular properties if property_data.get('viewCount', 0) > 10: score += 0.2 elif viewing_frequency == 'low': # Prefer unique properties if property_data.get('viewCount', 0) < 5: score += 0.2 return min(score, 1.0) except Exception as e: logger.error(f"❌ Error calculating behavioral match score: {e}") return 0.5 def is_price_in_range(self, price: float, price_range: Dict) -> bool: """Check if price is in the preferred range""" try: preferred_range = price_range.get('preferred_range', 'mid_range') avg_price = price_range.get('avg_price', 0) if preferred_range == 'luxury': return price > 50000000 elif preferred_range == 'premium': return 25000000 <= price <= 50000000 elif preferred_range == 'mid_range': return 10000000 <= price <= 25000000 elif preferred_range == 'affordable': return 5000000 <= price <= 10000000 else: # Use average price with tolerance tolerance = 0.3 min_price = avg_price * (1 - tolerance) max_price = avg_price * (1 + tolerance) return min_price <= price <= max_price except Exception as e: logger.error(f"❌ Error checking price range: {e}") return True def remove_duplicate_recommendations(self, recommendations: List[Dict]) -> List[Dict]: """Remove duplicate recommendations based on property ID""" try: seen_ids = set() unique_recommendations = [] for rec in recommendations: prop_id = rec.get('id') or rec.get('propertyId') if prop_id and prop_id not in seen_ids: seen_ids.add(prop_id) unique_recommendations.append(rec) return unique_recommendations except Exception as e: logger.error(f"❌ Error removing duplicates: {e}") return recommendations def add_ai_explanations(self, recommendations: List[Dict], preferences: Dict, ai_insights: Dict) -> List[Dict]: """Add AI explanations to recommendations""" try: for rec in recommendations: explanation = self.generate_ai_explanation(rec, preferences, ai_insights) rec['ai_explanation'] = explanation rec['ai_confidence'] = self.calculate_ai_confidence(rec, preferences) return recommendations except Exception as e: logger.error(f"❌ Error adding AI explanations: {e}") return recommendations def generate_ai_explanation(self, property_data: Dict, preferences: Dict, ai_insights: Dict) -> str: """Generate AI explanation for why a property is recommended""" try: explanations = [] # Price explanation price = property_data.get('price', 0) price_range = preferences.get('price_range', {}) if self.is_price_in_range(price, price_range): explanations.append("Matches your preferred price range") # Property type explanation prop_type = property_data.get('propertyTypeName', '') preferred_types = preferences.get('property_types', {}).get('preferred_types', []) if prop_type in preferred_types: explanations.append(f"Matches your interest in {prop_type} properties") # Location explanation location = property_data.get('location', '') preferred_locations = preferences.get('locations', {}).get('preferred_locations', []) if location in preferred_locations: explanations.append(f"Located in your preferred area: {location}") # Feature explanation description = property_data.get('description', '') + ' ' + property_data.get('propertyName', '') features = self.extract_features_from_text(description) preferred_features = preferences.get('features', {}).get('preferred_features', []) matching_features = set(features) & set(preferred_features) if matching_features: feature_list = ', '.join(list(matching_features)[:3]) explanations.append(f"Features you prefer: {feature_list}") # Behavioral explanation personality_type = ai_insights.get('personality_type', 'Balanced') if personality_type == 'Analytical': explanations.append("Suitable for your analytical decision-making style") elif personality_type == 'Impulsive': explanations.append("Perfect for your quick decision-making preference") if not explanations: explanations.append("Based on your overall property preferences and behavior patterns") return " | ".join(explanations) except Exception as e: logger.error(f"❌ Error generating AI explanation: {e}") return "Recommended based on your preferences and behavior analysis" def calculate_ai_confidence(self, property_data: Dict, preferences: Dict) -> float: """Calculate AI confidence score for a recommendation""" try: confidence = 0.5 # Base confidence # Price confidence price = property_data.get('price', 0) price_range = preferences.get('price_range', {}) if self.is_price_in_range(price, price_range): confidence += 0.2 # Type confidence prop_type = property_data.get('propertyTypeName', '') preferred_types = preferences.get('property_types', {}).get('preferred_types', []) if prop_type in preferred_types: confidence += 0.2 # Location confidence location = property_data.get('location', '') preferred_locations = preferences.get('locations', {}).get('preferred_locations', []) if location in preferred_locations: confidence += 0.1 return min(confidence, 1.0) except Exception as e: logger.error(f"❌ Error calculating AI confidence: {e}") return 0.5 def get_fallback_recommendations(self, count: int) -> List[Dict]: """Get fallback recommendations when AI models fail""" try: # First, try to debug the ChromaDB structure debug_info = self.debug_chromadb_metadata() if 'error' not in debug_info: logger.info(f"🔍 ChromaDB structure: {debug_info}") # Simple fallback based on ChromaDB search query = "luxury properties premium apartments" results = self.search_similar_properties_chromadb(query, [], count) for rec in results: rec['ai_explanation'] = "Recommended based on general property preferences" rec['ai_confidence'] = 0.6 return results except Exception as e: logger.error(f"❌ Error getting fallback recommendations: {e}") return [] def classify_property_preferences(self, preferences: Dict) -> Dict: """Classify property preferences using AI models""" try: if not self.property_classifier: return {} classification = { 'property_category': 'balanced', 'location_preference': 'established', 'price_sensitivity': 'moderate' } # Analyze property type diversity property_types = preferences.get('property_types', {}) diversity_score = property_types.get('diversity_score', 0) if diversity_score > 0.7: classification['property_category'] = 'diverse' elif diversity_score < 0.3: classification['property_category'] = 'focused' else: classification['property_category'] = 'balanced' # Analyze price sensitivity price_range = preferences.get('price_range', {}) price_consistency = price_range.get('price_consistency', 0.5) if price_consistency > 0.8: classification['price_sensitivity'] = 'low' elif price_consistency < 0.3: classification['price_sensitivity'] = 'high' else: classification['price_sensitivity'] = 'moderate' return classification except Exception as e: logger.error(f"❌ Error classifying property preferences: {e}") return {} def predict_price_preferences(self, preferences: Dict) -> Dict: """Predict price preferences using AI models""" try: if not self.price_predictor: return {} price_range = preferences.get('price_range', {}) avg_price = price_range.get('avg_price', 0) confidence = price_range.get('confidence', 0.5) prediction = { 'predicted_range': price_range.get('preferred_range', 'mid_range'), 'confidence': confidence, 'price_tolerance': 0.2, 'upsell_potential': 'medium' } # Predict upsell potential if avg_price > 50000000: prediction['upsell_potential'] = 'high' elif avg_price < 10000000: prediction['upsell_potential'] = 'low' else: prediction['upsell_potential'] = 'medium' # Adjust price tolerance based on confidence if confidence > 0.8: prediction['price_tolerance'] = 0.1 elif confidence < 0.3: prediction['price_tolerance'] = 0.4 return prediction except Exception as e: logger.error(f"❌ Error predicting price preferences: {e}") return {} def calculate_preference_weights(self, preferences: Dict) -> Dict: """Calculate preference weights using AI models""" try: if not self.preference_analyzer: return {} weights = { 'price_weight': 1.0, 'type_weight': 1.0, 'location_weight': 1.0, 'feature_weight': 1.0, 'engagement_weight': 1.0 } # Adjust weights based on engagement level engagement_level = preferences.get('engagement_level', 'medium') engagement_weight = self.preference_analyzer['engagement_weights'].get(engagement_level, 1.0) weights['engagement_weight'] = engagement_weight # Adjust weights based on behavioral patterns behavioral_patterns = preferences.get('behavioral_patterns', {}) viewing_frequency = behavioral_patterns.get('viewing_frequency', 'medium') if viewing_frequency == 'high': weights['feature_weight'] = 1.2 # High engagement users care more about features elif viewing_frequency == 'low': weights['price_weight'] = 1.3 # Low engagement users care more about price # Adjust weights based on property exploration property_exploration = behavioral_patterns.get('property_exploration', 'focused') if property_exploration == 'diverse': weights['type_weight'] = 1.2 elif property_exploration == 'focused': weights['location_weight'] = 1.2 return weights except Exception as e: logger.error(f"❌ Error calculating preference weights: {e}") return {} def debug_chromadb_metadata(self, sample_count: int = 3) -> Dict: """Debug method to inspect ChromaDB metadata structure""" try: if not self.properties_collection: return {'error': 'ChromaDB collection not initialized'} total_count = self.properties_collection.count() if total_count == 0: return {'error': 'No properties in ChromaDB'} # Get sample properties results = self.properties_collection.query( query_texts=["property"], n_results=sample_count, include=["metadatas", "documents"] ) debug_info = { 'total_properties': total_count, 'sample_metadata_keys': [], 'sample_metadata': [], 'field_mapping_issues': [] } if results and results['metadatas'] and results['metadatas'][0]: for i, metadata in enumerate(results['metadatas'][0]): # Get all keys keys = list(metadata.keys()) debug_info['sample_metadata_keys'].append(keys) # Check for missing expected fields expected_fields = ['propertyTypeName', 'propertyName', 'price', 'address', 'beds', 'baths'] missing_fields = [field for field in expected_fields if field not in keys] if missing_fields: debug_info['field_mapping_issues'].append({ 'sample_index': i, 'missing_fields': missing_fields, 'available_fields': keys }) # Store sample metadata debug_info['sample_metadata'].append(metadata) logger.info(f"🔍 ChromaDB Debug Info: {debug_info}") return debug_info except Exception as e: logger.error(f"❌ Error debugging ChromaDB metadata: {e}") return {'error': str(e)} def _clean_property_data(self, prop: Dict) -> Dict: """Clean and standardize property data to fix JSON parsing issues""" try: cleaned = {} # Standardize ID field cleaned['id'] = str(prop.get('id', prop.get('propertyId', ''))) cleaned['propertyId'] = cleaned['id'] # Standardize basic fields cleaned['propertyName'] = str(prop.get('propertyName', prop.get('name', ''))) cleaned['propertyTypeName'] = str(prop.get('propertyTypeName', prop.get('typeName', prop.get('type', '')))) cleaned['price'] = float(prop.get('price', prop.get('marketValue', 0)) or 0) cleaned['address'] = str(prop.get('address', prop.get('location', ''))) cleaned['beds'] = int(prop.get('beds', prop.get('bedrooms', 0)) or 0) cleaned['baths'] = int(prop.get('baths', prop.get('bathrooms', 0)) or 0) cleaned['sqft'] = float(prop.get('totalSquareFeet', prop.get('area', prop.get('sqft', 0))) or 0) cleaned['description'] = str(prop.get('description', '')) # Enhanced features handling to fix JSON parsing errors features = prop.get('features', []) if isinstance(features, str): # Try to parse as JSON first try: parsed_features = json.loads(features) if isinstance(parsed_features, list): cleaned['features'] = parsed_features else: cleaned['features'] = [str(parsed_features)] except (json.JSONDecodeError, ValueError): # If JSON parsing fails, split by common delimiters if ',' in features: cleaned['features'] = [f.strip() for f in features.split(',') if f.strip()] elif ';' in features: cleaned['features'] = [f.strip() for f in features.split(';') if f.strip()] else: cleaned['features'] = [features.strip()] if features.strip() else [] elif isinstance(features, list): cleaned['features'] = [str(f) for f in features if f] else: cleaned['features'] = [] # Additional fields cleaned['viewCount'] = int(prop.get('viewCount', 0) or 0) cleaned['totalDuration'] = int(prop.get('totalDuration', 0) or 0) cleaned['lastViewedAt'] = str(prop.get('lastViewedAt', '')) return cleaned except Exception as e: logger.error(f"❌ Error cleaning property data: {e}") # Return minimal valid data return { 'id': str(prop.get('id', prop.get('propertyId', 'unknown'))), 'propertyId': str(prop.get('id', prop.get('propertyId', 'unknown'))), 'propertyName': 'Unknown Property', 'propertyTypeName': 'Property', 'price': 0.0, 'address': '', 'beds': 0, 'baths': 0, 'sqft': 0.0, 'description': '', 'features': [], 'viewCount': 0, 'totalDuration': 0, 'lastViewedAt': '' } def _retry_failed_pages(self, failed_pages: List[int], page_size: int, max_workers: int) -> List[Dict]: """Retry failed pages with exponential backoff and reduced concurrency""" logger.info(f"🔄 Retrying {len(failed_pages)} failed pages with reduced concurrency...") retry_properties = [] retry_workers = max(1, max_workers // 2) # Reduce workers for retry with ThreadPoolExecutor(max_workers=retry_workers) as executor: futures = [] for page in failed_pages: future = executor.submit(self._fetch_property_page_enhanced, page, page_size) futures.append((page, future)) for page, future in futures: try: properties = future.result() # No timeout - let it take as long as needed if properties: retry_properties.extend(properties) logger.info(f"✅ Retry successful for page {page}: {len(properties)} properties") else: logger.warning(f"⚠️ Retry failed for page {page}: No properties returned") except Exception as e: logger.error(f"❌ Retry failed for page {page}: {e}") logger.info(f"🔄 Retry completed: {len(retry_properties)} properties recovered") return retry_properties def _store_properties_in_chromadb_parallel(self, properties: List[Dict]) -> bool: """Memory-efficient parallel ChromaDB storage with simple embeddings""" try: logger.info(f"🗄️ Storing {len(properties)} properties in ChromaDB with memory-efficient processing...") if self.properties_collection is None: logger.error("❌ ChromaDB properties collection not initialized") return False # Check existing properties but don't delete collection try: existing_count = self.properties_collection.count() logger.info(f"📊 Collection currently has {existing_count} properties") # Only clear if we have new properties and want to replace if existing_count > 0 and len(properties) > existing_count * 0.8: # Only clear if new data is substantial logger.info(f"🗑️ Clearing {existing_count} existing properties for fresh data...") # Clear items without deleting collection try: all_ids = self.properties_collection.get()['ids'] if all_ids: # Delete in batches to avoid memory issues batch_size = 100 for i in range(0, len(all_ids), batch_size): batch_ids = all_ids[i:i + batch_size] self.properties_collection.delete(ids=batch_ids) logger.info(f"✅ Cleared {len(all_ids)} existing properties") except Exception as clear_error: logger.warning(f"⚠️ Error clearing collection: {clear_error}") elif existing_count > 0: logger.info(f"📦 Keeping existing {existing_count} properties, adding {len(properties)} new ones") except Exception as count_error: logger.warning(f"⚠️ Could not check existing count: {count_error}") # Collection might be corrupted, reinitialize self.initialize_chromadb() # Process properties in smaller batches to avoid memory issues batch_size = 20 # Smaller batches to prevent memory overflow total_stored = 0 failed_batches = [] # Use fewer workers to reduce memory pressure with ThreadPoolExecutor(max_workers=5) as executor: # Reduced workers for memory efficiency futures = {} for i in range(0, len(properties), batch_size): batch = properties[i:i+batch_size] batch_num = i//batch_size + 1 future = executor.submit(self._process_property_batch_memory_efficient, batch, batch_num) futures[future] = batch_num logger.info(f"🚀 Submitted {len(futures)} memory-efficient storage tasks with 5 workers") completed_batches = 0 for future in concurrent.futures.as_completed(futures.keys()): batch_num = futures[future] try: batch_result = future.result() # No timeout completed_batches += 1 progress = (completed_batches / len(futures)) * 100 if batch_result['success']: total_stored += batch_result['count'] logger.info(f"📊 Batch {batch_num}: {batch_result['count']} properties stored (Total: {total_stored}, Progress: {progress:.1f}%)") else: failed_batches.append(batch_num) logger.error(f"❌ Batch {batch_num}: Failed to store properties (Progress: {progress:.1f}%)") except Exception as e: completed_batches += 1 progress = (completed_batches / len(futures)) * 100 failed_batches.append(batch_num) logger.error(f"❌ Batch {batch_num}: Error - {e} (Progress: {progress:.1f}%)") if failed_batches: logger.warning(f"⚠️ {len(failed_batches)} batches failed: {failed_batches}") logger.info(f"✅ Successfully stored {total_stored} properties in ChromaDB!") return total_stored > 0 except Exception as e: logger.error(f"❌ Error in parallel ChromaDB storage: {e}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return False def _process_property_batch(self, batch: List[Dict], batch_num: int) -> Dict: """Process a batch of properties for ChromaDB storage with memory-efficient approach""" try: documents = [] metadatas = [] ids = [] for prop in batch: # Create simple text description without heavy AI processing description = self._create_simple_property_description(prop) documents.append(description) # Prepare metadata with enhanced JSON handling metadata = { 'id': str(prop.get('id', prop.get('propertyId', f'prop_{len(ids)}'))), 'propertyName': str(prop.get('propertyName', '')), 'propertyTypeName': str(prop.get('propertyTypeName', '')), 'price': float(prop.get('price', 0) or 0), 'address': str(prop.get('address', '')), 'beds': int(prop.get('beds', 0) or 0), 'baths': int(prop.get('baths', 0) or 0), 'sqft': float(prop.get('sqft', 0) or 0), 'description': str(prop.get('description', '')), 'features': self._safe_json_stringify(prop.get('features', [])) } # Clean metadata metadata = self.clean_metadata_for_chromadb(metadata) metadatas.append(metadata) prop_id = str(prop.get('id', prop.get('propertyId', f'prop_{len(ids)}'))) ids.append(prop_id) # Add batch to ChromaDB using simple embedding function try: self.properties_collection.add( documents=documents, metadatas=metadatas, ids=ids ) return {'success': True, 'count': len(batch), 'error': None} except Exception as e: logger.error(f"❌ ChromaDB add error: {e}") return {'success': False, 'count': 0, 'error': str(e)} except Exception as e: logger.error(f"❌ Error processing batch {batch_num}: {e}") return {'success': False, 'count': 0, 'error': str(e)} def _create_simple_property_description(self, prop: Dict) -> str: """Create simple property description without heavy AI processing""" try: parts = [] # Basic property info if prop.get('propertyName'): parts.append(f"Property: {prop.get('propertyName')}") if prop.get('propertyTypeName'): parts.append(f"Type: {prop.get('propertyTypeName')}") if prop.get('price'): parts.append(f"Price: ${prop.get('price'):,.0f}") if prop.get('address'): parts.append(f"Address: {prop.get('address')}") if prop.get('beds') or prop.get('baths'): beds = prop.get('beds', 0) baths = prop.get('baths', 0) parts.append(f"Bedrooms: {beds}, Bathrooms: {baths}") if prop.get('sqft'): parts.append(f"Square Feet: {prop.get('sqft'):,.0f}") if prop.get('description'): desc = str(prop.get('description'))[:200] # Limit description length parts.append(f"Description: {desc}") # Features features = prop.get('features', []) if features: if isinstance(features, str): try: features = json.loads(features) except: features = [features] if isinstance(features, list) and features: feature_list = [str(f) for f in features[:10]] # Limit to 10 features parts.append(f"Features: {', '.join(feature_list)}") return " | ".join(parts) if parts else "Property" except Exception as e: logger.warning(f"⚠️ Error creating simple property description: {e}") return "Property" def _process_property_batch_memory_efficient(self, batch: List[Dict], batch_num: int) -> Dict: """Memory-efficient batch processing with minimal AI usage""" try: documents = [] metadatas = [] ids = [] for prop in batch: # Create ultra-simple text description without any AI processing description = self._create_ultra_simple_description(prop) documents.append(description) # Prepare minimal metadata to reduce memory usage metadata = { 'id': str(prop.get('id', prop.get('propertyId', f'prop_{len(ids)}'))), 'propertyName': str(prop.get('propertyName', ''))[:100], # Limit string length 'propertyTypeName': str(prop.get('propertyTypeName', ''))[:50], 'price': float(prop.get('price', 0) or 0), 'address': str(prop.get('address', ''))[:200], 'beds': int(prop.get('beds', 0) or 0), 'baths': int(prop.get('baths', 0) or 0), 'sqft': float(prop.get('sqft', 0) or 0), 'description': str(prop.get('description', ''))[:300], # Limit description 'features': self._safe_json_stringify(prop.get('features', []))[:500] # Limit features } metadatas.append(metadata) prop_id = str(prop.get('id', prop.get('propertyId', f'prop_{len(ids)}'))) ids.append(prop_id) # Add batch to ChromaDB with error handling try: self.properties_collection.add( documents=documents, metadatas=metadatas, ids=ids ) return {'success': True, 'count': len(batch), 'error': None} except Exception as e: logger.error(f"❌ ChromaDB add error in batch {batch_num}: {e}") return {'success': False, 'count': 0, 'error': str(e)} except Exception as e: logger.error(f"❌ Error processing batch {batch_num}: {e}") return {'success': False, 'count': 0, 'error': str(e)} def _create_ultra_simple_description(self, prop: Dict) -> str: """Create ultra-simple property description without any AI processing""" try: parts = [] # Only essential information to minimize memory usage if prop.get('propertyName'): parts.append(prop.get('propertyName')) if prop.get('propertyTypeName'): parts.append(prop.get('propertyTypeName')) if prop.get('price'): parts.append(f"${prop.get('price'):,.0f}") if prop.get('address'): parts.append(prop.get('address')) if prop.get('beds') or prop.get('baths'): beds = prop.get('beds', 0) baths = prop.get('baths', 0) parts.append(f"{beds}bed {baths}bath") if prop.get('sqft'): parts.append(f"{prop.get('sqft'):,.0f}sqft") # Minimal features (only first 3) features = prop.get('features', []) if features: if isinstance(features, str): try: features = json.loads(features) except: features = [features] if isinstance(features, list) and features: feature_list = [str(f) for f in features[:3]] # Only 3 features parts.append(f"Features: {', '.join(feature_list)}") return " | ".join(parts) if parts else "Property" except Exception as e: logger.warning(f"⚠️ Error creating ultra-simple description: {e}") return "Property" def _safe_json_stringify(self, obj) -> str: """Safely convert object to JSON string without parsing errors""" try: if isinstance(obj, list): # Clean list items cleaned_list = [] for item in obj: if item is not None: cleaned_list.append(str(item).strip()) return json.dumps(cleaned_list) elif isinstance(obj, str): # If it's already a string, try to parse and re-stringify try: parsed = json.loads(obj) return json.dumps(parsed) except (json.JSONDecodeError, ValueError): # If parsing fails, return as simple string return json.dumps([obj]) else: return json.dumps([]) except Exception as e: logger.warning(f"⚠️ Error in safe JSON stringify: {e}") return json.dumps([]) def _schedule_property_update(self): """Schedule next property update in 24 hours""" try: import schedule import threading def update_properties(): logger.info("🔄 Starting scheduled property update...") self.fetch_all_properties_parallel() # Schedule update for 24 hours from now schedule.every(24).hours.do(update_properties) # Start scheduler in background thread def run_scheduler(): while True: schedule.run_pending() time.sleep(3600) # Check every hour scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) scheduler_thread.start() logger.info("⏰ Property update scheduled for every 24 hours") except Exception as e: logger.warning(f"⚠️ Could not schedule property update: {e}") def safe_json_parse(self, json_str): """Enhanced JSON parsing with better error handling""" try: if not json_str or json_str == '': return [] if isinstance(json_str, list): return json_str if isinstance(json_str, dict): return [json_str] # Try to parse JSON parsed = json.loads(json_str) if isinstance(parsed, list): return parsed elif isinstance(parsed, dict): return [parsed] else: return [] except (ValueError, TypeError, json.JSONDecodeError) as e: logger.warning(f"⚠️ Failed to parse JSON: {json_str[:50]}... Error: {e}") # Try to extract features from malformed JSON if isinstance(json_str, str): # Look for array-like patterns if '[' in json_str and ']' in json_str: start = json_str.find('[') end = json_str.rfind(']') + 1 try: extracted = json_str[start:end] parsed = json.loads(extracted) if isinstance(parsed, list): return parsed except: pass # Fallback: split by common delimiters if ',' in json_str: return [item.strip().strip('"\'') for item in json_str.split(',') if item.strip()] elif ';' in json_str: return [item.strip().strip('"\'') for item in json_str.split(';') if item.strip()] return [] # Global instance ai_engine = AIRecommendationEngine()