from flask import Flask, render_template, request, jsonify import requests import numpy as np from sentence_transformers import SentenceTransformer import faiss import json import os from datetime import datetime, timedelta import pandas as pd from sklearn.preprocessing import MinMaxScaler from sklearn.metrics.pairwise import cosine_similarity import urllib3 import chromadb from chromadb.config import Settings import time from flask_cors import CORS import logging import threading import sys from concurrent.futures import ThreadPoolExecutor, as_completed import math import psutil TARGET_PROPERTIES = 20 # Always return 20 properties # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Disable SSL warning since we're using https://localhost urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) app = Flask(__name__) # Multi-user support configuration app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 # Disable caching for development # Configure CORS with specific settings for multi-user support CORS(app, resources={r"/*": { "origins": ["http://localhost:4200", "https://huggingface.co", "*"], "methods": ["GET", "POST", "OPTIONS"], "allow_headers": ["Content-Type", "Authorization", "X-Requested-With"], "supports_credentials": True, "max_age": 3600 }}) # Add CORS headers to all responses @app.after_request def after_request(response): response.headers.add('Access-Control-Allow-Origin', '*') response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization') response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS') response.headers.add('Access-Control-Allow-Credentials', 'true') return response # Initialize the sentence transformer model with error handling import multiprocessing as mp from functools import partial # Load model directly to avoid multiprocessing issues try: logger.info("Loading sentence transformer model...") model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder=os.path.join(os.path.expanduser("~"), ".cache", "huggingface")) logger.info("Model loaded successfully") except Exception as e: logger.error(f"Error loading model: {str(e)}") raise # Configuration TOTAL_PROPERTIES = 500 # Total number of properties to fetch CACHE_EXPIRY_SECONDS = 24 * 60 * 60 # 24 hours in seconds BATCH_SIZE = 50 # Number of properties per batch MAX_WORKERS = 10 # Number of parallel workers # REQUEST_TIMEOUT = None # No timeout for requests - removed # Multi-user support configuration MAX_CONCURRENT_REQUESTS = 50 # Maximum concurrent requests REQUEST_RATE_LIMIT = 100 # Requests per minute per IP CONNECTION_POOL_SIZE = 20 # Connection pool size for external API calls # Create connection pool for multi-user support session = requests.Session() adapter = requests.adapters.HTTPAdapter( pool_connections=CONNECTION_POOL_SIZE, pool_maxsize=CONNECTION_POOL_SIZE, max_retries=3 ) session.mount('http://', adapter) session.mount('https://', adapter) # Global variable for property collection property_collection = None # Path for cache timestamp CACHE_TIMESTAMP_PATH = os.path.join("property_db", "last_cache_time.txt") # Global variable for countdown thread countdown_thread = None stop_countdown = False countdown_initialized = False # Add flag to track initialization # Multi-user request tracking active_requests = 0 request_lock = threading.Lock() # Background task management background_tasks = {} task_lock = threading.Lock() is_fetching_properties = False fetch_lock = threading.Lock() def format_time_remaining(seconds): """Format remaining time in a human-readable format""" if seconds < 60: return f"{int(seconds)} seconds" elif seconds < 3600: minutes = int(seconds / 60) remaining_seconds = int(seconds % 60) return f"{minutes} minutes {remaining_seconds} seconds" else: hours = int(seconds / 3600) minutes = int((seconds % 3600) / 60) return f"{hours} hours {minutes} minutes" def fetch_property_batch(page_number, property_type=None, min_price=None, max_price=None): """Fetch a single batch of properties without timeout constraints""" url = "https://hivepropapi.azurewebsites.net/api/Property/allPropertieswithfulldetails" params = { "pageNumber": page_number, "pageSize": BATCH_SIZE } # Add filters to params if provided if property_type: params["propertyType"] = property_type if min_price is not None: params["minPrice"] = min_price if max_price is not None: params["maxPrice"] = max_price try: logger.info(f"Fetching batch {page_number} with {BATCH_SIZE} properties...") response = session.get(url, params=params, verify=False) # Removed timeout response.raise_for_status() data = response.json() if data and 'data' in data: properties = data['data'] if properties: logger.info(f"Successfully fetched {len(properties)} properties from batch {page_number}") return properties else: logger.warning(f"Batch {page_number} returned empty property list") return [] logger.warning(f"No data found in batch {page_number} response") return [] except requests.exceptions.ConnectionError: logger.error(f"Failed to connect to the API server for batch {page_number}") return [] except Exception as e: logger.error(f"Error fetching batch {page_number}: {e}") return [] def fetch_properties_background_safe(property_type=None, min_price=None, max_price=None): """Fetch properties in background with proper error handling and no timeout constraints""" global is_fetching_properties with fetch_lock: if is_fetching_properties: logger.info("Property fetch already in progress, skipping...") return None is_fetching_properties = True try: logger.info(f"šŸ”„ Starting background property fetch with {MAX_WORKERS} workers, batch size {BATCH_SIZE}") # Calculate number of batches needed total_batches = math.ceil(TOTAL_PROPERTIES / BATCH_SIZE) logger.info(f"Total batches to fetch: {total_batches}") all_properties = [] start_time = time.time() completed_batches = 0 # Use ThreadPoolExecutor without timeout for parallel fetching with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # Submit all batch requests future_to_batch = { executor.submit(fetch_property_batch, page_num, property_type, min_price, max_price): page_num for page_num in range(1, total_batches + 1) } # Collect results as they complete without timeout for future in as_completed(future_to_batch): # Removed timeout batch_num = future_to_batch[future] try: batch_properties = future.result() # Removed timeout if batch_properties: all_properties.extend(batch_properties) completed_batches += 1 logger.info(f"Completed batch {batch_num}/{total_batches} - Total properties so far: {len(all_properties)}") else: logger.warning(f"Batch {batch_num} returned no properties") except Exception as e: logger.error(f"Batch {batch_num} generated an exception: {e}") continue # Continue with other batches end_time = time.time() fetch_duration = end_time - start_time if all_properties: # Limit to total properties requested all_properties = all_properties[:TOTAL_PROPERTIES] logger.info(f"āœ… Successfully fetched {len(all_properties)} properties in {fetch_duration:.2f} seconds using {completed_batches}/{total_batches} batches") return {"data": all_properties} else: logger.error(f"āŒ Failed to fetch any properties after {fetch_duration:.2f} seconds") return None except Exception as e: logger.error(f"āŒ Error during background property fetch: {str(e)}") return None finally: with fetch_lock: is_fetching_properties = False def fetch_properties(property_type=None, min_price=None, max_price=None): """Fetch properties using parallel workers and batches - now calls background version""" return fetch_properties_background_safe(property_type, min_price, max_price) def update_cache_timestamp(): try: os.makedirs(os.path.dirname(CACHE_TIMESTAMP_PATH), exist_ok=True) current_time = time.time() with open(CACHE_TIMESTAMP_PATH, 'w') as f: f.write(str(current_time)) next_update = datetime.fromtimestamp(current_time + CACHE_EXPIRY_SECONDS) logger.info(f"Updated cache timestamp. Next update scheduled for: {next_update.strftime('%Y-%m-%d %H:%M:%S')}") logger.info(f"Next update in: {format_time_remaining(CACHE_EXPIRY_SECONDS)}") except Exception as e: logger.error(f"Error updating cache timestamp: {e}") def refresh_properties_background(): """Refresh properties in background without blocking the main application""" try: logger.info("šŸ”„ Starting background property refresh...") data = fetch_properties_background_safe() if data and 'data' in data: new_properties = [p for p in data['data'] if isinstance(p, dict)] if new_properties: # Use ThreadPoolExecutor for non-blocking cache update def update_cache_async(): try: update_property_cache(new_properties) update_cache_timestamp() logger.info("āœ… Background property refresh completed successfully") except Exception as e: logger.error(f"Error updating cache in background: {e}") # Start cache update in background thread threading.Thread(target=update_cache_async, daemon=True).start() else: logger.warning("āš ļø No properties fetched during background refresh") else: logger.warning("āš ļø Failed to fetch properties during background refresh") except Exception as e: logger.error(f"āŒ Error during background property refresh: {str(e)}") def force_initial_property_fetch(): """Force initial property fetch to ensure cache is created - non-blocking""" try: logger.info("šŸ”„ Force initial property fetch...") data = fetch_properties_background_safe() if data and 'data' in data: new_properties = [p for p in data['data'] if isinstance(p, dict)] if new_properties: # Use ThreadPoolExecutor for non-blocking cache update def update_cache_async(): try: update_property_cache(new_properties) update_cache_timestamp() logger.info(f"āœ… Force initial fetch completed with {len(new_properties)} properties") except Exception as e: logger.error(f"Error updating cache in force initial fetch: {e}") # Start cache update in background thread threading.Thread(target=update_cache_async, daemon=True).start() return True else: logger.warning("āš ļø No properties fetched during force initial fetch") return False else: logger.warning("āš ļø Failed to fetch properties during force initial fetch") return False except Exception as e: logger.error(f"āŒ Error during force initial property fetch: {str(e)}") return False def process_property_batch(properties_batch, existing_ids): """Process a batch of properties for ChromaDB in parallel""" batch_ids = [] batch_metadatas = [] batch_documents = [] batch_embeddings = [] # Prepare property texts for batch embedding property_texts = [] valid_properties = [] for prop in properties_batch: try: prop_id = str(prop.get('id')) if prop_id not in existing_ids: # Create property text for embedding property_text = f"{prop.get('propertyName', '')} {prop.get('typeName', '')} {prop.get('description', '')} {prop.get('address', '')}" property_texts.append(property_text) valid_properties.append(prop) except Exception as e: logger.error(f"Error processing property for batch: {str(e)}") continue if not valid_properties: return batch_ids, batch_metadatas, batch_documents, batch_embeddings # Create embeddings in batch for better performance try: embeddings = model.encode(property_texts) for i, prop in enumerate(valid_properties): try: prop_id = str(prop.get('id')) # Convert property to metadata dictionary with proper type handling metadata = { 'id': str(prop.get('id', '')), 'propertyName': str(prop.get('propertyName', '')), 'typeName': str(prop.get('typeName', '')), 'description': str(prop.get('description', '')), 'address': str(prop.get('address', '')), 'totalSquareFeet': float(prop.get('totalSquareFeet', 0)), 'beds': int(prop.get('beds', 0)), 'baths': int(prop.get('baths', 0)), 'numberOfRooms': int(prop.get('numberOfRooms', 0)), 'marketValue': float(prop.get('marketValue', 0)), 'yearBuilt': str(prop.get('yearBuilt', '')), 'propertyImages': json.dumps(prop.get('propertyImages', [])), 'features': json.dumps(prop.get('features', [])), 'location': json.dumps(prop.get('location', {})), 'floorPlans': json.dumps(prop.get('floorPlans', [])), 'documents': json.dumps(prop.get('documents', [])), 'propertyVideos': json.dumps(prop.get('propertyVideos', [])) } batch_ids.append(prop_id) batch_metadatas.append(metadata) batch_documents.append(property_texts[i]) batch_embeddings.append(embeddings[i].tolist()) except Exception as e: logger.error(f"Error processing property {i} in batch: {str(e)}") continue except Exception as e: logger.error(f"Error creating embeddings for batch: {str(e)}") return batch_ids, batch_metadatas, batch_documents, batch_embeddings def update_property_cache(new_properties): """Update property cache with parallel processing optimization""" global property_collection try: if not new_properties: logger.warning("No new properties to add") return logger.info(f"Updating ChromaDB with {len(new_properties)} properties using parallel processing") start_time = time.time() # Get existing property IDs existing_results = property_collection.get() existing_ids = set(existing_results['ids']) if existing_results and existing_results['ids'] else set() # Filter out existing properties in parallel def filter_new_property(prop): try: prop_id = str(prop.get('id')) if prop_id not in existing_ids: return prop return None except Exception as e: logger.error(f"Error filtering property: {str(e)}") return None # Use ThreadPoolExecutor for parallel filtering max_workers = min(mp.cpu_count(), 8) new_properties_filtered = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_property = { executor.submit(filter_new_property, prop): i for i, prop in enumerate(new_properties) } for future in as_completed(future_to_property): try: filtered_prop = future.result() if filtered_prop: new_properties_filtered.append(filtered_prop) except Exception as e: property_idx = future_to_property[future] logger.error(f"Error filtering property {property_idx}: {str(e)}") if not new_properties_filtered: logger.info("No new properties to add to ChromaDB") return logger.info(f"Processing {len(new_properties_filtered)} new properties in parallel") # Process properties in batches for better performance cache_batch_size = 100 # Process 100 properties at a time for embedding all_ids = [] all_metadatas = [] all_documents = [] all_embeddings = [] # Process in batches with parallel processing def process_batch_parallel(batch): return process_property_batch(batch, existing_ids) with ThreadPoolExecutor(max_workers=max_workers) as executor: # Create batches batches = [new_properties_filtered[i:i + cache_batch_size] for i in range(0, len(new_properties_filtered), cache_batch_size)] # Submit batch processing tasks future_to_batch = { executor.submit(process_batch_parallel, batch): i for i, batch in enumerate(batches) } # Collect results for future in as_completed(future_to_batch): batch_idx = future_to_batch[future] try: batch_ids, batch_metadatas, batch_documents, batch_embeddings = future.result() all_ids.extend(batch_ids) all_metadatas.extend(batch_metadatas) all_documents.extend(batch_documents) all_embeddings.extend(batch_embeddings) logger.info(f"Processed batch {batch_idx + 1}/{len(batches)} - {len(batch_ids)} properties") except Exception as e: logger.error(f"Error processing batch {batch_idx}: {str(e)}") # Add all new properties to ChromaDB if all_ids: logger.info(f"Adding {len(all_ids)} new properties to ChromaDB") property_collection.add( ids=all_ids, metadatas=all_metadatas, documents=all_documents, embeddings=all_embeddings ) end_time = time.time() logger.info(f"āœ… Successfully added {len(all_ids)} properties to ChromaDB in {end_time - start_time:.2f} seconds using parallel processing") else: logger.info("No new properties to add to ChromaDB") except Exception as e: logger.error(f"Error updating property cache: {str(e)}") raise def initialize_collection(): global property_collection try: # First try to get the collection try: property_collection = chroma_client.get_collection("properties") logger.info("Using existing property collection from vector DB") except Exception as e: logger.info("Creating new property collection") property_collection = chroma_client.create_collection( name="properties", metadata={"hnsw:space": "cosine"} ) # Check if collection is empty or needs refresh results = property_collection.get() if not results or not results['ids']: logger.info("Collection is empty, fetching initial properties") # Always fetch initial properties if collection is empty data = fetch_properties() if data and 'data' in data: new_properties = [p for p in data['data'] if isinstance(p, dict)] if new_properties: update_property_cache(new_properties) update_cache_timestamp() logger.info(f"āœ… Successfully added {len(new_properties)} initial properties to collection") else: logger.warning("No properties fetched from API during initialization") else: logger.warning("Failed to fetch properties from API during initialization") else: logger.info(f"Collection has {len(results['ids'])} properties, checking if cache is fresh") if is_cache_stale(): logger.info("Cache is stale, will refresh in background") # Start background refresh threading.Thread(target=refresh_properties_background, daemon=True).start() else: logger.info("Cache is fresh, using existing properties") except Exception as e: logger.error(f"Error initializing collection: {str(e)}") # Try to recreate the collection try: logger.info("Attempting to recreate collection") try: chroma_client.delete_collection("properties") except: pass property_collection = chroma_client.create_collection( name="properties", metadata={"hnsw:space": "cosine"} ) logger.info("Successfully recreated collection") except Exception as recreate_error: logger.error(f"Error recreating collection: {str(recreate_error)}") raise # Initialize ChromaDB with persistent storage try: logger.info("Initializing ChromaDB...") chroma_client = chromadb.PersistentClient(path="property_db") # Initialize collection at startup initialize_collection() logger.info("ChromaDB initialized successfully") except Exception as e: logger.error(f"Error initializing ChromaDB: {str(e)}") raise def is_cache_stale(): """Check if cache is stale and needs refresh""" if not os.path.exists(CACHE_TIMESTAMP_PATH): logger.info("No cache timestamp found, cache needs refresh") return True try: with open(CACHE_TIMESTAMP_PATH, 'r') as f: last_time = float(f.read().strip()) current_time = time.time() time_diff = current_time - last_time is_stale = time_diff > CACHE_EXPIRY_SECONDS if is_stale: logger.info(f"Cache is stale (older than {format_time_remaining(CACHE_EXPIRY_SECONDS)}). Last update: {datetime.fromtimestamp(last_time)}") logger.info(f"Time since last update: {format_time_remaining(time_diff)}") else: time_remaining = CACHE_EXPIRY_SECONDS - time_diff next_update = datetime.fromtimestamp(current_time + time_remaining) logger.info(f"Cache is fresh. Last update: {datetime.fromtimestamp(last_time)}") logger.info(f"Next update in: {format_time_remaining(time_remaining)}") logger.info(f"Next update scheduled for: {next_update.strftime('%Y-%m-%d %H:%M:%S')}") return is_stale except Exception as e: logger.error(f"Error reading cache timestamp: {e}, cache needs refresh") return True def calculate_property_score(prop, price_range=None, property_type=None, user_input=None): """Calculate property score with optimized parallel processing""" score = 0.0 similarity_score = 0.0 # Calculate semantic similarity if user input is provided if user_input: property_text = f"{prop.get('propertyName', '')} {prop.get('typeName', '')} {prop.get('description', '')} {prop.get('address', '')}" property_embedding = model.encode([property_text])[0] user_embedding = model.encode([user_input])[0] similarity_score = float(cosine_similarity([property_embedding], [user_embedding])[0][0]) # Convert to Python float score += similarity_score * 0.5 # 50% weight for semantic similarity # Base score for property type match (only if property_type is specified) if property_type: prop_type = prop.get('typeName', '').lower() if isinstance(property_type, list): # Check if property type matches any in the list if any(pt.lower() == prop_type for pt in property_type): score += 0.3 # 30% weight for exact type match elif any(pt.lower() in prop_type for pt in property_type): score += 0.2 # 20% weight for partial type match else: # Single property type if prop_type == property_type.lower(): score += 0.3 # 30% weight for exact type match elif property_type.lower() in prop_type: score += 0.2 # 20% weight for partial type match else: # For additional properties (no type filter), give base score score += 0.1 # Base score for any property type # Score for price range match if price_range: price = float(prop.get('marketValue', 0)) # Convert to Python float if isinstance(price_range, list) and len(price_range) > 0: if isinstance(price_range[0], list): # Multiple price ranges - check if price falls in any range price_in_range = False for range_item in price_range: min_val = range_item[0] if range_item[0] is not None else 0 max_val = range_item[1] if range_item[1] is not None else float('inf') if min_val <= price <= max_val: price_in_range = True score += 0.3 # 30% weight for exact price match break if not price_in_range: # Check if price is close to any range for range_item in price_range: min_val = range_item[0] if range_item[0] is not None else 0 max_val = range_item[1] if range_item[1] is not None else float('inf') if price < min_val: price_diff = (min_val - price) / min_val if min_val > 0 else 1 score += 0.1 * (1 - min(price_diff, 1)) # Up to 10% for properties below range elif price < max_val * 1.2: # Allow 20% above max price price_diff = (price - max_val) / max_val if max_val > 0 else 1 score += 0.2 * (1 - min(price_diff, 1)) # Up to 20% for properties above range else: # Single price range min_val = price_range[0] if price_range[0] is not None else 0 max_val = price_range[1] if price_range[1] is not None else float('inf') if min_val <= price <= max_val: score += 0.3 # 30% weight for exact price match elif price < min_val: # Properties below range get a small score price_diff = (min_val - price) / min_val if min_val > 0 else 1 score += 0.1 * (1 - min(price_diff, 1)) # Up to 10% for properties below range elif price < max_val * 1.2: # Allow 20% above max price price_diff = (price - max_val) / max_val if max_val > 0 else 1 score += 0.2 * (1 - min(price_diff, 1)) # Up to 20% for properties above range # Score for property features features = [ 'balcony', 'parking', 'security', 'garden', 'swimming pool', 'gym', 'power backup', 'water supply' ] description = prop.get('description', '') if description: # Only process features if description exists description = description.lower() feature_count = sum(1 for feature in features if feature in description) score += (feature_count / len(features)) * 0.1 # Up to 10% for features # Score for property size if prop.get('totalSquareFeet', 0) > 1000: score += 0.05 # 5% for larger properties # Score for number of rooms if prop.get('numberOfRooms', 0) >= 3: score += 0.05 # 5% for properties with more rooms return float(score), float(similarity_score) # Convert to Python floats def calculate_property_scores_parallel(properties, price_range=None, property_type=None, user_input=None, max_workers=None): """Calculate scores for multiple properties in parallel""" if not properties: return [] if max_workers is None: max_workers = min(mp.cpu_count(), 8) # Use CPU cores but cap at 8 logger.info(f"Calculating scores for {len(properties)} properties using {max_workers} workers") # Prepare property texts for batch embedding if user_input is provided if user_input: property_texts = [] for prop in properties: property_text = f"{prop.get('propertyName', '')} {prop.get('typeName', '')} {prop.get('description', '')} {prop.get('address', '')}" property_texts.append(property_text) # Batch encode all property texts at once try: property_embeddings = model.encode(property_texts) user_embedding = model.encode([user_input])[0] # Calculate similarities in batch similarities = cosine_similarity(property_embeddings, [user_embedding]).flatten() except Exception as e: logger.error(f"Error in batch embedding: {e}") similarities = [0.0] * len(properties) else: similarities = [0.0] * len(properties) # Process properties in parallel def score_single_property(args): prop, similarity = args score = 0.0 # Add similarity score score += similarity * 0.5 # Base score for property type match if property_type: prop_type = prop.get('typeName', '').lower() if isinstance(property_type, list): if any(pt.lower() == prop_type for pt in property_type): score += 0.3 elif any(pt.lower() in prop_type for pt in property_type): score += 0.2 else: if prop_type == property_type.lower(): score += 0.3 elif property_type.lower() in prop_type: score += 0.2 else: score += 0.1 # Price range scoring if price_range: price = float(prop.get('marketValue', 0)) if isinstance(price_range, list) and len(price_range) > 0: if isinstance(price_range[0], list): price_in_range = False for range_item in price_range: min_val = range_item[0] if range_item[0] is not None else 0 max_val = range_item[1] if range_item[1] is not None else float('inf') if min_val <= price <= max_val: price_in_range = True score += 0.3 break if not price_in_range: for range_item in price_range: min_val = range_item[0] if range_item[0] is not None else 0 max_val = range_item[1] if range_item[1] is not None else float('inf') if price < min_val: price_diff = (min_val - price) / min_val if min_val > 0 else 1 score += 0.1 * (1 - min(price_diff, 1)) elif price < max_val * 1.2: # Allow 20% above max price price_diff = (price - max_val) / max_val if max_val > 0 else 1 score += 0.2 * (1 - min(price_diff, 1)) else: min_val = price_range[0] if price_range[0] is not None else 0 max_val = price_range[1] if price_range[1] is not None else float('inf') if min_val <= price <= max_val: score += 0.3 elif price < min_val: price_diff = (min_val - price) / min_val if min_val > 0 else 1 score += 0.1 * (1 - min(price_diff, 1)) elif price < max_val * 1.2: # Allow 20% above max price price_diff = (price - max_val) / max_val if max_val > 0 else 1 score += 0.2 * (1 - min(price_diff, 1)) # Feature scoring features = ['balcony', 'parking', 'security', 'garden', 'swimming pool', 'gym', 'power backup', 'water supply'] description = prop.get('description', '') if description: description = description.lower() feature_count = sum(1 for feature in features if feature in description) score += (feature_count / len(features)) * 0.1 # Size and room scoring if prop.get('totalSquareFeet', 0) > 1000: score += 0.05 if prop.get('numberOfRooms', 0) >= 3: score += 0.05 return { 'property': prop, 'score': float(score), 'similarity': float(similarity) } # Use ThreadPoolExecutor for parallel scoring with ThreadPoolExecutor(max_workers=max_workers) as executor: # Create arguments for each property args = list(zip(properties, similarities)) # Submit all scoring tasks future_to_property = { executor.submit(score_single_property, arg): i for i, arg in enumerate(args) } # Collect results scored_properties = [] for future in as_completed(future_to_property): try: result = future.result() if result['score'] >= 0.1: # Filter low scores scored_properties.append(result) except Exception as e: property_idx = future_to_property[future] logger.error(f"Error scoring property {property_idx}: {e}") # Sort by score scored_properties.sort(key=lambda x: x['score'], reverse=True) # Convert back to original format for result in scored_properties: result['property']['score'] = result['score'] result['property']['similarity'] = result['similarity'] logger.info(f"Successfully scored {len(scored_properties)} properties in parallel") return [result['property'] for result in scored_properties] def get_recommendations(user_input, price_range=None, property_type=None): global property_collection # Handle multiple price ranges - convert to min/max arrays min_price = None max_price = None if price_range: if isinstance(price_range, list) and len(price_range) > 0: if isinstance(price_range[0], list): # Array of price ranges min_price = [range[0] for range in price_range] max_price = [range[1] for range in price_range] else: # Single price range min_price = [price_range[0]] max_price = [price_range[1]] # Get properties from ChromaDB, fetch from API if empty try: logger.info("Getting properties from ChromaDB...") properties = get_cached_properties( property_type=property_type, min_price=min_price, max_price=max_price ) if not properties: logger.warning("No properties found in ChromaDB, fetching from API...") # Fetch properties from API and store in ChromaDB data = fetch_properties( property_type=property_type, min_price=min_price[0] if min_price else None, max_price=max_price[0] if max_price else None ) if data and 'data' in data: new_properties = [p for p in data['data'] if isinstance(p, dict)] if new_properties: # Store properties in ChromaDB update_property_cache(new_properties) update_cache_timestamp() logger.info(f"āœ… Fetched and stored {len(new_properties)} properties from API") # Now get properties from ChromaDB again properties = get_cached_properties( property_type=property_type, min_price=min_price, max_price=max_price ) else: logger.warning("No properties fetched from API") return [] else: logger.warning("Failed to fetch properties from API") return [] except Exception as e: logger.error(f"Error getting properties from ChromaDB: {str(e)}") properties = [] # Final fallback: if still no properties, try to get all properties without filters if not properties: logger.warning("Still no properties found, trying fallback fetch...") try: # Try to get all properties without filters all_properties = get_cached_properties( property_type=None, min_price=None, max_price=None ) if all_properties: logger.info(f"āœ… Found {len(all_properties)} properties in fallback fetch") properties = all_properties else: logger.error("āŒ No properties available even in fallback") return [] except Exception as e: logger.error(f"Error in fallback property fetch: {str(e)}") return [] if not properties: logger.warning("No properties found matching criteria") return [] # Calculate scores for properties using parallel processing scored_properties = calculate_property_scores_parallel( properties, price_range, property_type, user_input, max_workers=min(mp.cpu_count(), 8) ) # If we have enough properties matching exact criteria, return them if len(scored_properties) >= TARGET_PROPERTIES: logger.info(f"Found {len(scored_properties)} properties matching exact criteria, returning top {TARGET_PROPERTIES}") return scored_properties[:TARGET_PROPERTIES] # If we don't have enough properties, get additional properties from other types in same price range logger.info(f"Found only {len(scored_properties)} properties matching exact criteria, need {TARGET_PROPERTIES - len(scored_properties)} more") # Get all properties in the same price range (regardless of type) all_properties_in_range = get_cached_properties( property_type=None, # No type filter min_price=min_price, max_price=max_price ) if not all_properties_in_range: logger.warning("No properties found in price range, trying fallback to all properties...") # Final fallback: get all properties regardless of price range all_properties_in_range = get_cached_properties( property_type=None, min_price=None, max_price=None ) if not all_properties_in_range: logger.error("āŒ No properties available even in final fallback") return scored_properties # Return what we have # Calculate scores for all properties in price range using parallel processing all_scored_properties = calculate_property_scores_parallel( all_properties_in_range, price_range, None, user_input, max_workers=min(mp.cpu_count(), 8) ) # Filter for lower threshold for additional properties all_scored_properties = [prop for prop in all_scored_properties if prop.get('score', 0) >= 0.05] # Create final list: exact matches first, then additional properties using parallel processing def select_final_properties(): final_properties = [] used_ids = set() # Add exact matches first for prop in scored_properties: if len(final_properties) >= TARGET_PROPERTIES: break prop_id = str(prop.get('id')) if prop_id not in used_ids: final_properties.append(prop) used_ids.add(prop_id) # Add additional properties to reach target for prop in all_scored_properties: if len(final_properties) >= TARGET_PROPERTIES: break prop_id = str(prop.get('id')) if prop_id not in used_ids: final_properties.append(prop) used_ids.add(prop_id) return final_properties # Use ThreadPoolExecutor for final property selection with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(select_final_properties) final_properties = future.result() logger.info(f"Returning {len(final_properties)} properties total: {len(scored_properties)} exact matches + {len(final_properties) - len(scored_properties)} additional") return final_properties def get_cached_properties(property_type=None, min_price=None, max_price=None): """Get properties from ChromaDB with parallel processing optimization""" global property_collection try: # Verify collection exists and has data if not property_collection: logger.info("Collection not initialized, reinitializing...") initialize_collection() if not property_collection: logger.error("Failed to initialize collection") return [] logger.info("Fetching properties from ChromaDB with parallel processing...") # Get all properties from ChromaDB results = property_collection.get() if not results or not results['ids']: logger.warning("No properties found in ChromaDB") return [] logger.info(f"Found {len(results['ids'])} properties in ChromaDB") # Process ChromaDB results in parallel def process_single_property(args): i, metadata = args try: # Convert JSON strings back to Python objects and handle type conversion property_data = { 'id': metadata['id'], 'propertyName': metadata['propertyName'], 'typeName': metadata['typeName'], 'description': metadata['description'], 'address': metadata['address'], 'totalSquareFeet': float(metadata['totalSquareFeet']), 'beds': int(metadata['beds']), 'baths': int(metadata['baths']), 'numberOfRooms': int(metadata['numberOfRooms']), 'marketValue': float(metadata['marketValue']), 'yearBuilt': metadata['yearBuilt'], 'propertyImages': json.loads(metadata['propertyImages']), 'features': json.loads(metadata['features']), 'location': json.loads(metadata['location']), 'floorPlans': json.loads(metadata['floorPlans']), 'documents': json.loads(metadata['documents']), 'propertyVideos': json.loads(metadata['propertyVideos']) } return property_data except Exception as e: logger.error(f"Error processing property {i}: {str(e)}") return None # Use ThreadPoolExecutor for parallel processing max_workers = min(mp.cpu_count(), 8) # Use CPU cores but cap at 8 properties = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # Create arguments for each property args = [(i, results['metadatas'][i]) for i in range(len(results['ids']))] # Submit all property processing tasks future_to_index = { executor.submit(process_single_property, arg): i for i, arg in enumerate(args) } # Collect results as they complete for future in as_completed(future_to_index): try: property_data = future.result() if property_data: properties.append(property_data) except Exception as e: index = future_to_index[future] logger.error(f"Error processing property {index}: {str(e)}") logger.info(f"Successfully processed {len(properties)} properties from ChromaDB") # Apply filters in parallel def filter_single_property(prop): try: # Apply property type filter (handle both single string and array) if property_type: prop_type = prop.get('typeName', '').lower() if isinstance(property_type, list): # Check if property type matches any in the list if not any(pt.lower() == prop_type for pt in property_type): return None else: # Single property type if prop_type != property_type.lower(): return None # Apply price range filter (handle both single range and array of ranges) if min_price is not None and max_price is not None: price = prop.get('marketValue', 0) if isinstance(min_price, list) and isinstance(max_price, list): # Multiple price ranges - check if price falls in any range price_in_range = False for i in range(len(min_price)): min_val = min_price[i] if min_price[i] is not None else 0 max_val = max_price[i] if max_price[i] is not None else float('inf') if min_val <= price <= max_val: price_in_range = True break if not price_in_range: return None else: # Single price range min_val = min_price if min_price is not None else 0 max_val = max_price if max_price is not None else float('inf') if price < min_val or price > max_val: return None return prop except Exception as e: logger.error(f"Error filtering property: {str(e)}") return None # Filter properties in parallel filtered_properties = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all filtering tasks future_to_property = { executor.submit(filter_single_property, prop): i for i, prop in enumerate(properties) } # Collect filtered results for future in as_completed(future_to_property): try: filtered_prop = future.result() if filtered_prop: filtered_properties.append(filtered_prop) except Exception as e: property_idx = future_to_property[future] logger.error(f"Error filtering property {property_idx}: {str(e)}") logger.info(f"Filtered to {len(filtered_properties)} properties from ChromaDB using parallel processing") return filtered_properties except Exception as e: logger.error(f"Error getting properties from ChromaDB: {str(e)}") return [] @app.route('/') def home(): return render_template('index.html') import base64 import logging from flask import request, jsonify import requests TWILIO_ACCOUNT_SID = "AC3bb0a3ea71812794b85b75ccf7d6ad07" TWILIO_AUTH_TOKEN = "ddba202f5f4a48d08f04fb983a161fa3" TWILIO_WHATSAPP_NUMBER = "whatsapp:+14155238886" WHATSAPP_CHAR_LIMIT = 1600 PROPERTY_BASE_URL = "https://hiveprop.com/property/image-slider?id=" MAX_PROPERTIES_TO_SEND = 1 # Send only 1 property in email HIVEPROP_EMAIL_API_URL = "https://hivepropapi.azurewebsites.net/api/Email/sendPlainText" def send_email_recommendations(email, first_name, last_name, recommendations): """Send property recommendations via email using HiveProp API""" try: # Take only the top properties top_recommendations = recommendations[:MAX_PROPERTIES_TO_SEND] # Prepare email content subject = f"Your Top {MAX_PROPERTIES_TO_SEND} Property Recommendations - HiveProp" # Create optimized HTML email body (under 1800 character limit) email_body = f"""

