import os
import json
import requests
import logging
import numpy as np
import pandas as pd
from datetime import datetime
from typing import Dict, List, Optional, Any
import pickle
import math
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoTokenizer, AutoModel, pipeline
import torch
import warnings
warnings.filterwarnings('ignore')
# Parallel processing imports
import concurrent.futures
import threading
# Import centralized configuration
from config import get_backend_url, get_ngrok_headers, get_api_endpoint, get_email_config
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# ChromaDB imports
import chromadb
from chromadb.config import Settings
import uuid
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AIRecommendationEngine:
def __init__(self):
self.device = 'cpu' # Force CPU usage
logger.info("🤖 Initializing AI Recommendation Engine with ChromaDB (CPU-based)")
# Set Hugging Face cache directories for Hugging Face Spaces
self.setup_cache_directories()
# Configuration for API endpoints and settings
self.config = {
'backend_url': get_backend_url(),
'headers': get_ngrok_headers()
}
# Email configuration (SendGrid with fallback to mock)
# Uses real SendGrid API when network is available, falls back to mock service
self.email_config = get_email_config()
# API configuration - NO TIMEOUT for complete data fetch
self.properties_api = f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails"
# Initialize ChromaDB collections (will be set in initialize_chromadb)
self.properties_collection = None
self.user_preferences_collection = None
# Initialize ChromaDB
self.initialize_chromadb()
# Initialize AI models
self.initialize_ai_models()
def setup_cache_directories(self):
"""Setup cache directories for Hugging Face Spaces compatibility"""
try:
# Create cache directories with proper permissions
cache_dir = "/tmp/hf_cache"
os.makedirs(cache_dir, exist_ok=True)
os.chmod(cache_dir, 0o755)
# Set environment variables for Hugging Face cache
os.environ['HF_HOME'] = cache_dir
os.environ['TRANSFORMERS_CACHE'] = f"{cache_dir}/transformers"
os.environ['HF_DATASETS_CACHE'] = f"{cache_dir}/datasets"
os.environ['HF_HUB_CACHE'] = f"{cache_dir}/hub"
# Create subdirectories
for subdir in ['transformers', 'datasets', 'hub', 'chroma']:
subdir_path = f"{cache_dir}/{subdir}"
os.makedirs(subdir_path, exist_ok=True)
os.chmod(subdir_path, 0o755)
logger.info(f"✅ Cache directories setup complete: {cache_dir}")
except Exception as e:
logger.warning(f"⚠️ Cache directory setup failed: {e}")
# Fallback to default locations
pass
def initialize_chromadb(self):
"""Initialize ChromaDB with robust containerized environment support"""
try:
logger.info("🗄️ Initializing ChromaDB with robust containerized support...")
# Multiple database path strategies for different environments
import os
import tempfile
# Strategy 1: Try /tmp for containerized environments
db_paths = [
"/tmp/chromadb_properties", # Container-friendly
"/tmp/hf_cache/chromadb", # Hugging Face cache
os.path.abspath("./property_db"), # Local development
tempfile.mkdtemp(prefix="chromadb_") # Fallback temp directory
]
chroma_client = None
selected_path = None
# Try each path until one works
for db_path in db_paths:
try:
logger.info(f"🔄 Trying ChromaDB path: {db_path}")
os.makedirs(db_path, exist_ok=True)
# Set proper permissions for containerized environments
os.chmod(db_path, 0o755)
# Try to create client
chroma_client = chromadb.PersistentClient(path=db_path)
# Test the client by listing collections
collections = chroma_client.list_collections()
logger.info(f"✅ ChromaDB client test successful at: {db_path}")
selected_path = db_path
break
except Exception as path_error:
logger.warning(f"⚠️ Failed to initialize ChromaDB at {db_path}: {path_error}")
continue
if chroma_client is None:
logger.error("❌ All ChromaDB paths failed - falling back to in-memory client")
chroma_client = chromadb.Client()
selected_path = "in-memory"
self.chroma_client = chroma_client
logger.info(f"🗄️ ChromaDB initialized successfully at: {selected_path}")
# Create robust embedding function
embedding_function = self.create_robust_embedding_function()
# Initialize collections with retry logic
collection_name = "properties_main_collection"
max_retries = 3
for attempt in range(max_retries):
try:
# Try to get existing collection
self.properties_collection = self.chroma_client.get_collection(collection_name)
count = self.properties_collection.count()
logger.info(f"📚 Using existing ChromaDB collection '{collection_name}' with {count} properties")
break
except Exception as e:
logger.info(f"📚 Attempt {attempt + 1}/{max_retries}: Creating new ChromaDB collection '{collection_name}'")
try:
# Delete existing collection if it exists
try:
self.chroma_client.delete_collection(collection_name)
logger.info(f"🗑️ Deleted existing collection '{collection_name}'")
except:
pass
# Create new collection
self.properties_collection = self.chroma_client.create_collection(
name=collection_name,
metadata={"description": "Real estate properties with robust embeddings"},
embedding_function=embedding_function
)
logger.info(f"📚 Successfully created ChromaDB collection '{collection_name}'")
break
except Exception as create_error:
logger.error(f"❌ Attempt {attempt + 1} failed to create collection: {create_error}")
if attempt == max_retries - 1:
raise create_error
import time
time.sleep(1) # Wait before retry
# Create user preferences collection
try:
self.user_preferences_collection = self.chroma_client.get_collection("user_preferences_robust")
logger.info("📚 Using existing robust user preferences collection")
except:
self.user_preferences_collection = self.chroma_client.create_collection(
name="user_preferences_robust",
metadata={"description": "User behavior and preferences with robust embeddings"},
embedding_function=embedding_function
)
logger.info("📚 Created new robust user preferences collection")
logger.info(f"✅ Robust ChromaDB initialized successfully")
logger.info(f"📊 Properties in DB: {self.properties_collection.count()}")
# Test the embedding function
try:
test_text = "Test property for robust embedding function"
test_embedding = embedding_function([test_text])
if test_embedding and len(test_embedding) > 0 and len(test_embedding[0]) > 0:
logger.info(f"✅ Robust embedding function test successful - embedding shape: {len(test_embedding[0])}")
else:
logger.warning("⚠️ Robust embedding function returned empty result")
except Exception as test_error:
logger.warning(f"⚠️ Robust embedding function test failed: {test_error}")
except Exception as e:
logger.error(f"❌ Failed to initialize robust ChromaDB: {e}")
# Set to None so other methods can handle gracefully
self.properties_collection = None
self.user_preferences_collection = None
def get_custom_embedding_function(self):
"""Get custom embedding function with proper cache directory"""
try:
from chromadb.utils import embedding_functions
# Set environment variables for all cache directories
cache_dir = '/tmp/hf_cache'
chroma_cache_dir = f'{cache_dir}/chroma'
onnx_cache_dir = f'{cache_dir}/onnx'
# Create all necessary cache directories with proper permissions
for dir_path in [cache_dir, chroma_cache_dir, onnx_cache_dir]:
os.makedirs(dir_path, exist_ok=True)
os.chmod(dir_path, 0o755)
# Set environment variables to control cache locations
os.environ['CHROMA_CACHE_DIR'] = chroma_cache_dir
os.environ['ONNXRUNTIME_CACHE_DIR'] = onnx_cache_dir
os.environ['HF_HOME'] = cache_dir
os.environ['TRANSFORMERS_CACHE'] = f'{cache_dir}/transformers'
os.environ['SENTENCE_TRANSFORMERS_HOME'] = f'{cache_dir}/sentence_transformers'
# Try ONNX embedding function first (faster, smaller)
try:
logger.info("🔄 Trying ONNX embedding function...")
embedding_func = embedding_functions.ONNXMiniLM_L6_V2()
logger.info("✅ ONNX embedding function created successfully")
return embedding_func
except Exception as onnx_error:
logger.warning(f"⚠️ ONNX embedding function failed: {onnx_error}")
# Fallback to sentence transformers
try:
logger.info("🔄 Trying SentenceTransformer embedding function...")
embedding_func = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name='all-MiniLM-L6-v2',
cache_folder=chroma_cache_dir
)
logger.info("✅ SentenceTransformer embedding function created successfully")
return embedding_func
except Exception as st_error:
logger.warning(f"⚠️ SentenceTransformer embedding function failed: {st_error}")
# Final fallback to default
try:
logger.info("🔄 Trying default embedding function...")
embedding_func = embedding_functions.DefaultEmbeddingFunction()
logger.info("✅ Default embedding function created successfully")
return embedding_func
except Exception as default_error:
logger.warning(f"⚠️ Default embedding function failed: {default_error}")
# Ultimate fallback - create a simple embedding function
try:
logger.info("🔄 Creating simple fallback embedding function...")
embedding_func = self.create_simple_embedding_function()
logger.info("✅ Simple fallback embedding function created successfully")
return embedding_func
except Exception as simple_error:
logger.error(f"❌ All embedding functions failed: {simple_error}")
return None
except Exception as e:
logger.error(f"❌ Error in get_custom_embedding_function: {e}")
return None
def create_robust_embedding_function(self):
"""Create robust embedding function that works in containerized environments"""
class RobustEmbeddingFunction:
def __init__(self):
self.embedding_dim = 384 # Standard dimension
self.cache = {} # Simple cache for performance
logger.info(f"🔧 Created robust embedding function with dimension: {self.embedding_dim}")
def __call__(self, input):
"""Generate robust embeddings that work in any environment"""
try:
# Handle both string and list inputs
if isinstance(input, str):
texts = [input]
elif isinstance(input, list):
texts = input
else:
texts = [str(input)]
if not texts:
return [[0.0] * self.embedding_dim]
embeddings = []
for text in texts:
if not text or len(text.strip()) == 0:
embeddings.append([0.0] * self.embedding_dim)
continue
# Check cache first
text_key = text.lower().strip()
if text_key in self.cache:
embeddings.append(self.cache[text_key])
continue
# Create robust embedding based on text characteristics
embedding = self._generate_robust_embedding(text)
# Cache the result
self.cache[text_key] = embedding
embeddings.append(embedding)
return embeddings
except Exception as e:
logger.warning(f"⚠️ Robust embedding failed, using fallback: {e}")
return [[0.0] * self.embedding_dim for _ in texts]
def _generate_robust_embedding(self, text):
"""Generate embedding using multiple strategies for robustness"""
import hashlib
import struct
# Strategy 1: MD5 hash-based
hash_obj = hashlib.md5(text.encode('utf-8'))
hash_bytes = hash_obj.digest()
# Strategy 2: SHA256 for additional entropy
sha_obj = hashlib.sha256(text.encode('utf-8'))
sha_bytes = sha_obj.digest()
# Strategy 3: Text characteristics
char_freq = {}
for char in text:
char_freq[char] = char_freq.get(char, 0) + 1
# Combine strategies for robust embedding
embedding = []
for i in range(self.embedding_dim):
if i < 16: # Use MD5 bytes
embedding.append(float(hash_bytes[i]) / 255.0)
elif i < 48: # Use SHA256 bytes
embedding.append(float(sha_bytes[i - 16]) / 255.0)
else: # Use derived values
# Combine character frequency, position, and hash
char_idx = i % len(text) if text else 0
char_val = ord(text[char_idx]) if char_idx < len(text) else 0
freq_val = char_freq.get(text[char_idx] if char_idx < len(text) else ' ', 0)
combined_val = (char_val + freq_val + hash_bytes[i % 16] + sha_bytes[i % 32]) % 256
embedding.append(float(combined_val) / 255.0)
return embedding
return RobustEmbeddingFunction()
def initialize_ai_models(self):
"""Initialize AI models for analysis and recommendations"""
try:
logger.info("🧠 Initializing AI models for enhanced recommendations...")
# Initialize small AI models for property recommendations
self.initialize_recommendation_models()
# Initialize existing models
self.initialize_fallback_models()
logger.info("✅ AI models initialized successfully")
except Exception as e:
logger.error(f"❌ Error initializing AI models: {e}")
self.initialize_fallback_models()
def initialize_recommendation_models(self):
"""Initialize small AI models specifically for property recommendations"""
try:
logger.info("🔍 Initializing recommendation AI models...")
# Small transformer model for property analysis
model_name = "sentence-transformers/all-MiniLM-L6-v2" # Small, fast model
logger.info(f"📥 Loading recommendation model: {model_name}")
self.recommendation_tokenizer = AutoTokenizer.from_pretrained(model_name)
self.recommendation_model = AutoModel.from_pretrained(model_name)
# Move to CPU for compatibility
self.recommendation_model.to(self.device)
self.recommendation_model.eval()
# Initialize property classification model
self.property_classifier = self.initialize_property_classifier()
# Initialize price range predictor
self.price_predictor = self.initialize_price_predictor()
# Initialize preference analyzer
self.preference_analyzer = self.initialize_preference_analyzer()
logger.info("✅ Recommendation AI models loaded successfully")
except Exception as e:
logger.warning(f"⚠️ Could not load recommendation models: {e}")
self.recommendation_tokenizer = None
self.recommendation_model = None
self.property_classifier = None
self.price_predictor = None
self.preference_analyzer = None
def initialize_property_classifier(self):
"""Initialize a small model for property type classification"""
try:
# Simple rule-based classifier with ML enhancement
classifier = {
'luxury_threshold': 50000000, # 50M
'premium_threshold': 25000000, # 25M
'mid_range_threshold': 10000000, # 10M
'affordable_threshold': 5000000, # 5M
'property_type_weights': {
'Apartment': 1.0,
'Villa': 1.5,
'Penthouse': 2.0,
'Townhouse': 1.2,
'Plot': 1.3,
'Commercial': 1.8
},
'location_weights': {
'premium': 1.5,
'upcoming': 1.2,
'established': 1.0,
'developing': 0.8
}
}
return classifier
except Exception as e:
logger.warning(f"⚠️ Property classifier initialization failed: {e}")
return None
def initialize_price_predictor(self):
"""Initialize a simple price range predictor"""
try:
predictor = {
'price_ranges': {
'ultra_luxury': (100000000, float('inf')), # 100M+
'luxury': (50000000, 100000000), # 50M-100M
'premium': (25000000, 50000000), # 25M-50M
'mid_range': (10000000, 25000000), # 10M-25M
'affordable': (5000000, 10000000), # 5M-10M
'budget': (0, 5000000) # 0-5M
},
'price_preferences': {
'conservative': 0.7, # Prefer lower prices
'balanced': 1.0, # No preference
'premium': 1.3 # Prefer higher prices
}
}
return predictor
except Exception as e:
logger.warning(f"⚠️ Price predictor initialization failed: {e}")
return None
def initialize_preference_analyzer(self):
"""Initialize preference analysis model"""
try:
analyzer = {
'engagement_weights': {
'high': 1.5,
'medium': 1.0,
'low': 0.7
},
'viewing_pattern_weights': {
'morning': 1.1,
'afternoon': 1.0,
'evening': 0.9
},
'property_type_preferences': {
'diverse': 1.2,
'focused': 1.0,
'exploratory': 1.1
}
}
return analyzer
except Exception as e:
logger.warning(f"⚠️ Preference analyzer initialization failed: {e}")
return None
def initialize_fallback_models(self):
"""Initialize fallback models if main models fail"""
logger.info("🔄 Initializing fallback models...")
try:
from transformers import pipeline
self.sentiment_analyzer = pipeline("sentiment-analysis", device=-1)
self.tokenizer = None
self.embedding_model = None
self.behavior_classifier = None
logger.info("✅ Fallback models loaded")
except Exception as e:
logger.error(f"❌ Fallback models failed: {e}")
self.sentiment_analyzer = None
self.tokenizer = None
self.embedding_model = None
self.behavior_classifier = None
def get_property_embeddings(self, property_text: str) -> np.ndarray:
"""Generate embeddings for property text using Hugging Face model"""
try:
if self.tokenizer and self.embedding_model:
inputs = self.tokenizer(property_text, return_tensors='pt',
truncation=True, padding=True, max_length=512)
with torch.no_grad():
outputs = self.embedding_model(**inputs)
embeddings = outputs.last_hidden_state.mean(dim=1)
return embeddings.numpy().flatten()
else:
# Fallback to TF-IDF
return self.get_tfidf_embeddings(property_text)
except Exception as e:
logger.error(f"❌ Error generating embeddings: {e}")
return self.get_tfidf_embeddings(property_text)
def get_tfidf_embeddings(self, text: str) -> np.ndarray:
"""Fallback TF-IDF embeddings"""
vectorizer = TfidfVectorizer(max_features=100, stop_words='english')
try:
tfidf = vectorizer.fit_transform([text])
return tfidf.toarray().flatten()
except:
return np.random.rand(100) # Random fallback
def analyze_user_behavior_with_ai(self, analysis_data: Dict) -> Dict:
"""Use AI to analyze user behavior patterns"""
logger.info("🧠 Analyzing user behavior with AI...")
try:
# Extract behavior patterns
lead_qual = analysis_data.get('data', {}).get('lead_qualification', {})
analytics = analysis_data.get('data', {}).get('analytics', {})
properties = analysis_data.get('data', {}).get('properties', [])
# Create behavior text for AI analysis
behavior_text = self.create_behavior_description(lead_qual, analytics, properties)
# Sentiment analysis of user engagement
sentiment_result = None
if self.sentiment_analyzer:
try:
sentiment_result = self.sentiment_analyzer(behavior_text[:512])
except Exception as e:
logger.warning(f"⚠️ Sentiment analysis failed: {e}")
# AI-enhanced insights
ai_insights = {
'behavior_summary': behavior_text,
'sentiment_analysis': sentiment_result,
'ai_personality_type': self.determine_personality_type(analytics),
'buying_motivation': self.analyze_buying_motivation(properties),
'recommendation_strategy': self.determine_recommendation_strategy(lead_qual),
'urgency_level': self.calculate_urgency_level(analytics),
'property_preferences': self.extract_ai_preferences(properties)
}
logger.info("✅ AI behavior analysis completed")
return ai_insights
except Exception as e:
logger.error(f"❌ AI behavior analysis failed: {e}")
return {'error': str(e)}
def create_behavior_description(self, lead_qual: Dict, analytics: Dict, properties: List) -> str:
"""Create a text description of user behavior for AI analysis"""
lead_status = lead_qual.get('lead_status', 'UNKNOWN')
engagement_level = analytics.get('engagement_level', 'Unknown')
total_views = len(properties)
# Calculate average price interest
prices = [p.get('price', 0) for p in properties if p.get('price', 0) > 0]
avg_price = sum(prices) / len(prices) if prices else 0
# Property types viewed
prop_types = list(set(p.get('propertyTypeName', 'Unknown') for p in properties))
behavior_text = f"""
Customer shows {engagement_level.lower()} engagement with {lead_status.lower()} lead status.
They have viewed {total_views} properties with an average price interest of ₹{avg_price:,.0f}.
Property types of interest include: {', '.join(prop_types[:3])}.
Their viewing pattern suggests they are a {self.get_buyer_archetype(analytics)} buyer.
"""
return behavior_text.strip()
def determine_personality_type(self, analytics: Dict) -> str:
"""Determine customer personality type using AI analysis"""
engagement = analytics.get('engagement_level', 'Medium')
viewing_patterns = analytics.get('viewing_patterns', {})
if engagement == 'Very High':
return "Decisive Decision Maker"
elif engagement == 'High':
return "Research-Oriented Buyer"
elif engagement == 'Medium':
return "Cautious Evaluator"
else:
return "Passive Browser"
def analyze_buying_motivation(self, properties: List) -> str:
"""Analyze what motivates the buyer based on property choices"""
if not properties:
return "Unknown motivation"
# Analyze price ranges
prices = [p.get('price', 0) for p in properties if p.get('price', 0) > 0]
if not prices:
return "Price-conscious buyer"
price_range = max(prices) - min(prices)
avg_price = sum(prices) / len(prices)
if price_range < avg_price * 0.2: # Consistent price range
return "Budget-focused with clear price target"
elif avg_price > 20000000: # High-value properties
return "Luxury-seeking with investment focus"
elif len(set(p.get('propertyTypeName') for p in properties)) > 3:
return "Exploring options, needs guidance"
else:
return "Value-conscious with specific requirements"
def determine_recommendation_strategy(self, lead_qual: Dict) -> str:
"""Determine the best recommendation strategy"""
lead_status = lead_qual.get('lead_status', 'COLD')
lead_score = lead_qual.get('lead_score', 0)
if lead_status == 'HOT' or lead_score > 70:
return "aggressive_immediate" # Show premium properties, create urgency
elif lead_status == 'WARM' or lead_score > 50:
return "nurturing_educational" # Provide comparisons, detailed info
elif lead_status == 'LUKEWARM' or lead_score > 30:
return "engagement_building" # Variety of options, build interest
else:
return "awareness_creation" # Basic information, build trust
def calculate_urgency_level(self, analytics: Dict) -> str:
"""Calculate how urgent the customer's need is"""
recent_activity = analytics.get('viewing_patterns', {}).get('peak_viewing_time')
engagement = analytics.get('engagement_level', 'Low')
if engagement in ['Very High', 'High'] and recent_activity:
return "High"
elif engagement == 'Medium':
return "Medium"
else:
return "Low"
def extract_ai_preferences(self, properties: List) -> Dict:
"""Extract detailed preferences using AI analysis"""
if not properties:
return {}
# Analyze property features that attract the user
features = {
'preferred_types': [],
'price_sensitivity': 'medium',
'location_preferences': [],
'feature_importance': {}
}
# Property types preference
type_counts = {}
for prop in properties:
prop_type = prop.get('propertyTypeName', 'Unknown')
type_counts[prop_type] = type_counts.get(prop_type, 0) + prop.get('viewCount', 1)
# Return just the property type names as strings, not tuples
features['preferred_types'] = [prop_type for prop_type, count in sorted(type_counts.items(), key=lambda x: x[1], reverse=True)[:3]]
# Price sensitivity analysis
prices = [p.get('price', 0) for p in properties if p.get('price', 0) > 0]
if prices:
price_std = np.std(prices)
price_mean = np.mean(prices)
cv = price_std / price_mean if price_mean > 0 else 0
if cv < 0.2:
features['price_sensitivity'] = 'high' # Very specific about price
elif cv > 0.5:
features['price_sensitivity'] = 'low' # Flexible on price
else:
features['price_sensitivity'] = 'medium'
return features
def get_buyer_archetype(self, analytics: Dict) -> str:
"""Determine buyer archetype"""
engagement = analytics.get('engagement_level', 'Medium')
opportunity_score = analytics.get('opportunity_score', 50)
if engagement == 'Very High' and opportunity_score > 80:
return "motivated"
elif engagement in ['High', 'Medium'] and opportunity_score > 60:
return "serious"
else:
return "casual"
def fetch_and_store_all_properties(self) -> bool:
"""Fetch ALL properties from API using PARALLEL processing and store in ChromaDB"""
try:
logger.info("🚀 Starting PARALLEL property fetching for faster performance...")
# Use parallel fetching for speed
all_properties = self.fetch_all_properties_parallel(max_workers=8)
if not all_properties:
logger.warning("⚠️ No properties fetched from API, falling back to sequential method...")
# Fallback to the original sequential method
return self.fetch_all_properties_sequential()
logger.info(f"🎉 Successfully fetched {len(all_properties)} total properties using parallel processing!")
# Store all properties in ChromaDB
if all_properties:
success = self.store_properties_in_chromadb(all_properties)
return success
else:
logger.warning("⚠️ No properties to store")
return False
except Exception as e:
logger.error(f"❌ Error in parallel property fetch: {e}")
logger.info("🔄 Falling back to sequential fetching...")
return self.fetch_all_properties_sequential()
def fetch_all_properties_sequential(self) -> bool:
"""Fallback sequential property fetching method"""
try:
logger.info("🌐 Fetching properties using sequential method...")
all_properties = []
page = 1
page_size = 100
while True:
try:
params = {
'pageNumber': page,
'pageSize': page_size
}
logger.info(f"📄 Fetching page {page}...")
response = requests.get(self.properties_api, params=params, verify=False, timeout=None)
response.raise_for_status()
data = response.json()
# Handle different response structures
if isinstance(data, list):
properties_batch = data
elif isinstance(data, dict):
if 'data' in data:
properties_batch = data['data']
elif 'properties' in data:
properties_batch = data['properties']
elif 'results' in data:
properties_batch = data['results']
else:
properties_batch = list(data.values()) if data else []
else:
logger.warning(f"Unexpected response type on page {page}: {type(data)}")
break
if not properties_batch or len(properties_batch) == 0:
logger.info(f"📄 Page {page} is empty - reached end")
break
all_properties.extend(properties_batch)
logger.info(f"✅ Page {page}: {len(properties_batch)} properties (Total: {len(all_properties)})")
page += 1
if page > 1000: # Safety limit
logger.warning("⚠️ Reached page limit (1000) - stopping fetch")
break
except Exception as page_error:
logger.error(f"❌ Error fetching page {page}: {page_error}")
page += 1
if page > 5:
break
continue
logger.info(f"🎉 Sequential fetch completed: {len(all_properties)} total properties!")
if all_properties:
success = self.store_properties_in_chromadb(all_properties)
return success
else:
logger.warning("⚠️ No properties fetched from API")
return False
except Exception as e:
logger.error(f"❌ Error in sequential property fetch: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return False
def store_properties_in_chromadb(self, properties: List[Dict]):
"""Store properties in ChromaDB with embeddings"""
try:
logger.info(f"🗄️ Storing {len(properties)} properties in ChromaDB...")
# Ensure ChromaDB is properly initialized
if self.properties_collection is None:
logger.error("❌ ChromaDB properties collection not initialized, cannot store properties")
return False
# Clear existing properties to avoid duplicates
existing_count = self.properties_collection.count()
if existing_count > 0:
logger.info(f"🗑️ Clearing {existing_count} existing properties...")
# ChromaDB doesn't have a clear method, so we recreate the collection
collection_name = self.properties_collection.name
self.chroma_client.delete_collection(collection_name)
# Get the same embedding function that was used initially
embedding_function = self.get_custom_embedding_function()
if embedding_function is None:
logger.error("❌ No embedding function available - cannot recreate collection")
return False
self.properties_collection = self.chroma_client.create_collection(
name=collection_name,
metadata={"description": "Real estate properties with embeddings"},
embedding_function=embedding_function
)
# Process properties in batches for efficiency
batch_size = 50
total_stored = 0
for i in range(0, len(properties), batch_size):
batch = properties[i:i+batch_size]
# Prepare data for ChromaDB
documents = []
metadatas = []
ids = []
for prop in batch:
# Create comprehensive text description for embeddings
description = self.create_property_description(prop)
documents.append(description)
# Store all property metadata
metadata = {
'id': str(prop.get('id', prop.get('propertyId', f'prop_{i}'))),
'propertyName': prop.get('propertyName', ''),
'propertyTypeName': prop.get('propertyTypeName', prop.get('typeName', '')),
'price': float(prop.get('price', 0) or prop.get('marketValue', 0) or 0),
'address': prop.get('address', prop.get('location', '')),
'beds': int(prop.get('beds', 0) or prop.get('bedrooms', 0) or 0),
'baths': int(prop.get('baths', 0) or prop.get('bathrooms', 0) or 0),
'sqft': float(prop.get('totalSquareFeet', 0) or prop.get('area', 0) or 0),
'description': prop.get('description', ''),
'features': json.dumps(prop.get('features', [])) if prop.get('features') else '[]'
}
# Clean metadata to remove None values
metadata = self.clean_metadata_for_chromadb(metadata)
metadatas.append(metadata)
# Use property ID as document ID
prop_id = str(prop.get('id', prop.get('propertyId', f'prop_{total_stored + len(ids)}')))
ids.append(prop_id)
# Add batch to ChromaDB
try:
# Validate that all metadatas are clean
for idx, metadata in enumerate(metadatas):
for key, value in metadata.items():
if value is None:
logger.warning(f"⚠️ Found None value in metadata[{idx}][{key}], this should have been cleaned")
self.properties_collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
total_stored += len(batch)
logger.info(f"📊 Stored batch {i//batch_size + 1}: {len(batch)} properties (Total: {total_stored})")
except Exception as batch_error:
logger.error(f"❌ Error storing batch {i//batch_size + 1}: {batch_error}")
# Check if it's an embedding function error
if "embedding function" in str(batch_error).lower():
logger.error("🔧 This appears to be an embedding function issue. Checking collection setup...")
if self.properties_collection is None:
logger.error("❌ Properties collection is None - ChromaDB not properly initialized")
else:
logger.error("❌ Collection exists but embedding function may be missing")
# Log the problematic data for debugging
logger.error(f"Batch size: {len(batch)}, Documents: {len(documents)}, Metadatas: {len(metadatas)}, IDs: {len(ids)}")
if metadatas:
logger.error(f"Sample metadata keys: {list(metadatas[0].keys())}")
for idx, metadata in enumerate(metadatas[:3]): # Log first 3 for debugging
logger.error(f"Metadata {idx}: {metadata}")
# Log sample document for debugging
if documents:
logger.error(f"Sample document: {documents[0][:200]}...")
raise # Re-raise to stop processing
logger.info(f"✅ Successfully stored {total_stored} properties in ChromaDB!")
return True
except Exception as e:
logger.error(f"❌ Error storing properties in ChromaDB: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return False
def create_property_description(self, prop: Dict) -> str:
"""Create a comprehensive text description for property embeddings"""
parts = []
# Basic info
name = prop.get('propertyName', prop.get('name', ''))
if name:
parts.append(f"Property: {name}")
property_type = prop.get('propertyTypeName', prop.get('typeName', ''))
if property_type:
parts.append(f"Type: {property_type}")
# Location
location = prop.get('address', prop.get('location', ''))
if location:
parts.append(f"Location: {location}")
# Price
price = prop.get('price', 0) or prop.get('marketValue', 0) or 0
if price > 0:
parts.append(f"Price: ${price:,.0f}")
# Specs
beds = prop.get('beds', 0) or prop.get('bedrooms', 0) or 0
baths = prop.get('baths', 0) or prop.get('bathrooms', 0) or 0
sqft = prop.get('totalSquareFeet', 0) or prop.get('area', 0) or 0
if beds > 0:
parts.append(f"{beds} bedrooms")
if baths > 0:
parts.append(f"{baths} bathrooms")
if sqft > 0:
parts.append(f"{sqft:,.0f} sq ft")
# Description
description = prop.get('description', '')
if description:
parts.append(f"Description: {description}")
# Features
features = prop.get('features', [])
if features:
if isinstance(features, list):
parts.append(f"Features: {', '.join(features)}")
elif isinstance(features, str):
parts.append(f"Features: {features}")
return '. '.join(parts)
def clean_metadata_for_chromadb(self, metadata: Dict) -> Dict:
"""Clean metadata to ensure no None values for ChromaDB storage"""
cleaned = {}
for key, value in metadata.items():
if value is None:
# Set appropriate defaults based on expected type
if key in ['price', 'sqft']:
cleaned[key] = 0.0
elif key in ['beds', 'baths']:
cleaned[key] = 0
else:
cleaned[key] = ''
elif isinstance(value, str):
# Ensure string is not None and handle encoding issues
cleaned[key] = str(value).strip() if value else ''
elif isinstance(value, (int, float)):
# Handle NaN and infinity values
if value != value or value == float('inf') or value == float('-inf'):
cleaned[key] = 0.0 if key in ['price', 'sqft'] else 0
else:
# Ensure proper type conversion
if key in ['price', 'sqft']:
cleaned[key] = float(value)
elif key in ['beds', 'baths']:
cleaned[key] = int(value)
else:
cleaned[key] = value
elif isinstance(value, bool):
# Convert boolean to string for ChromaDB compatibility
cleaned[key] = str(value).lower()
else:
# Convert any other type to string
try:
cleaned[key] = str(value) if value is not None else ''
except Exception:
cleaned[key] = ''
return cleaned
def find_similar_properties_ai(self, user_analysis: Dict, ai_insights: Dict) -> List[Dict]:
"""Find similar properties using enhanced AI recommendation system"""
try:
logger.info("🔍 Finding AI recommendations using enhanced system...")
# Use the new enhanced AI recommendation system
recommendations = self.get_enhanced_ai_recommendations(user_analysis, ai_insights, count=15)
if recommendations:
logger.info(f"✅ Found {len(recommendations)} enhanced AI recommendations")
return recommendations
else:
logger.warning("⚠️ No enhanced recommendations found, using fallback")
return self.get_fallback_recommendations(10)
except Exception as e:
logger.error(f"❌ Error in enhanced AI recommendations: {e}")
return self.get_fallback_recommendations(10)
def create_multiple_user_queries(self, viewed_properties: List[Dict], preferences: Dict, personality_type: str) -> List[str]:
"""Create multiple search queries for better property coverage"""
queries = []
# Query 1: Based on user's actual viewed properties
if viewed_properties:
primary_query = self.create_user_preference_query(viewed_properties, preferences, personality_type)
queries.append(primary_query)
# Query 2: Based on preferred property types
property_types = preferences.get('preferred_types', [])
if property_types:
# Handle both strings and tuples in property_types
type_strings = []
for prop_type in property_types:
if isinstance(prop_type, tuple):
# If it's a tuple, take the first element (usually the type name)
type_strings.append(str(prop_type[0]) if prop_type else 'Property')
elif isinstance(prop_type, str):
type_strings.append(prop_type)
else:
type_strings.append(str(prop_type))
if type_strings:
type_query = f"Property types: {', '.join(type_strings)}. Modern amenities and good location."
queries.append(type_query)
# Query 3: Based on price range preferences
price_range = preferences.get('price_range', {})
if price_range:
min_price = price_range.get('min', 0)
max_price = price_range.get('max', 0)
if min_price > 0 or max_price > 0:
price_query = f"Properties in price range {min_price} to {max_price}. Good value for money."
queries.append(price_query)
# Query 4: Generic high-quality properties
queries.append("High-quality properties with modern amenities, good location, and excellent facilities.")
# Query 5: Diverse property types
queries.append("Diverse mix of Villas, Flats, Apartments, and Houses with different price ranges.")
return queries[:5] # Limit to 5 queries for performance
def enhance_recommendations_with_ai_parallel(self, properties: List[Dict], viewed_properties: List[Dict],
preferences: Dict, strategy: str) -> List[Dict]:
"""Enhanced AI scoring with parallel processing for speed"""
logger.info(f"🧠 Enhanced {len(properties)} properties with AI analysis using parallel processing")
if not properties:
return []
# Process properties in parallel batches
batch_size = 20
processed_properties = []
with ThreadPoolExecutor(max_workers=6) as executor:
# Split properties into batches
batches = [properties[i:i + batch_size] for i in range(0, len(properties), batch_size)]
# Submit batch processing tasks
future_to_batch = {
executor.submit(
self.process_property_batch_ai,
batch, viewed_properties, preferences, strategy
): batch_idx
for batch_idx, batch in enumerate(batches)
}
for future in as_completed(future_to_batch):
batch_idx = future_to_batch[future]
try:
batch_results = future.result()
processed_properties.extend(batch_results)
logger.info(f"✅ Processed batch {batch_idx + 1}/{len(batches)} with AI analysis")
except Exception as e:
logger.error(f"❌ Failed to process batch {batch_idx + 1}: {e}")
# Sort by AI score
processed_properties.sort(key=lambda x: x.get('ai_score', 0), reverse=True)
return processed_properties
def process_property_batch_ai(self, properties_batch: List[Dict], viewed_properties: List[Dict],
preferences: Dict, strategy: str) -> List[Dict]:
"""Process a batch of properties with AI analysis"""
enhanced_batch = []
for prop in properties_batch:
try:
# Calculate AI score based on user preferences and behavior
ai_score = self.calculate_ai_property_score(prop, viewed_properties, preferences, strategy)
# Add AI insights
prop['ai_score'] = ai_score
prop['ai_match_reasons'] = self.generate_match_reasons(prop, preferences)
prop['ai_recommendation_confidence'] = min(ai_score / 100, 1.0)
enhanced_batch.append(prop)
except Exception as e:
logger.error(f"❌ Error processing property {prop.get('id', 'unknown')}: {e}")
# Include property without AI enhancement
prop['ai_score'] = 50 # Default score
enhanced_batch.append(prop)
return enhanced_batch
def get_guaranteed_additional_recommendations(self, current_recommendations: List[Dict],
viewed_properties: List[Dict], needed_count: int) -> List[Dict]:
"""Get additional recommendations with broader search criteria to guarantee minimum count"""
logger.info(f"➕ Getting {needed_count} additional recommendations with broader criteria...")
if self.properties_collection is None or needed_count <= 0:
return []
try:
# Get excluded IDs
excluded_ids = set()
for prop in current_recommendations + viewed_properties:
prop_id = str(prop.get('id', prop.get('propertyId', '')))
if prop_id:
excluded_ids.add(prop_id)
# Use multiple broader queries in parallel
broader_queries = [
"Properties with good amenities and location",
"Affordable housing options with modern features",
"Premium properties with luxury amenities",
"Family-friendly properties with good connectivity",
"Investment properties with high potential"
]
additional_properties = []
with ThreadPoolExecutor(max_workers=3) as executor:
future_to_query = {
executor.submit(
self.search_properties_with_query,
query, excluded_ids, needed_count
): query
for query in broader_queries
}
for future in as_completed(future_to_query):
try:
properties = future.result()
additional_properties.extend(properties)
if len(additional_properties) >= needed_count:
break
except Exception as e:
logger.error(f"❌ Broader search failed: {e}")
# Remove duplicates
seen_ids = set()
unique_additional = []
for prop in additional_properties:
prop_id = str(prop.get('id', prop.get('propertyId', '')))
if prop_id and prop_id not in seen_ids and prop_id not in excluded_ids:
seen_ids.add(prop_id)
unique_additional.append(prop)
if len(unique_additional) >= needed_count:
break
logger.info(f"➕ Added {len(unique_additional)} additional recommendations")
return unique_additional[:needed_count]
except Exception as e:
logger.error(f"❌ Error getting additional recommendations: {e}")
return []
def get_diverse_properties(self, current_recommendations: List[Dict],
viewed_properties: List[Dict], needed_count: int) -> List[Dict]:
"""Get diverse properties to ensure variety in recommendations"""
logger.info(f"🎲 Getting {needed_count} diverse properties...")
if self.properties_collection is None or needed_count <= 0:
return []
try:
# Get all property IDs to exclude
excluded_ids = set()
for prop in current_recommendations + viewed_properties:
prop_id = str(prop.get('id', prop.get('propertyId', '')))
if prop_id:
excluded_ids.add(prop_id)
# Get random selection from different property types
diverse_queries = [
"Villa with luxury amenities",
"Apartment with modern facilities",
"House with family features",
"Flat with good connectivity",
"Commercial property with investment potential"
]
diverse_properties = []
for query in diverse_queries:
try:
results = self.properties_collection.query(
query_texts=[query],
n_results=min(5, needed_count),
include=["metadatas", "documents", "distances"]
)
if results and results['metadatas']:
for metadata in results['metadatas'][0]:
prop_id = metadata.get('id', '')
if prop_id not in excluded_ids:
diverse_properties.append(metadata)
excluded_ids.add(prop_id)
if len(diverse_properties) >= needed_count:
break
if len(diverse_properties) >= needed_count:
break
except Exception as e:
logger.error(f"❌ Error in diverse search with query '{query}': {e}")
continue
logger.info(f"🎲 Found {len(diverse_properties)} diverse properties")
return diverse_properties[:needed_count]
except Exception as e:
logger.error(f"❌ Error getting diverse properties: {e}")
return []
def get_emergency_fallback_properties(self, needed_count: int) -> List[Dict]:
"""Emergency fallback when all else fails - return any available properties"""
logger.warning(f"🚨 Emergency fallback: getting {needed_count} any available properties")
try:
if self.properties_collection is None:
return []
# Get any properties from ChromaDB
results = self.properties_collection.query(
query_texts=["property"],
n_results=needed_count,
include=["metadatas", "documents"]
)
if results and results['metadatas']:
properties = results['metadatas'][0]
logger.info(f"🚨 Emergency fallback provided {len(properties)} properties")
return properties
except Exception as e:
logger.error(f"❌ Emergency fallback failed: {e}")
return []
def create_user_preference_query(self, viewed_properties: List[Dict], preferences: Dict, personality_type: str) -> str:
"""Create a comprehensive query based on user behavior and preferences"""
query_parts = []
# Property types from user behavior
property_types = set()
for prop in viewed_properties:
prop_type = prop.get('propertyTypeName', '')
if prop_type:
property_types.add(prop_type)
if property_types:
query_parts.append(f"Property types: {', '.join(property_types)}")
# Price range from viewed properties
prices = [p.get('price', 0) for p in viewed_properties if p.get('price', 0) > 0]
if prices:
min_price = min(prices)
max_price = max(prices)
avg_price = sum(prices) / len(prices)
query_parts.append(f"Price range {min_price} to {max_price}, average {avg_price}")
# Location preferences
locations = set()
for prop in viewed_properties:
address = prop.get('address', '')
if address:
# Extract city/area from address
parts = address.split(',')
if parts:
locations.add(parts[0].strip())
if locations:
query_parts.append(f"Preferred locations: {', '.join(locations)}")
# Specifications preferences
beds_list = [p.get('beds', 0) for p in viewed_properties if p.get('beds', 0) > 0]
if beds_list:
avg_beds = sum(beds_list) / len(beds_list)
query_parts.append(f"Preferred {avg_beds:.0f} bedrooms")
# Personality-based preferences
if personality_type == 'Decisive':
query_parts.append("Premium properties, luxury features, move-in ready")
elif personality_type == 'Research-Oriented':
query_parts.append("Good value, detailed specifications, investment potential")
elif personality_type == 'Cautious':
query_parts.append("Established neighborhoods, reliable developers, good amenities")
else:
query_parts.append("Modern amenities, good connectivity, family-friendly")
return '. '.join(query_parts)
def search_similar_properties_chromadb(self, query: str, viewed_properties: List[Dict], min_results: int = 15) -> List[Dict]:
"""Search for similar properties in ChromaDB"""
try:
# Auto-initialize ChromaDB if not available
if self.properties_collection is None:
logger.warning("⚠️ ChromaDB properties collection not initialized, auto-initializing...")
self.initialize_chromadb()
# If still not available after init, try to fetch properties
if self.properties_collection is None:
logger.warning("⚠️ ChromaDB still not available, fetching properties...")
self.auto_fetch_and_store_properties()
# Final check
if self.properties_collection is None:
logger.error("❌ ChromaDB properties collection not available after auto-setup")
return []
# Check if collection is empty and auto-populate
try:
collection_count = self.properties_collection.count()
logger.info(f"📊 ChromaDB collection has {collection_count} properties")
if collection_count == 0:
logger.warning("⚠️ ChromaDB collection is empty, auto-fetching properties...")
self.auto_fetch_and_store_properties()
else:
logger.info(f"✅ ChromaDB collection has {collection_count} properties - ready for recommendations")
except Exception as count_error:
logger.warning(f"⚠️ Collection access error: {count_error}")
# Reinitialize the collection
logger.info("🔄 Reinitializing ChromaDB collection...")
self.initialize_chromadb()
# Try to fetch properties anyway
self.auto_fetch_and_store_properties()
# Get IDs of viewed properties to exclude
viewed_ids = set()
for prop in viewed_properties:
prop_id = str(prop.get('propertyId', prop.get('id', '')))
if prop_id:
viewed_ids.add(prop_id)
# Perform semantic search with proper result count
collection_count = self.properties_collection.count()
n_results = max(1, min(min_results * 2, collection_count, 100)) # Ensure n_results is at least 1
logger.info(f"🔍 Querying ChromaDB: '{query}' (requesting {n_results} results from {collection_count} total)")
results = self.properties_collection.query(
query_texts=[query],
n_results=n_results,
include=["metadatas", "documents", "distances"]
)
# Convert results to property format
similar_properties = []
if results['metadatas'] and results['metadatas'][0]:
for i, metadata in enumerate(results['metadatas'][0]):
# Skip viewed properties
if metadata.get('id') in viewed_ids:
continue
# Safely extract metadata fields with fallbacks
try:
property_data = {
'id': metadata.get('id', ''),
'propertyId': metadata.get('id', ''),
'propertyName': metadata.get('propertyName', metadata.get('name', 'Unknown Property')),
'propertyTypeName': metadata.get('propertyTypeName', metadata.get('typeName', metadata.get('type', 'Property'))),
'price': float(metadata.get('price', 0) or metadata.get('marketValue', 0) or 0),
'marketValue': float(metadata.get('price', 0) or metadata.get('marketValue', 0) or 0),
'address': metadata.get('address', metadata.get('location', '')),
'location': metadata.get('address', metadata.get('location', '')),
'beds': int(metadata.get('beds', 0) or metadata.get('bedrooms', 0) or 0),
'baths': int(metadata.get('baths', 0) or metadata.get('bathrooms', 0) or 0),
'totalSquareFeet': float(metadata.get('sqft', 0) or metadata.get('area', 0) or 0),
'description': metadata.get('description', ''),
'features': self.safe_json_parse(metadata.get('features', '[]')),
'similarity_score': 1 - results['distances'][0][i], # Convert distance to similarity
'chromadb_rank': i + 1
}
# Add any additional fields that might exist
for key, value in metadata.items():
if key not in property_data:
property_data[key] = value
similar_properties.append(property_data)
# Stop when we have enough
if len(similar_properties) >= min_results:
break
except Exception as field_error:
logger.warning(f"⚠️ Error processing metadata field: {field_error}")
# Try to create a minimal property object
try:
minimal_property = {
'id': metadata.get('id', f'prop_{i}'),
'propertyId': metadata.get('id', f'prop_{i}'),
'propertyName': 'Property',
'propertyTypeName': 'Property',
'price': 0.0,
'marketValue': 0.0,
'address': '',
'location': '',
'beds': 0,
'baths': 0,
'totalSquareFeet': 0.0,
'description': '',
'features': [],
'similarity_score': 1 - results['distances'][0][i],
'chromadb_rank': i + 1
}
similar_properties.append(minimal_property)
except Exception as minimal_error:
logger.error(f"❌ Failed to create minimal property: {minimal_error}")
continue
logger.info(f"🎯 ChromaDB found {len(similar_properties)} similar properties")
return similar_properties
except Exception as e:
logger.error(f"❌ Error searching ChromaDB: {e}")
return []
def auto_fetch_and_store_properties(self):
"""Automatically fetch and store properties in ChromaDB"""
try:
logger.info("🚀 Auto-fetching properties for ChromaDB...")
# Fetch properties from backend
success = self.fetch_all_properties_parallel(
max_workers=5,
page_size=100,
max_pages=10
)
if success:
logger.info("✅ Auto-fetched and stored properties in ChromaDB successfully")
return True
else:
logger.warning("⚠️ Failed to auto-fetch from backend, using fallback properties...")
# Create some fallback properties for testing
fallback_properties = self.create_fallback_properties()
fallback_success = self._store_properties_in_chromadb_parallel(fallback_properties)
if fallback_success:
logger.info(f"✅ Stored {len(fallback_properties)} fallback properties in ChromaDB")
return True
else:
logger.error("❌ Failed to store fallback properties")
return False
except Exception as e:
logger.error(f"❌ Error in auto-fetch and store: {e}")
return False
def create_fallback_properties(self) -> List[Dict]:
"""Create fallback properties for testing when backend is unavailable"""
fallback_properties = []
property_types = ['Apartment', 'Villa', 'House', 'Condo', 'Townhouse']
locations = ['Mumbai', 'Delhi', 'Bangalore', 'Chennai', 'Hyderabad', 'Pune', 'Kolkata']
for i in range(50): # Create 50 fallback properties
property_data = {
'propertyId': f'fallback_{i+1}',
'property_name': f'Fallback Property {i+1}',
'propertyType': property_types[i % len(property_types)],
'price': 1000000 + (i * 100000), # Varying prices
'location': locations[i % len(locations)],
'beds': (i % 4) + 1,
'baths': (i % 3) + 1,
'totalSquareFeet': 800 + (i * 50),
'description': f'Beautiful {property_types[i % len(property_types)]} in {locations[i % len(locations)]} with modern amenities',
'features': ['Parking', 'Security', 'Garden', 'Gym'][:(i % 4) + 1],
'yearBuilt': 2020 - (i % 10),
'propertyStatus': 'For Sale',
'listingDate': '2024-01-01',
'agent': f'Agent {i % 10 + 1}',
'is_fallback': True
}
fallback_properties.append(property_data)
logger.info(f"✅ Created {len(fallback_properties)} fallback properties")
return fallback_properties
def _get_properties_with_multiple_queries(self, queries: List[str], viewed_properties: List[Dict], count: int) -> List[Dict]:
"""Helper function to get properties using multiple queries with fallback"""
all_results = []
for query in queries:
try:
results = self.search_similar_properties_chromadb(query, viewed_properties, count)
all_results.extend(results)
if len(all_results) >= count:
break
except Exception as e:
logger.warning(f"⚠️ Query '{query}' failed: {e}")
continue
# Remove duplicates while preserving order
seen = set()
unique_results = []
for prop in all_results:
prop_id = str(prop.get('id', prop.get('propertyId', '')))
if prop_id not in seen:
seen.add(prop_id)
unique_results.append(prop)
if len(unique_results) >= count:
break
# If still not enough, try generic queries
if len(unique_results) < count:
logger.info(f"🔄 Only {len(unique_results)} properties found, trying generic queries...")
generic_queries = [
"modern property good amenities",
"residential property parking security",
"apartment villa house property",
"property with good facilities"
]
for query in generic_queries:
try:
results = self.search_similar_properties_chromadb(query, [], count)
for prop in results:
prop_id = str(prop.get('id', prop.get('propertyId', '')))
if prop_id not in seen and len(unique_results) < count:
seen.add(prop_id)
unique_results.append(prop)
if len(unique_results) >= count:
break
except Exception as e:
continue
return unique_results[:count]
def get_multi_ai_property_recommendations(self, analysis_data: Dict, ai_insights: Dict, email_type: str, count: int = 15) -> List[Dict]:
"""Get property recommendations using multiple AI models based on email type"""
try:
logger.info(f"🤖 Getting {email_type} recommendations using specialized AI model...")
# Extract customer preferences and behavior patterns
preferences = self.extract_customer_preferences(analysis_data, ai_insights)
behavioral_patterns = self.analyze_behavioral_patterns(analysis_data, ai_insights)
viewed_properties = analysis_data.get('data', {}).get('properties', [])
# Use different AI strategies based on email type
if email_type == 'property_based':
return self._get_property_type_based_recommendations(preferences, viewed_properties, count)
elif email_type == 'price_based':
return self._get_price_based_recommendations(preferences, viewed_properties, count)
elif email_type == 'location_based':
return self._get_location_based_recommendations(preferences, viewed_properties, count)
elif email_type == 'similarity_based':
return self._get_similarity_based_recommendations(viewed_properties, preferences, count)
elif email_type == 'behavioral_based':
return self._get_behavioral_based_recommendations(behavioral_patterns, viewed_properties, count)
elif email_type == 'premium_properties':
return self._get_premium_properties_recommendations(preferences, count)
elif email_type == 'budget_friendly':
return self._get_budget_friendly_recommendations(preferences, count)
elif email_type == 'trending_properties':
return self._get_trending_properties_recommendations(preferences, count)
elif email_type == 'family_oriented':
return self._get_family_oriented_recommendations(preferences, count)
elif email_type == 'investment_opportunities':
return self._get_investment_opportunities_recommendations(preferences, ai_insights, count)
else:
# Fallback to general recommendations
return self.get_enhanced_ai_recommendations(analysis_data, ai_insights, count)
except Exception as e:
logger.error(f"❌ Error in multi-AI property recommendations for {email_type}: {e}")
return []
def _get_property_type_based_recommendations(self, preferences: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]:
"""AI Model 1: Property type preference analysis"""
try:
preferred_types = preferences.get('preferred_property_types', [])
if not preferred_types:
# Analyze viewed properties to infer preferences
type_counts = {}
for prop in viewed_properties:
prop_type = prop.get('propertyType', '')
if prop_type:
type_counts[prop_type] = type_counts.get(prop_type, 0) + 1
preferred_types = [k for k, v in sorted(type_counts.items(), key=lambda x: x[1], reverse=True)]
# Build queries based on property type preferences
queries = []
if preferred_types:
queries.extend([
f"property type {' or '.join(preferred_types[:3])} with modern amenities",
f"{preferred_types[0]} property modern facilities" if preferred_types else "modern property",
f"residential {preferred_types[0] if preferred_types else 'apartment'} premium amenities"
])
else:
queries.extend([
"modern property with good amenities",
"residential apartment villa house premium",
"property with parking security amenities"
])
return self._get_properties_with_multiple_queries(queries, viewed_properties, count)
except Exception as e:
logger.error(f"❌ Error in property type based recommendations: {e}")
return []
def _get_price_based_recommendations(self, preferences: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]:
"""AI Model 2: Price range optimization"""
try:
price_range = preferences.get('price_range', {})
min_price = price_range.get('min_price', 0)
max_price = price_range.get('max_price', 0)
# Build price-based queries
queries = []
if min_price > 0 and max_price > 0:
avg_price = (min_price + max_price) / 2
queries.extend([
f"property price range {min_price} to {max_price} rupees affordable good value",
f"affordable property around {avg_price} rupees",
f"budget property good value for money under {max_price}"
])
else:
# Analyze viewed properties for price patterns
prices = [prop.get('price', 0) for prop in viewed_properties if prop.get('price', 0) > 0]
if prices:
avg_price = sum(prices) / len(prices)
queries.extend([
f"property around {avg_price} rupees good value for money",
f"affordable property budget range",
f"good value property reasonable price"
])
else:
queries.extend([
"affordable property good value for money",
"budget friendly property reasonable price",
"economical property good investment"
])
return self._get_properties_with_multiple_queries(queries, viewed_properties, count)
except Exception as e:
logger.error(f"❌ Error in price based recommendations: {e}")
return []
def _get_location_based_recommendations(self, preferences: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]:
"""AI Model 3: Location preference analysis"""
try:
preferred_locations = preferences.get('preferred_locations', [])
if not preferred_locations:
# Extract locations from viewed properties
locations = [prop.get('location', '') for prop in viewed_properties if prop.get('location')]
preferred_locations = list(set(locations))
queries = []
if preferred_locations:
queries.extend([
f"property in {' or '.join(preferred_locations[:3])} good location connectivity",
f"property near {preferred_locations[0] if preferred_locations else 'city center'} transport",
f"residential property {preferred_locations[0] if preferred_locations else 'prime area'} amenities"
])
else:
queries.extend([
"property in prime location good connectivity transport",
"property near metro station bus transport",
"property central location amenities nearby"
])
return self._get_properties_with_multiple_queries(queries, viewed_properties, count)
except Exception as e:
logger.error(f"❌ Error in location based recommendations: {e}")
return []
def _get_similarity_based_recommendations(self, viewed_properties: List[Dict], preferences: Dict, count: int) -> List[Dict]:
"""AI Model 4: Semantic similarity analysis"""
try:
if not viewed_properties:
queries = ["modern apartment villa house property", "residential property good amenities"]
else:
# Create queries based on viewed property characteristics
most_viewed = viewed_properties[0] if viewed_properties else {}
property_type = most_viewed.get('propertyType', 'apartment')
location = most_viewed.get('location', '')
beds = most_viewed.get('beds', 2)
queries = [
f"similar to {property_type} in {location} with {beds} bedrooms modern amenities",
f"{property_type} property modern facilities parking",
f"residential {property_type} good amenities security"
]
return self._get_properties_with_multiple_queries(queries, viewed_properties, count)
except Exception as e:
logger.error(f"❌ Error in similarity based recommendations: {e}")
return []
def _get_behavioral_based_recommendations(self, behavioral_patterns: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]:
"""AI Model 5: Behavioral pattern analysis"""
try:
engagement_level = behavioral_patterns.get('engagement_level', 'medium')
if engagement_level == 'high':
queries = ["premium property luxury amenities high-end features", "luxury villa penthouse executive", "high-end property modern amenities"]
elif engagement_level == 'low':
queries = ["simple property basic amenities affordable", "budget property economical good value", "affordable residential property"]
else:
queries = ["balanced property good amenities reasonable price", "moderate property facilities parking", "residential property good value"]
return self._get_properties_with_multiple_queries(queries, viewed_properties, count)
except Exception as e:
logger.error(f"❌ Error in behavioral based recommendations: {e}")
return []
def _get_premium_properties_recommendations(self, preferences: Dict, count: int) -> List[Dict]:
"""AI Model 6: Premium property analysis"""
try:
queries = [
"luxury premium property high-end amenities villa penthouse executive",
"premium villa luxury amenities swimming pool",
"high-end property executive modern facilities"
]
return self._get_properties_with_multiple_queries(queries, [], count)
except Exception as e:
logger.error(f"❌ Error in premium properties recommendations: {e}")
return []
def _get_budget_friendly_recommendations(self, preferences: Dict, count: int) -> List[Dict]:
"""AI Model 7: Budget optimization analysis"""
try:
queries = [
"affordable budget friendly property good value money economical",
"budget property affordable reasonable price",
"economical property good investment value"
]
return self._get_properties_with_multiple_queries(queries, [], count)
except Exception as e:
logger.error(f"❌ Error in budget friendly recommendations: {e}")
return []
def _get_trending_properties_recommendations(self, preferences: Dict, count: int) -> List[Dict]:
"""AI Model 8: Market trend analysis"""
try:
queries = [
"trending popular property new development modern design latest",
"new property modern design contemporary",
"popular property development amenities"
]
return self._get_properties_with_multiple_queries(queries, [], count)
except Exception as e:
logger.error(f"❌ Error in trending properties recommendations: {e}")
return []
def _get_family_oriented_recommendations(self, preferences: Dict, count: int) -> List[Dict]:
"""AI Model 9: Family suitability analysis"""
try:
queries = [
"family friendly property multiple bedrooms school nearby children playground safety",
"family property children school park nearby",
"residential property family multiple bedrooms"
]
return self._get_properties_with_multiple_queries(queries, [], count)
except Exception as e:
logger.error(f"❌ Error in family oriented recommendations: {e}")
return []
def _get_investment_opportunities_recommendations(self, preferences: Dict, ai_insights: Dict, count: int) -> List[Dict]:
"""AI Model 10: Investment potential analysis"""
try:
queries = [
"investment property rental yield capital appreciation commercial residential ROI",
"investment opportunity property rental income",
"commercial property investment good returns"
]
return self._get_properties_with_multiple_queries(queries, [], count)
except Exception as e:
logger.error(f"❌ Error in investment opportunities recommendations: {e}")
return []
def enhance_recommendations_with_ai(self, similar_properties: List[Dict], viewed_properties: List[Dict],
preferences: Dict, strategy: str) -> List[Dict]:
"""Enhance ChromaDB results with additional AI analysis"""
try:
enhanced_properties = []
for prop in similar_properties:
# Calculate additional AI scores
ai_score = self.calculate_comprehensive_ai_score(prop, viewed_properties, preferences, strategy)
# Combine ChromaDB similarity with AI analysis
chromadb_score = prop.get('similarity_score', 0.5)
combined_score = (chromadb_score * 0.6) + (ai_score * 0.4) # Weight ChromaDB higher
# Add AI-generated recommendation reason
recommendation_reason = self.generate_advanced_recommendation_reason(prop, preferences, strategy)
# Enhanced property data
enhanced_prop = prop.copy()
enhanced_prop.update({
'ai_similarity_score': combined_score,
'chromadb_similarity': chromadb_score,
'ai_analysis_score': ai_score,
'recommendation_reason': recommendation_reason,
'recommendation_confidence': self.calculate_recommendation_confidence(prop, viewed_properties)
})
enhanced_properties.append(enhanced_prop)
# Sort by combined score
enhanced_properties.sort(key=lambda x: x['ai_similarity_score'], reverse=True)
logger.info(f"🧠 Enhanced {len(enhanced_properties)} properties with AI analysis")
return enhanced_properties
except Exception as e:
logger.error(f"❌ Error enhancing recommendations: {e}")
return similar_properties # Return original if enhancement fails
def get_additional_recommendations(self, current_recommendations: List[Dict],
viewed_properties: List[Dict], needed_count: int) -> List[Dict]:
"""Get additional recommendations to reach minimum count"""
try:
if needed_count <= 0:
return []
# Ensure ChromaDB is properly initialized
if self.properties_collection is None:
logger.error("❌ ChromaDB properties collection not initialized, cannot get additional recommendations")
return []
# Get excluded IDs
excluded_ids = set()
for prop in current_recommendations + viewed_properties:
prop_id = str(prop.get('id', prop.get('propertyId', '')))
if prop_id:
excluded_ids.add(prop_id)
# Get additional properties using broader search
results = self.properties_collection.query(
query_texts=["modern property with good amenities"], # Generic query
n_results=needed_count * 3, # Get extra for filtering
include=["metadatas", "documents", "distances"]
)
additional_properties = []
if results['metadatas'] and results['metadatas'][0]:
for i, metadata in enumerate(results['metadatas'][0]):
if metadata['id'] in excluded_ids:
continue
# Convert to property format
property_data = {
'id': metadata['id'],
'propertyId': metadata['id'],
'propertyName': metadata['propertyName'],
'propertyTypeName': metadata['propertyTypeName'],
'price': metadata['price'],
'marketValue': metadata['price'],
'address': metadata['address'],
'beds': metadata['beds'],
'baths': metadata['baths'],
'totalSquareFeet': metadata['sqft'],
'description': metadata['description'],
'features': self.safe_json_parse(metadata['features']),
'ai_similarity_score': 0.3, # Lower score for generic recommendations
'recommendation_reason': "Popular choice with good features and location"
}
additional_properties.append(property_data)
excluded_ids.add(metadata['id'])
if len(additional_properties) >= needed_count:
break
logger.info(f"➕ Added {len(additional_properties)} additional recommendations")
return additional_properties
except Exception as e:
logger.error(f"❌ Error getting additional recommendations: {e}")
return []
def calculate_comprehensive_ai_score(self, property_item: Dict, viewed_properties: List,
preferences: Dict, strategy: str) -> float:
"""Calculate comprehensive AI score for a property"""
try:
score = 0.0
# Base similarity from ChromaDB
base_score = property_item.get('similarity_score', 0.5)
score += base_score * 0.3
# Property type preference
prop_type = property_item.get('propertyTypeName', '')
viewed_types = [p.get('propertyTypeName', '') for p in viewed_properties]
if prop_type in viewed_types:
score += 0.2
# Price compatibility
prop_price = property_item.get('price', 0)
viewed_prices = [p.get('price', 0) for p in viewed_properties if p.get('price', 0) > 0]
if viewed_prices and prop_price > 0:
avg_viewed_price = sum(viewed_prices) / len(viewed_prices)
price_diff = abs(prop_price - avg_viewed_price) / avg_viewed_price
price_score = max(0, 1 - price_diff) # Higher score for similar prices
score += price_score * 0.2
# Feature matching
prop_features = property_item.get('features', [])
if isinstance(prop_features, str):
prop_features = self.safe_json_parse(prop_features) if prop_features else []
feature_match_count = 0
total_viewed_features = set()
for viewed_prop in viewed_properties:
viewed_features = viewed_prop.get('features', [])
if isinstance(viewed_features, list):
total_viewed_features.update(viewed_features)
if total_viewed_features and prop_features:
feature_match_count = len(set(prop_features) & total_viewed_features)
feature_score = min(1.0, feature_match_count / len(total_viewed_features))
score += feature_score * 0.15
# Strategy bonus
if strategy == 'aggressive_immediate' and prop_price > 15000000:
score += 0.1 # Prefer premium properties
elif strategy == 'nurturing_educational' and 5000000 <= prop_price <= 20000000:
score += 0.1 # Prefer mid-range
# Specification matching
prop_beds = property_item.get('beds', 0)
viewed_beds = [p.get('beds', 0) for p in viewed_properties if p.get('beds', 0) > 0]
if viewed_beds and prop_beds > 0:
avg_beds = sum(viewed_beds) / len(viewed_beds)
beds_diff = abs(prop_beds - avg_beds)
if beds_diff <= 1: # Within 1 bedroom
score += 0.05
return min(score, 1.0)
except Exception as e:
logger.error(f"Error calculating AI score: {e}")
return 0.5 # Default score
def generate_advanced_recommendation_reason(self, property_item: Dict, preferences: Dict, strategy: str) -> str:
"""Generate advanced AI recommendation reason"""
try:
reasons = []
# Property type appeal
prop_type = property_item.get('propertyTypeName', '')
if prop_type:
reasons.append(f"Excellent {prop_type.lower()} option")
# Price positioning
price = property_item.get('price', 0)
if price > 20000000:
reasons.append("premium location and features")
elif price > 10000000:
reasons.append("great value in mid-range segment")
else:
reasons.append("affordable with good potential")
# Specification highlights
beds = property_item.get('beds', 0)
if beds >= 3:
reasons.append("spacious family accommodation")
elif beds == 2:
reasons.append("perfect for couples or small families")
# Location appeal
address = property_item.get('address', '')
if 'Gachibowli' in address or 'Hitech City' in address:
reasons.append("prime IT corridor location")
elif 'Banjara Hills' in address or 'Jubilee Hills' in address:
reasons.append("prestigious neighborhood")
else:
reasons.append("well-connected area")
# Strategy-specific reasons
if strategy == 'aggressive_immediate':
reasons.append("immediate possession available")
elif strategy == 'nurturing_educational':
reasons.append("excellent investment opportunity")
else:
reasons.append("suits your viewing preferences")
return "Perfect match with " + ", ".join(reasons[:3])
except Exception as e:
logger.error(f"Error generating recommendation reason: {e}")
return "Recommended based on your preferences and AI analysis"
def calculate_recommendation_confidence(self, property_item: Dict, viewed_properties: List) -> float:
"""Calculate confidence level for recommendation"""
try:
confidence = 0.5 # Base confidence
# Higher confidence for properties similar to heavily viewed ones
prop_type = property_item.get('propertyTypeName', '')
type_views = sum(1 for p in viewed_properties if p.get('propertyTypeName') == prop_type)
if type_views > 0:
confidence += min(0.3, type_views * 0.1)
# Price range confidence
prop_price = property_item.get('price', 0)
viewed_prices = [p.get('price', 0) for p in viewed_properties if p.get('price', 0) > 0]
if viewed_prices and prop_price > 0:
avg_price = sum(viewed_prices) / len(viewed_prices)
price_variance = abs(prop_price - avg_price) / avg_price
if price_variance < 0.2: # Within 20% of average
confidence += 0.2
return min(confidence, 1.0)
except Exception as e:
logger.error(f"Error calculating confidence: {e}")
return 0.5
def score_properties_with_ai(self, all_properties: List, viewed_properties: List,
preferences: Dict, strategy: str) -> List[Dict]:
"""Score properties using AI analysis"""
scored_properties = []
# Get viewed property IDs to exclude
viewed_ids = set()
for p in viewed_properties:
if isinstance(p, dict):
viewed_ids.add(str(p.get('propertyId', '')))
for i, prop in enumerate(all_properties):
try:
# Ensure property is a dictionary
if not isinstance(prop, dict):
logger.warning(f"Property {i} is not a dictionary: {type(prop)}")
continue
# Skip already viewed properties
prop_id = str(prop.get('propertyId', prop.get('id', '')))
if prop_id in viewed_ids:
continue
# Calculate AI-based similarity score
score = self.calculate_ai_similarity_score(prop, viewed_properties, preferences, strategy)
if score > 0.3: # Minimum threshold
prop_copy = prop.copy()
prop_copy['ai_similarity_score'] = score
prop_copy['recommendation_reason'] = self.generate_recommendation_reason(prop, preferences, strategy)
scored_properties.append(prop_copy)
except Exception as e:
logger.warning(f"Error processing property {i}: {e}")
continue
# Sort by AI score
scored_properties.sort(key=lambda x: x['ai_similarity_score'], reverse=True)
return scored_properties
def calculate_ai_similarity_score(self, property_item: Dict, viewed_properties: List,
preferences: Dict, strategy: str) -> float:
"""Calculate similarity score using AI analysis"""
score = 0.0
# Property type preference (30% weight)
preferred_types = dict(preferences.get('preferred_types', []))
prop_type = property_item.get('propertyTypeName', '') or property_item.get('typeName', '')
if prop_type in preferred_types:
score += 0.3 * (preferred_types[prop_type] / max(preferred_types.values()))
# Price similarity (25% weight)
viewed_prices = [p.get('price', 0) for p in viewed_properties if p.get('price', 0) > 0]
if viewed_prices:
avg_viewed_price = sum(viewed_prices) / len(viewed_prices)
# Try multiple price field names
prop_price = property_item.get('price', 0) or property_item.get('marketValue', 0) or property_item.get('amount', 0)
if prop_price > 0:
price_ratio = min(prop_price, avg_viewed_price) / max(prop_price, avg_viewed_price)
score += 0.25 * price_ratio
# Strategy-based scoring (25% weight)
strategy_bonus = self.get_strategy_bonus(property_item, strategy)
score += 0.25 * strategy_bonus
# Feature similarity using AI (20% weight)
feature_score = self.calculate_feature_similarity_ai(property_item, viewed_properties)
score += 0.2 * feature_score
return min(score, 1.0)
def get_strategy_bonus(self, property_item: Dict, strategy: str) -> float:
"""Get bonus score based on recommendation strategy"""
price = property_item.get('price', 0)
if strategy == 'aggressive_immediate':
# Prefer premium properties for hot leads
return 1.0 if price > 15000000 else 0.5
elif strategy == 'nurturing_educational':
# Prefer mid-range with good value
return 1.0 if 5000000 <= price <= 20000000 else 0.7
elif strategy == 'engagement_building':
# Prefer variety and attractive options
return 0.8 # Neutral bonus
else: # awareness_creation
# Prefer budget-friendly options
return 1.0 if price < 10000000 else 0.3
def calculate_feature_similarity_ai(self, property_item: Dict, viewed_properties: List) -> float:
"""Calculate feature similarity using AI embeddings"""
try:
# Create text description of the property
prop_text = self.create_property_description(property_item)
# Get embeddings for current property
prop_embedding = self.get_property_embeddings(prop_text)
# Calculate similarity with viewed properties
similarities = []
for viewed_prop in viewed_properties:
viewed_text = self.create_property_description(viewed_prop)
viewed_embedding = self.get_property_embeddings(viewed_text)
# Calculate cosine similarity
similarity = cosine_similarity([prop_embedding], [viewed_embedding])[0][0]
similarities.append(similarity)
return max(similarities) if similarities else 0.0
except Exception as e:
logger.warning(f"⚠️ Feature similarity calculation failed: {e}")
return 0.5 # Default similarity
def create_property_description(self, property_item: Dict) -> str:
"""Create a text description of property for AI analysis"""
name = property_item.get('propertyName', 'Property')
prop_type = property_item.get('propertyTypeName', 'Residential')
price = property_item.get('price', 0)
description = f"{name} is a {prop_type} property priced at ₹{price:,.0f}"
# Add more features if available
if property_item.get('locationName'):
description += f" located in {property_item['locationName']}"
return description
def generate_recommendation_reason(self, property_item: Dict, preferences: Dict, strategy: str) -> str:
"""Generate AI-powered recommendation reason"""
reasons = []
# Property type match
preferred_types = dict(preferences.get('preferred_types', []))
prop_type = property_item.get('propertyTypeName', '')
if prop_type in preferred_types:
reasons.append(f"Matches your preference for {prop_type} properties")
# Price consideration
price = property_item.get('price', 0)
if strategy == 'aggressive_immediate':
reasons.append("Premium property perfect for immediate purchase")
elif strategy == 'nurturing_educational':
reasons.append("Great value proposition with excellent features")
elif strategy == 'engagement_building':
reasons.append("Interesting option to explore based on your browsing pattern")
else:
reasons.append("Budget-friendly option worth considering")
return '; '.join(reasons) if reasons else "Recommended based on your viewing history"
def filter_recommendations(self, scored_properties: List, viewed_properties: List) -> List[Dict]:
"""Filter and refine recommendations"""
# Remove duplicates and ensure variety
filtered = []
seen_types = set()
price_ranges = []
for prop in scored_properties:
prop_type = prop.get('propertyTypeName', 'Unknown')
price = prop.get('price', 0)
# Ensure variety in property types (max 3 per type)
type_count = sum(1 for p in filtered if p.get('propertyTypeName') == prop_type)
if type_count >= 3:
continue
# Ensure price variety
price_range = self.get_price_range(price)
range_count = sum(1 for p in filtered if self.get_price_range(p.get('price', 0)) == price_range)
if range_count >= 4: # Max 4 properties per price range
continue
filtered.append(prop)
if len(filtered) >= 10: # Limit recommendations
break
return filtered
def get_price_range(self, price: float) -> str:
"""Categorize price into ranges"""
if price < 5000000:
return "budget"
elif price < 15000000:
return "mid-range"
elif price < 30000000:
return "premium"
else:
return "luxury"
def generate_personalized_email(self, user_analysis: Dict, ai_insights: Dict,
recommendations: List[Dict], recipient_email: str, email_type: str = None) -> Dict:
"""Generate personalized email using AI insights"""
logger.info(f"📧 Generating personalized email for {recipient_email}")
try:
# Extract key information
customer_id = user_analysis.get('customer_id')
lead_status = user_analysis.get('data', {}).get('lead_qualification', {}).get('lead_status', 'UNKNOWN')
personality_type = ai_insights.get('ai_personality_type', 'Valued Customer')
buying_motivation = ai_insights.get('buying_motivation', 'finding the perfect property')
urgency_level = ai_insights.get('urgency_level', 'Medium')
# Generate personalized content
email_content = self.create_email_content(
customer_id, lead_status, personality_type, buying_motivation,
urgency_level, recommendations, email_type
)
# For preview mode, don't actually send the email
if recipient_email == 'preview@example.com':
return {
'success': True,
'recipient': recipient_email,
'subject': email_content['subject'],
'html_content': email_content['html_content'],
'text_content': email_content['text_content'],
'recommendations_count': len(recommendations),
'personalization': {
'personality_type': personality_type,
'buying_motivation': buying_motivation,
'urgency_level': urgency_level
}
}
# Send email for real recipients
success = self.send_email(recipient_email, email_content)
return {
'success': success,
'recipient': recipient_email,
'subject': email_content['subject'],
'html_content': email_content['html_content'],
'text_content': email_content['text_content'],
'recommendations_count': len(recommendations),
'personalization': {
'personality_type': personality_type,
'buying_motivation': buying_motivation,
'urgency_level': urgency_level
}
}
except Exception as e:
logger.error(f"❌ Error generating email: {e}")
return {'success': False, 'error': str(e)}
def create_email_content(self, customer_id: int, lead_status: str, personality_type: str,
buying_motivation: str, urgency_level: str, recommendations: List[Dict], email_type: str = None) -> Dict:
"""Create personalized email content based on AI insights"""
# Personalized greeting
greeting = self.get_personalized_greeting(personality_type, lead_status)
# Subject line based on urgency, personality, and email type
subject = self.generate_subject_line(personality_type, urgency_level, len(recommendations), email_type)
# Prepare recommendations for email - ENSURE MINIMUM 10 PROPERTIES
recommendations_to_show = recommendations[:10] if len(recommendations) >= 10 else recommendations
logger.info(f"📧 Including {len(recommendations_to_show)} properties in email (target: 10 minimum)")
# If we have less than 10, log a warning but still proceed
if len(recommendations_to_show) < 10:
logger.warning(f"⚠️ Only {len(recommendations_to_show)} properties available for email (target was 10)")
# Main content
html_content = f"""
{greeting}
🤖 AI Analysis Insights
Your Profile: {personality_type}
Buying Motivation: {buying_motivation}
Current Status: {lead_status} Lead
Based on your recent property viewing activity and our AI analysis, we've curated these perfect matches for you:
🎯 {len(recommendations_to_show)} Personalized Property Recommendations
Handpicked by our AI based on your preferences and behavior
🏠 Your Property Matches
"""
# Add property recommendations - already defined above
for i, prop in enumerate(recommendations_to_show, 1):
# Try multiple price field names
price = prop.get('price', 0) or prop.get('marketValue', 0) or prop.get('amount', 0)
logger.info(f"Property {i} price fields - price: {prop.get('price')}, marketValue: {prop.get('marketValue')}, amount: {prop.get('amount')}")
if price == 0:
# If still no price, try to extract from other fields
price_text = prop.get('priceText', '') or prop.get('priceDisplay', '')
if price_text:
# Try to extract numeric value from price text
import re
price_match = re.findall(r'[\d,]+', str(price_text).replace(',', ''))
if price_match:
try:
price = int(price_match[0])
except:
price = 0
# Extract comprehensive property details
name = prop.get('propertyName', '') or prop.get('name', '') or prop.get('title', '') or f'Premium Property {i}'
prop_type = prop.get('propertyTypeName', '') or prop.get('typeName', '') or prop.get('type', '') or 'Residential'
description = prop.get('description', '') or prop.get('desc', '') or 'Excellent property opportunity with modern amenities and prime location.'
location = prop.get('address', '') or prop.get('location', '') or prop.get('area', '') or 'Prime Location'
# Property specifications
beds = prop.get('beds', 0) or prop.get('bedrooms', 0) or prop.get('bhk', 0)
baths = prop.get('baths', 0) or prop.get('bathrooms', 0) or prop.get('bath', 0)
sqft = prop.get('totalSquareFeet', 0) or prop.get('area', 0) or prop.get('sqft', 0) or prop.get('size', 0)
# Features
features = prop.get('features', []) or prop.get('amenities', []) or []
if isinstance(features, str):
features = [f.strip() for f in features.split(',') if f.strip()]
elif not features:
# Default features based on property type
if 'villa' in prop_type.lower():
features = ['Parking', 'Garden', 'Security', 'Swimming Pool']
elif 'flat' in prop_type.lower() or 'apartment' in prop_type.lower():
features = ['Lift', 'Security', 'Parking', 'Gym']
else:
features = ['Parking', 'Security', 'CCTV', 'Power Backup']
# Create property link
property_id = prop.get('id', '') or prop.get('propertyId', '') or f'prop_{i}'
property_link = f"https://your-property-website.com/property/{property_id}"
reason = prop.get('recommendation_reason', 'Recommended based on your viewing patterns and preferences')
# Format price display
if price > 0:
price_display = f"₹{price:,.0f}"
else:
price_display = "Price on Request"
# Format specifications
specs_html = ""
if beds > 0:
specs_html += f"
{beds} Bed{'s' if beds > 1 else ''}"
if baths > 0:
specs_html += f"
{baths} Bath{'s' if baths > 1 else ''}"
if sqft > 0:
specs_html += f"
{sqft:,.0f} sq ft"
# Format features
features_html = ""
if features:
top_features = features[:4] # Show top 4 features
features_html = "".join([f"
{feature} " for feature in top_features])
if len(features) > 4:
features_html += f"
+{len(features) - 4} more "
html_content += f"""
{prop_type}
{location}
{f'
{specs_html}
' if specs_html else ''}
{f'
{features_html}
' if features_html else ''}
AI Recommendation: {reason}
"""
# Add call-to-action based on urgency
cta_text = self.get_cta_text(urgency_level)
html_content += f"""
💡 AI Recommendation
{self.get_ai_recommendation_text(personality_type, urgency_level)}
Our AI system analyzed your viewing patterns, preferences, and behavior to find these properties. Each recommendation is personally curated for your specific needs.
Questions? Reply to this email or call us directly. We're here to help you find your perfect property!
"""
return {
'subject': subject,
'html_content': html_content,
'text_content': self.html_to_text(html_content)
}
def get_personalized_greeting(self, personality_type: str, lead_status: str) -> str:
"""Generate personalized greeting"""
if personality_type == "Decisive Decision Maker":
return "Hello, Decisive Property Investor!"
elif personality_type == "Research-Oriented Buyer":
return "Dear Informed Property Seeker,"
elif personality_type == "Cautious Evaluator":
return "Hello, Thoughtful Property Explorer,"
else:
return "Dear Valued Property Browser,"
def generate_subject_line(self, personality_type: str, urgency_level: str, count: int, email_type: str = None) -> str:
"""Generate compelling subject line"""
# Use email type specific subjects if available
if email_type:
type_subjects = {
'property_based': f"🏠 {count} Property-Based Recommendations Just for You",
'price_based': f"💰 {count} Price-Based Property Matches Within Your Budget",
'location_based': f"📍 {count} Location-Based Properties in Your Preferred Areas",
'similarity_based': f"🔍 {count} Properties Similar to Your Viewed Listings",
'behavioral_based': f"🧠 {count} Behavioral-Based Property Recommendations",
'premium_properties': f"⭐ {count} Premium Luxury Properties Just for You",
'budget_friendly': f"💵 {count} Budget-Friendly Value Properties",
'trending_properties': f"📈 {count} Trending Properties in Today's Market",
'family_oriented': f"👨👩👧👦 {count} Family-Oriented Properties with Great Amenities",
'investment_opportunities': f"💼 {count} Investment Properties with High ROI Potential"
}
if email_type in type_subjects:
return type_subjects[email_type]
# Fallback to original logic
if urgency_level == "High":
return f"🔥 {count} Hot Properties - Perfect Match for You!"
elif personality_type == "Decisive Decision Maker":
return f"⚡ {count} Premium Properties - Ready for Immediate Purchase"
elif personality_type == "Research-Oriented Buyer":
return f"📊 {count} Carefully Selected Properties + Detailed Analysis"
else:
return f"🏠 {count} Personalized Property Recommendations Just for You"
def get_cta_text(self, urgency_level: str) -> str:
"""Get call-to-action text based on urgency"""
if urgency_level == "High":
return "🔥 View Properties Now - Limited Time!"
elif urgency_level == "Medium":
return "📞 Schedule Viewing Today"
else:
return "💭 Explore These Options"
def get_ai_recommendation_text(self, personality_type: str, urgency_level: str) -> str:
"""Get AI recommendation text"""
if personality_type == "Decisive Decision Maker":
return "Our AI suggests focusing on 1-2 properties for immediate action. Your profile indicates you prefer quick decisions with clear value propositions."
elif personality_type == "Research-Oriented Buyer":
return "Our AI recommends comparing these properties in detail. We can provide additional market analysis and property reports to support your research."
else:
return "Our AI suggests exploring these options at your own pace. Each property offers unique value that aligns with your viewing patterns."
def html_to_text(self, html_content: str) -> str:
"""Convert HTML to plain text"""
# Simple HTML to text conversion
import re
text = re.sub('<[^<]+?>', '', html_content)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def send_email(self, recipient_email: str, email_content: Dict) -> bool:
"""Send personalized email with SendGrid support"""
try:
logger.info(f"📧 Attempting to send real email to {recipient_email}")
# Real email sending - always attempt SendGrid
import requests
# SendGrid API endpoint
url = "https://api.sendgrid.com/v3/mail/send"
# Prepare email data
email_data = {
"personalizations": [
{
"to": [
{
"email": recipient_email
}
]
}
],
"from": {
"email": self.email_config['sender_email']
},
"subject": email_content['subject'],
"content": [
{
"type": "text/plain",
"value": email_content['text_content']
},
{
"type": "text/html",
"value": email_content['html_content']
}
]
}
# Send email with SendGrid API
headers = {
"Authorization": f"Bearer {self.email_config['sendgrid_api_key']}",
"Content-Type": "application/json"
}
response = requests.post(url, json=email_data, headers=headers)
if response.status_code == 202:
logger.info(f"✅ SendGrid email sent successfully to {recipient_email}")
return True
elif (response.status_code == 403 and "messaging limits" in response.text) or \
(response.status_code == 401 and "Maximum credits exceeded" in response.text):
logger.warning(f"⚠️ SendGrid limits/credits exceeded - trying SMTP fallback...")
# Try SMTP fallback first
if self._send_via_smtp(recipient_email, email_content):
logger.info(f"✅ SMTP fallback successful for {recipient_email}")
return True
else:
# Save email to file if SMTP also fails
logger.info("📁 Both SendGrid and SMTP failed - saving email to file...")
self._save_email_to_file(recipient_email, email_content)
return True # Return True since email was "processed" (saved to file)
else:
logger.error(f"❌ SendGrid API error: {response.status_code} - {response.text}")
# Save email to file for manual sending
logger.info("📁 Saving email to file for manual sending...")
# Try SMTP fallback
if self._send_via_smtp(recipient_email, email_content):
logger.info(f"✅ SMTP fallback successful for {recipient_email}")
return True
else:
# Save to file as last resort
self._save_email_to_file(recipient_email, email_content)
return False
except Exception as e:
logger.error(f"❌ Failed to send SendGrid email to {recipient_email}: {e}")
# Save email to file for manual sending instead of using mock service
logger.info("📁 Saving email to file for manual sending...")
try:
self._save_email_to_file(recipient_email, email_content)
except Exception as save_error:
logger.error(f"❌ Failed to save email to file: {save_error}")
# Return True anyway since email generation was successful
return True
return False
def _send_mock_email(self, recipient_email: str, email_content: Dict) -> bool:
"""Mock email service for environments without network access"""
try:
logger.info(f"📧 Mock email service - Logging email content for {recipient_email}")
# Log email details
logger.info(f"📧 Email Subject: {email_content['subject']}")
logger.info(f"📧 Email From: {self.email_config['sender_email']}")
logger.info(f"📧 Email To: {recipient_email}")
# Log a preview of the email content
text_preview = email_content['text_content'][:500] + "..." if len(email_content['text_content']) > 500 else email_content['text_content']
logger.info(f"📧 Email Text Preview: {text_preview}")
# Log the number of properties in the email
if 'html_content' in email_content:
property_count = email_content['html_content'].count('property-card')
logger.info(f"📧 Email contains {property_count} property recommendations")
# Save email content to a file for manual sending
self._save_email_to_file(recipient_email, email_content)
logger.info(f"✅ Mock email logged successfully for {recipient_email}")
logger.info(f"📧 Email would be sent with {len(email_content.get('html_content', ''))} characters of HTML content")
return True
except Exception as e:
logger.error(f"❌ Failed to log mock email for {recipient_email}: {e}")
return False
def _save_email_to_file(self, recipient_email: str, email_content: Dict):
"""Save email content to a file for manual sending"""
try:
import os
from datetime import datetime
# Create emails directory if it doesn't exist - use current directory instead of /tmp
emails_dir = "./saved_emails"
os.makedirs(emails_dir, exist_ok=True)
# Create filename with timestamp and email type info
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Extract email type from subject if possible
subject = email_content.get('subject', 'email')
email_type = 'unknown'
if 'Property-Based' in subject or 'property type' in subject.lower():
email_type = 'property_based'
elif 'Price-Based' in subject or 'budget' in subject.lower():
email_type = 'price_based'
elif 'Location-Based' in subject or 'location' in subject.lower():
email_type = 'location_based'
elif 'Premium' in subject or 'luxury' in subject.lower():
email_type = 'premium'
elif 'Budget' in subject or 'affordable' in subject.lower():
email_type = 'budget_friendly'
elif 'Family' in subject or 'family' in subject.lower():
email_type = 'family_oriented'
elif 'Investment' in subject or 'investment' in subject.lower():
email_type = 'investment'
elif 'Trending' in subject or 'trending' in subject.lower():
email_type = 'trending'
filename = f"{emails_dir}/{email_type}_{timestamp}.html"
# Create complete HTML email with better styling
full_html = f"""
{email_content['subject']}
📧 EMAIL SAVED DUE TO SENDGRID LIMITS - READY FOR MANUAL REVIEW
{email_content['html_content']}
"""
# Save to file
with open(filename, 'w', encoding='utf-8') as f:
f.write(full_html)
logger.info(f"📁 Email saved to file: {filename}")
logger.info(f"💡 Open this file in your browser to view the beautiful email content")
logger.info(f"🎯 Email type: {email_type.replace('_', ' ').title()}")
except Exception as e:
logger.error(f"❌ Failed to save email to file: {e}")
def _send_via_smtp(self, recipient_email: str, email_content: Dict) -> bool:
"""Send email via Gmail SMTP with authentication"""
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Gmail SMTP configuration
smtp_server = 'smtp.gmail.com'
smtp_port = 587
smtp_username = 'sameermujahid7777@gmail.com'
smtp_password = 'wpgn fmut nbkt mdvw' # App password
sender_email = 'shaiksameermujahid@gmail.com'
# Extract email components
subject = email_content.get('subject', 'Property Recommendations')
html_content = email_content.get('html_content', '')
text_content = email_content.get('text_content', '')
# Create message
msg = MIMEMultipart()
msg['From'] = sender_email
msg['To'] = recipient_email
msg['Subject'] = subject
# Attach HTML content
msg.attach(MIMEText(html_content, 'html'))
# Attach text content if provided
if text_content:
msg.attach(MIMEText(text_content, 'plain'))
# Send email via Gmail SMTP
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls() # Enable TLS
server.login(smtp_username, smtp_password)
# Send email
text = msg.as_string()
server.sendmail(sender_email, recipient_email, text)
server.quit()
logger.info(f"✅ Gmail SMTP email sent successfully to {recipient_email}")
return True
except Exception as e:
logger.error(f"❌ Gmail SMTP error: {e}")
return False
def process_customer_with_ai(self, customer_id: int, recipient_email: str, analysis_data: Dict) -> Dict:
"""Complete AI processing pipeline for a customer"""
logger.info(f"🤖 Starting AI processing for customer {customer_id}")
try:
# Step 1: AI behavior analysis
ai_insights = self.analyze_user_behavior_with_ai(analysis_data)
# Step 2: Find similar properties using AI
recommendations = self.find_similar_properties_ai(analysis_data, ai_insights)
# Step 3: Generate and send personalized email
email_result = self.generate_personalized_email(
analysis_data, ai_insights, recommendations, recipient_email
)
# Return comprehensive results
return {
'success': True,
'customer_id': customer_id,
'ai_insights': ai_insights,
'recommendations': recommendations,
'email_result': email_result,
'processing_time': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"❌ AI processing failed for customer {customer_id}: {e}")
return {
'success': False,
'customer_id': customer_id,
'error': str(e)
}
def fetch_properties_page(self, page: int, max_retries: int = 3) -> List[Dict]:
"""Fetch a single page of properties with retry logic"""
for attempt in range(max_retries):
try:
# Use the correct API endpoint with proper parameters
url = f"{self.properties_api}"
params = {
'pageNumber': page,
'pageSize': 100
}
response = requests.get(url, params=params, verify=False, timeout=None)
if response.status_code == 200:
data = response.json()
# Handle different response structures
if isinstance(data, list):
properties = data
elif isinstance(data, dict):
if 'data' in data:
properties = data['data']
elif 'properties' in data:
properties = data['properties']
elif 'results' in data:
properties = data['results']
else:
properties = list(data.values()) if data else []
else:
properties = []
if properties:
logger.info(f"✅ Page {page}: {len(properties)} properties")
return properties
else:
logger.info(f"📄 Page {page} is empty - reached end")
return []
else:
logger.warning(f"⚠️ Page {page} returned status {response.status_code}")
if attempt < max_retries - 1:
time.sleep(2) # Wait before retry
continue
return []
except Exception as e:
logger.error(f"❌ Error fetching page {page}, attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
time.sleep(2) # Wait before retry
continue
return []
return []
def fetch_all_properties_parallel(self, max_workers: int = 50, page_size: int = 600, max_pages: int = 20) -> bool:
"""ULTRA-ENHANCED parallel property fetch optimized for maximum speed and 600+ properties"""
logger.info(f"🚀 Starting ULTRA-ENHANCED parallel property fetch: {max_workers} workers, {page_size} per page, max {max_pages} pages")
# Validate parameters
if page_size <= 0:
logger.error("❌ Invalid page_size: must be greater than 0")
return False
if max_workers <= 0:
logger.error("❌ Invalid max_workers: must be greater than 0")
return False
if max_pages <= 0:
logger.error("❌ Invalid max_pages: must be greater than 0")
return False
try:
# Enhanced total count detection with multiple fallbacks
total_count = self._get_total_property_count()
# Calculate optimal pages needed
if total_count > 0:
estimated_pages = (total_count // page_size) + 1
max_pages = min(estimated_pages, max_pages)
logger.info(f"📄 Optimized pages needed: {estimated_pages}, using max: {max_pages}")
else:
logger.info(f"📄 Using default max_pages: {max_pages}")
# Enhanced parallel fetching with better error handling and progress tracking
all_properties = []
failed_pages = []
# ULTRA-ENHANCED parallel fetching with maximum concurrency
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all page fetch tasks simultaneously for maximum speed
futures = {}
for page in range(1, max_pages + 1):
future = executor.submit(self._fetch_property_page_enhanced, page, page_size)
futures[future] = page # Use future as key, page as value
logger.info(f"🚀 Submitted {len(futures)} parallel fetch tasks with {max_workers} workers for ultra-fast processing")
# Collect results as they complete with enhanced progress tracking
completed_pages = 0
for future in concurrent.futures.as_completed(futures.keys()):
page = futures[future] # Get the page number from the future
try:
properties = future.result() # No timeout - let it take as long as needed
completed_pages += 1
if properties:
all_properties.extend(properties)
progress = (completed_pages / max_pages) * 100
logger.info(f"✅ Page {page}: {len(properties)} properties fetched (Total: {len(all_properties)}, Progress: {progress:.1f}%)")
else:
progress = (completed_pages / max_pages) * 100
logger.info(f"📄 Page {page}: No properties found (Progress: {progress:.1f}%)")
if page > 1: # Don't stop on first page
break
except Exception as e:
completed_pages += 1
progress = (completed_pages / max_pages) * 100
logger.error(f"❌ Error fetching page {page}: {e} (Progress: {progress:.1f}%)")
failed_pages.append(page)
continue
# Retry failed pages with exponential backoff
if failed_pages:
logger.info(f"🔄 Retrying {len(failed_pages)} failed pages...")
retry_properties = self._retry_failed_pages(failed_pages, page_size, max_workers)
all_properties.extend(retry_properties)
logger.info(f"📦 Total properties fetched: {len(all_properties)} (Failed pages: {len(failed_pages)})")
# Enhanced ChromaDB storage with parallel processing
if all_properties:
success = self._store_properties_in_chromadb_parallel(all_properties)
if success:
logger.info(f"💾 Successfully stored {len(all_properties)} properties in ChromaDB")
# Schedule next update in 24 hours
self._schedule_property_update()
return True
else:
logger.error("❌ Failed to store properties in ChromaDB")
return False
else:
logger.warning("⚠️ No properties fetched")
return False
except Exception as e:
logger.error(f"❌ Error in enhanced parallel property fetch: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return False
def _get_total_property_count(self) -> int:
"""Enhanced method to get total property count with multiple fallbacks"""
try:
# Try multiple endpoints and response structures
endpoints = [
f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails",
f"{self.config['backend_url']}/api/Property/count",
f"{self.config['backend_url']}/api/Property/total"
]
for endpoint in endpoints:
try:
response = requests.get(
endpoint,
params={'pageNumber': 1, 'pageSize': 1},
timeout=30,
headers=self.config.get('headers', {})
)
if response.status_code == 200:
data = response.json()
# Try multiple possible field names for total count
count_fields = ['totalCount', 'total', 'count', 'totalRecords', 'recordCount']
for field in count_fields:
if isinstance(data, dict) and field in data:
total_count = data.get(field, 0)
logger.info(f"📊 Total properties found via {endpoint}: {total_count}")
return total_count
# If no count field found, estimate from first page
if isinstance(data, list) and len(data) > 0:
estimated_count = len(data) * 50 # Conservative estimate
logger.info(f"📊 Estimated total properties from {endpoint}: {estimated_count}")
return estimated_count
except Exception as e:
logger.debug(f"⚠️ Failed to get count from {endpoint}: {e}")
continue
logger.warning("⚠️ Could not determine total property count, using default")
return 0
except Exception as e:
logger.error(f"❌ Error getting total property count: {e}")
return 0
def _fetch_property_page_enhanced(self, page: int, page_size: int) -> List[Dict]:
"""Enhanced property page fetching with better error handling and retry logic"""
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
logger.info(f"📄 Fetching page {page} with {page_size} properties (attempt {attempt + 1}/{max_retries})...")
# Log the exact URL and parameters being sent
url = f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails"
params = {'pageNumber': page, 'pageSize': page_size}
logger.info(f"🌐 Making request to: {url}")
logger.info(f"📋 Parameters: {params}")
response = requests.get(
url,
params=params,
headers=self.config.get('headers', {})
)
if response.status_code == 200:
data = response.json()
# Log the raw response structure
logger.info(f"📥 Raw response type: {type(data)}")
if isinstance(data, dict):
logger.info(f"📥 Response keys: {list(data.keys())}")
# Handle different response structures
if isinstance(data, list):
properties = data
logger.info(f"📥 Direct list response with {len(properties)} properties")
elif isinstance(data, dict):
# Try different possible field names
for field in ['data', 'properties', 'items', 'results']:
if field in data and isinstance(data[field], list):
properties = data[field]
logger.info(f"📥 Found properties in '{field}' field: {len(properties)} properties")
break
else:
properties = []
logger.warning(f"⚠️ No properties found in response data")
else:
properties = []
logger.warning(f"⚠️ Unexpected response type: {type(data)}")
# Clean and validate properties
cleaned_properties = []
for prop in properties:
if isinstance(prop, dict) and (prop.get('id') or prop.get('propertyId')):
cleaned_prop = self._clean_property_data(prop)
cleaned_properties.append(cleaned_prop)
logger.info(f"✅ Page {page}: {len(cleaned_properties)} valid properties fetched (requested: {page_size})")
return cleaned_properties
elif response.status_code == 404:
logger.info(f"📄 Page {page}: No more data (404)")
return []
else:
logger.warning(f"⚠️ Page {page}: HTTP {response.status_code}")
except requests.exceptions.Timeout:
logger.warning(f"⏰ Page {page}: Timeout on attempt {attempt + 1}")
if attempt < max_retries - 1:
time.sleep(retry_delay * (2 ** attempt)) # Exponential backoff
continue
except Exception as e:
logger.error(f"❌ Page {page}: Error on attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay * (2 ** attempt))
continue
logger.error(f"❌ Page {page}: All attempts failed")
return []
def _fetch_property_page(self, page: int, page_size: int) -> List[Dict]:
"""Legacy method - now calls enhanced version"""
return self._fetch_property_page_enhanced(page, page_size)
def calculate_ai_property_score(self, property_data: Dict, viewed_properties: List[Dict],
preferences: Dict, strategy: str) -> float:
"""Calculate AI-based property score for recommendations"""
score = 0.0
try:
# Base score
score = 50.0
# Property type preference
prop_type = property_data.get('propertyTypeName', '').lower()
preferred_types_raw = preferences.get('preferred_types', [])
# Handle both strings and tuples in preferred_types
preferred_types = []
for item in preferred_types_raw:
if isinstance(item, tuple):
preferred_types.append(str(item[0]).lower() if item else '')
elif isinstance(item, str):
preferred_types.append(item.lower())
else:
preferred_types.append(str(item).lower())
if prop_type in preferred_types:
score += 20
# Price range matching
prop_price = float(property_data.get('price', 0) or 0)
price_range = preferences.get('price_range', {})
min_price = price_range.get('min', 0)
max_price = price_range.get('max', float('inf'))
if min_price <= prop_price <= max_price:
score += 15
elif prop_price < min_price:
score -= 10 # Too cheap might be suspicious
else:
score -= 20 # Too expensive
# Location and amenities
features = property_data.get('features', [])
if isinstance(features, str):
import json
try:
features = json.loads(features)
except:
features = []
preferred_features = preferences.get('preferred_features', [])
feature_matches = sum(1 for f in preferred_features if f.lower() in [feat.lower() for feat in features])
score += feature_matches * 3
# Property size scoring
sqft = float(property_data.get('sqft', 0) or 0)
beds = int(property_data.get('beds', 0) or 0)
baths = int(property_data.get('baths', 0) or 0)
# Reasonable size scoring
if 500 <= sqft <= 5000:
score += 10
if 1 <= beds <= 6:
score += 5
if 1 <= baths <= 4:
score += 5
# Strategy-based scoring
if strategy == 'motivated':
score += 10 # Boost high-quality matches
elif strategy == 'casual':
score += 5 # Moderate boost
# Similarity to viewed properties
if viewed_properties:
similarity_score = self.calculate_property_similarity(property_data, viewed_properties)
score += similarity_score * 15
# Ensure score is in reasonable range
score = max(0, min(100, score))
except Exception as e:
logger.error(f"❌ Error calculating AI score: {e}")
score = 50.0 # Default score
return score
def calculate_property_similarity(self, property_data: Dict, viewed_properties: List[Dict]) -> float:
"""Calculate similarity between a property and user's viewed properties"""
if not viewed_properties:
return 0.0
similarities = []
for viewed_prop in viewed_properties:
similarity = 0.0
# Type similarity
if (property_data.get('propertyTypeName', '').lower() ==
viewed_prop.get('propertyTypeName', '').lower()):
similarity += 0.3
# Price similarity (within 50% range)
prop_price = float(property_data.get('price', 0) or 0)
viewed_price = float(viewed_prop.get('price', 0) or viewed_prop.get('marketValue', 0) or 0)
if prop_price > 0 and viewed_price > 0:
price_ratio = min(prop_price, viewed_price) / max(prop_price, viewed_price)
if price_ratio > 0.5: # Within 50% price range
similarity += 0.4 * price_ratio
# Size similarity
prop_sqft = float(property_data.get('sqft', 0) or 0)
viewed_sqft = float(viewed_prop.get('totalSquareFeet', 0) or 0)
if prop_sqft > 0 and viewed_sqft > 0:
size_ratio = min(prop_sqft, viewed_sqft) / max(prop_sqft, viewed_sqft)
if size_ratio > 0.7: # Within 30% size range
similarity += 0.3 * size_ratio
similarities.append(similarity)
return max(similarities) if similarities else 0.0
def generate_match_reasons(self, property_data: Dict, preferences: Dict) -> List[str]:
"""Generate AI explanations for why this property matches user preferences"""
reasons = []
try:
# Property type match
prop_type = property_data.get('propertyTypeName', '')
preferred_types_raw = preferences.get('preferred_types', [])
# Handle both strings and tuples in preferred_types
preferred_types = []
for item in preferred_types_raw:
if isinstance(item, tuple):
preferred_types.append(str(item[0]) if item else '')
elif isinstance(item, str):
preferred_types.append(item)
else:
preferred_types.append(str(item))
if prop_type in preferred_types:
reasons.append(f"Matches your preferred property type: {prop_type}")
# Price range match
prop_price = float(property_data.get('price', 0) or 0)
price_range = preferences.get('price_range', {})
min_price = price_range.get('min', 0)
max_price = price_range.get('max', float('inf'))
if min_price <= prop_price <= max_price:
reasons.append(f"Within your budget range: ₹{prop_price:,.0f}")
# Feature matches
features = property_data.get('features', [])
if isinstance(features, str):
import json
try:
features = json.loads(features)
except:
features = []
preferred_features = preferences.get('preferred_features', [])
matching_features = [f for f in preferred_features if f.lower() in [feat.lower() for feat in features]]
if matching_features:
reasons.append(f"Has your preferred amenities: {', '.join(matching_features[:3])}")
# Size appropriateness
beds = int(property_data.get('beds', 0) or 0)
if beds > 0:
reasons.append(f"Suitable size with {beds} bedroom{'s' if beds != 1 else ''}")
# Default reasons if none found
if not reasons:
reasons = [
"Good property in desirable location",
"Competitive pricing for the area",
"Modern amenities and facilities"
]
except Exception as e:
logger.error(f"❌ Error generating match reasons: {e}")
reasons = ["AI-recommended based on your preferences"]
return reasons[:3] # Limit to top 3 reasons
def search_properties_with_query(self, query: str, excluded_ids: set, max_results: int) -> List[Dict]:
"""Search properties with a specific query and exclusions"""
try:
if self.properties_collection is None:
return []
results = self.properties_collection.query(
query_texts=[query],
n_results=max_results * 2, # Get extra for filtering
include=["metadatas", "documents", "distances"]
)
properties = []
if results and results['metadatas']:
for metadata in results['metadatas'][0]:
prop_id = metadata.get('id', '')
if prop_id and prop_id not in excluded_ids:
properties.append(metadata)
if len(properties) >= max_results:
break
return properties
except Exception as e:
logger.error(f"❌ Error searching with query '{query}': {e}")
return []
def test_sendgrid_email(self, recipient_email: str = None) -> Dict:
"""Test SendGrid email functionality"""
if not recipient_email:
recipient_email = self.email_config['sender_email'] # Send to self for testing
logger.info(f"🧪 Testing SendGrid email to {recipient_email}")
try:
import requests
# First, validate the API key
validation_result = self.validate_sendgrid_api_key()
if not validation_result['valid']:
return {
'success': False,
'error': f"SendGrid API key validation failed: {validation_result['error']}",
'recipient': recipient_email,
'timestamp': datetime.now().isoformat()
}
# SendGrid API endpoint
url = "https://api.sendgrid.com/v3/mail/send"
# Simple test email
email_data = {
"personalizations": [
{
"to": [
{
"email": recipient_email
}
]
}
],
"from": {
"email": self.email_config['sender_email']
},
"subject": "🧪 SendGrid Test Email - AI Lead Analysis System",
"content": [
{
"type": "text/plain",
"value": f"SendGrid Test Email - AI Lead Analysis System\n\nTest successful at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
},
{
"type": "text/html",
"value": f"""
🧪 SendGrid Test Email
This is a test email to verify SendGrid integration is working correctly.
Test Details:
✅ SendGrid API: Connected
✅ Authentication: Successful
✅ Sender: {self.email_config['sender_email']}
✅ Recipient: {recipient_email}
✅ Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
🎉 SendGrid email system is working!
Sent by AI Lead Analysis System
"""
}
]
}
# Send email with SendGrid API
headers = {
"Authorization": f"Bearer {self.email_config['sendgrid_api_key']}",
"Content-Type": "application/json"
}
response = requests.post(url, json=email_data, headers=headers, timeout=30)
if response.status_code == 202:
logger.info(f"✅ SendGrid test email sent successfully to {recipient_email}")
return {
'success': True,
'recipient': recipient_email,
'subject': email_data['subject'],
'timestamp': datetime.now().isoformat(),
'message': 'SendGrid test email sent successfully'
}
else:
logger.error(f"❌ SendGrid API error: {response.status_code} - {response.text}")
return {
'success': False,
'error': f"SendGrid API error: {response.status_code} - {response.text}",
'recipient': recipient_email,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"❌ Failed to send SendGrid test email: {e}")
return {
'success': False,
'error': str(e),
'recipient': recipient_email,
'timestamp': datetime.now().isoformat()
}
def validate_sendgrid_api_key(self) -> Dict:
"""Validate SendGrid API key"""
try:
logger.info("🔑 Validating SendGrid API key...")
import requests
url = "https://api.sendgrid.com/v3/user/profile"
headers = {
"Authorization": f"Bearer {self.email_config['sendgrid_api_key']}",
"Content-Type": "application/json"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
profile_data = response.json()
logger.info("✅ SendGrid API key is valid")
return {
'success': True,
'valid': True,
'account_info': {
'email': profile_data.get('email', 'Unknown'),
'first_name': profile_data.get('first_name', 'Unknown'),
'last_name': profile_data.get('last_name', 'Unknown')
}
}
else:
logger.error(f"❌ SendGrid API key validation failed: {response.status_code}")
return {
'success': False,
'valid': False,
'error': f"API key validation failed: {response.status_code}"
}
except Exception as e:
logger.error(f"❌ Error validating SendGrid API key: {e}")
return {
'success': False,
'valid': False,
'error': str(e)
}
def analyze_automated_email_triggers(self, customer_analysis: Dict) -> Dict:
"""Analyze customer behavior and determine automated email triggers based on AI analysis"""
logger.info("🤖 Analyzing automated email triggers based on customer behavior...")
try:
# Extract customer data from the actual analysis structure
customer_data = customer_analysis.get('data', {})
analytics = customer_data.get('analytics', {})
lead_qual = customer_data.get('lead_qualification', {})
properties = customer_data.get('properties', [])
if not properties:
return {
'success': False,
'error': 'No property data available for analysis'
}
# Extract key metrics from the actual analysis
engagement_level = analytics.get('engagement_level', 'Low')
lead_score = lead_qual.get('lead_score', 0)
lead_status = lead_qual.get('lead_status', 'UNKNOWN')
# Get viewing patterns
viewing_patterns = analytics.get('viewing_patterns', {})
total_views = viewing_patterns.get('morning_views', 0) + viewing_patterns.get('afternoon_views', 0) + viewing_patterns.get('evening_views', 0)
# Get price preferences
price_prefs = analytics.get('price_preferences', {})
avg_price = price_prefs.get('avg_price', 0)
max_price = price_prefs.get('max_price', 0)
min_price = price_prefs.get('min_price', 0)
price_range = price_prefs.get('price_range', 0)
# Get property type preferences
preferred_types = analytics.get('preferred_property_types', [])
# Get lead timeline for inactivity analysis
lead_timeline = analytics.get('lead_timeline', [])
days_since_last_view = 0
if lead_timeline:
from datetime import datetime
last_view_date = lead_timeline[0].get('date', '')
if last_view_date:
try:
last_view = datetime.fromisoformat(last_view_date.replace('Z', '+00:00'))
days_since_last_view = (datetime.now() - last_view).days
except:
days_since_last_view = 14 # Default fallback
# Analyze different trigger conditions based on actual data
triggers = []
# 1. Inactivity Alert Trigger (High Priority)
if days_since_last_view > 7:
triggers.append({
'trigger_type': 'inactivity_alert',
'condition': f"Customer hasn't viewed properties in {days_since_last_view} days",
'reason': 'Customer may need re-engagement with fresh property listings',
'priority': 'high',
'recommendation_count': 5
})
# 2. Property Type Interest Trigger (Medium Priority)
if len(preferred_types) >= 3:
triggers.append({
'trigger_type': 'property_type_interest',
'condition': f"Customer interested in {len(preferred_types)} property types: {', '.join(preferred_types[:3])}",
'reason': 'Customer exploring multiple property categories, showing diverse interests',
'priority': 'medium',
'recommendation_count': 7
})
# 3. Price Range Diversity Trigger (Medium Priority)
if price_range > 10000000: # 1 crore range
triggers.append({
'trigger_type': 'price_range_diversity',
'condition': f"Customer exploring properties from ₹{min_price:,.0f} to ₹{max_price:,.0f}",
'reason': 'Customer showing interest across different price segments',
'priority': 'medium',
'recommendation_count': 6
})
# 4. Lead Score Milestone Trigger (High Priority)
if lead_score >= 40: # LUKEWARM or better
triggers.append({
'trigger_type': 'lead_score_milestone',
'condition': f"Customer has {lead_status.lower()} lead status with score {lead_score}/100",
'reason': 'Customer showing qualified interest, ready for targeted recommendations',
'priority': 'high',
'recommendation_count': 8
})
# 5. Viewing Pattern Trigger (Medium Priority)
if total_views >= 10:
triggers.append({
'trigger_type': 'viewing_pattern',
'condition': f"Customer has viewed {total_views} properties with peak time: {viewing_patterns.get('peak_viewing_time', 'unknown')}",
'reason': 'Customer actively researching properties, ready for curated recommendations',
'priority': 'medium',
'recommendation_count': 6
})
# 6. Engagement Level Trigger (Based on actual engagement score)
if engagement_level == 'Low' and lead_score < 50:
triggers.append({
'trigger_type': 'engagement_boost',
'condition': f"Customer has {engagement_level.lower()} engagement ({lead_score}/100 score)",
'reason': 'Customer needs engagement boost with compelling property options',
'priority': 'high',
'recommendation_count': 5
})
# 7. Conversion Probability Trigger (High Priority)
conversion_prob = analytics.get('conversion_probability', {})
final_prob = conversion_prob.get('final_probability', 0)
if final_prob >= 50:
triggers.append({
'trigger_type': 'high_conversion_potential',
'condition': f"Customer has {final_prob:.1f}% conversion probability",
'reason': 'High-value prospect needing premium property recommendations',
'priority': 'high',
'recommendation_count': 10
})
# Generate AI insights based on actual analysis
ai_insights = {
'personality_type': 'Explorer' if len(preferred_types) >= 3 else 'Focused',
'decision_making_style': 'Analytical' if total_views >= 10 else 'Casual',
'urgency_level': 'High' if days_since_last_view > 7 else 'Medium',
'budget_confidence': 'Flexible' if price_range > 10000000 else 'Specific',
'location_focus': 'Diverse' if len(preferred_types) >= 3 else 'Specific',
'engagement_trend': 'Declining' if days_since_last_view > 7 else 'Stable',
'conversion_readiness': 'High' if final_prob >= 50 else 'Medium'
}
return {
'success': True,
'triggers': triggers,
'total_triggers': len(triggers),
'ai_insights': ai_insights,
'analysis_summary': {
'engagement_level': engagement_level,
'lead_score': lead_score,
'lead_status': lead_status,
'total_properties_viewed': len(properties),
'total_views': total_views,
'average_price_range': f"₹{avg_price:,.0f}",
'price_range_diversity': f"₹{min_price:,.0f} - ₹{max_price:,.0f}",
'preferred_locations': len(preferred_types),
'days_since_last_view': days_since_last_view,
'conversion_probability': f"{final_prob:.1f}%",
'peak_viewing_time': viewing_patterns.get('peak_viewing_time', 'unknown')
}
}
except Exception as e:
logger.error(f"❌ Error analyzing automated email triggers: {e}")
return {
'success': False,
'error': str(e)
}
def generate_automated_email_content(self, trigger: Dict, customer_analysis: Dict, ai_insights: Dict) -> Dict:
"""Generate email content based on trigger type and actual customer analysis"""
logger.info(f"📝 Generating automated email content for trigger: {trigger['trigger_type']}")
try:
# Get customer data from actual analysis structure
customer_data = customer_analysis.get('data', {})
analytics = customer_data.get('analytics', {})
lead_qual = customer_data.get('lead_qualification', {})
properties = customer_data.get('properties', [])
# Generate personalized greeting
customer_name = f"Customer {customer_analysis.get('customer_id', 'Unknown')}"
greeting = f"Hello {customer_name},"
# Generate content based on trigger type and actual data
trigger_type = trigger['trigger_type']
if trigger_type == 'inactivity_alert':
days = analytics.get('lead_timeline', [{}])[0].get('date', '')
if days:
from datetime import datetime
try:
last_view = datetime.fromisoformat(days.replace('Z', '+00:00'))
days_ago = (datetime.now() - last_view).days
except:
days_ago = 13
else:
days_ago = 13
content = f"""
We noticed you haven't been browsing properties for {days_ago} days. The real estate market is constantly evolving with exciting new opportunities!
Based on your previous interest in {len(analytics.get('preferred_property_types', []))} different property types, we've curated some fresh listings that might reignite your search for the perfect property.
"""
subject = "🆕 Fresh Properties to Rekindle Your Search"
elif trigger_type == 'property_type_interest':
preferred_types = analytics.get('preferred_property_types', [])
content = f"""
Your interest in {', '.join(preferred_types[:3])} properties shows you have excellent taste and diverse preferences!
We've found some amazing properties across these categories that match your sophisticated requirements and might be perfect for your next investment.
"""
subject = "🏠 Diverse Property Options for You"
elif trigger_type == 'price_range_diversity':
price_prefs = analytics.get('price_preferences', {})
min_price = price_prefs.get('min_price', 0)
max_price = price_prefs.get('max_price', 0)
content = f"""
We see you're exploring properties from ₹{min_price:,.0f} to ₹{max_price:,.0f} - that's an impressive range showing you're considering various investment options!
Here are some outstanding properties across your preferred price segments that offer exceptional value and investment potential.
"""
subject = "💰 Properties Across Your Investment Range"
elif trigger_type == 'lead_score_milestone':
lead_score = lead_qual.get('lead_score', 0)
lead_status = lead_qual.get('lead_status', 'UNKNOWN')
content = f"""
Congratulations! Your lead score of {lead_score}/100 puts you in our {lead_status.lower()} category of prospects.
As a qualified customer, we're excited to share these exclusive properties that match your refined preferences and investment criteria.
"""
subject = "⭐ Exclusive Properties for Our Qualified Customer"
elif trigger_type == 'viewing_pattern':
viewing_patterns = analytics.get('viewing_patterns', {})
total_views = viewing_patterns.get('morning_views', 0) + viewing_patterns.get('afternoon_views', 0) + viewing_patterns.get('evening_views', 0)
peak_time = viewing_patterns.get('peak_viewing_time', 'your preferred time')
content = f"""
You've viewed {total_views} properties with peak activity during {peak_time} - that's impressive research! You clearly know what you're looking for.
Based on your extensive browsing patterns, here are some properties that align perfectly with your refined preferences and viewing behavior.
"""
subject = "🎯 Targeted Recommendations Based on Your Research"
elif trigger_type == 'engagement_boost':
engagement_level = analytics.get('engagement_level', 'Low')
lead_score = lead_qual.get('lead_score', 0)
content = f"""
We noticed your current engagement level is {engagement_level.lower()} with a lead score of {lead_score}/100.
To help you find your perfect property faster, we've curated some compelling options that might boost your interest and engagement with our platform.
"""
subject = "🚀 Boost Your Property Search with These Gems"
elif trigger_type == 'high_conversion_potential':
conversion_prob = analytics.get('conversion_probability', {})
final_prob = conversion_prob.get('final_probability', 0)
content = f"""
Excellent news! Our AI analysis shows you have a {final_prob:.1f}% conversion probability - you're a high-value prospect!
As a premium customer with strong buying potential, we're excited to share these exclusive properties that match your sophisticated requirements.
"""
subject = "💎 Premium Properties for High-Value Prospect"
else:
content = """
We've analyzed your property viewing behavior and found some excellent recommendations for you.
These properties are carefully selected based on your preferences, viewing patterns, and market trends.
"""
subject = "🏠 Personalized Property Recommendations"
# Get AI recommendations using the stored properties in ChromaDB
ai_engine = self.get_ai_engine() if hasattr(self, 'get_ai_engine') else None
if ai_engine and hasattr(ai_engine, 'properties_collection') and ai_engine.properties_collection:
# Use ChromaDB to find similar properties
recommendations = self._get_ai_recommendations_from_chromadb(
properties, analytics, trigger.get('recommendation_count', 5), trigger_type
)
else:
# Fallback to existing properties
recommendations = properties[:trigger.get('recommendation_count', 5)]
# Create email HTML
email_content = self.create_automated_email_html(
greeting, content, recommendations, trigger, customer_analysis
)
return {
'success': True,
'subject': subject,
'html_content': email_content['html'],
'text_content': email_content['text'],
'recommendations_count': len(recommendations),
'trigger_type': trigger_type,
'priority': trigger['priority']
}
except Exception as e:
logger.error(f"❌ Error generating automated email content: {e}")
return {
'success': False,
'error': str(e)
}
def _get_ai_recommendations_from_chromadb(self, viewed_properties: List[Dict], analytics: Dict, count: int, trigger_type: str = None) -> List[Dict]:
"""Get AI recommendations from ChromaDB based on customer analysis and trigger type"""
try:
if not hasattr(self, 'properties_collection') or not self.properties_collection:
return viewed_properties[:count]
# Create different search queries based on trigger type
preferred_types = analytics.get('preferred_property_types', [])
price_prefs = analytics.get('price_preferences', {})
avg_price = price_prefs.get('avg_price', 0)
min_price = price_prefs.get('min_price', 0)
max_price = price_prefs.get('max_price', 0)
# Different search strategies based on trigger type
if trigger_type == 'inactivity_alert':
# For inactive customers, show fresh/new properties
query_text = f"New and fresh properties in {', '.join(preferred_types[:2])} categories around ₹{avg_price:,.0f}"
count = min(count, 5) # Fewer properties for re-engagement
elif trigger_type == 'property_type_interest':
# For diverse interests, show variety across types
query_text = f"Diverse properties across {', '.join(preferred_types)} types with good investment potential"
count = min(count, 7) # More properties to show variety
elif trigger_type == 'price_range_diversity':
# For price diversity, show properties across different price ranges
query_text = f"Properties from ₹{min_price:,.0f} to ₹{max_price:,.0f} with excellent value"
count = min(count, 6)
elif trigger_type == 'lead_score_milestone':
# For qualified leads, show premium properties
query_text = f"Premium {', '.join(preferred_types[:2])} properties for qualified customers around ₹{avg_price:,.0f}"
count = min(count, 8)
elif trigger_type == 'viewing_pattern':
# For active researchers, show highly relevant properties
query_text = f"Highly relevant {', '.join(preferred_types)} properties matching viewing patterns"
count = min(count, 6)
elif trigger_type == 'engagement_boost':
# For low engagement, show compelling properties
query_text = f"Compelling and attractive properties in {', '.join(preferred_types[:2])} categories"
count = min(count, 5)
elif trigger_type == 'high_conversion_potential':
# For high conversion potential, show exclusive properties
query_text = f"Exclusive and premium properties for high-value prospects in {', '.join(preferred_types)} categories"
count = min(count, 10)
else:
# Default search
query_text = f"Properties similar to {', '.join(preferred_types)} types around ₹{avg_price:,.0f} price range"
logger.info(f"🔍 Searching ChromaDB with query: {query_text} for trigger: {trigger_type}")
# Search in ChromaDB
results = self.properties_collection.query(
query_texts=[query_text],
n_results=count * 2, # Get more to filter
include=["metadatas", "documents", "distances"]
)
if results and results['metadatas'] and results['metadatas'][0]:
recommendations = []
for i, metadata in enumerate(results['metadatas'][0]):
if len(recommendations) >= count:
break
# Safely extract metadata fields with fallbacks
try:
property_data = {
'id': metadata.get('id', f'prop_{i}'),
'propertyId': metadata.get('id', f'prop_{i}'),
'propertyName': metadata.get('propertyName', metadata.get('name', 'Property')),
'propertyTypeName': metadata.get('propertyTypeName', metadata.get('typeName', metadata.get('type', 'Property'))),
'price': float(metadata.get('price', 0) or metadata.get('marketValue', 0) or 0),
'marketValue': float(metadata.get('price', 0) or metadata.get('marketValue', 0) or 0),
'address': metadata.get('address', metadata.get('location', '')),
'location': metadata.get('address', metadata.get('location', '')),
'beds': int(metadata.get('beds', 0) or metadata.get('bedrooms', 0) or 0),
'baths': int(metadata.get('baths', 0) or metadata.get('bathrooms', 0) or 0),
'totalSquareFeet': float(metadata.get('sqft', 0) or metadata.get('area', 0) or 0),
'description': metadata.get('description', ''),
'features': self.safe_json_parse(metadata.get('features', '[]')),
'similarity_score': 1 - results['distances'][0][i] if results.get('distances') and results['distances'][0] else 0.5,
'chromadb_rank': i + 1
}
# Add any additional fields that might exist
for key, value in metadata.items():
if key not in property_data:
property_data[key] = value
recommendations.append(property_data)
except Exception as field_error:
logger.warning(f"⚠️ Error processing metadata field in _get_ai_recommendations_from_chromadb: {field_error}")
# Try to create a minimal property object
try:
minimal_property = {
'id': metadata.get('id', f'prop_{i}'),
'propertyId': metadata.get('id', f'prop_{i}'),
'propertyName': 'Property',
'propertyTypeName': 'Property',
'price': 0.0,
'marketValue': 0.0,
'address': '',
'location': '',
'beds': 0,
'baths': 0,
'totalSquareFeet': 0.0,
'description': '',
'features': [],
'similarity_score': 0.5,
'chromadb_rank': i + 1
}
recommendations.append(minimal_property)
except Exception as minimal_error:
logger.error(f"❌ Failed to create minimal property in _get_ai_recommendations_from_chromadb: {minimal_error}")
continue
logger.info(f"✅ Found {len(recommendations)} recommendations for trigger: {trigger_type}")
return recommendations
else:
logger.warning(f"⚠️ No results from ChromaDB for trigger: {trigger_type}")
return viewed_properties[:count]
except Exception as e:
logger.error(f"❌ Error in _get_ai_recommendations_from_chromadb: {e}")
return viewed_properties[:count]
def create_automated_email_html(self, greeting: str, content: str, recommendations: List[Dict],
trigger: Dict, customer_analysis: Dict) -> Dict:
"""Create HTML and text versions of automated email content"""
try:
logger.info("🎨 Creating automated email HTML content...")
# Create property cards
property_cards = ""
for i, prop in enumerate(recommendations[:5]): # Limit to 5 properties
property_cards += f"""
{prop.get('propertyName', f'Property {i+1}')}
📍 {prop.get('address', 'Address not available')}
💰 ₹{prop.get('price', 0):,.0f}
🏠 {prop.get('propertyTypeName', 'Property Type')}
⭐ AI Match Score: {prop.get('ai_score', 0):.1f}%
"""
# Create HTML content using string concatenation to avoid f-string backslash issues
html_content = """
Automated Property Recommendations
""" + greeting + """
""" + content + """
📊 Why You Received This Email:
Condition: """ + trigger['condition'] + """
Reason: """ + trigger['reason'] + """
Priority: """ + trigger['priority'].title() + """
🏠 Recommended Properties
""" + property_cards + """
"""
# Create text version using string concatenation
text_content = """
AI-Powered Property Recommendations
""" + greeting + """
""" + content.replace('', '').replace('
', '\n') + """
Why you received this email:
Condition: """ + trigger['condition'] + """
Reason: """ + trigger['reason'] + """
Priority: """ + trigger['priority'].title() + """
Recommended Properties:
"""
for i, prop in enumerate(recommendations[:5]): # Limit to 5 in text version
text_content += """
""" + str(i+1) + """. """ + prop.get('propertyName', f'Property {i+1}') + """
Address: """ + prop.get('address', 'Address not available') + """
Price: ₹""" + f"{prop.get('price', 0):,.0f}" + """
Type: """ + prop.get('propertyTypeName', 'Property Type') + """
AI Match Score: """ + f"{prop.get('ai_score', 0):.1f}" + """%
"""
text_content += """
Generated at: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """
"""
return {
'html': html_content,
'text': text_content
}
except Exception as e:
logger.error(f"❌ Error creating automated email HTML: {e}")
return {
'html': f"Error generating email content: {e}
",
'text': f"Error generating email content: {e}"
}
def _store_properties_in_chromadb(self, properties: List[Dict]) -> bool:
"""Store properties in ChromaDB for AI analysis"""
try:
if not self.properties_collection:
logger.error("❌ Properties collection not initialized")
return False
logger.info(f"💾 Storing {len(properties)} properties in ChromaDB...")
# Process properties in batches
batch_size = 100
total_stored = 0
for i in range(0, len(properties), batch_size):
batch = properties[i:i + batch_size]
# Prepare batch data for ChromaDB
documents = []
metadatas = []
ids = []
for prop in batch:
try:
# Create property description
description = self.create_property_description(prop)
# Clean metadata
metadata = self.clean_metadata_for_chromadb(prop)
# Generate unique ID
prop_id = str(uuid.uuid4())
documents.append(description)
metadatas.append(metadata)
ids.append(prop_id)
except Exception as e:
logger.warning(f"⚠️ Skipping property due to error: {e}")
continue
if documents:
try:
# Add batch to ChromaDB
self.properties_collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
total_stored += len(documents)
logger.info(f"✅ Stored batch {i//batch_size + 1}: {len(documents)} properties")
except Exception as e:
logger.error(f"❌ Error storing batch {i//batch_size + 1}: {e}")
continue
logger.info(f"🎉 Successfully stored {total_stored} properties in ChromaDB")
return total_stored > 0
except Exception as e:
logger.error(f"❌ Error storing properties in ChromaDB: {e}")
return False
def get_enhanced_ai_recommendations(self, customer_analysis: Dict, ai_insights: Dict, count: int = 10) -> List[Dict]:
"""Get enhanced AI recommendations using small AI models"""
try:
logger.info("🤖 Generating enhanced AI recommendations...")
# Extract customer preferences and behavior patterns
preferences = self.extract_customer_preferences(customer_analysis, ai_insights)
# Get viewed properties for similarity analysis
viewed_properties = customer_analysis.get('data', {}).get('properties', [])
# Generate multiple recommendation strategies
recommendations = []
# Strategy 1: Similarity-based recommendations
similarity_recs = self.get_similarity_based_recommendations(viewed_properties, preferences, count // 3)
recommendations.extend(similarity_recs)
# Strategy 2: Preference-based recommendations
preference_recs = self.get_preference_based_recommendations(preferences, viewed_properties, count // 3)
recommendations.extend(preference_recs)
# Strategy 3: Behavioral recommendations
behavioral_recs = self.get_behavioral_recommendations(customer_analysis, ai_insights, count // 3)
recommendations.extend(behavioral_recs)
# Remove duplicates and limit to requested count
unique_recommendations = self.remove_duplicate_recommendations(recommendations)
final_recommendations = unique_recommendations[:count]
# Enhance with AI explanations
enhanced_recommendations = self.add_ai_explanations(final_recommendations, preferences, ai_insights)
logger.info(f"✅ Generated {len(enhanced_recommendations)} enhanced AI recommendations")
return enhanced_recommendations
except Exception as e:
logger.error(f"❌ Error generating enhanced AI recommendations: {e}")
return self.get_fallback_recommendations(count)
def extract_customer_preferences(self, customer_analysis: Dict, ai_insights: Dict) -> Dict:
"""Extract detailed customer preferences using AI models"""
try:
preferences = {
'price_range': self.analyze_price_preferences(customer_analysis),
'property_types': self.analyze_property_type_preferences(customer_analysis),
'locations': self.analyze_location_preferences(customer_analysis),
'features': self.analyze_feature_preferences(customer_analysis),
'behavioral_patterns': self.analyze_behavioral_patterns(customer_analysis, ai_insights),
'engagement_level': self.analyze_engagement_level(customer_analysis),
'urgency_level': ai_insights.get('urgency_level', 'Medium'),
'personality_type': ai_insights.get('personality_type', 'Balanced')
}
# Use AI models to enhance preferences
if self.property_classifier:
preferences['property_classification'] = self.classify_property_preferences(preferences)
if self.price_predictor:
preferences['price_prediction'] = self.predict_price_preferences(preferences)
if self.preference_analyzer:
preferences['preference_weights'] = self.calculate_preference_weights(preferences)
return preferences
except Exception as e:
logger.error(f"❌ Error extracting customer preferences: {e}")
return {}
def analyze_price_preferences(self, customer_analysis: Dict) -> Dict:
"""Analyze price preferences using AI models"""
try:
properties = customer_analysis.get('data', {}).get('properties', [])
if not properties:
return {'preferred_range': 'mid_range', 'confidence': 0.5}
prices = [p.get('price', 0) for p in properties if p.get('price', 0) > 0]
if not prices:
return {'preferred_range': 'mid_range', 'confidence': 0.5}
avg_price = sum(prices) / len(prices)
min_price = min(prices)
max_price = max(prices)
price_variance = np.var(prices) if len(prices) > 1 else 0
# Use price predictor to classify
if self.price_predictor:
for range_name, (min_val, max_val) in self.price_predictor['price_ranges'].items():
if min_val <= avg_price <= max_val:
confidence = 1.0 - (price_variance / (avg_price ** 2)) if avg_price > 0 else 0.5
return {
'preferred_range': range_name,
'avg_price': avg_price,
'min_price': min_price,
'max_price': max_price,
'confidence': min(confidence, 1.0),
'price_consistency': 1.0 - (price_variance / (avg_price ** 2)) if avg_price > 0 else 0.5
}
# Fallback classification
if avg_price > 50000000:
range_name = 'luxury'
elif avg_price > 25000000:
range_name = 'premium'
elif avg_price > 10000000:
range_name = 'mid_range'
else:
range_name = 'affordable'
return {
'preferred_range': range_name,
'avg_price': avg_price,
'min_price': min_price,
'max_price': max_price,
'confidence': 0.7
}
except Exception as e:
logger.error(f"❌ Error analyzing price preferences: {e}")
return {'preferred_range': 'mid_range', 'confidence': 0.5}
def analyze_property_type_preferences(self, customer_analysis: Dict) -> Dict:
"""Analyze property type preferences using AI models"""
try:
properties = customer_analysis.get('data', {}).get('properties', [])
if not properties:
return {'preferred_types': [], 'confidence': 0.5}
type_counts = {}
type_engagement = {}
for prop in properties:
prop_type = prop.get('propertyTypeName', 'Unknown')
engagement = prop.get('viewCount', 0) * prop.get('totalDuration', 0)
if prop_type not in type_counts:
type_counts[prop_type] = 0
type_engagement[prop_type] = 0
type_counts[prop_type] += 1
type_engagement[prop_type] += engagement
# Calculate weighted preferences
weighted_preferences = []
for prop_type, count in type_counts.items():
avg_engagement = type_engagement[prop_type] / count
weight = self.property_classifier.get('property_type_weights', {}).get(prop_type, 1.0) if self.property_classifier else 1.0
weighted_score = count * avg_engagement * weight
weighted_preferences.append((prop_type, weighted_score))
# Sort by weighted score
weighted_preferences.sort(key=lambda x: x[1], reverse=True)
preferred_types = [pt[0] for pt in weighted_preferences[:3]]
confidence = min(len(preferred_types) / 3, 1.0)
return {
'preferred_types': preferred_types,
'type_counts': type_counts,
'type_engagement': type_engagement,
'confidence': confidence,
'diversity_score': len(type_counts) / max(len(properties), 1)
}
except Exception as e:
logger.error(f"❌ Error analyzing property type preferences: {e}")
return {'preferred_types': [], 'confidence': 0.5}
def analyze_location_preferences(self, customer_analysis: Dict) -> Dict:
"""Analyze location preferences using AI models"""
try:
properties = customer_analysis.get('data', {}).get('properties', [])
if not properties:
return {'preferred_locations': [], 'confidence': 0.5}
location_counts = {}
location_engagement = {}
for prop in properties:
location = prop.get('location', 'Unknown')
engagement = prop.get('viewCount', 0) * prop.get('totalDuration', 0)
if location not in location_counts:
location_counts[location] = 0
location_engagement[location] = 0
location_counts[location] += 1
location_engagement[location] += engagement
# Calculate location preferences
location_scores = []
for location, count in location_counts.items():
avg_engagement = location_engagement[location] / count
score = count * avg_engagement
location_scores.append((location, score))
location_scores.sort(key=lambda x: x[1], reverse=True)
preferred_locations = [loc[0] for loc in location_scores[:5]]
return {
'preferred_locations': preferred_locations,
'location_counts': location_counts,
'confidence': min(len(preferred_locations) / 5, 1.0)
}
except Exception as e:
logger.error(f"❌ Error analyzing location preferences: {e}")
return {'preferred_locations': [], 'confidence': 0.5}
def analyze_feature_preferences(self, customer_analysis: Dict) -> Dict:
"""Analyze feature preferences using AI models"""
try:
properties = customer_analysis.get('data', {}).get('properties', [])
if not properties:
return {'preferred_features': [], 'confidence': 0.5}
# Extract features from property descriptions
feature_counts = {}
for prop in properties:
description = prop.get('description', '') + ' ' + prop.get('propertyName', '')
features = self.extract_features_from_text(description)
for feature in features:
if feature not in feature_counts:
feature_counts[feature] = 0
feature_counts[feature] += 1
# Get top features
sorted_features = sorted(feature_counts.items(), key=lambda x: x[1], reverse=True)
preferred_features = [f[0] for f in sorted_features[:10]]
return {
'preferred_features': preferred_features,
'feature_counts': feature_counts,
'confidence': min(len(preferred_features) / 10, 1.0)
}
except Exception as e:
logger.error(f"❌ Error analyzing feature preferences: {e}")
return {'preferred_features': [], 'confidence': 0.5}
def extract_features_from_text(self, text: str) -> List[str]:
"""Extract property features from text using simple NLP"""
try:
features = []
text_lower = text.lower()
# Common property features
feature_keywords = {
'balcony': ['balcony', 'terrace', 'veranda'],
'parking': ['parking', 'garage', 'car space'],
'garden': ['garden', 'lawn', 'backyard'],
'swimming_pool': ['pool', 'swimming'],
'gym': ['gym', 'fitness', 'exercise'],
'security': ['security', 'guard', 'cctv'],
'elevator': ['elevator', 'lift'],
'modern': ['modern', 'contemporary', 'new'],
'luxury': ['luxury', 'premium', 'exclusive'],
'spacious': ['spacious', 'large', 'big'],
'corner': ['corner', 'end unit'],
'view': ['view', 'panoramic', 'city view']
}
for feature, keywords in feature_keywords.items():
if any(keyword in text_lower for keyword in keywords):
features.append(feature)
return features
except Exception as e:
logger.error(f"❌ Error extracting features: {e}")
return []
def analyze_behavioral_patterns(self, customer_analysis: Dict, ai_insights: Dict) -> Dict:
"""Analyze behavioral patterns using AI models"""
try:
properties = customer_analysis.get('data', {}).get('properties', [])
analytics = customer_analysis.get('data', {}).get('analytics', {})
patterns = {
'viewing_frequency': self.analyze_viewing_frequency(properties),
'engagement_timing': self.analyze_engagement_timing(properties),
'property_exploration': self.analyze_property_exploration(properties),
'decision_making_style': ai_insights.get('decision_making_style', 'Balanced'),
'urgency_level': ai_insights.get('urgency_level', 'Medium')
}
# Use preference analyzer to enhance patterns
if self.preference_analyzer:
patterns['engagement_weight'] = self.preference_analyzer['engagement_weights'].get(
analytics.get('engagement_level', 'medium').lower(), 1.0
)
return patterns
except Exception as e:
logger.error(f"❌ Error analyzing behavioral patterns: {e}")
return {}
def analyze_viewing_frequency(self, properties: List[Dict]) -> str:
"""Analyze viewing frequency patterns"""
try:
if not properties:
return 'low'
total_views = sum(p.get('viewCount', 0) for p in properties)
avg_views = total_views / len(properties)
if avg_views > 5:
return 'high'
elif avg_views > 2:
return 'medium'
else:
return 'low'
except Exception as e:
logger.error(f"❌ Error analyzing viewing frequency: {e}")
return 'medium'
def analyze_engagement_timing(self, properties: List[Dict]) -> str:
"""Analyze engagement timing patterns"""
try:
if not properties:
return 'afternoon'
morning_views = 0
afternoon_views = 0
evening_views = 0
for prop in properties:
if prop.get('lastViewedAt'):
try:
view_time = datetime.fromisoformat(prop['lastViewedAt'].replace('Z', '+00:00'))
hour = view_time.hour
if 6 <= hour < 12:
morning_views += prop.get('viewCount', 0)
elif 12 <= hour < 18:
afternoon_views += prop.get('viewCount', 0)
else:
evening_views += prop.get('viewCount', 0)
except:
pass
if morning_views > max(afternoon_views, evening_views):
return 'morning'
elif afternoon_views > evening_views:
return 'afternoon'
else:
return 'evening'
except Exception as e:
logger.error(f"❌ Error analyzing engagement timing: {e}")
return 'afternoon'
def analyze_property_exploration(self, properties: List[Dict]) -> str:
"""Analyze property exploration patterns"""
try:
if not properties:
return 'focused'
property_types = set(p.get('propertyTypeName', 'Unknown') for p in properties)
price_range = max(p.get('price', 0) for p in properties) - min(p.get('price', 0) for p in properties)
avg_price = sum(p.get('price', 0) for p in properties) / len(properties)
if len(property_types) > 3:
return 'diverse'
elif price_range > avg_price * 0.5:
return 'exploratory'
else:
return 'focused'
except Exception as e:
logger.error(f"❌ Error analyzing property exploration: {e}")
return 'focused'
def analyze_engagement_level(self, customer_analysis: Dict) -> str:
"""Analyze overall engagement level"""
try:
analytics = customer_analysis.get('data', {}).get('analytics', {})
engagement_score = analytics.get('engagement_level', 'Medium')
if isinstance(engagement_score, str):
return engagement_score.lower()
else:
return 'medium'
except Exception as e:
logger.error(f"❌ Error analyzing engagement level: {e}")
return 'medium'
def get_similarity_based_recommendations(self, viewed_properties: List[Dict], preferences: Dict, count: int) -> List[Dict]:
"""Get recommendations based on similarity to viewed properties"""
try:
if not viewed_properties:
return []
# Create similarity queries based on viewed properties
queries = []
for prop in viewed_properties[:3]: # Use top 3 most engaged properties
query = self.create_similarity_query(prop, preferences)
queries.append(query)
# Search for similar properties
similar_properties = []
for query in queries:
results = self.search_similar_properties_chromadb(query, viewed_properties, count // len(queries))
similar_properties.extend(results)
# Score and rank properties
scored_properties = []
for prop in similar_properties:
score = self.calculate_similarity_score(prop, viewed_properties, preferences)
prop['ai_similarity_score'] = score
scored_properties.append(prop)
# Sort by score and return top results
scored_properties.sort(key=lambda x: x.get('ai_similarity_score', 0), reverse=True)
return scored_properties[:count]
except Exception as e:
logger.error(f"❌ Error getting similarity-based recommendations: {e}")
return []
def get_preference_based_recommendations(self, preferences: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]:
"""Get recommendations based on customer preferences"""
try:
# Create preference-based queries
queries = self.create_preference_queries(preferences)
# Search for properties matching preferences
preference_properties = []
for query in queries:
results = self.search_similar_properties_chromadb(query, viewed_properties, count // len(queries))
preference_properties.extend(results)
# Score based on preference match
scored_properties = []
for prop in preference_properties:
score = self.calculate_preference_match_score(prop, preferences)
prop['ai_preference_score'] = score
scored_properties.append(prop)
# Sort by score and return top results
scored_properties.sort(key=lambda x: x.get('ai_preference_score', 0), reverse=True)
return scored_properties[:count]
except Exception as e:
logger.error(f"❌ Error getting preference-based recommendations: {e}")
return []
def get_behavioral_recommendations(self, customer_analysis: Dict, ai_insights: Dict, count: int) -> List[Dict]:
"""Get recommendations based on behavioral patterns"""
try:
behavioral_patterns = customer_analysis.get('data', {}).get('analytics', {})
personality_type = ai_insights.get('personality_type', 'Balanced')
urgency_level = ai_insights.get('urgency_level', 'Medium')
# Create behavioral queries
queries = self.create_behavioral_queries(behavioral_patterns, personality_type, urgency_level)
# Search for properties matching behavioral patterns
behavioral_properties = []
for query in queries:
results = self.search_similar_properties_chromadb(query, [], count // len(queries))
behavioral_properties.extend(results)
# Score based on behavioral match
scored_properties = []
for prop in behavioral_properties:
score = self.calculate_behavioral_match_score(prop, behavioral_patterns, personality_type, urgency_level)
prop['ai_behavioral_score'] = score
scored_properties.append(prop)
# Sort by score and return top results
scored_properties.sort(key=lambda x: x.get('ai_behavioral_score', 0), reverse=True)
return scored_properties[:count]
except Exception as e:
logger.error(f"❌ Error getting behavioral recommendations: {e}")
return []
def create_similarity_query(self, property_data: Dict, preferences: Dict) -> str:
"""Create a query for finding similar properties"""
try:
prop_type = property_data.get('propertyTypeName', '')
location = property_data.get('location', '')
price_range = self.get_price_range(property_data.get('price', 0))
# Extract key features
description = property_data.get('description', '') + ' ' + property_data.get('propertyName', '')
features = self.extract_features_from_text(description)
# Build query
query_parts = [prop_type, location, price_range]
query_parts.extend(features[:3]) # Top 3 features
query = f"{' '.join(query_parts)} property similar to {property_data.get('propertyName', 'this property')}"
return query
except Exception as e:
logger.error(f"❌ Error creating similarity query: {e}")
return f"{property_data.get('propertyTypeName', 'Property')} in {property_data.get('location', '')}"
def create_preference_queries(self, preferences: Dict) -> List[str]:
"""Create queries based on customer preferences"""
try:
queries = []
# Price range query
price_range = preferences.get('price_range', {}).get('preferred_range', 'mid_range')
queries.append(f"{price_range} price range properties")
# Property type queries
preferred_types = preferences.get('property_types', {}).get('preferred_types', [])
for prop_type in preferred_types[:2]: # Top 2 types
queries.append(f"{prop_type} properties")
# Location queries
preferred_locations = preferences.get('locations', {}).get('preferred_locations', [])
for location in preferred_locations[:2]: # Top 2 locations
queries.append(f"properties in {location}")
# Feature queries
preferred_features = preferences.get('features', {}).get('preferred_features', [])
for feature in preferred_features[:2]: # Top 2 features
queries.append(f"{feature} properties")
return queries
except Exception as e:
logger.error(f"❌ Error creating preference queries: {e}")
return ["luxury properties", "premium apartments"]
def create_behavioral_queries(self, behavioral_patterns: Dict, personality_type: str, urgency_level: str) -> List[str]:
"""Create queries based on behavioral patterns"""
try:
queries = []
# Personality-based queries
if personality_type == 'Analytical':
queries.extend(["detailed property information", "comprehensive property features"])
elif personality_type == 'Impulsive':
queries.extend(["exclusive properties", "limited time offers"])
elif personality_type == 'Conservative':
queries.extend(["established properties", "proven locations"])
# Urgency-based queries
if urgency_level == 'High':
queries.extend(["immediate availability", "ready to move properties"])
elif urgency_level == 'Low':
queries.extend(["upcoming projects", "pre-launch properties"])
# Engagement-based queries
viewing_frequency = behavioral_patterns.get('viewing_frequency', 'medium')
if viewing_frequency == 'high':
queries.extend(["high engagement properties", "popular properties"])
elif viewing_frequency == 'low':
queries.extend(["unique properties", "exclusive listings"])
return queries
except Exception as e:
logger.error(f"❌ Error creating behavioral queries: {e}")
return ["premium properties", "luxury apartments"]
def calculate_similarity_score(self, property_data: Dict, viewed_properties: List[Dict], preferences: Dict) -> float:
"""Calculate similarity score for a property"""
try:
score = 0.0
# Property type similarity
prop_type = property_data.get('propertyTypeName', '')
preferred_types = preferences.get('property_types', {}).get('preferred_types', [])
if prop_type in preferred_types:
score += 0.3
# Price range similarity
price = property_data.get('price', 0)
price_range = preferences.get('price_range', {})
if self.is_price_in_range(price, price_range):
score += 0.3
# Location similarity
location = property_data.get('location', '')
preferred_locations = preferences.get('locations', {}).get('preferred_locations', [])
if location in preferred_locations:
score += 0.2
# Feature similarity
description = property_data.get('description', '') + ' ' + property_data.get('propertyName', '')
features = self.extract_features_from_text(description)
preferred_features = preferences.get('features', {}).get('preferred_features', [])
feature_match = len(set(features) & set(preferred_features))
score += min(feature_match * 0.1, 0.2)
return min(score, 1.0)
except Exception as e:
logger.error(f"❌ Error calculating similarity score: {e}")
return 0.5
def calculate_preference_match_score(self, property_data: Dict, preferences: Dict) -> float:
"""Calculate preference match score for a property"""
try:
score = 0.0
# Price preference match
price = property_data.get('price', 0)
price_range = preferences.get('price_range', {})
if self.is_price_in_range(price, price_range):
score += 0.4
# Property type preference match
prop_type = property_data.get('propertyTypeName', '')
preferred_types = preferences.get('property_types', {}).get('preferred_types', [])
if prop_type in preferred_types:
score += 0.3
# Location preference match
location = property_data.get('location', '')
preferred_locations = preferences.get('locations', {}).get('preferred_locations', [])
if location in preferred_locations:
score += 0.2
# Feature preference match
description = property_data.get('description', '') + ' ' + property_data.get('propertyName', '')
features = self.extract_features_from_text(description)
preferred_features = preferences.get('features', {}).get('preferred_features', [])
feature_match = len(set(features) & set(preferred_features))
score += min(feature_match * 0.1, 0.1)
return min(score, 1.0)
except Exception as e:
logger.error(f"❌ Error calculating preference match score: {e}")
return 0.5
def calculate_behavioral_match_score(self, property_data: Dict, behavioral_patterns: Dict, personality_type: str, urgency_level: str) -> float:
"""Calculate behavioral match score for a property"""
try:
score = 0.0
# Personality-based scoring
if personality_type == 'Analytical':
# Prefer properties with detailed information
description = property_data.get('description', '')
if len(description) > 100:
score += 0.3
elif personality_type == 'Impulsive':
# Prefer exclusive or unique properties
if 'exclusive' in property_data.get('propertyName', '').lower():
score += 0.3
elif personality_type == 'Conservative':
# Prefer established properties
if 'established' in property_data.get('location', '').lower():
score += 0.3
# Urgency-based scoring
if urgency_level == 'High':
# Prefer ready properties
if 'ready' in property_data.get('description', '').lower():
score += 0.2
elif urgency_level == 'Low':
# Prefer upcoming projects
if 'upcoming' in property_data.get('description', '').lower():
score += 0.2
# Engagement-based scoring
viewing_frequency = behavioral_patterns.get('viewing_frequency', 'medium')
if viewing_frequency == 'high':
# Prefer popular properties
if property_data.get('viewCount', 0) > 10:
score += 0.2
elif viewing_frequency == 'low':
# Prefer unique properties
if property_data.get('viewCount', 0) < 5:
score += 0.2
return min(score, 1.0)
except Exception as e:
logger.error(f"❌ Error calculating behavioral match score: {e}")
return 0.5
def is_price_in_range(self, price: float, price_range: Dict) -> bool:
"""Check if price is in the preferred range"""
try:
preferred_range = price_range.get('preferred_range', 'mid_range')
avg_price = price_range.get('avg_price', 0)
if preferred_range == 'luxury':
return price > 50000000
elif preferred_range == 'premium':
return 25000000 <= price <= 50000000
elif preferred_range == 'mid_range':
return 10000000 <= price <= 25000000
elif preferred_range == 'affordable':
return 5000000 <= price <= 10000000
else:
# Use average price with tolerance
tolerance = 0.3
min_price = avg_price * (1 - tolerance)
max_price = avg_price * (1 + tolerance)
return min_price <= price <= max_price
except Exception as e:
logger.error(f"❌ Error checking price range: {e}")
return True
def remove_duplicate_recommendations(self, recommendations: List[Dict]) -> List[Dict]:
"""Remove duplicate recommendations based on property ID"""
try:
seen_ids = set()
unique_recommendations = []
for rec in recommendations:
prop_id = rec.get('id') or rec.get('propertyId')
if prop_id and prop_id not in seen_ids:
seen_ids.add(prop_id)
unique_recommendations.append(rec)
return unique_recommendations
except Exception as e:
logger.error(f"❌ Error removing duplicates: {e}")
return recommendations
def add_ai_explanations(self, recommendations: List[Dict], preferences: Dict, ai_insights: Dict) -> List[Dict]:
"""Add AI explanations to recommendations"""
try:
for rec in recommendations:
explanation = self.generate_ai_explanation(rec, preferences, ai_insights)
rec['ai_explanation'] = explanation
rec['ai_confidence'] = self.calculate_ai_confidence(rec, preferences)
return recommendations
except Exception as e:
logger.error(f"❌ Error adding AI explanations: {e}")
return recommendations
def generate_ai_explanation(self, property_data: Dict, preferences: Dict, ai_insights: Dict) -> str:
"""Generate AI explanation for why a property is recommended"""
try:
explanations = []
# Price explanation
price = property_data.get('price', 0)
price_range = preferences.get('price_range', {})
if self.is_price_in_range(price, price_range):
explanations.append("Matches your preferred price range")
# Property type explanation
prop_type = property_data.get('propertyTypeName', '')
preferred_types = preferences.get('property_types', {}).get('preferred_types', [])
if prop_type in preferred_types:
explanations.append(f"Matches your interest in {prop_type} properties")
# Location explanation
location = property_data.get('location', '')
preferred_locations = preferences.get('locations', {}).get('preferred_locations', [])
if location in preferred_locations:
explanations.append(f"Located in your preferred area: {location}")
# Feature explanation
description = property_data.get('description', '') + ' ' + property_data.get('propertyName', '')
features = self.extract_features_from_text(description)
preferred_features = preferences.get('features', {}).get('preferred_features', [])
matching_features = set(features) & set(preferred_features)
if matching_features:
feature_list = ', '.join(list(matching_features)[:3])
explanations.append(f"Features you prefer: {feature_list}")
# Behavioral explanation
personality_type = ai_insights.get('personality_type', 'Balanced')
if personality_type == 'Analytical':
explanations.append("Suitable for your analytical decision-making style")
elif personality_type == 'Impulsive':
explanations.append("Perfect for your quick decision-making preference")
if not explanations:
explanations.append("Based on your overall property preferences and behavior patterns")
return " | ".join(explanations)
except Exception as e:
logger.error(f"❌ Error generating AI explanation: {e}")
return "Recommended based on your preferences and behavior analysis"
def calculate_ai_confidence(self, property_data: Dict, preferences: Dict) -> float:
"""Calculate AI confidence score for a recommendation"""
try:
confidence = 0.5 # Base confidence
# Price confidence
price = property_data.get('price', 0)
price_range = preferences.get('price_range', {})
if self.is_price_in_range(price, price_range):
confidence += 0.2
# Type confidence
prop_type = property_data.get('propertyTypeName', '')
preferred_types = preferences.get('property_types', {}).get('preferred_types', [])
if prop_type in preferred_types:
confidence += 0.2
# Location confidence
location = property_data.get('location', '')
preferred_locations = preferences.get('locations', {}).get('preferred_locations', [])
if location in preferred_locations:
confidence += 0.1
return min(confidence, 1.0)
except Exception as e:
logger.error(f"❌ Error calculating AI confidence: {e}")
return 0.5
def get_fallback_recommendations(self, count: int) -> List[Dict]:
"""Get fallback recommendations when AI models fail"""
try:
# First, try to debug the ChromaDB structure
debug_info = self.debug_chromadb_metadata()
if 'error' not in debug_info:
logger.info(f"🔍 ChromaDB structure: {debug_info}")
# Simple fallback based on ChromaDB search
query = "luxury properties premium apartments"
results = self.search_similar_properties_chromadb(query, [], count)
for rec in results:
rec['ai_explanation'] = "Recommended based on general property preferences"
rec['ai_confidence'] = 0.6
return results
except Exception as e:
logger.error(f"❌ Error getting fallback recommendations: {e}")
return []
def classify_property_preferences(self, preferences: Dict) -> Dict:
"""Classify property preferences using AI models"""
try:
if not self.property_classifier:
return {}
classification = {
'property_category': 'balanced',
'location_preference': 'established',
'price_sensitivity': 'moderate'
}
# Analyze property type diversity
property_types = preferences.get('property_types', {})
diversity_score = property_types.get('diversity_score', 0)
if diversity_score > 0.7:
classification['property_category'] = 'diverse'
elif diversity_score < 0.3:
classification['property_category'] = 'focused'
else:
classification['property_category'] = 'balanced'
# Analyze price sensitivity
price_range = preferences.get('price_range', {})
price_consistency = price_range.get('price_consistency', 0.5)
if price_consistency > 0.8:
classification['price_sensitivity'] = 'low'
elif price_consistency < 0.3:
classification['price_sensitivity'] = 'high'
else:
classification['price_sensitivity'] = 'moderate'
return classification
except Exception as e:
logger.error(f"❌ Error classifying property preferences: {e}")
return {}
def predict_price_preferences(self, preferences: Dict) -> Dict:
"""Predict price preferences using AI models"""
try:
if not self.price_predictor:
return {}
price_range = preferences.get('price_range', {})
avg_price = price_range.get('avg_price', 0)
confidence = price_range.get('confidence', 0.5)
prediction = {
'predicted_range': price_range.get('preferred_range', 'mid_range'),
'confidence': confidence,
'price_tolerance': 0.2,
'upsell_potential': 'medium'
}
# Predict upsell potential
if avg_price > 50000000:
prediction['upsell_potential'] = 'high'
elif avg_price < 10000000:
prediction['upsell_potential'] = 'low'
else:
prediction['upsell_potential'] = 'medium'
# Adjust price tolerance based on confidence
if confidence > 0.8:
prediction['price_tolerance'] = 0.1
elif confidence < 0.3:
prediction['price_tolerance'] = 0.4
return prediction
except Exception as e:
logger.error(f"❌ Error predicting price preferences: {e}")
return {}
def calculate_preference_weights(self, preferences: Dict) -> Dict:
"""Calculate preference weights using AI models"""
try:
if not self.preference_analyzer:
return {}
weights = {
'price_weight': 1.0,
'type_weight': 1.0,
'location_weight': 1.0,
'feature_weight': 1.0,
'engagement_weight': 1.0
}
# Adjust weights based on engagement level
engagement_level = preferences.get('engagement_level', 'medium')
engagement_weight = self.preference_analyzer['engagement_weights'].get(engagement_level, 1.0)
weights['engagement_weight'] = engagement_weight
# Adjust weights based on behavioral patterns
behavioral_patterns = preferences.get('behavioral_patterns', {})
viewing_frequency = behavioral_patterns.get('viewing_frequency', 'medium')
if viewing_frequency == 'high':
weights['feature_weight'] = 1.2 # High engagement users care more about features
elif viewing_frequency == 'low':
weights['price_weight'] = 1.3 # Low engagement users care more about price
# Adjust weights based on property exploration
property_exploration = behavioral_patterns.get('property_exploration', 'focused')
if property_exploration == 'diverse':
weights['type_weight'] = 1.2
elif property_exploration == 'focused':
weights['location_weight'] = 1.2
return weights
except Exception as e:
logger.error(f"❌ Error calculating preference weights: {e}")
return {}
def debug_chromadb_metadata(self, sample_count: int = 3) -> Dict:
"""Debug method to inspect ChromaDB metadata structure"""
try:
if not self.properties_collection:
return {'error': 'ChromaDB collection not initialized'}
total_count = self.properties_collection.count()
if total_count == 0:
return {'error': 'No properties in ChromaDB'}
# Get sample properties
results = self.properties_collection.query(
query_texts=["property"],
n_results=sample_count,
include=["metadatas", "documents"]
)
debug_info = {
'total_properties': total_count,
'sample_metadata_keys': [],
'sample_metadata': [],
'field_mapping_issues': []
}
if results and results['metadatas'] and results['metadatas'][0]:
for i, metadata in enumerate(results['metadatas'][0]):
# Get all keys
keys = list(metadata.keys())
debug_info['sample_metadata_keys'].append(keys)
# Check for missing expected fields
expected_fields = ['propertyTypeName', 'propertyName', 'price', 'address', 'beds', 'baths']
missing_fields = [field for field in expected_fields if field not in keys]
if missing_fields:
debug_info['field_mapping_issues'].append({
'sample_index': i,
'missing_fields': missing_fields,
'available_fields': keys
})
# Store sample metadata
debug_info['sample_metadata'].append(metadata)
logger.info(f"🔍 ChromaDB Debug Info: {debug_info}")
return debug_info
except Exception as e:
logger.error(f"❌ Error debugging ChromaDB metadata: {e}")
return {'error': str(e)}
def _clean_property_data(self, prop: Dict) -> Dict:
"""Clean and standardize property data to fix JSON parsing issues"""
try:
cleaned = {}
# Standardize ID field
cleaned['id'] = str(prop.get('id', prop.get('propertyId', '')))
cleaned['propertyId'] = cleaned['id']
# Standardize basic fields
cleaned['propertyName'] = str(prop.get('propertyName', prop.get('name', '')))
cleaned['propertyTypeName'] = str(prop.get('propertyTypeName', prop.get('typeName', prop.get('type', ''))))
cleaned['price'] = float(prop.get('price', prop.get('marketValue', 0)) or 0)
cleaned['address'] = str(prop.get('address', prop.get('location', '')))
cleaned['beds'] = int(prop.get('beds', prop.get('bedrooms', 0)) or 0)
cleaned['baths'] = int(prop.get('baths', prop.get('bathrooms', 0)) or 0)
cleaned['sqft'] = float(prop.get('totalSquareFeet', prop.get('area', prop.get('sqft', 0))) or 0)
cleaned['description'] = str(prop.get('description', ''))
# Enhanced features handling to fix JSON parsing errors
features = prop.get('features', [])
if isinstance(features, str):
# Try to parse as JSON first
try:
parsed_features = json.loads(features)
if isinstance(parsed_features, list):
cleaned['features'] = parsed_features
else:
cleaned['features'] = [str(parsed_features)]
except (json.JSONDecodeError, ValueError):
# If JSON parsing fails, split by common delimiters
if ',' in features:
cleaned['features'] = [f.strip() for f in features.split(',') if f.strip()]
elif ';' in features:
cleaned['features'] = [f.strip() for f in features.split(';') if f.strip()]
else:
cleaned['features'] = [features.strip()] if features.strip() else []
elif isinstance(features, list):
cleaned['features'] = [str(f) for f in features if f]
else:
cleaned['features'] = []
# Additional fields
cleaned['viewCount'] = int(prop.get('viewCount', 0) or 0)
cleaned['totalDuration'] = int(prop.get('totalDuration', 0) or 0)
cleaned['lastViewedAt'] = str(prop.get('lastViewedAt', ''))
return cleaned
except Exception as e:
logger.error(f"❌ Error cleaning property data: {e}")
# Return minimal valid data
return {
'id': str(prop.get('id', prop.get('propertyId', 'unknown'))),
'propertyId': str(prop.get('id', prop.get('propertyId', 'unknown'))),
'propertyName': 'Unknown Property',
'propertyTypeName': 'Property',
'price': 0.0,
'address': '',
'beds': 0,
'baths': 0,
'sqft': 0.0,
'description': '',
'features': [],
'viewCount': 0,
'totalDuration': 0,
'lastViewedAt': ''
}
def _retry_failed_pages(self, failed_pages: List[int], page_size: int, max_workers: int) -> List[Dict]:
"""Retry failed pages with exponential backoff and reduced concurrency"""
logger.info(f"🔄 Retrying {len(failed_pages)} failed pages with reduced concurrency...")
retry_properties = []
retry_workers = max(1, max_workers // 2) # Reduce workers for retry
with ThreadPoolExecutor(max_workers=retry_workers) as executor:
futures = []
for page in failed_pages:
future = executor.submit(self._fetch_property_page_enhanced, page, page_size)
futures.append((page, future))
for page, future in futures:
try:
properties = future.result() # No timeout - let it take as long as needed
if properties:
retry_properties.extend(properties)
logger.info(f"✅ Retry successful for page {page}: {len(properties)} properties")
else:
logger.warning(f"⚠️ Retry failed for page {page}: No properties returned")
except Exception as e:
logger.error(f"❌ Retry failed for page {page}: {e}")
logger.info(f"🔄 Retry completed: {len(retry_properties)} properties recovered")
return retry_properties
def _store_properties_in_chromadb_parallel(self, properties: List[Dict]) -> bool:
"""Memory-efficient parallel ChromaDB storage with simple embeddings"""
try:
logger.info(f"🗄️ Storing {len(properties)} properties in ChromaDB with memory-efficient processing...")
if self.properties_collection is None:
logger.error("❌ ChromaDB properties collection not initialized")
return False
# Check existing properties but don't delete collection
try:
existing_count = self.properties_collection.count()
logger.info(f"📊 Collection currently has {existing_count} properties")
# Only clear if we have new properties and want to replace
if existing_count > 0 and len(properties) > existing_count * 0.8: # Only clear if new data is substantial
logger.info(f"🗑️ Clearing {existing_count} existing properties for fresh data...")
# Clear items without deleting collection
try:
all_ids = self.properties_collection.get()['ids']
if all_ids:
# Delete in batches to avoid memory issues
batch_size = 100
for i in range(0, len(all_ids), batch_size):
batch_ids = all_ids[i:i + batch_size]
self.properties_collection.delete(ids=batch_ids)
logger.info(f"✅ Cleared {len(all_ids)} existing properties")
except Exception as clear_error:
logger.warning(f"⚠️ Error clearing collection: {clear_error}")
elif existing_count > 0:
logger.info(f"📦 Keeping existing {existing_count} properties, adding {len(properties)} new ones")
except Exception as count_error:
logger.warning(f"⚠️ Could not check existing count: {count_error}")
# Collection might be corrupted, reinitialize
self.initialize_chromadb()
# Process properties in smaller batches to avoid memory issues
batch_size = 20 # Smaller batches to prevent memory overflow
total_stored = 0
failed_batches = []
# Use fewer workers to reduce memory pressure
with ThreadPoolExecutor(max_workers=5) as executor: # Reduced workers for memory efficiency
futures = {}
for i in range(0, len(properties), batch_size):
batch = properties[i:i+batch_size]
batch_num = i//batch_size + 1
future = executor.submit(self._process_property_batch_memory_efficient, batch, batch_num)
futures[future] = batch_num
logger.info(f"🚀 Submitted {len(futures)} memory-efficient storage tasks with 5 workers")
completed_batches = 0
for future in concurrent.futures.as_completed(futures.keys()):
batch_num = futures[future]
try:
batch_result = future.result() # No timeout
completed_batches += 1
progress = (completed_batches / len(futures)) * 100
if batch_result['success']:
total_stored += batch_result['count']
logger.info(f"📊 Batch {batch_num}: {batch_result['count']} properties stored (Total: {total_stored}, Progress: {progress:.1f}%)")
else:
failed_batches.append(batch_num)
logger.error(f"❌ Batch {batch_num}: Failed to store properties (Progress: {progress:.1f}%)")
except Exception as e:
completed_batches += 1
progress = (completed_batches / len(futures)) * 100
failed_batches.append(batch_num)
logger.error(f"❌ Batch {batch_num}: Error - {e} (Progress: {progress:.1f}%)")
if failed_batches:
logger.warning(f"⚠️ {len(failed_batches)} batches failed: {failed_batches}")
logger.info(f"✅ Successfully stored {total_stored} properties in ChromaDB!")
return total_stored > 0
except Exception as e:
logger.error(f"❌ Error in parallel ChromaDB storage: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return False
def _process_property_batch(self, batch: List[Dict], batch_num: int) -> Dict:
"""Process a batch of properties for ChromaDB storage with memory-efficient approach"""
try:
documents = []
metadatas = []
ids = []
for prop in batch:
# Create simple text description without heavy AI processing
description = self._create_simple_property_description(prop)
documents.append(description)
# Prepare metadata with enhanced JSON handling
metadata = {
'id': str(prop.get('id', prop.get('propertyId', f'prop_{len(ids)}'))),
'propertyName': str(prop.get('propertyName', '')),
'propertyTypeName': str(prop.get('propertyTypeName', '')),
'price': float(prop.get('price', 0) or 0),
'address': str(prop.get('address', '')),
'beds': int(prop.get('beds', 0) or 0),
'baths': int(prop.get('baths', 0) or 0),
'sqft': float(prop.get('sqft', 0) or 0),
'description': str(prop.get('description', '')),
'features': self._safe_json_stringify(prop.get('features', []))
}
# Clean metadata
metadata = self.clean_metadata_for_chromadb(metadata)
metadatas.append(metadata)
prop_id = str(prop.get('id', prop.get('propertyId', f'prop_{len(ids)}')))
ids.append(prop_id)
# Add batch to ChromaDB using simple embedding function
try:
self.properties_collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
return {'success': True, 'count': len(batch), 'error': None}
except Exception as e:
logger.error(f"❌ ChromaDB add error: {e}")
return {'success': False, 'count': 0, 'error': str(e)}
except Exception as e:
logger.error(f"❌ Error processing batch {batch_num}: {e}")
return {'success': False, 'count': 0, 'error': str(e)}
def _create_simple_property_description(self, prop: Dict) -> str:
"""Create simple property description without heavy AI processing"""
try:
parts = []
# Basic property info
if prop.get('propertyName'):
parts.append(f"Property: {prop.get('propertyName')}")
if prop.get('propertyTypeName'):
parts.append(f"Type: {prop.get('propertyTypeName')}")
if prop.get('price'):
parts.append(f"Price: ${prop.get('price'):,.0f}")
if prop.get('address'):
parts.append(f"Address: {prop.get('address')}")
if prop.get('beds') or prop.get('baths'):
beds = prop.get('beds', 0)
baths = prop.get('baths', 0)
parts.append(f"Bedrooms: {beds}, Bathrooms: {baths}")
if prop.get('sqft'):
parts.append(f"Square Feet: {prop.get('sqft'):,.0f}")
if prop.get('description'):
desc = str(prop.get('description'))[:200] # Limit description length
parts.append(f"Description: {desc}")
# Features
features = prop.get('features', [])
if features:
if isinstance(features, str):
try:
features = json.loads(features)
except:
features = [features]
if isinstance(features, list) and features:
feature_list = [str(f) for f in features[:10]] # Limit to 10 features
parts.append(f"Features: {', '.join(feature_list)}")
return " | ".join(parts) if parts else "Property"
except Exception as e:
logger.warning(f"⚠️ Error creating simple property description: {e}")
return "Property"
def _process_property_batch_memory_efficient(self, batch: List[Dict], batch_num: int) -> Dict:
"""Memory-efficient batch processing with minimal AI usage"""
try:
documents = []
metadatas = []
ids = []
for prop in batch:
# Create ultra-simple text description without any AI processing
description = self._create_ultra_simple_description(prop)
documents.append(description)
# Prepare minimal metadata to reduce memory usage
metadata = {
'id': str(prop.get('id', prop.get('propertyId', f'prop_{len(ids)}'))),
'propertyName': str(prop.get('propertyName', ''))[:100], # Limit string length
'propertyTypeName': str(prop.get('propertyTypeName', ''))[:50],
'price': float(prop.get('price', 0) or 0),
'address': str(prop.get('address', ''))[:200],
'beds': int(prop.get('beds', 0) or 0),
'baths': int(prop.get('baths', 0) or 0),
'sqft': float(prop.get('sqft', 0) or 0),
'description': str(prop.get('description', ''))[:300], # Limit description
'features': self._safe_json_stringify(prop.get('features', []))[:500] # Limit features
}
metadatas.append(metadata)
prop_id = str(prop.get('id', prop.get('propertyId', f'prop_{len(ids)}')))
ids.append(prop_id)
# Add batch to ChromaDB with error handling
try:
self.properties_collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
return {'success': True, 'count': len(batch), 'error': None}
except Exception as e:
logger.error(f"❌ ChromaDB add error in batch {batch_num}: {e}")
return {'success': False, 'count': 0, 'error': str(e)}
except Exception as e:
logger.error(f"❌ Error processing batch {batch_num}: {e}")
return {'success': False, 'count': 0, 'error': str(e)}
def _create_ultra_simple_description(self, prop: Dict) -> str:
"""Create ultra-simple property description without any AI processing"""
try:
parts = []
# Only essential information to minimize memory usage
if prop.get('propertyName'):
parts.append(prop.get('propertyName'))
if prop.get('propertyTypeName'):
parts.append(prop.get('propertyTypeName'))
if prop.get('price'):
parts.append(f"${prop.get('price'):,.0f}")
if prop.get('address'):
parts.append(prop.get('address'))
if prop.get('beds') or prop.get('baths'):
beds = prop.get('beds', 0)
baths = prop.get('baths', 0)
parts.append(f"{beds}bed {baths}bath")
if prop.get('sqft'):
parts.append(f"{prop.get('sqft'):,.0f}sqft")
# Minimal features (only first 3)
features = prop.get('features', [])
if features:
if isinstance(features, str):
try:
features = json.loads(features)
except:
features = [features]
if isinstance(features, list) and features:
feature_list = [str(f) for f in features[:3]] # Only 3 features
parts.append(f"Features: {', '.join(feature_list)}")
return " | ".join(parts) if parts else "Property"
except Exception as e:
logger.warning(f"⚠️ Error creating ultra-simple description: {e}")
return "Property"
def _safe_json_stringify(self, obj) -> str:
"""Safely convert object to JSON string without parsing errors"""
try:
if isinstance(obj, list):
# Clean list items
cleaned_list = []
for item in obj:
if item is not None:
cleaned_list.append(str(item).strip())
return json.dumps(cleaned_list)
elif isinstance(obj, str):
# If it's already a string, try to parse and re-stringify
try:
parsed = json.loads(obj)
return json.dumps(parsed)
except (json.JSONDecodeError, ValueError):
# If parsing fails, return as simple string
return json.dumps([obj])
else:
return json.dumps([])
except Exception as e:
logger.warning(f"⚠️ Error in safe JSON stringify: {e}")
return json.dumps([])
def _schedule_property_update(self):
"""Schedule next property update in 24 hours"""
try:
import schedule
import threading
def update_properties():
logger.info("🔄 Starting scheduled property update...")
self.fetch_all_properties_parallel()
# Schedule update for 24 hours from now
schedule.every(24).hours.do(update_properties)
# Start scheduler in background thread
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(3600) # Check every hour
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()
logger.info("⏰ Property update scheduled for every 24 hours")
except Exception as e:
logger.warning(f"⚠️ Could not schedule property update: {e}")
def safe_json_parse(self, json_str):
"""Enhanced JSON parsing with better error handling"""
try:
if not json_str or json_str == '':
return []
if isinstance(json_str, list):
return json_str
if isinstance(json_str, dict):
return [json_str]
# Try to parse JSON
parsed = json.loads(json_str)
if isinstance(parsed, list):
return parsed
elif isinstance(parsed, dict):
return [parsed]
else:
return []
except (ValueError, TypeError, json.JSONDecodeError) as e:
logger.warning(f"⚠️ Failed to parse JSON: {json_str[:50]}... Error: {e}")
# Try to extract features from malformed JSON
if isinstance(json_str, str):
# Look for array-like patterns
if '[' in json_str and ']' in json_str:
start = json_str.find('[')
end = json_str.rfind(']') + 1
try:
extracted = json_str[start:end]
parsed = json.loads(extracted)
if isinstance(parsed, list):
return parsed
except:
pass
# Fallback: split by common delimiters
if ',' in json_str:
return [item.strip().strip('"\'') for item in json_str.split(',') if item.strip()]
elif ';' in json_str:
return [item.strip().strip('"\'') for item in json_str.split(';') if item.strip()]
return []
# Global instance
ai_engine = AIRecommendationEngine()