sksameermujahid's picture
Upload 2 files
7414008 verified
from flask import Flask, render_template, request, jsonify
import requests
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
import json
import os
from datetime import datetime, timedelta
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics.pairwise import cosine_similarity
import urllib3
import chromadb
from chromadb.config import Settings
import time
from flask_cors import CORS
import logging
import threading
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
import math
import psutil
TARGET_PROPERTIES = 20 # Always return 20 properties
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Disable SSL warning since we're using https://localhost
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
app = Flask(__name__)
# Multi-user support configuration
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 # Disable caching for development
# Configure CORS with specific settings for multi-user support
CORS(app,
resources={r"/*": {
"origins": ["http://localhost:4200", "https://huggingface.co", "*"],
"methods": ["GET", "POST", "OPTIONS"],
"allow_headers": ["Content-Type", "Authorization", "X-Requested-With"],
"supports_credentials": True,
"max_age": 3600
}})
# Add CORS headers to all responses
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
response.headers.add('Access-Control-Allow-Credentials', 'true')
return response
# Initialize the sentence transformer model with error handling
import multiprocessing as mp
from functools import partial
# Load model directly to avoid multiprocessing issues
try:
logger.info("Loading sentence transformer model...")
model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder=os.path.join(os.path.expanduser("~"), ".cache", "huggingface"))
logger.info("Model loaded successfully")
except Exception as e:
logger.error(f"Error loading model: {str(e)}")
raise
# Configuration
TOTAL_PROPERTIES = 500 # Total number of properties to fetch
CACHE_EXPIRY_SECONDS = 24 * 60 * 60 # 24 hours in seconds
BATCH_SIZE = 50 # Number of properties per batch
MAX_WORKERS = 10 # Number of parallel workers
# REQUEST_TIMEOUT = None # No timeout for requests - removed
# Multi-user support configuration
MAX_CONCURRENT_REQUESTS = 50 # Maximum concurrent requests
REQUEST_RATE_LIMIT = 100 # Requests per minute per IP
CONNECTION_POOL_SIZE = 20 # Connection pool size for external API calls
# Create connection pool for multi-user support
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=CONNECTION_POOL_SIZE,
pool_maxsize=CONNECTION_POOL_SIZE,
max_retries=3
)
session.mount('http://', adapter)
session.mount('https://', adapter)
# Global variable for property collection
property_collection = None
# Path for cache timestamp
CACHE_TIMESTAMP_PATH = os.path.join("property_db", "last_cache_time.txt")
# Global variable for countdown thread
countdown_thread = None
stop_countdown = False
countdown_initialized = False # Add flag to track initialization
# Multi-user request tracking
active_requests = 0
request_lock = threading.Lock()
# Background task management
background_tasks = {}
task_lock = threading.Lock()
is_fetching_properties = False
fetch_lock = threading.Lock()
def format_time_remaining(seconds):
"""Format remaining time in a human-readable format"""
if seconds < 60:
return f"{int(seconds)} seconds"
elif seconds < 3600:
minutes = int(seconds / 60)
remaining_seconds = int(seconds % 60)
return f"{minutes} minutes {remaining_seconds} seconds"
else:
hours = int(seconds / 3600)
minutes = int((seconds % 3600) / 60)
return f"{hours} hours {minutes} minutes"
def fetch_property_batch(page_number, property_type=None, min_price=None, max_price=None):
"""Fetch a single batch of properties without timeout constraints"""
url = "https://hivepropapi.azurewebsites.net/api/Property/allPropertieswithfulldetails"
params = {
"pageNumber": page_number,
"pageSize": BATCH_SIZE
}
# Add filters to params if provided
if property_type:
params["propertyType"] = property_type
if min_price is not None:
params["minPrice"] = min_price
if max_price is not None:
params["maxPrice"] = max_price
try:
logger.info(f"Fetching batch {page_number} with {BATCH_SIZE} properties...")
response = session.get(url, params=params, verify=False) # Removed timeout
response.raise_for_status()
data = response.json()
if data and 'data' in data:
properties = data['data']
if properties:
logger.info(f"Successfully fetched {len(properties)} properties from batch {page_number}")
return properties
else:
logger.warning(f"Batch {page_number} returned empty property list")
return []
logger.warning(f"No data found in batch {page_number} response")
return []
except requests.exceptions.ConnectionError:
logger.error(f"Failed to connect to the API server for batch {page_number}")
return []
except Exception as e:
logger.error(f"Error fetching batch {page_number}: {e}")
return []
def fetch_properties_background_safe(property_type=None, min_price=None, max_price=None):
"""Fetch properties in background with proper error handling and no timeout constraints"""
global is_fetching_properties
with fetch_lock:
if is_fetching_properties:
logger.info("Property fetch already in progress, skipping...")
return None
is_fetching_properties = True
try:
logger.info(f"πŸ”„ Starting background property fetch with {MAX_WORKERS} workers, batch size {BATCH_SIZE}")
# Calculate number of batches needed
total_batches = math.ceil(TOTAL_PROPERTIES / BATCH_SIZE)
logger.info(f"Total batches to fetch: {total_batches}")
all_properties = []
start_time = time.time()
completed_batches = 0
# Use ThreadPoolExecutor without timeout for parallel fetching
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Submit all batch requests
future_to_batch = {
executor.submit(fetch_property_batch, page_num, property_type, min_price, max_price): page_num
for page_num in range(1, total_batches + 1)
}
# Collect results as they complete without timeout
for future in as_completed(future_to_batch): # Removed timeout
batch_num = future_to_batch[future]
try:
batch_properties = future.result() # Removed timeout
if batch_properties:
all_properties.extend(batch_properties)
completed_batches += 1
logger.info(f"Completed batch {batch_num}/{total_batches} - Total properties so far: {len(all_properties)}")
else:
logger.warning(f"Batch {batch_num} returned no properties")
except Exception as e:
logger.error(f"Batch {batch_num} generated an exception: {e}")
continue # Continue with other batches
end_time = time.time()
fetch_duration = end_time - start_time
if all_properties:
# Limit to total properties requested
all_properties = all_properties[:TOTAL_PROPERTIES]
logger.info(f"βœ… Successfully fetched {len(all_properties)} properties in {fetch_duration:.2f} seconds using {completed_batches}/{total_batches} batches")
return {"data": all_properties}
else:
logger.error(f"❌ Failed to fetch any properties after {fetch_duration:.2f} seconds")
return None
except Exception as e:
logger.error(f"❌ Error during background property fetch: {str(e)}")
return None
finally:
with fetch_lock:
is_fetching_properties = False
def fetch_properties(property_type=None, min_price=None, max_price=None):
"""Fetch properties using parallel workers and batches - now calls background version"""
return fetch_properties_background_safe(property_type, min_price, max_price)
def update_cache_timestamp():
try:
os.makedirs(os.path.dirname(CACHE_TIMESTAMP_PATH), exist_ok=True)
current_time = time.time()
with open(CACHE_TIMESTAMP_PATH, 'w') as f:
f.write(str(current_time))
next_update = datetime.fromtimestamp(current_time + CACHE_EXPIRY_SECONDS)
logger.info(f"Updated cache timestamp. Next update scheduled for: {next_update.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"Next update in: {format_time_remaining(CACHE_EXPIRY_SECONDS)}")
except Exception as e:
logger.error(f"Error updating cache timestamp: {e}")
def refresh_properties_background():
"""Refresh properties in background without blocking the main application"""
try:
logger.info("πŸ”„ Starting background property refresh...")
data = fetch_properties_background_safe()
if data and 'data' in data:
new_properties = [p for p in data['data'] if isinstance(p, dict)]
if new_properties:
# Use ThreadPoolExecutor for non-blocking cache update
def update_cache_async():
try:
update_property_cache(new_properties)
update_cache_timestamp()
logger.info("βœ… Background property refresh completed successfully")
except Exception as e:
logger.error(f"Error updating cache in background: {e}")
# Start cache update in background thread
threading.Thread(target=update_cache_async, daemon=True).start()
else:
logger.warning("⚠️ No properties fetched during background refresh")
else:
logger.warning("⚠️ Failed to fetch properties during background refresh")
except Exception as e:
logger.error(f"❌ Error during background property refresh: {str(e)}")
def force_initial_property_fetch():
"""Force initial property fetch to ensure cache is created - non-blocking"""
try:
logger.info("πŸ”„ Force initial property fetch...")
data = fetch_properties_background_safe()
if data and 'data' in data:
new_properties = [p for p in data['data'] if isinstance(p, dict)]
if new_properties:
# Use ThreadPoolExecutor for non-blocking cache update
def update_cache_async():
try:
update_property_cache(new_properties)
update_cache_timestamp()
logger.info(f"βœ… Force initial fetch completed with {len(new_properties)} properties")
except Exception as e:
logger.error(f"Error updating cache in force initial fetch: {e}")
# Start cache update in background thread
threading.Thread(target=update_cache_async, daemon=True).start()
return True
else:
logger.warning("⚠️ No properties fetched during force initial fetch")
return False
else:
logger.warning("⚠️ Failed to fetch properties during force initial fetch")
return False
except Exception as e:
logger.error(f"❌ Error during force initial property fetch: {str(e)}")
return False
def process_property_batch(properties_batch, existing_ids):
"""Process a batch of properties for ChromaDB in parallel"""
batch_ids = []
batch_metadatas = []
batch_documents = []
batch_embeddings = []
# Prepare property texts for batch embedding
property_texts = []
valid_properties = []
for prop in properties_batch:
try:
prop_id = str(prop.get('id'))
if prop_id not in existing_ids:
# Create property text for embedding
property_text = f"{prop.get('propertyName', '')} {prop.get('typeName', '')} {prop.get('description', '')} {prop.get('address', '')}"
property_texts.append(property_text)
valid_properties.append(prop)
except Exception as e:
logger.error(f"Error processing property for batch: {str(e)}")
continue
if not valid_properties:
return batch_ids, batch_metadatas, batch_documents, batch_embeddings
# Create embeddings in batch for better performance
try:
embeddings = model.encode(property_texts)
for i, prop in enumerate(valid_properties):
try:
prop_id = str(prop.get('id'))
# Convert property to metadata dictionary with proper type handling
metadata = {
'id': str(prop.get('id', '')),
'propertyName': str(prop.get('propertyName', '')),
'typeName': str(prop.get('typeName', '')),
'description': str(prop.get('description', '')),
'address': str(prop.get('address', '')),
'totalSquareFeet': float(prop.get('totalSquareFeet', 0)),
'beds': int(prop.get('beds', 0)),
'baths': int(prop.get('baths', 0)),
'numberOfRooms': int(prop.get('numberOfRooms', 0)),
'marketValue': float(prop.get('marketValue', 0)),
'yearBuilt': str(prop.get('yearBuilt', '')),
'propertyImages': json.dumps(prop.get('propertyImages', [])),
'features': json.dumps(prop.get('features', [])),
'location': json.dumps(prop.get('location', {})),
'floorPlans': json.dumps(prop.get('floorPlans', [])),
'documents': json.dumps(prop.get('documents', [])),
'propertyVideos': json.dumps(prop.get('propertyVideos', []))
}
batch_ids.append(prop_id)
batch_metadatas.append(metadata)
batch_documents.append(property_texts[i])
batch_embeddings.append(embeddings[i].tolist())
except Exception as e:
logger.error(f"Error processing property {i} in batch: {str(e)}")
continue
except Exception as e:
logger.error(f"Error creating embeddings for batch: {str(e)}")
return batch_ids, batch_metadatas, batch_documents, batch_embeddings
def update_property_cache(new_properties):
"""Update property cache with parallel processing optimization"""
global property_collection
try:
if not new_properties:
logger.warning("No new properties to add")
return
logger.info(f"Updating ChromaDB with {len(new_properties)} properties using parallel processing")
start_time = time.time()
# Get existing property IDs
existing_results = property_collection.get()
existing_ids = set(existing_results['ids']) if existing_results and existing_results['ids'] else set()
# Filter out existing properties in parallel
def filter_new_property(prop):
try:
prop_id = str(prop.get('id'))
if prop_id not in existing_ids:
return prop
return None
except Exception as e:
logger.error(f"Error filtering property: {str(e)}")
return None
# Use ThreadPoolExecutor for parallel filtering
max_workers = min(mp.cpu_count(), 8)
new_properties_filtered = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_property = {
executor.submit(filter_new_property, prop): i
for i, prop in enumerate(new_properties)
}
for future in as_completed(future_to_property):
try:
filtered_prop = future.result()
if filtered_prop:
new_properties_filtered.append(filtered_prop)
except Exception as e:
property_idx = future_to_property[future]
logger.error(f"Error filtering property {property_idx}: {str(e)}")
if not new_properties_filtered:
logger.info("No new properties to add to ChromaDB")
return
logger.info(f"Processing {len(new_properties_filtered)} new properties in parallel")
# Process properties in batches for better performance
cache_batch_size = 100 # Process 100 properties at a time for embedding
all_ids = []
all_metadatas = []
all_documents = []
all_embeddings = []
# Process in batches with parallel processing
def process_batch_parallel(batch):
return process_property_batch(batch, existing_ids)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Create batches
batches = [new_properties_filtered[i:i + cache_batch_size]
for i in range(0, len(new_properties_filtered), cache_batch_size)]
# Submit batch processing tasks
future_to_batch = {
executor.submit(process_batch_parallel, batch): i
for i, batch in enumerate(batches)
}
# Collect results
for future in as_completed(future_to_batch):
batch_idx = future_to_batch[future]
try:
batch_ids, batch_metadatas, batch_documents, batch_embeddings = future.result()
all_ids.extend(batch_ids)
all_metadatas.extend(batch_metadatas)
all_documents.extend(batch_documents)
all_embeddings.extend(batch_embeddings)
logger.info(f"Processed batch {batch_idx + 1}/{len(batches)} - {len(batch_ids)} properties")
except Exception as e:
logger.error(f"Error processing batch {batch_idx}: {str(e)}")
# Add all new properties to ChromaDB
if all_ids:
logger.info(f"Adding {len(all_ids)} new properties to ChromaDB")
property_collection.add(
ids=all_ids,
metadatas=all_metadatas,
documents=all_documents,
embeddings=all_embeddings
)
end_time = time.time()
logger.info(f"βœ… Successfully added {len(all_ids)} properties to ChromaDB in {end_time - start_time:.2f} seconds using parallel processing")
else:
logger.info("No new properties to add to ChromaDB")
except Exception as e:
logger.error(f"Error updating property cache: {str(e)}")
raise
def initialize_collection():
global property_collection
try:
# First try to get the collection
try:
property_collection = chroma_client.get_collection("properties")
logger.info("Using existing property collection from vector DB")
except Exception as e:
logger.info("Creating new property collection")
property_collection = chroma_client.create_collection(
name="properties",
metadata={"hnsw:space": "cosine"}
)
# Check if collection is empty or needs refresh
results = property_collection.get()
if not results or not results['ids']:
logger.info("Collection is empty, fetching initial properties")
# Always fetch initial properties if collection is empty
data = fetch_properties()
if data and 'data' in data:
new_properties = [p for p in data['data'] if isinstance(p, dict)]
if new_properties:
update_property_cache(new_properties)
update_cache_timestamp()
logger.info(f"βœ… Successfully added {len(new_properties)} initial properties to collection")
else:
logger.warning("No properties fetched from API during initialization")
else:
logger.warning("Failed to fetch properties from API during initialization")
else:
logger.info(f"Collection has {len(results['ids'])} properties, checking if cache is fresh")
if is_cache_stale():
logger.info("Cache is stale, will refresh in background")
# Start background refresh
threading.Thread(target=refresh_properties_background, daemon=True).start()
else:
logger.info("Cache is fresh, using existing properties")
except Exception as e:
logger.error(f"Error initializing collection: {str(e)}")
# Try to recreate the collection
try:
logger.info("Attempting to recreate collection")
try:
chroma_client.delete_collection("properties")
except:
pass
property_collection = chroma_client.create_collection(
name="properties",
metadata={"hnsw:space": "cosine"}
)
logger.info("Successfully recreated collection")
except Exception as recreate_error:
logger.error(f"Error recreating collection: {str(recreate_error)}")
raise
# Initialize ChromaDB with persistent storage
try:
logger.info("Initializing ChromaDB...")
chroma_client = chromadb.PersistentClient(path="property_db")
# Initialize collection at startup
initialize_collection()
logger.info("ChromaDB initialized successfully")
except Exception as e:
logger.error(f"Error initializing ChromaDB: {str(e)}")
raise
def is_cache_stale():
"""Check if cache is stale and needs refresh"""
if not os.path.exists(CACHE_TIMESTAMP_PATH):
logger.info("No cache timestamp found, cache needs refresh")
return True
try:
with open(CACHE_TIMESTAMP_PATH, 'r') as f:
last_time = float(f.read().strip())
current_time = time.time()
time_diff = current_time - last_time
is_stale = time_diff > CACHE_EXPIRY_SECONDS
if is_stale:
logger.info(f"Cache is stale (older than {format_time_remaining(CACHE_EXPIRY_SECONDS)}). Last update: {datetime.fromtimestamp(last_time)}")
logger.info(f"Time since last update: {format_time_remaining(time_diff)}")
else:
time_remaining = CACHE_EXPIRY_SECONDS - time_diff
next_update = datetime.fromtimestamp(current_time + time_remaining)
logger.info(f"Cache is fresh. Last update: {datetime.fromtimestamp(last_time)}")
logger.info(f"Next update in: {format_time_remaining(time_remaining)}")
logger.info(f"Next update scheduled for: {next_update.strftime('%Y-%m-%d %H:%M:%S')}")
return is_stale
except Exception as e:
logger.error(f"Error reading cache timestamp: {e}, cache needs refresh")
return True
def calculate_property_score(prop, price_range=None, property_type=None, user_input=None):
"""Calculate property score with optimized parallel processing"""
score = 0.0
similarity_score = 0.0
# Calculate semantic similarity if user input is provided
if user_input:
property_text = f"{prop.get('propertyName', '')} {prop.get('typeName', '')} {prop.get('description', '')} {prop.get('address', '')}"
property_embedding = model.encode([property_text])[0]
user_embedding = model.encode([user_input])[0]
similarity_score = float(cosine_similarity([property_embedding], [user_embedding])[0][0]) # Convert to Python float
score += similarity_score * 0.5 # 50% weight for semantic similarity
# Base score for property type match (only if property_type is specified)
if property_type:
prop_type = prop.get('typeName', '').lower()
if isinstance(property_type, list):
# Check if property type matches any in the list
if any(pt.lower() == prop_type for pt in property_type):
score += 0.3 # 30% weight for exact type match
elif any(pt.lower() in prop_type for pt in property_type):
score += 0.2 # 20% weight for partial type match
else:
# Single property type
if prop_type == property_type.lower():
score += 0.3 # 30% weight for exact type match
elif property_type.lower() in prop_type:
score += 0.2 # 20% weight for partial type match
else:
# For additional properties (no type filter), give base score
score += 0.1 # Base score for any property type
# Score for price range match
if price_range:
price = float(prop.get('marketValue', 0)) # Convert to Python float
if isinstance(price_range, list) and len(price_range) > 0:
if isinstance(price_range[0], list):
# Multiple price ranges - check if price falls in any range
price_in_range = False
for range_item in price_range:
min_val = range_item[0] if range_item[0] is not None else 0
max_val = range_item[1] if range_item[1] is not None else float('inf')
if min_val <= price <= max_val:
price_in_range = True
score += 0.3 # 30% weight for exact price match
break
if not price_in_range:
# Check if price is close to any range
for range_item in price_range:
min_val = range_item[0] if range_item[0] is not None else 0
max_val = range_item[1] if range_item[1] is not None else float('inf')
if price < min_val:
price_diff = (min_val - price) / min_val if min_val > 0 else 1
score += 0.1 * (1 - min(price_diff, 1)) # Up to 10% for properties below range
elif price < max_val * 1.2: # Allow 20% above max price
price_diff = (price - max_val) / max_val if max_val > 0 else 1
score += 0.2 * (1 - min(price_diff, 1)) # Up to 20% for properties above range
else:
# Single price range
min_val = price_range[0] if price_range[0] is not None else 0
max_val = price_range[1] if price_range[1] is not None else float('inf')
if min_val <= price <= max_val:
score += 0.3 # 30% weight for exact price match
elif price < min_val:
# Properties below range get a small score
price_diff = (min_val - price) / min_val if min_val > 0 else 1
score += 0.1 * (1 - min(price_diff, 1)) # Up to 10% for properties below range
elif price < max_val * 1.2: # Allow 20% above max price
price_diff = (price - max_val) / max_val if max_val > 0 else 1
score += 0.2 * (1 - min(price_diff, 1)) # Up to 20% for properties above range
# Score for property features
features = [
'balcony', 'parking', 'security', 'garden',
'swimming pool', 'gym', 'power backup', 'water supply'
]
description = prop.get('description', '')
if description: # Only process features if description exists
description = description.lower()
feature_count = sum(1 for feature in features if feature in description)
score += (feature_count / len(features)) * 0.1 # Up to 10% for features
# Score for property size
if prop.get('totalSquareFeet', 0) > 1000:
score += 0.05 # 5% for larger properties
# Score for number of rooms
if prop.get('numberOfRooms', 0) >= 3:
score += 0.05 # 5% for properties with more rooms
return float(score), float(similarity_score) # Convert to Python floats
def calculate_property_scores_parallel(properties, price_range=None, property_type=None, user_input=None, max_workers=None):
"""Calculate scores for multiple properties in parallel"""
if not properties:
return []
if max_workers is None:
max_workers = min(mp.cpu_count(), 8) # Use CPU cores but cap at 8
logger.info(f"Calculating scores for {len(properties)} properties using {max_workers} workers")
# Prepare property texts for batch embedding if user_input is provided
if user_input:
property_texts = []
for prop in properties:
property_text = f"{prop.get('propertyName', '')} {prop.get('typeName', '')} {prop.get('description', '')} {prop.get('address', '')}"
property_texts.append(property_text)
# Batch encode all property texts at once
try:
property_embeddings = model.encode(property_texts)
user_embedding = model.encode([user_input])[0]
# Calculate similarities in batch
similarities = cosine_similarity(property_embeddings, [user_embedding]).flatten()
except Exception as e:
logger.error(f"Error in batch embedding: {e}")
similarities = [0.0] * len(properties)
else:
similarities = [0.0] * len(properties)
# Process properties in parallel
def score_single_property(args):
prop, similarity = args
score = 0.0
# Add similarity score
score += similarity * 0.5
# Base score for property type match
if property_type:
prop_type = prop.get('typeName', '').lower()
if isinstance(property_type, list):
if any(pt.lower() == prop_type for pt in property_type):
score += 0.3
elif any(pt.lower() in prop_type for pt in property_type):
score += 0.2
else:
if prop_type == property_type.lower():
score += 0.3
elif property_type.lower() in prop_type:
score += 0.2
else:
score += 0.1
# Price range scoring
if price_range:
price = float(prop.get('marketValue', 0))
if isinstance(price_range, list) and len(price_range) > 0:
if isinstance(price_range[0], list):
price_in_range = False
for range_item in price_range:
min_val = range_item[0] if range_item[0] is not None else 0
max_val = range_item[1] if range_item[1] is not None else float('inf')
if min_val <= price <= max_val:
price_in_range = True
score += 0.3
break
if not price_in_range:
for range_item in price_range:
min_val = range_item[0] if range_item[0] is not None else 0
max_val = range_item[1] if range_item[1] is not None else float('inf')
if price < min_val:
price_diff = (min_val - price) / min_val if min_val > 0 else 1
score += 0.1 * (1 - min(price_diff, 1))
elif price < max_val * 1.2: # Allow 20% above max price
price_diff = (price - max_val) / max_val if max_val > 0 else 1
score += 0.2 * (1 - min(price_diff, 1))
else:
min_val = price_range[0] if price_range[0] is not None else 0
max_val = price_range[1] if price_range[1] is not None else float('inf')
if min_val <= price <= max_val:
score += 0.3
elif price < min_val:
price_diff = (min_val - price) / min_val if min_val > 0 else 1
score += 0.1 * (1 - min(price_diff, 1))
elif price < max_val * 1.2: # Allow 20% above max price
price_diff = (price - max_val) / max_val if max_val > 0 else 1
score += 0.2 * (1 - min(price_diff, 1))
# Feature scoring
features = ['balcony', 'parking', 'security', 'garden', 'swimming pool', 'gym', 'power backup', 'water supply']
description = prop.get('description', '')
if description:
description = description.lower()
feature_count = sum(1 for feature in features if feature in description)
score += (feature_count / len(features)) * 0.1
# Size and room scoring
if prop.get('totalSquareFeet', 0) > 1000:
score += 0.05
if prop.get('numberOfRooms', 0) >= 3:
score += 0.05
return {
'property': prop,
'score': float(score),
'similarity': float(similarity)
}
# Use ThreadPoolExecutor for parallel scoring
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Create arguments for each property
args = list(zip(properties, similarities))
# Submit all scoring tasks
future_to_property = {
executor.submit(score_single_property, arg): i
for i, arg in enumerate(args)
}
# Collect results
scored_properties = []
for future in as_completed(future_to_property):
try:
result = future.result()
if result['score'] >= 0.1: # Filter low scores
scored_properties.append(result)
except Exception as e:
property_idx = future_to_property[future]
logger.error(f"Error scoring property {property_idx}: {e}")
# Sort by score
scored_properties.sort(key=lambda x: x['score'], reverse=True)
# Convert back to original format
for result in scored_properties:
result['property']['score'] = result['score']
result['property']['similarity'] = result['similarity']
logger.info(f"Successfully scored {len(scored_properties)} properties in parallel")
return [result['property'] for result in scored_properties]
def get_recommendations(user_input, price_range=None, property_type=None):
global property_collection
# Handle multiple price ranges - convert to min/max arrays
min_price = None
max_price = None
if price_range:
if isinstance(price_range, list) and len(price_range) > 0:
if isinstance(price_range[0], list):
# Array of price ranges
min_price = [range[0] for range in price_range]
max_price = [range[1] for range in price_range]
else:
# Single price range
min_price = [price_range[0]]
max_price = [price_range[1]]
# Get properties from ChromaDB, fetch from API if empty
try:
logger.info("Getting properties from ChromaDB...")
properties = get_cached_properties(
property_type=property_type,
min_price=min_price,
max_price=max_price
)
if not properties:
logger.warning("No properties found in ChromaDB, fetching from API...")
# Fetch properties from API and store in ChromaDB
data = fetch_properties(
property_type=property_type,
min_price=min_price[0] if min_price else None,
max_price=max_price[0] if max_price else None
)
if data and 'data' in data:
new_properties = [p for p in data['data'] if isinstance(p, dict)]
if new_properties:
# Store properties in ChromaDB
update_property_cache(new_properties)
update_cache_timestamp()
logger.info(f"βœ… Fetched and stored {len(new_properties)} properties from API")
# Now get properties from ChromaDB again
properties = get_cached_properties(
property_type=property_type,
min_price=min_price,
max_price=max_price
)
else:
logger.warning("No properties fetched from API")
return []
else:
logger.warning("Failed to fetch properties from API")
return []
except Exception as e:
logger.error(f"Error getting properties from ChromaDB: {str(e)}")
properties = []
# Final fallback: if still no properties, try to get all properties without filters
if not properties:
logger.warning("Still no properties found, trying fallback fetch...")
try:
# Try to get all properties without filters
all_properties = get_cached_properties(
property_type=None,
min_price=None,
max_price=None
)
if all_properties:
logger.info(f"βœ… Found {len(all_properties)} properties in fallback fetch")
properties = all_properties
else:
logger.error("❌ No properties available even in fallback")
return []
except Exception as e:
logger.error(f"Error in fallback property fetch: {str(e)}")
return []
if not properties:
logger.warning("No properties found matching criteria")
return []
# Calculate scores for properties using parallel processing
scored_properties = calculate_property_scores_parallel(
properties, price_range, property_type, user_input, max_workers=min(mp.cpu_count(), 8)
)
# If we have enough properties matching exact criteria, return them
if len(scored_properties) >= TARGET_PROPERTIES:
logger.info(f"Found {len(scored_properties)} properties matching exact criteria, returning top {TARGET_PROPERTIES}")
return scored_properties[:TARGET_PROPERTIES]
# If we don't have enough properties, get additional properties from other types in same price range
logger.info(f"Found only {len(scored_properties)} properties matching exact criteria, need {TARGET_PROPERTIES - len(scored_properties)} more")
# Get all properties in the same price range (regardless of type)
all_properties_in_range = get_cached_properties(
property_type=None, # No type filter
min_price=min_price,
max_price=max_price
)
if not all_properties_in_range:
logger.warning("No properties found in price range, trying fallback to all properties...")
# Final fallback: get all properties regardless of price range
all_properties_in_range = get_cached_properties(
property_type=None,
min_price=None,
max_price=None
)
if not all_properties_in_range:
logger.error("❌ No properties available even in final fallback")
return scored_properties # Return what we have
# Calculate scores for all properties in price range using parallel processing
all_scored_properties = calculate_property_scores_parallel(
all_properties_in_range, price_range, None, user_input, max_workers=min(mp.cpu_count(), 8)
)
# Filter for lower threshold for additional properties
all_scored_properties = [prop for prop in all_scored_properties if prop.get('score', 0) >= 0.05]
# Create final list: exact matches first, then additional properties using parallel processing
def select_final_properties():
final_properties = []
used_ids = set()
# Add exact matches first
for prop in scored_properties:
if len(final_properties) >= TARGET_PROPERTIES:
break
prop_id = str(prop.get('id'))
if prop_id not in used_ids:
final_properties.append(prop)
used_ids.add(prop_id)
# Add additional properties to reach target
for prop in all_scored_properties:
if len(final_properties) >= TARGET_PROPERTIES:
break
prop_id = str(prop.get('id'))
if prop_id not in used_ids:
final_properties.append(prop)
used_ids.add(prop_id)
return final_properties
# Use ThreadPoolExecutor for final property selection
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(select_final_properties)
final_properties = future.result()
logger.info(f"Returning {len(final_properties)} properties total: {len(scored_properties)} exact matches + {len(final_properties) - len(scored_properties)} additional")
return final_properties
def get_cached_properties(property_type=None, min_price=None, max_price=None):
"""Get properties from ChromaDB with parallel processing optimization"""
global property_collection
try:
# Verify collection exists and has data
if not property_collection:
logger.info("Collection not initialized, reinitializing...")
initialize_collection()
if not property_collection:
logger.error("Failed to initialize collection")
return []
logger.info("Fetching properties from ChromaDB with parallel processing...")
# Get all properties from ChromaDB
results = property_collection.get()
if not results or not results['ids']:
logger.warning("No properties found in ChromaDB")
return []
logger.info(f"Found {len(results['ids'])} properties in ChromaDB")
# Process ChromaDB results in parallel
def process_single_property(args):
i, metadata = args
try:
# Convert JSON strings back to Python objects and handle type conversion
property_data = {
'id': metadata['id'],
'propertyName': metadata['propertyName'],
'typeName': metadata['typeName'],
'description': metadata['description'],
'address': metadata['address'],
'totalSquareFeet': float(metadata['totalSquareFeet']),
'beds': int(metadata['beds']),
'baths': int(metadata['baths']),
'numberOfRooms': int(metadata['numberOfRooms']),
'marketValue': float(metadata['marketValue']),
'yearBuilt': metadata['yearBuilt'],
'propertyImages': json.loads(metadata['propertyImages']),
'features': json.loads(metadata['features']),
'location': json.loads(metadata['location']),
'floorPlans': json.loads(metadata['floorPlans']),
'documents': json.loads(metadata['documents']),
'propertyVideos': json.loads(metadata['propertyVideos'])
}
return property_data
except Exception as e:
logger.error(f"Error processing property {i}: {str(e)}")
return None
# Use ThreadPoolExecutor for parallel processing
max_workers = min(mp.cpu_count(), 8) # Use CPU cores but cap at 8
properties = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Create arguments for each property
args = [(i, results['metadatas'][i]) for i in range(len(results['ids']))]
# Submit all property processing tasks
future_to_index = {
executor.submit(process_single_property, arg): i
for i, arg in enumerate(args)
}
# Collect results as they complete
for future in as_completed(future_to_index):
try:
property_data = future.result()
if property_data:
properties.append(property_data)
except Exception as e:
index = future_to_index[future]
logger.error(f"Error processing property {index}: {str(e)}")
logger.info(f"Successfully processed {len(properties)} properties from ChromaDB")
# Apply filters in parallel
def filter_single_property(prop):
try:
# Apply property type filter (handle both single string and array)
if property_type:
prop_type = prop.get('typeName', '').lower()
if isinstance(property_type, list):
# Check if property type matches any in the list
if not any(pt.lower() == prop_type for pt in property_type):
return None
else:
# Single property type
if prop_type != property_type.lower():
return None
# Apply price range filter (handle both single range and array of ranges)
if min_price is not None and max_price is not None:
price = prop.get('marketValue', 0)
if isinstance(min_price, list) and isinstance(max_price, list):
# Multiple price ranges - check if price falls in any range
price_in_range = False
for i in range(len(min_price)):
min_val = min_price[i] if min_price[i] is not None else 0
max_val = max_price[i] if max_price[i] is not None else float('inf')
if min_val <= price <= max_val:
price_in_range = True
break
if not price_in_range:
return None
else:
# Single price range
min_val = min_price if min_price is not None else 0
max_val = max_price if max_price is not None else float('inf')
if price < min_val or price > max_val:
return None
return prop
except Exception as e:
logger.error(f"Error filtering property: {str(e)}")
return None
# Filter properties in parallel
filtered_properties = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all filtering tasks
future_to_property = {
executor.submit(filter_single_property, prop): i
for i, prop in enumerate(properties)
}
# Collect filtered results
for future in as_completed(future_to_property):
try:
filtered_prop = future.result()
if filtered_prop:
filtered_properties.append(filtered_prop)
except Exception as e:
property_idx = future_to_property[future]
logger.error(f"Error filtering property {property_idx}: {str(e)}")
logger.info(f"Filtered to {len(filtered_properties)} properties from ChromaDB using parallel processing")
return filtered_properties
except Exception as e:
logger.error(f"Error getting properties from ChromaDB: {str(e)}")
return []
@app.route('/')
def home():
return render_template('index.html')
import base64
import logging
from flask import request, jsonify
import requests
TWILIO_ACCOUNT_SID = "AC3bb0a3ea71812794b85b75ccf7d6ad07"
TWILIO_AUTH_TOKEN = "ddba202f5f4a48d08f04fb983a161fa3"
TWILIO_WHATSAPP_NUMBER = "whatsapp:+14155238886"
WHATSAPP_CHAR_LIMIT = 1600
PROPERTY_BASE_URL = "https://hiveprop.com/property/image-slider?id="
MAX_PROPERTIES_TO_SEND = 1 # Send only 1 property in email
HIVEPROP_EMAIL_API_URL = "https://hivepropapi.azurewebsites.net/api/Email/sendPlainText"
def send_email_recommendations(email, first_name, last_name, recommendations):
"""Send property recommendations via email using HiveProp API"""
try:
# Take only the top properties
top_recommendations = recommendations[:MAX_PROPERTIES_TO_SEND]
# Prepare email content
subject = f"Your Top {MAX_PROPERTIES_TO_SEND} Property Recommendations - HiveProp"
# Create optimized HTML email body (under 1800 character limit)
email_body = f"""<!DOCTYPE html>
<html>
<head>
<style>
body{{font-family:Arial;color:#333;margin:0;padding:20px}}
.header{{background:#2c5aa0;color:white;padding:15px;text-align:center;border-radius:5px}}
.content{{padding:15px}}
.property{{border:1px solid #ddd;padding:10px;margin:10px 0;border-radius:3px;background:#f9f9f9}}
.btn{{background:#2c5aa0;color:white;padding:8px 15px;text-decoration:none;border-radius:3px;display:inline-block}}
.footer{{background:#f8f9fa;padding:15px;text-align:center;border-radius:3px}}
</style>
</head>
<body>
<div class="header">
<h1>🏑 Hello {first_name} {last_name}!</h1>
</div>
<div class="content">
<h2>Your Top {len(top_recommendations)} Property Recommendations</h2>"""
if not top_recommendations:
email_body += """
<div class="property">
<h3>⚠️ No specific properties found</h3>
<p>We couldn't find specific properties matching your criteria, but you can explore all available properties on our website.</p>
<a href="https://hiveprop.com" class="btn">Explore All Properties</a>
</div>"""
else:
for idx, prop in enumerate(top_recommendations, 1):
property_name = prop.get('propertyName', 'Unnamed Property')
property_id = prop.get('id', '')
property_link = f"{PROPERTY_BASE_URL}{property_id}"
address = prop.get('address', '')
location = address.split(',')[-2].strip() if ',' in address else address
email_body += f"""
<div class="property">
<h3>{idx}. {property_name}</h3>
<p>πŸ“ {location}</p>
<a href="{property_link}" class="btn">View Details</a>
</div>"""
email_body += f"""
</div>
<div class="footer">
<h3>πŸ”Ž Looking for more options?</h3>
<p>Explore all listings at <a href="https://hiveprop.com">hiveprop.com</a></p>
<p>Thank you for using HiveProp!</p>
</div>
</body>
</html>"""
# Send email via HiveProp API using query parameters
params = {
'toEmail': email,
'subject': subject,
'body': email_body
}
logger.info(f"Sending email to {email} via HiveProp API")
logger.info(f"API URL: {HIVEPROP_EMAIL_API_URL}")
logger.info(f"Subject: {subject}")
logger.info(f"Request params: {params}")
# Send POST request with query parameters
response = requests.post(HIVEPROP_EMAIL_API_URL, params=params, timeout=30)
logger.info(f"Email API response status: {response.status_code}")
logger.info(f"Email API response text: {response.text}")
# Check for 404 or other errors before raising
if response.status_code == 404:
logger.error(f"Email API endpoint not found (404): {HIVEPROP_EMAIL_API_URL}")
return False, f"Email API endpoint not found. Please check the API URL."
elif response.status_code != 200:
logger.error(f"Email API returned status {response.status_code}: {response.text}")
return False, f"Email API error: Status {response.status_code} - {response.text}"
response.raise_for_status()
logger.info(f"Email sent successfully to {email}")
return True, "Email sent successfully"
except requests.exceptions.Timeout:
logger.error("HiveProp Email API request timed out")
return False, "Email API timeout - service may be slow or unavailable"
except requests.exceptions.ConnectionError as e:
logger.error(f"HiveProp Email API connection error: {str(e)}")
return False, "Email API connection failed - service may be down"
except requests.exceptions.HTTPError as e:
error_text = e.response.text if e.response else "No response text"
status_code = e.response.status_code if e.response else "Unknown"
logger.error(f"HiveProp Email API error: Status {status_code}, Response: {error_text}")
return False, f"Email API error: Status {status_code} - {error_text}"
except requests.exceptions.RequestException as e:
logger.error(f"Request error sending email: {str(e)}")
return False, f"Email request failed: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error sending email: {str(e)}")
return False, f"Failed to send email: {str(e)}"
@app.route('/create_account_and_send_whatsapp', methods=['POST'])
def create_account_and_send_whatsapp():
try:
data = request.json
# Extract user details
first_name = data.get('firstName')
last_name = data.get('lastName')
phone = data.get('phone')
email = data.get('email')
# Extract search criteria to generate recommendations
user_input = data.get('description', '')
price_range = data.get('price_range', None)
property_type = data.get('property_type', None)
logger.info(f"Received data: firstName={first_name}, lastName={last_name}, email={email}, phone={phone}")
logger.info(f"Search criteria: description='{user_input}', price_range={price_range}, property_type={property_type}")
# Generate recommendations directly in backend
logger.info("Generating recommendations in backend...")
recommendations = get_recommendations(user_input, price_range, property_type)
logger.info(f"Generated {len(recommendations)} recommendations")
# Validate email format
if not email or '@' not in email:
logger.error(f"Invalid email format: {email}")
return jsonify({"success": False, "error": "Invalid email format. Please provide a valid email address."}), 400
# Clean and format phone number in E.164
# Remove all spaces, dashes, and parentheses
phone = phone.replace(' ', '').replace('-', '').replace('(', '').replace(')', '')
# Add +91 if not already present
if not phone.startswith('+'):
phone = f"+91{phone}"
# Validate phone number format (should be +91 followed by 10 digits)
if not phone.startswith('+91') or len(phone) != 13 or not phone[3:].isdigit():
logger.error(f"Invalid phone number format: {phone}")
return jsonify({"success": False, "error": "Invalid phone number format. Please provide a valid 10-digit Indian mobile number."}), 400
# Track success status for both email and WhatsApp
whatsapp_success = False
email_success = False
messages = []
# Send via WhatsApp
try:
# Take only the top properties (1 for email, 5 for WhatsApp)
whatsapp_recommendations = recommendations[:5] # WhatsApp can handle more
email_recommendations = recommendations[:MAX_PROPERTIES_TO_SEND] # Email gets 1
logger.info(f"Total recommendations received: {len(recommendations)}")
logger.info(f"WhatsApp recommendations to send: {len(whatsapp_recommendations)}")
# Prepare the WhatsApp message with improved design
message_header = f"🏑 *Hello {first_name} {last_name}!*\n\nHere are your *top {len(whatsapp_recommendations)} recommended properties* in Hyderabad:\n"
message_body = ""
if not whatsapp_recommendations:
message_body = "⚠️ No specific properties found, but you can explore all available properties at https://hiveprop.com\n\n"
else:
for idx, prop in enumerate(whatsapp_recommendations, 1):
property_name = prop.get('propertyName', 'Unnamed Property')
property_id = prop.get('id', '')
property_link = f"{PROPERTY_BASE_URL}{property_id}"
# Extract location from address or description
address = prop.get('address', '')
location = address.split(',')[-2].strip() if ',' in address else address
message_body += f"{idx}️⃣ *{property_name}*\n"
message_body += f"πŸ“ *{location}*\n"
message_body += f"πŸ”—{property_link}\n\n"
# Add a call-to-action
message_footer = "---\nπŸ”Ž *Looking for more options?* Explore all listings at https://hiveprop.com."
full_message = message_header + message_body + message_footer
logger.info(f"WhatsApp message length: {len(full_message)}")
# Encode credentials for Basic Auth
credentials = f"{TWILIO_ACCOUNT_SID}:{TWILIO_AUTH_TOKEN}"
encoded_credentials = base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
# Send the message
payload = {
"To": f"whatsapp:{phone}",
"From": TWILIO_WHATSAPP_NUMBER,
"Body": full_message
}
headers = {
"Authorization": f"Basic {encoded_credentials}",
"Content-Type": "application/x-www-form-urlencoded"
}
response = requests.post(
f"https://api.twilio.com/2010-04-01/Accounts/{TWILIO_ACCOUNT_SID}/Messages.json",
data=payload,
headers=headers
)
response.raise_for_status()
logger.info("WhatsApp message sent successfully")
whatsapp_success = True
messages.append("WhatsApp message sent successfully!")
except requests.exceptions.HTTPError as e:
logger.error(f"Twilio API error: {e.response.text}")
messages.append(f"WhatsApp error: {e.response.text}")
except Exception as e:
logger.error(f"Unexpected WhatsApp error: {str(e)}")
messages.append(f"WhatsApp error: {str(e)}")
# Send via Email
logger.info(f"Sending email to {email} with {len(email_recommendations)} recommendations")
email_success, email_message = send_email_recommendations(email, first_name, last_name, email_recommendations)
messages.append(email_message)
# Determine overall success and response message
success = whatsapp_success or email_success
if whatsapp_success and email_success:
overall_message = "Account created! Messages sent via WhatsApp and email!"
elif whatsapp_success:
overall_message = "Account created! WhatsApp message sent (email failed)"
elif email_success:
overall_message = "Account created! Email sent (WhatsApp failed)"
else:
overall_message = "Account created but failed to send messages"
return jsonify({
"success": success,
"message": overall_message,
"details": messages,
"recommendations": recommendations # Return generated recommendations
})
except requests.exceptions.HTTPError as e:
logger.error(f"Twilio API error: {e.response.text}")
return jsonify({"success": False, "error": f"Twilio API error: {e.response.text}"}), 500
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return jsonify({"success": False, "error": f"Failed to send WhatsApp message: {str(e)}"}), 500
@app.route('/get_recommendations', methods=['POST'])
def get_recommendations_route():
"""Get property recommendations with parallel processing optimization and non-blocking property fetch - no timeouts"""
global active_requests
# Check concurrent request limit
with request_lock:
if active_requests >= MAX_CONCURRENT_REQUESTS:
logger.warning(f"Too many concurrent requests ({active_requests}), rejecting request")
return jsonify({"error": "Server is busy, please try again later"}), 503
active_requests += 1
try:
# Validate request data
if not request.is_json:
return jsonify({"error": "Request must be JSON"}), 400
data = request.json
if not data:
return jsonify({"error": "No data provided"}), 400
user_input = data.get('description', '')
price_range = data.get('price_range', None)
property_type = data.get('property_type', None)
# Handle partial inputs
if not user_input and not property_type and not price_range:
return jsonify({"error": "Please provide at least one search criteria"})
# If no description is provided, create one based on available inputs
if not user_input:
user_input = f"{property_type if property_type else ''} property"
if price_range:
# Handle multiple price ranges
if isinstance(price_range, list) and len(price_range) > 0:
if isinstance(price_range[0], list):
# Array of price ranges
ranges_text = []
for range_item in price_range:
ranges_text.append(f"β‚Ή{range_item[0]} to β‚Ή{range_item[1]}")
user_input += f" within price ranges: {', '.join(ranges_text)}"
else:
# Single price range
user_input += f" within price range β‚Ή{price_range[0]} to β‚Ή{price_range[1]}"
logger.info(f"Search request - Type: {property_type}, Price Range: {price_range}, Description: {user_input}")
# Handle multiple price ranges - convert to min/max arrays
min_price = None
max_price = None
if price_range:
if isinstance(price_range, list) and len(price_range) > 0:
if isinstance(price_range[0], list):
# Array of price ranges
min_price = [range[0] for range in price_range]
max_price = [range[1] for range in price_range]
else:
# Single price range
min_price = [price_range[0]]
max_price = [price_range[1]]
# Check if we have properties in cache first
try:
properties = get_cached_properties(
property_type=property_type,
min_price=min_price,
max_price=max_price
)
except Exception as e:
logger.error(f"Error getting cached properties: {str(e)}")
properties = []
# If no properties found and we're not currently fetching, start background fetch
if not properties and not is_fetching_properties:
logger.info("No properties found, initiating background fetch...")
try:
threading.Thread(target=lambda: fetch_properties_background_safe(property_type, min_price, max_price), daemon=True).start()
except Exception as e:
logger.error(f"Error starting background fetch: {str(e)}")
# Use ThreadPoolExecutor for parallel recommendation processing without timeout
def process_recommendations():
try:
return get_recommendations(user_input, price_range, property_type)
except Exception as e:
logger.error(f"Error in process_recommendations: {str(e)}")
return []
try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(process_recommendations)
recommendations = future.result() # Removed timeout
# Ensure we always return properties, even if empty
if not recommendations:
logger.warning("No recommendations returned, getting fallback properties...")
# Get fallback properties (top 20 by score)
try:
fallback_properties = get_cached_properties(
property_type=None,
min_price=None,
max_price=None
)
if fallback_properties:
# Score and return top 20 fallback properties
fallback_scored = calculate_property_scores_parallel(
fallback_properties, None, None, "", max_workers=min(mp.cpu_count(), 8)
)
fallback_scored = sorted(fallback_scored, key=lambda x: x.get('score', 0), reverse=True)
recommendations = fallback_scored[:TARGET_PROPERTIES]
logger.info(f"Returning {len(recommendations)} fallback properties")
else:
recommendations = []
logger.error("No fallback properties available")
except Exception as e:
logger.error(f"Error getting fallback properties: {str(e)}")
recommendations = []
logger.info(f"Returning {len(recommendations)} properties for search using parallel processing")
return jsonify(recommendations)
except Exception as e:
logger.error(f"Error processing recommendations: {str(e)}")
# Try to return fallback properties even on error
try:
fallback_properties = get_cached_properties(
property_type=None,
min_price=None,
max_price=None
)
if fallback_properties:
fallback_scored = calculate_property_scores_parallel(
fallback_properties, None, None, "", max_workers=min(mp.cpu_count(), 8)
)
fallback_scored = sorted(fallback_scored, key=lambda x: x.get('score', 0), reverse=True)
fallback_result = fallback_scored[:TARGET_PROPERTIES]
logger.info(f"Returning {len(fallback_result)} fallback properties due to error")
return jsonify(fallback_result)
except Exception as fallback_error:
logger.error(f"Error getting fallback properties: {str(fallback_error)}")
return jsonify({"error": "An error occurred while processing recommendations"})
except Exception as e:
logger.error(f"Unexpected error in get_recommendations_route: {str(e)}")
return jsonify({"error": "An unexpected error occurred"}), 500
finally:
# Decrease active request count
with request_lock:
active_requests -= 1
@app.route('/refresh_properties', methods=['POST'])
def refresh_properties_route():
"""Manually trigger property refresh (for debugging) - non-blocking"""
try:
logger.info("πŸ”„ Manual property refresh triggered")
# Start background refresh - non-blocking
threading.Thread(target=refresh_properties_background, daemon=True).start()
return jsonify({"message": "Property refresh started in background", "status": "success"})
except Exception as e:
logger.error(f"Error triggering property refresh: {str(e)}")
return jsonify({"error": "Failed to trigger property refresh"})
@app.route('/fetch_properties', methods=['POST'])
def fetch_properties_route():
"""Manually fetch and store properties (for debugging) - non-blocking"""
try:
logger.info("πŸ”„ Manual property fetch triggered")
# Start background fetch - non-blocking
def fetch_and_store():
try:
data = fetch_properties_background_safe()
if data and 'data' in data:
new_properties = [p for p in data['data'] if isinstance(p, dict)]
if new_properties:
# Store properties in ChromaDB
update_property_cache(new_properties)
update_cache_timestamp()
logger.info(f"βœ… Manually fetched and stored {len(new_properties)} properties")
else:
logger.warning("No properties fetched from API")
else:
logger.warning("Failed to fetch properties from API")
except Exception as e:
logger.error(f"Error in manual property fetch: {str(e)}")
threading.Thread(target=fetch_and_store, daemon=True).start()
return jsonify({
"message": "Property fetch started in background",
"status": "success"
})
except Exception as e:
logger.error(f"Error in manual property fetch: {str(e)}")
return jsonify({"error": f"Failed to fetch properties: {str(e)}"})
@app.route('/cache_status', methods=['GET'])
def cache_status_route():
"""Get cache status information"""
try:
property_count = check_property_cache_status()
cache_stale = is_cache_stale()
status_info = {
"property_count": property_count,
"cache_stale": cache_stale,
"cache_expiry_hours": CACHE_EXPIRY_SECONDS / 3600
}
if os.path.exists(CACHE_TIMESTAMP_PATH):
with open(CACHE_TIMESTAMP_PATH, 'r') as f:
last_time = float(f.read().strip())
status_info["last_update"] = datetime.fromtimestamp(last_time).isoformat()
else:
status_info["last_update"] = None
return jsonify(status_info)
except Exception as e:
logger.error(f"Error getting cache status: {str(e)}")
return jsonify({"error": "Failed to get cache status"})
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint for monitoring multi-user performance"""
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
health_info = {
"status": "healthy",
"active_requests": active_requests,
"max_concurrent_requests": MAX_CONCURRENT_REQUESTS,
"cpu_usage_percent": cpu_percent,
"memory_usage_percent": memory.percent,
"memory_available_gb": round(memory.available / (1024**3), 2),
"timestamp": datetime.now().isoformat()
}
# Check if system is overloaded
if active_requests >= MAX_CONCURRENT_REQUESTS * 0.8:
health_info["status"] = "warning"
if cpu_percent > 80 or memory.percent > 80:
health_info["status"] = "warning"
return jsonify(health_info)
except Exception as e:
logger.error(f"Error in health check: {str(e)}")
return jsonify({"status": "error", "error": str(e)})
def display_countdown():
"""Display countdown in terminal and manage scheduled property refresh"""
global stop_countdown, countdown_initialized
# Initialize countdown properly
if not countdown_initialized:
logger.info("πŸ”„ Initializing countdown system...")
# Force initial property fetch if no cache exists - non-blocking
if not os.path.exists(CACHE_TIMESTAMP_PATH):
logger.info("πŸ”„ No cache timestamp found, performing initial property fetch...")
# Start initial fetch in background thread
threading.Thread(target=force_initial_property_fetch, daemon=True).start()
countdown_initialized = True
last_display_time = 0
display_interval = 60 # Update display every 60 seconds
while not stop_countdown:
try:
current_time = time.time()
# Only update display every minute to reduce spam
if current_time - last_display_time >= display_interval:
if os.path.exists(CACHE_TIMESTAMP_PATH):
with open(CACHE_TIMESTAMP_PATH, 'r') as f:
last_time = float(f.read().strip())
time_diff = current_time - last_time
time_remaining = CACHE_EXPIRY_SECONDS - time_diff
if time_remaining > 0:
# Clear the current line and print countdown
sys.stdout.write('\r' + ' ' * 100)
sys.stdout.write('\r')
next_update = datetime.fromtimestamp(current_time + time_remaining)
sys.stdout.write(f"⏰ Next data fetch in: {format_time_remaining(time_remaining)} (at {next_update.strftime('%H:%M:%S')})")
sys.stdout.flush()
last_display_time = current_time
else:
# Time to refresh properties - non-blocking
logger.info("πŸ”„ 24 hours have passed, starting scheduled property refresh...")
threading.Thread(target=refresh_properties_background, daemon=True).start()
sys.stdout.write('\r' + ' ' * 100)
sys.stdout.write('\r')
sys.stdout.write("βœ… Scheduled property refresh initiated")
sys.stdout.flush()
last_display_time = current_time
# Wait 60 seconds after initiating refresh before continuing
time.sleep(60)
else:
# No cache timestamp - try to create one - non-blocking
sys.stdout.write('\r' + ' ' * 100)
sys.stdout.write('\r')
sys.stdout.write("πŸ”„ No cache timestamp found. Attempting initial fetch...")
sys.stdout.flush()
last_display_time = current_time
# Try to fetch properties and create timestamp - non-blocking
threading.Thread(target=force_initial_property_fetch, daemon=True).start()
sys.stdout.write('\r' + ' ' * 100)
sys.stdout.write('\r')
sys.stdout.write("βœ… Initial property fetch initiated")
sys.stdout.flush()
time.sleep(300) # Wait 5 minutes before checking again
continue
# Sleep for a shorter interval
time.sleep(10) # Check every 10 seconds instead of 1 second
except Exception as e:
logger.error(f"Error in countdown display: {e}")
time.sleep(30) # Wait 30 seconds on error before retrying
def start_countdown():
"""Start the countdown display thread"""
global countdown_thread, stop_countdown, countdown_initialized
stop_countdown = False
countdown_initialized = False
countdown_thread = threading.Thread(target=display_countdown, daemon=True)
countdown_thread.start()
logger.info("βœ… Countdown thread started")
def stop_countdown_display():
"""Stop the countdown display thread"""
global stop_countdown
stop_countdown = True
if countdown_thread:
countdown_thread.join(timeout=1)
logger.info("βœ… Countdown thread stopped")
def get_system_info():
"""Get system information for performance optimization"""
import psutil
cpu_count = mp.cpu_count()
memory_gb = psutil.virtual_memory().total / (1024**3)
logger.info(f"πŸš€ System Info - CPU Cores: {cpu_count}, Memory: {memory_gb:.1f}GB")
return cpu_count, memory_gb
def check_property_cache_status():
"""Check the status of property cache and log information"""
global property_collection
try:
if property_collection:
results = property_collection.get()
if results and results['ids']:
logger.info(f"πŸ“Š Property Cache Status: {len(results['ids'])} properties available")
return len(results['ids'])
else:
logger.warning("πŸ“Š Property Cache Status: No properties in cache")
return 0
else:
logger.warning("πŸ“Š Property Cache Status: Collection not initialized")
return 0
except Exception as e:
logger.error(f"Error checking property cache status: {e}")
return 0
def ensure_properties_available():
"""Ensure properties are available in cache, fetch if needed - non-blocking"""
global property_collection
try:
if not property_collection:
logger.info("Collection not initialized, initializing...")
initialize_collection()
return
results = property_collection.get()
if not results or not results['ids']:
logger.info("πŸ”„ No properties in cache, fetching from API...")
# Force fetch properties - non-blocking
threading.Thread(target=force_initial_property_fetch, daemon=True).start()
logger.info("βœ… Property fetch initiated in background")
else:
logger.info(f"βœ… {len(results['ids'])} properties already available in cache")
except Exception as e:
logger.error(f"Error ensuring properties are available: {e}")
if __name__ == '__main__':
try:
# Get system information
cpu_count, memory_gb = get_system_info()
logger.info(f"πŸš€ Starting Property Recommendation System with parallel processing optimization")
logger.info(f"πŸ”§ Using {min(cpu_count, 8)} workers for parallel processing")
# Ensure properties are available in cache - non-blocking
ensure_properties_available()
# Check property cache status
property_count = check_property_cache_status()
if property_count == 0:
logger.warning("⚠️ Still no properties in cache after initialization")
# Try one more time to fetch properties - non-blocking
threading.Thread(target=force_initial_property_fetch, daemon=True).start()
logger.info("βœ… Property fetch retry initiated in background")
else:
logger.info(f"βœ… Cache has {property_count} properties ready for recommendations")
# Start the countdown display
start_countdown()
# Change port to 7860 for Hugging Face Spaces with multi-user support
app.run(debug=False, host='0.0.0.0', port=7860, threaded=True)
except KeyboardInterrupt:
logger.info("πŸ›‘ Application interrupted by user")
except Exception as e:
logger.error(f"❌ Fatal error starting application: {str(e)}")
finally:
# Stop the countdown display when the app stops
stop_countdown_display()
logger.info("πŸ›‘ Application stopped")