šŸ” Hello {first_name} {last_name}!

Your Top {len(top_recommendations)} Property Recommendations

""" if not top_recommendations: email_body += """

āš ļø No specific properties found

We couldn't find specific properties matching your criteria, but you can explore all available properties on our website.

Explore All Properties
""" else: for idx, prop in enumerate(top_recommendations, 1): property_name = prop.get('propertyName', 'Unnamed Property') property_id = prop.get('id', '') property_link = f"{PROPERTY_BASE_URL}{property_id}" address = prop.get('address', '') location = address.split(',')[-2].strip() if ',' in address else address email_body += f"""

{idx}. {property_name}

šŸ“ {location}

View Details
""" email_body += f"""
""" # Send email via HiveProp API using query parameters params = { 'toEmail': email, 'subject': subject, 'body': email_body } logger.info(f"Sending email to {email} via HiveProp API") logger.info(f"API URL: {HIVEPROP_EMAIL_API_URL}") logger.info(f"Subject: {subject}") logger.info(f"Request params: {params}") # Send POST request with query parameters response = requests.post(HIVEPROP_EMAIL_API_URL, params=params, timeout=30) logger.info(f"Email API response status: {response.status_code}") logger.info(f"Email API response text: {response.text}") # Check for 404 or other errors before raising if response.status_code == 404: logger.error(f"Email API endpoint not found (404): {HIVEPROP_EMAIL_API_URL}") return False, f"Email API endpoint not found. Please check the API URL." elif response.status_code != 200: logger.error(f"Email API returned status {response.status_code}: {response.text}") return False, f"Email API error: Status {response.status_code} - {response.text}" response.raise_for_status() logger.info(f"Email sent successfully to {email}") return True, "Email sent successfully" except requests.exceptions.Timeout: logger.error("HiveProp Email API request timed out") return False, "Email API timeout - service may be slow or unavailable" except requests.exceptions.ConnectionError as e: logger.error(f"HiveProp Email API connection error: {str(e)}") return False, "Email API connection failed - service may be down" except requests.exceptions.HTTPError as e: error_text = e.response.text if e.response else "No response text" status_code = e.response.status_code if e.response else "Unknown" logger.error(f"HiveProp Email API error: Status {status_code}, Response: {error_text}") return False, f"Email API error: Status {status_code} - {error_text}" except requests.exceptions.RequestException as e: logger.error(f"Request error sending email: {str(e)}") return False, f"Email request failed: {str(e)}" except Exception as e: logger.error(f"Unexpected error sending email: {str(e)}") return False, f"Failed to send email: {str(e)}" @app.route('/create_account_and_send_whatsapp', methods=['POST']) def create_account_and_send_whatsapp(): try: data = request.json # Extract user details first_name = data.get('firstName') last_name = data.get('lastName') phone = data.get('phone') email = data.get('email') # Extract search criteria to generate recommendations user_input = data.get('description', '') price_range = data.get('price_range', None) property_type = data.get('property_type', None) logger.info(f"Received data: firstName={first_name}, lastName={last_name}, email={email}, phone={phone}") logger.info(f"Search criteria: description='{user_input}', price_range={price_range}, property_type={property_type}") # Generate recommendations directly in backend logger.info("Generating recommendations in backend...") recommendations = get_recommendations(user_input, price_range, property_type) logger.info(f"Generated {len(recommendations)} recommendations") # Validate email format if not email or '@' not in email: logger.error(f"Invalid email format: {email}") return jsonify({"success": False, "error": "Invalid email format. Please provide a valid email address."}), 400 # Clean and format phone number in E.164 # Remove all spaces, dashes, and parentheses phone = phone.replace(' ', '').replace('-', '').replace('(', '').replace(')', '') # Add +91 if not already present if not phone.startswith('+'): phone = f"+91{phone}" # Validate phone number format (should be +91 followed by 10 digits) if not phone.startswith('+91') or len(phone) != 13 or not phone[3:].isdigit(): logger.error(f"Invalid phone number format: {phone}") return jsonify({"success": False, "error": "Invalid phone number format. Please provide a valid 10-digit Indian mobile number."}), 400 # Track success status for both email and WhatsApp whatsapp_success = False email_success = False messages = [] # Send via WhatsApp try: # Take only the top properties (1 for email, 5 for WhatsApp) whatsapp_recommendations = recommendations[:5] # WhatsApp can handle more email_recommendations = recommendations[:MAX_PROPERTIES_TO_SEND] # Email gets 1 logger.info(f"Total recommendations received: {len(recommendations)}") logger.info(f"WhatsApp recommendations to send: {len(whatsapp_recommendations)}") # Prepare the WhatsApp message with improved design message_header = f"šŸ” *Hello {first_name} {last_name}!*\n\nHere are your *top {len(whatsapp_recommendations)} recommended properties* in Hyderabad:\n" message_body = "" if not whatsapp_recommendations: message_body = "āš ļø No specific properties found, but you can explore all available properties at https://hiveprop.com\n\n" else: for idx, prop in enumerate(whatsapp_recommendations, 1): property_name = prop.get('propertyName', 'Unnamed Property') property_id = prop.get('id', '') property_link = f"{PROPERTY_BASE_URL}{property_id}" # Extract location from address or description address = prop.get('address', '') location = address.split(',')[-2].strip() if ',' in address else address message_body += f"{idx}ļøāƒ£ *{property_name}*\n" message_body += f"šŸ“ *{location}*\n" message_body += f"šŸ”—{property_link}\n\n" # Add a call-to-action message_footer = "---\nšŸ”Ž *Looking for more options?* Explore all listings at https://hiveprop.com." full_message = message_header + message_body + message_footer logger.info(f"WhatsApp message length: {len(full_message)}") # Encode credentials for Basic Auth credentials = f"{TWILIO_ACCOUNT_SID}:{TWILIO_AUTH_TOKEN}" encoded_credentials = base64.b64encode(credentials.encode('utf-8')).decode('utf-8') # Send the message payload = { "To": f"whatsapp:{phone}", "From": TWILIO_WHATSAPP_NUMBER, "Body": full_message } headers = { "Authorization": f"Basic {encoded_credentials}", "Content-Type": "application/x-www-form-urlencoded" } response = requests.post( f"https://api.twilio.com/2010-04-01/Accounts/{TWILIO_ACCOUNT_SID}/Messages.json", data=payload, headers=headers ) response.raise_for_status() logger.info("WhatsApp message sent successfully") whatsapp_success = True messages.append("WhatsApp message sent successfully!") except requests.exceptions.HTTPError as e: logger.error(f"Twilio API error: {e.response.text}") messages.append(f"WhatsApp error: {e.response.text}") except Exception as e: logger.error(f"Unexpected WhatsApp error: {str(e)}") messages.append(f"WhatsApp error: {str(e)}") # Send via Email logger.info(f"Sending email to {email} with {len(email_recommendations)} recommendations") email_success, email_message = send_email_recommendations(email, first_name, last_name, email_recommendations) messages.append(email_message) # Determine overall success and response message success = whatsapp_success or email_success if whatsapp_success and email_success: overall_message = "Account created! Messages sent via WhatsApp and email!" elif whatsapp_success: overall_message = "Account created! WhatsApp message sent (email failed)" elif email_success: overall_message = "Account created! Email sent (WhatsApp failed)" else: overall_message = "Account created but failed to send messages" return jsonify({ "success": success, "message": overall_message, "details": messages, "recommendations": recommendations # Return generated recommendations }) except requests.exceptions.HTTPError as e: logger.error(f"Twilio API error: {e.response.text}") return jsonify({"success": False, "error": f"Twilio API error: {e.response.text}"}), 500 except Exception as e: logger.error(f"Unexpected error: {str(e)}") return jsonify({"success": False, "error": f"Failed to send WhatsApp message: {str(e)}"}), 500 @app.route('/get_recommendations', methods=['POST']) def get_recommendations_route(): """Get property recommendations with parallel processing optimization and non-blocking property fetch - no timeouts""" global active_requests # Check concurrent request limit with request_lock: if active_requests >= MAX_CONCURRENT_REQUESTS: logger.warning(f"Too many concurrent requests ({active_requests}), rejecting request") return jsonify({"error": "Server is busy, please try again later"}), 503 active_requests += 1 try: # Validate request data if not request.is_json: return jsonify({"error": "Request must be JSON"}), 400 data = request.json if not data: return jsonify({"error": "No data provided"}), 400 user_input = data.get('description', '') price_range = data.get('price_range', None) property_type = data.get('property_type', None) # Handle partial inputs if not user_input and not property_type and not price_range: return jsonify({"error": "Please provide at least one search criteria"}) # If no description is provided, create one based on available inputs if not user_input: user_input = f"{property_type if property_type else ''} property" if price_range: # Handle multiple price ranges if isinstance(price_range, list) and len(price_range) > 0: if isinstance(price_range[0], list): # Array of price ranges ranges_text = [] for range_item in price_range: ranges_text.append(f"₹{range_item[0]} to ₹{range_item[1]}") user_input += f" within price ranges: {', '.join(ranges_text)}" else: # Single price range user_input += f" within price range ₹{price_range[0]} to ₹{price_range[1]}" logger.info(f"Search request - Type: {property_type}, Price Range: {price_range}, Description: {user_input}") # Handle multiple price ranges - convert to min/max arrays min_price = None max_price = None if price_range: if isinstance(price_range, list) and len(price_range) > 0: if isinstance(price_range[0], list): # Array of price ranges min_price = [range[0] for range in price_range] max_price = [range[1] for range in price_range] else: # Single price range min_price = [price_range[0]] max_price = [price_range[1]] # Check if we have properties in cache first try: properties = get_cached_properties( property_type=property_type, min_price=min_price, max_price=max_price ) except Exception as e: logger.error(f"Error getting cached properties: {str(e)}") properties = [] # If no properties found and we're not currently fetching, start background fetch if not properties and not is_fetching_properties: logger.info("No properties found, initiating background fetch...") try: threading.Thread(target=lambda: fetch_properties_background_safe(property_type, min_price, max_price), daemon=True).start() except Exception as e: logger.error(f"Error starting background fetch: {str(e)}") # Use ThreadPoolExecutor for parallel recommendation processing without timeout def process_recommendations(): try: return get_recommendations(user_input, price_range, property_type) except Exception as e: logger.error(f"Error in process_recommendations: {str(e)}") return [] try: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(process_recommendations) recommendations = future.result() # Removed timeout # Ensure we always return properties, even if empty if not recommendations: logger.warning("No recommendations returned, getting fallback properties...") # Get fallback properties (top 20 by score) try: fallback_properties = get_cached_properties( property_type=None, min_price=None, max_price=None ) if fallback_properties: # Score and return top 20 fallback properties fallback_scored = calculate_property_scores_parallel( fallback_properties, None, None, "", max_workers=min(mp.cpu_count(), 8) ) fallback_scored = sorted(fallback_scored, key=lambda x: x.get('score', 0), reverse=True) recommendations = fallback_scored[:TARGET_PROPERTIES] logger.info(f"Returning {len(recommendations)} fallback properties") else: recommendations = [] logger.error("No fallback properties available") except Exception as e: logger.error(f"Error getting fallback properties: {str(e)}") recommendations = [] logger.info(f"Returning {len(recommendations)} properties for search using parallel processing") return jsonify(recommendations) except Exception as e: logger.error(f"Error processing recommendations: {str(e)}") # Try to return fallback properties even on error try: fallback_properties = get_cached_properties( property_type=None, min_price=None, max_price=None ) if fallback_properties: fallback_scored = calculate_property_scores_parallel( fallback_properties, None, None, "", max_workers=min(mp.cpu_count(), 8) ) fallback_scored = sorted(fallback_scored, key=lambda x: x.get('score', 0), reverse=True) fallback_result = fallback_scored[:TARGET_PROPERTIES] logger.info(f"Returning {len(fallback_result)} fallback properties due to error") return jsonify(fallback_result) except Exception as fallback_error: logger.error(f"Error getting fallback properties: {str(fallback_error)}") return jsonify({"error": "An error occurred while processing recommendations"}) except Exception as e: logger.error(f"Unexpected error in get_recommendations_route: {str(e)}") return jsonify({"error": "An unexpected error occurred"}), 500 finally: # Decrease active request count with request_lock: active_requests -= 1 @app.route('/refresh_properties', methods=['POST']) def refresh_properties_route(): """Manually trigger property refresh (for debugging) - non-blocking""" try: logger.info("šŸ”„ Manual property refresh triggered") # Start background refresh - non-blocking threading.Thread(target=refresh_properties_background, daemon=True).start() return jsonify({"message": "Property refresh started in background", "status": "success"}) except Exception as e: logger.error(f"Error triggering property refresh: {str(e)}") return jsonify({"error": "Failed to trigger property refresh"}) @app.route('/fetch_properties', methods=['POST']) def fetch_properties_route(): """Manually fetch and store properties (for debugging) - non-blocking""" try: logger.info("šŸ”„ Manual property fetch triggered") # Start background fetch - non-blocking def fetch_and_store(): try: data = fetch_properties_background_safe() if data and 'data' in data: new_properties = [p for p in data['data'] if isinstance(p, dict)] if new_properties: # Store properties in ChromaDB update_property_cache(new_properties) update_cache_timestamp() logger.info(f"āœ… Manually fetched and stored {len(new_properties)} properties") else: logger.warning("No properties fetched from API") else: logger.warning("Failed to fetch properties from API") except Exception as e: logger.error(f"Error in manual property fetch: {str(e)}") threading.Thread(target=fetch_and_store, daemon=True).start() return jsonify({ "message": "Property fetch started in background", "status": "success" }) except Exception as e: logger.error(f"Error in manual property fetch: {str(e)}") return jsonify({"error": f"Failed to fetch properties: {str(e)}"}) @app.route('/cache_status', methods=['GET']) def cache_status_route(): """Get cache status information""" try: property_count = check_property_cache_status() cache_stale = is_cache_stale() status_info = { "property_count": property_count, "cache_stale": cache_stale, "cache_expiry_hours": CACHE_EXPIRY_SECONDS / 3600 } if os.path.exists(CACHE_TIMESTAMP_PATH): with open(CACHE_TIMESTAMP_PATH, 'r') as f: last_time = float(f.read().strip()) status_info["last_update"] = datetime.fromtimestamp(last_time).isoformat() else: status_info["last_update"] = None return jsonify(status_info) except Exception as e: logger.error(f"Error getting cache status: {str(e)}") return jsonify({"error": "Failed to get cache status"}) @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint for monitoring multi-user performance""" try: cpu_percent = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() health_info = { "status": "healthy", "active_requests": active_requests, "max_concurrent_requests": MAX_CONCURRENT_REQUESTS, "cpu_usage_percent": cpu_percent, "memory_usage_percent": memory.percent, "memory_available_gb": round(memory.available / (1024**3), 2), "timestamp": datetime.now().isoformat() } # Check if system is overloaded if active_requests >= MAX_CONCURRENT_REQUESTS * 0.8: health_info["status"] = "warning" if cpu_percent > 80 or memory.percent > 80: health_info["status"] = "warning" return jsonify(health_info) except Exception as e: logger.error(f"Error in health check: {str(e)}") return jsonify({"status": "error", "error": str(e)}) def display_countdown(): """Display countdown in terminal and manage scheduled property refresh""" global stop_countdown, countdown_initialized # Initialize countdown properly if not countdown_initialized: logger.info("šŸ”„ Initializing countdown system...") # Force initial property fetch if no cache exists - non-blocking if not os.path.exists(CACHE_TIMESTAMP_PATH): logger.info("šŸ”„ No cache timestamp found, performing initial property fetch...") # Start initial fetch in background thread threading.Thread(target=force_initial_property_fetch, daemon=True).start() countdown_initialized = True last_display_time = 0 display_interval = 60 # Update display every 60 seconds while not stop_countdown: try: current_time = time.time() # Only update display every minute to reduce spam if current_time - last_display_time >= display_interval: if os.path.exists(CACHE_TIMESTAMP_PATH): with open(CACHE_TIMESTAMP_PATH, 'r') as f: last_time = float(f.read().strip()) time_diff = current_time - last_time time_remaining = CACHE_EXPIRY_SECONDS - time_diff if time_remaining > 0: # Clear the current line and print countdown sys.stdout.write('\r' + ' ' * 100) sys.stdout.write('\r') next_update = datetime.fromtimestamp(current_time + time_remaining) sys.stdout.write(f"ā° Next data fetch in: {format_time_remaining(time_remaining)} (at {next_update.strftime('%H:%M:%S')})") sys.stdout.flush() last_display_time = current_time else: # Time to refresh properties - non-blocking logger.info("šŸ”„ 24 hours have passed, starting scheduled property refresh...") threading.Thread(target=refresh_properties_background, daemon=True).start() sys.stdout.write('\r' + ' ' * 100) sys.stdout.write('\r') sys.stdout.write("āœ… Scheduled property refresh initiated") sys.stdout.flush() last_display_time = current_time # Wait 60 seconds after initiating refresh before continuing time.sleep(60) else: # No cache timestamp - try to create one - non-blocking sys.stdout.write('\r' + ' ' * 100) sys.stdout.write('\r') sys.stdout.write("šŸ”„ No cache timestamp found. Attempting initial fetch...") sys.stdout.flush() last_display_time = current_time # Try to fetch properties and create timestamp - non-blocking threading.Thread(target=force_initial_property_fetch, daemon=True).start() sys.stdout.write('\r' + ' ' * 100) sys.stdout.write('\r') sys.stdout.write("āœ… Initial property fetch initiated") sys.stdout.flush() time.sleep(300) # Wait 5 minutes before checking again continue # Sleep for a shorter interval time.sleep(10) # Check every 10 seconds instead of 1 second except Exception as e: logger.error(f"Error in countdown display: {e}") time.sleep(30) # Wait 30 seconds on error before retrying def start_countdown(): """Start the countdown display thread""" global countdown_thread, stop_countdown, countdown_initialized stop_countdown = False countdown_initialized = False countdown_thread = threading.Thread(target=display_countdown, daemon=True) countdown_thread.start() logger.info("āœ… Countdown thread started") def stop_countdown_display(): """Stop the countdown display thread""" global stop_countdown stop_countdown = True if countdown_thread: countdown_thread.join(timeout=1) logger.info("āœ… Countdown thread stopped") def get_system_info(): """Get system information for performance optimization""" import psutil cpu_count = mp.cpu_count() memory_gb = psutil.virtual_memory().total / (1024**3) logger.info(f"šŸš€ System Info - CPU Cores: {cpu_count}, Memory: {memory_gb:.1f}GB") return cpu_count, memory_gb def check_property_cache_status(): """Check the status of property cache and log information""" global property_collection try: if property_collection: results = property_collection.get() if results and results['ids']: logger.info(f"šŸ“Š Property Cache Status: {len(results['ids'])} properties available") return len(results['ids']) else: logger.warning("šŸ“Š Property Cache Status: No properties in cache") return 0 else: logger.warning("šŸ“Š Property Cache Status: Collection not initialized") return 0 except Exception as e: logger.error(f"Error checking property cache status: {e}") return 0 def ensure_properties_available(): """Ensure properties are available in cache, fetch if needed - non-blocking""" global property_collection try: if not property_collection: logger.info("Collection not initialized, initializing...") initialize_collection() return results = property_collection.get() if not results or not results['ids']: logger.info("šŸ”„ No properties in cache, fetching from API...") # Force fetch properties - non-blocking threading.Thread(target=force_initial_property_fetch, daemon=True).start() logger.info("āœ… Property fetch initiated in background") else: logger.info(f"āœ… {len(results['ids'])} properties already available in cache") except Exception as e: logger.error(f"Error ensuring properties are available: {e}") if __name__ == '__main__': try: # Get system information cpu_count, memory_gb = get_system_info() logger.info(f"šŸš€ Starting Property Recommendation System with parallel processing optimization") logger.info(f"šŸ”§ Using {min(cpu_count, 8)} workers for parallel processing") # Ensure properties are available in cache - non-blocking ensure_properties_available() # Check property cache status property_count = check_property_cache_status() if property_count == 0: logger.warning("āš ļø Still no properties in cache after initialization") # Try one more time to fetch properties - non-blocking threading.Thread(target=force_initial_property_fetch, daemon=True).start() logger.info("āœ… Property fetch retry initiated in background") else: logger.info(f"āœ… Cache has {property_count} properties ready for recommendations") # Start the countdown display start_countdown() # Change port to 7860 for Hugging Face Spaces with multi-user support app.run(debug=False, host='0.0.0.0', port=7860, threaded=True) except KeyboardInterrupt: logger.info("šŸ›‘ Application interrupted by user") except Exception as e: logger.error(f"āŒ Fatal error starting application: {str(e)}") finally: # Stop the countdown display when the app stops stop_countdown_display() logger.info("šŸ›‘ Application stopped")