|
from flask import Flask, render_template, request, jsonify |
|
import requests |
|
import numpy as np |
|
from sentence_transformers import SentenceTransformer |
|
import faiss |
|
import json |
|
import os |
|
from datetime import datetime, timedelta |
|
import pandas as pd |
|
from sklearn.preprocessing import MinMaxScaler |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import urllib3 |
|
import chromadb |
|
from chromadb.config import Settings |
|
import time |
|
from flask_cors import CORS |
|
import logging |
|
import threading |
|
import sys |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
import math |
|
import psutil |
|
|
|
TARGET_PROPERTIES = 20 |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 |
|
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 |
|
|
|
|
|
CORS(app, |
|
resources={r"/*": { |
|
"origins": ["http://localhost:4200", "https://huggingface.co", "*"], |
|
"methods": ["GET", "POST", "OPTIONS"], |
|
"allow_headers": ["Content-Type", "Authorization", "X-Requested-With"], |
|
"supports_credentials": True, |
|
"max_age": 3600 |
|
}}) |
|
|
|
|
|
@app.after_request |
|
def after_request(response): |
|
response.headers.add('Access-Control-Allow-Origin', '*') |
|
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization') |
|
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS') |
|
response.headers.add('Access-Control-Allow-Credentials', 'true') |
|
return response |
|
|
|
|
|
import multiprocessing as mp |
|
from functools import partial |
|
|
|
|
|
try: |
|
logger.info("Loading sentence transformer model...") |
|
model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder=os.path.join(os.path.expanduser("~"), ".cache", "huggingface")) |
|
logger.info("Model loaded successfully") |
|
except Exception as e: |
|
logger.error(f"Error loading model: {str(e)}") |
|
raise |
|
|
|
|
|
TOTAL_PROPERTIES = 500 |
|
CACHE_EXPIRY_SECONDS = 24 * 60 * 60 |
|
BATCH_SIZE = 50 |
|
MAX_WORKERS = 10 |
|
|
|
|
|
|
|
MAX_CONCURRENT_REQUESTS = 50 |
|
REQUEST_RATE_LIMIT = 100 |
|
CONNECTION_POOL_SIZE = 20 |
|
|
|
|
|
session = requests.Session() |
|
adapter = requests.adapters.HTTPAdapter( |
|
pool_connections=CONNECTION_POOL_SIZE, |
|
pool_maxsize=CONNECTION_POOL_SIZE, |
|
max_retries=3 |
|
) |
|
session.mount('http://', adapter) |
|
session.mount('https://', adapter) |
|
|
|
|
|
property_collection = None |
|
|
|
|
|
CACHE_TIMESTAMP_PATH = os.path.join("property_db", "last_cache_time.txt") |
|
|
|
|
|
countdown_thread = None |
|
stop_countdown = False |
|
countdown_initialized = False |
|
|
|
|
|
active_requests = 0 |
|
request_lock = threading.Lock() |
|
|
|
|
|
background_tasks = {} |
|
task_lock = threading.Lock() |
|
is_fetching_properties = False |
|
fetch_lock = threading.Lock() |
|
|
|
def format_time_remaining(seconds): |
|
"""Format remaining time in a human-readable format""" |
|
if seconds < 60: |
|
return f"{int(seconds)} seconds" |
|
elif seconds < 3600: |
|
minutes = int(seconds / 60) |
|
remaining_seconds = int(seconds % 60) |
|
return f"{minutes} minutes {remaining_seconds} seconds" |
|
else: |
|
hours = int(seconds / 3600) |
|
minutes = int((seconds % 3600) / 60) |
|
return f"{hours} hours {minutes} minutes" |
|
|
|
def fetch_property_batch(page_number, property_type=None, min_price=None, max_price=None): |
|
"""Fetch a single batch of properties without timeout constraints""" |
|
url = "https://hivepropapi.azurewebsites.net/api/Property/allPropertieswithfulldetails" |
|
params = { |
|
"pageNumber": page_number, |
|
"pageSize": BATCH_SIZE |
|
} |
|
|
|
|
|
if property_type: |
|
params["propertyType"] = property_type |
|
if min_price is not None: |
|
params["minPrice"] = min_price |
|
if max_price is not None: |
|
params["maxPrice"] = max_price |
|
|
|
try: |
|
logger.info(f"Fetching batch {page_number} with {BATCH_SIZE} properties...") |
|
response = session.get(url, params=params, verify=False) |
|
response.raise_for_status() |
|
data = response.json() |
|
if data and 'data' in data: |
|
properties = data['data'] |
|
if properties: |
|
logger.info(f"Successfully fetched {len(properties)} properties from batch {page_number}") |
|
return properties |
|
else: |
|
logger.warning(f"Batch {page_number} returned empty property list") |
|
return [] |
|
logger.warning(f"No data found in batch {page_number} response") |
|
return [] |
|
except requests.exceptions.ConnectionError: |
|
logger.error(f"Failed to connect to the API server for batch {page_number}") |
|
return [] |
|
except Exception as e: |
|
logger.error(f"Error fetching batch {page_number}: {e}") |
|
return [] |
|
|
|
def fetch_properties_background_safe(property_type=None, min_price=None, max_price=None): |
|
"""Fetch properties in background with proper error handling and no timeout constraints""" |
|
global is_fetching_properties |
|
|
|
with fetch_lock: |
|
if is_fetching_properties: |
|
logger.info("Property fetch already in progress, skipping...") |
|
return None |
|
is_fetching_properties = True |
|
|
|
try: |
|
logger.info(f"π Starting background property fetch with {MAX_WORKERS} workers, batch size {BATCH_SIZE}") |
|
|
|
|
|
total_batches = math.ceil(TOTAL_PROPERTIES / BATCH_SIZE) |
|
logger.info(f"Total batches to fetch: {total_batches}") |
|
|
|
all_properties = [] |
|
start_time = time.time() |
|
completed_batches = 0 |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: |
|
|
|
future_to_batch = { |
|
executor.submit(fetch_property_batch, page_num, property_type, min_price, max_price): page_num |
|
for page_num in range(1, total_batches + 1) |
|
} |
|
|
|
|
|
for future in as_completed(future_to_batch): |
|
batch_num = future_to_batch[future] |
|
try: |
|
batch_properties = future.result() |
|
if batch_properties: |
|
all_properties.extend(batch_properties) |
|
completed_batches += 1 |
|
logger.info(f"Completed batch {batch_num}/{total_batches} - Total properties so far: {len(all_properties)}") |
|
else: |
|
logger.warning(f"Batch {batch_num} returned no properties") |
|
except Exception as e: |
|
logger.error(f"Batch {batch_num} generated an exception: {e}") |
|
continue |
|
|
|
end_time = time.time() |
|
fetch_duration = end_time - start_time |
|
|
|
if all_properties: |
|
|
|
all_properties = all_properties[:TOTAL_PROPERTIES] |
|
logger.info(f"β
Successfully fetched {len(all_properties)} properties in {fetch_duration:.2f} seconds using {completed_batches}/{total_batches} batches") |
|
return {"data": all_properties} |
|
else: |
|
logger.error(f"β Failed to fetch any properties after {fetch_duration:.2f} seconds") |
|
return None |
|
|
|
except Exception as e: |
|
logger.error(f"β Error during background property fetch: {str(e)}") |
|
return None |
|
finally: |
|
with fetch_lock: |
|
is_fetching_properties = False |
|
|
|
def fetch_properties(property_type=None, min_price=None, max_price=None): |
|
"""Fetch properties using parallel workers and batches - now calls background version""" |
|
return fetch_properties_background_safe(property_type, min_price, max_price) |
|
|
|
def update_cache_timestamp(): |
|
try: |
|
os.makedirs(os.path.dirname(CACHE_TIMESTAMP_PATH), exist_ok=True) |
|
current_time = time.time() |
|
with open(CACHE_TIMESTAMP_PATH, 'w') as f: |
|
f.write(str(current_time)) |
|
next_update = datetime.fromtimestamp(current_time + CACHE_EXPIRY_SECONDS) |
|
logger.info(f"Updated cache timestamp. Next update scheduled for: {next_update.strftime('%Y-%m-%d %H:%M:%S')}") |
|
logger.info(f"Next update in: {format_time_remaining(CACHE_EXPIRY_SECONDS)}") |
|
except Exception as e: |
|
logger.error(f"Error updating cache timestamp: {e}") |
|
|
|
def refresh_properties_background(): |
|
"""Refresh properties in background without blocking the main application""" |
|
try: |
|
logger.info("π Starting background property refresh...") |
|
data = fetch_properties_background_safe() |
|
if data and 'data' in data: |
|
new_properties = [p for p in data['data'] if isinstance(p, dict)] |
|
if new_properties: |
|
|
|
def update_cache_async(): |
|
try: |
|
update_property_cache(new_properties) |
|
update_cache_timestamp() |
|
logger.info("β
Background property refresh completed successfully") |
|
except Exception as e: |
|
logger.error(f"Error updating cache in background: {e}") |
|
|
|
|
|
threading.Thread(target=update_cache_async, daemon=True).start() |
|
else: |
|
logger.warning("β οΈ No properties fetched during background refresh") |
|
else: |
|
logger.warning("β οΈ Failed to fetch properties during background refresh") |
|
except Exception as e: |
|
logger.error(f"β Error during background property refresh: {str(e)}") |
|
|
|
def force_initial_property_fetch(): |
|
"""Force initial property fetch to ensure cache is created - non-blocking""" |
|
try: |
|
logger.info("π Force initial property fetch...") |
|
data = fetch_properties_background_safe() |
|
if data and 'data' in data: |
|
new_properties = [p for p in data['data'] if isinstance(p, dict)] |
|
if new_properties: |
|
|
|
def update_cache_async(): |
|
try: |
|
update_property_cache(new_properties) |
|
update_cache_timestamp() |
|
logger.info(f"β
Force initial fetch completed with {len(new_properties)} properties") |
|
except Exception as e: |
|
logger.error(f"Error updating cache in force initial fetch: {e}") |
|
|
|
|
|
threading.Thread(target=update_cache_async, daemon=True).start() |
|
return True |
|
else: |
|
logger.warning("β οΈ No properties fetched during force initial fetch") |
|
return False |
|
else: |
|
logger.warning("β οΈ Failed to fetch properties during force initial fetch") |
|
return False |
|
except Exception as e: |
|
logger.error(f"β Error during force initial property fetch: {str(e)}") |
|
return False |
|
|
|
def process_property_batch(properties_batch, existing_ids): |
|
"""Process a batch of properties for ChromaDB in parallel""" |
|
batch_ids = [] |
|
batch_metadatas = [] |
|
batch_documents = [] |
|
batch_embeddings = [] |
|
|
|
|
|
property_texts = [] |
|
valid_properties = [] |
|
|
|
for prop in properties_batch: |
|
try: |
|
prop_id = str(prop.get('id')) |
|
if prop_id not in existing_ids: |
|
|
|
property_text = f"{prop.get('propertyName', '')} {prop.get('typeName', '')} {prop.get('description', '')} {prop.get('address', '')}" |
|
property_texts.append(property_text) |
|
valid_properties.append(prop) |
|
except Exception as e: |
|
logger.error(f"Error processing property for batch: {str(e)}") |
|
continue |
|
|
|
if not valid_properties: |
|
return batch_ids, batch_metadatas, batch_documents, batch_embeddings |
|
|
|
|
|
try: |
|
embeddings = model.encode(property_texts) |
|
|
|
for i, prop in enumerate(valid_properties): |
|
try: |
|
prop_id = str(prop.get('id')) |
|
|
|
|
|
metadata = { |
|
'id': str(prop.get('id', '')), |
|
'propertyName': str(prop.get('propertyName', '')), |
|
'typeName': str(prop.get('typeName', '')), |
|
'description': str(prop.get('description', '')), |
|
'address': str(prop.get('address', '')), |
|
'totalSquareFeet': float(prop.get('totalSquareFeet', 0)), |
|
'beds': int(prop.get('beds', 0)), |
|
'baths': int(prop.get('baths', 0)), |
|
'numberOfRooms': int(prop.get('numberOfRooms', 0)), |
|
'marketValue': float(prop.get('marketValue', 0)), |
|
'yearBuilt': str(prop.get('yearBuilt', '')), |
|
'propertyImages': json.dumps(prop.get('propertyImages', [])), |
|
'features': json.dumps(prop.get('features', [])), |
|
'location': json.dumps(prop.get('location', {})), |
|
'floorPlans': json.dumps(prop.get('floorPlans', [])), |
|
'documents': json.dumps(prop.get('documents', [])), |
|
'propertyVideos': json.dumps(prop.get('propertyVideos', [])) |
|
} |
|
|
|
batch_ids.append(prop_id) |
|
batch_metadatas.append(metadata) |
|
batch_documents.append(property_texts[i]) |
|
batch_embeddings.append(embeddings[i].tolist()) |
|
except Exception as e: |
|
logger.error(f"Error processing property {i} in batch: {str(e)}") |
|
continue |
|
except Exception as e: |
|
logger.error(f"Error creating embeddings for batch: {str(e)}") |
|
|
|
return batch_ids, batch_metadatas, batch_documents, batch_embeddings |
|
|
|
def update_property_cache(new_properties): |
|
"""Update property cache with parallel processing optimization""" |
|
global property_collection |
|
try: |
|
if not new_properties: |
|
logger.warning("No new properties to add") |
|
return |
|
|
|
logger.info(f"Updating ChromaDB with {len(new_properties)} properties using parallel processing") |
|
start_time = time.time() |
|
|
|
|
|
existing_results = property_collection.get() |
|
existing_ids = set(existing_results['ids']) if existing_results and existing_results['ids'] else set() |
|
|
|
|
|
def filter_new_property(prop): |
|
try: |
|
prop_id = str(prop.get('id')) |
|
if prop_id not in existing_ids: |
|
return prop |
|
return None |
|
except Exception as e: |
|
logger.error(f"Error filtering property: {str(e)}") |
|
return None |
|
|
|
|
|
max_workers = min(mp.cpu_count(), 8) |
|
new_properties_filtered = [] |
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
future_to_property = { |
|
executor.submit(filter_new_property, prop): i |
|
for i, prop in enumerate(new_properties) |
|
} |
|
|
|
for future in as_completed(future_to_property): |
|
try: |
|
filtered_prop = future.result() |
|
if filtered_prop: |
|
new_properties_filtered.append(filtered_prop) |
|
except Exception as e: |
|
property_idx = future_to_property[future] |
|
logger.error(f"Error filtering property {property_idx}: {str(e)}") |
|
|
|
if not new_properties_filtered: |
|
logger.info("No new properties to add to ChromaDB") |
|
return |
|
|
|
logger.info(f"Processing {len(new_properties_filtered)} new properties in parallel") |
|
|
|
|
|
cache_batch_size = 100 |
|
all_ids = [] |
|
all_metadatas = [] |
|
all_documents = [] |
|
all_embeddings = [] |
|
|
|
|
|
def process_batch_parallel(batch): |
|
return process_property_batch(batch, existing_ids) |
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
|
batches = [new_properties_filtered[i:i + cache_batch_size] |
|
for i in range(0, len(new_properties_filtered), cache_batch_size)] |
|
|
|
|
|
future_to_batch = { |
|
executor.submit(process_batch_parallel, batch): i |
|
for i, batch in enumerate(batches) |
|
} |
|
|
|
|
|
for future in as_completed(future_to_batch): |
|
batch_idx = future_to_batch[future] |
|
try: |
|
batch_ids, batch_metadatas, batch_documents, batch_embeddings = future.result() |
|
all_ids.extend(batch_ids) |
|
all_metadatas.extend(batch_metadatas) |
|
all_documents.extend(batch_documents) |
|
all_embeddings.extend(batch_embeddings) |
|
logger.info(f"Processed batch {batch_idx + 1}/{len(batches)} - {len(batch_ids)} properties") |
|
except Exception as e: |
|
logger.error(f"Error processing batch {batch_idx}: {str(e)}") |
|
|
|
|
|
if all_ids: |
|
logger.info(f"Adding {len(all_ids)} new properties to ChromaDB") |
|
property_collection.add( |
|
ids=all_ids, |
|
metadatas=all_metadatas, |
|
documents=all_documents, |
|
embeddings=all_embeddings |
|
) |
|
end_time = time.time() |
|
logger.info(f"β
Successfully added {len(all_ids)} properties to ChromaDB in {end_time - start_time:.2f} seconds using parallel processing") |
|
else: |
|
logger.info("No new properties to add to ChromaDB") |
|
|
|
except Exception as e: |
|
logger.error(f"Error updating property cache: {str(e)}") |
|
raise |
|
|
|
def initialize_collection(): |
|
global property_collection |
|
try: |
|
|
|
try: |
|
property_collection = chroma_client.get_collection("properties") |
|
logger.info("Using existing property collection from vector DB") |
|
except Exception as e: |
|
logger.info("Creating new property collection") |
|
property_collection = chroma_client.create_collection( |
|
name="properties", |
|
metadata={"hnsw:space": "cosine"} |
|
) |
|
|
|
|
|
results = property_collection.get() |
|
if not results or not results['ids']: |
|
logger.info("Collection is empty, fetching initial properties") |
|
|
|
data = fetch_properties() |
|
if data and 'data' in data: |
|
new_properties = [p for p in data['data'] if isinstance(p, dict)] |
|
if new_properties: |
|
update_property_cache(new_properties) |
|
update_cache_timestamp() |
|
logger.info(f"β
Successfully added {len(new_properties)} initial properties to collection") |
|
else: |
|
logger.warning("No properties fetched from API during initialization") |
|
else: |
|
logger.warning("Failed to fetch properties from API during initialization") |
|
else: |
|
logger.info(f"Collection has {len(results['ids'])} properties, checking if cache is fresh") |
|
if is_cache_stale(): |
|
logger.info("Cache is stale, will refresh in background") |
|
|
|
threading.Thread(target=refresh_properties_background, daemon=True).start() |
|
else: |
|
logger.info("Cache is fresh, using existing properties") |
|
except Exception as e: |
|
logger.error(f"Error initializing collection: {str(e)}") |
|
|
|
try: |
|
logger.info("Attempting to recreate collection") |
|
try: |
|
chroma_client.delete_collection("properties") |
|
except: |
|
pass |
|
property_collection = chroma_client.create_collection( |
|
name="properties", |
|
metadata={"hnsw:space": "cosine"} |
|
) |
|
logger.info("Successfully recreated collection") |
|
except Exception as recreate_error: |
|
logger.error(f"Error recreating collection: {str(recreate_error)}") |
|
raise |
|
|
|
|
|
try: |
|
logger.info("Initializing ChromaDB...") |
|
chroma_client = chromadb.PersistentClient(path="property_db") |
|
|
|
initialize_collection() |
|
logger.info("ChromaDB initialized successfully") |
|
except Exception as e: |
|
logger.error(f"Error initializing ChromaDB: {str(e)}") |
|
raise |
|
|
|
def is_cache_stale(): |
|
"""Check if cache is stale and needs refresh""" |
|
if not os.path.exists(CACHE_TIMESTAMP_PATH): |
|
logger.info("No cache timestamp found, cache needs refresh") |
|
return True |
|
try: |
|
with open(CACHE_TIMESTAMP_PATH, 'r') as f: |
|
last_time = float(f.read().strip()) |
|
current_time = time.time() |
|
time_diff = current_time - last_time |
|
is_stale = time_diff > CACHE_EXPIRY_SECONDS |
|
|
|
if is_stale: |
|
logger.info(f"Cache is stale (older than {format_time_remaining(CACHE_EXPIRY_SECONDS)}). Last update: {datetime.fromtimestamp(last_time)}") |
|
logger.info(f"Time since last update: {format_time_remaining(time_diff)}") |
|
else: |
|
time_remaining = CACHE_EXPIRY_SECONDS - time_diff |
|
next_update = datetime.fromtimestamp(current_time + time_remaining) |
|
logger.info(f"Cache is fresh. Last update: {datetime.fromtimestamp(last_time)}") |
|
logger.info(f"Next update in: {format_time_remaining(time_remaining)}") |
|
logger.info(f"Next update scheduled for: {next_update.strftime('%Y-%m-%d %H:%M:%S')}") |
|
return is_stale |
|
except Exception as e: |
|
logger.error(f"Error reading cache timestamp: {e}, cache needs refresh") |
|
return True |
|
|
|
def calculate_property_score(prop, price_range=None, property_type=None, user_input=None): |
|
"""Calculate property score with optimized parallel processing""" |
|
score = 0.0 |
|
similarity_score = 0.0 |
|
|
|
|
|
if user_input: |
|
property_text = f"{prop.get('propertyName', '')} {prop.get('typeName', '')} {prop.get('description', '')} {prop.get('address', '')}" |
|
property_embedding = model.encode([property_text])[0] |
|
user_embedding = model.encode([user_input])[0] |
|
similarity_score = float(cosine_similarity([property_embedding], [user_embedding])[0][0]) |
|
score += similarity_score * 0.5 |
|
|
|
|
|
if property_type: |
|
prop_type = prop.get('typeName', '').lower() |
|
if isinstance(property_type, list): |
|
|
|
if any(pt.lower() == prop_type for pt in property_type): |
|
score += 0.3 |
|
elif any(pt.lower() in prop_type for pt in property_type): |
|
score += 0.2 |
|
else: |
|
|
|
if prop_type == property_type.lower(): |
|
score += 0.3 |
|
elif property_type.lower() in prop_type: |
|
score += 0.2 |
|
else: |
|
|
|
score += 0.1 |
|
|
|
|
|
if price_range: |
|
price = float(prop.get('marketValue', 0)) |
|
if isinstance(price_range, list) and len(price_range) > 0: |
|
if isinstance(price_range[0], list): |
|
|
|
price_in_range = False |
|
for range_item in price_range: |
|
min_val = range_item[0] if range_item[0] is not None else 0 |
|
max_val = range_item[1] if range_item[1] is not None else float('inf') |
|
if min_val <= price <= max_val: |
|
price_in_range = True |
|
score += 0.3 |
|
break |
|
if not price_in_range: |
|
|
|
for range_item in price_range: |
|
min_val = range_item[0] if range_item[0] is not None else 0 |
|
max_val = range_item[1] if range_item[1] is not None else float('inf') |
|
if price < min_val: |
|
price_diff = (min_val - price) / min_val if min_val > 0 else 1 |
|
score += 0.1 * (1 - min(price_diff, 1)) |
|
elif price < max_val * 1.2: |
|
price_diff = (price - max_val) / max_val if max_val > 0 else 1 |
|
score += 0.2 * (1 - min(price_diff, 1)) |
|
else: |
|
|
|
min_val = price_range[0] if price_range[0] is not None else 0 |
|
max_val = price_range[1] if price_range[1] is not None else float('inf') |
|
if min_val <= price <= max_val: |
|
score += 0.3 |
|
elif price < min_val: |
|
|
|
price_diff = (min_val - price) / min_val if min_val > 0 else 1 |
|
score += 0.1 * (1 - min(price_diff, 1)) |
|
elif price < max_val * 1.2: |
|
price_diff = (price - max_val) / max_val if max_val > 0 else 1 |
|
score += 0.2 * (1 - min(price_diff, 1)) |
|
|
|
|
|
features = [ |
|
'balcony', 'parking', 'security', 'garden', |
|
'swimming pool', 'gym', 'power backup', 'water supply' |
|
] |
|
description = prop.get('description', '') |
|
if description: |
|
description = description.lower() |
|
feature_count = sum(1 for feature in features if feature in description) |
|
score += (feature_count / len(features)) * 0.1 |
|
|
|
|
|
if prop.get('totalSquareFeet', 0) > 1000: |
|
score += 0.05 |
|
|
|
|
|
if prop.get('numberOfRooms', 0) >= 3: |
|
score += 0.05 |
|
|
|
return float(score), float(similarity_score) |
|
|
|
def calculate_property_scores_parallel(properties, price_range=None, property_type=None, user_input=None, max_workers=None): |
|
"""Calculate scores for multiple properties in parallel""" |
|
if not properties: |
|
return [] |
|
|
|
if max_workers is None: |
|
max_workers = min(mp.cpu_count(), 8) |
|
|
|
logger.info(f"Calculating scores for {len(properties)} properties using {max_workers} workers") |
|
|
|
|
|
if user_input: |
|
property_texts = [] |
|
for prop in properties: |
|
property_text = f"{prop.get('propertyName', '')} {prop.get('typeName', '')} {prop.get('description', '')} {prop.get('address', '')}" |
|
property_texts.append(property_text) |
|
|
|
|
|
try: |
|
property_embeddings = model.encode(property_texts) |
|
user_embedding = model.encode([user_input])[0] |
|
|
|
|
|
similarities = cosine_similarity(property_embeddings, [user_embedding]).flatten() |
|
except Exception as e: |
|
logger.error(f"Error in batch embedding: {e}") |
|
similarities = [0.0] * len(properties) |
|
else: |
|
similarities = [0.0] * len(properties) |
|
|
|
|
|
def score_single_property(args): |
|
prop, similarity = args |
|
score = 0.0 |
|
|
|
|
|
score += similarity * 0.5 |
|
|
|
|
|
if property_type: |
|
prop_type = prop.get('typeName', '').lower() |
|
if isinstance(property_type, list): |
|
if any(pt.lower() == prop_type for pt in property_type): |
|
score += 0.3 |
|
elif any(pt.lower() in prop_type for pt in property_type): |
|
score += 0.2 |
|
else: |
|
if prop_type == property_type.lower(): |
|
score += 0.3 |
|
elif property_type.lower() in prop_type: |
|
score += 0.2 |
|
else: |
|
score += 0.1 |
|
|
|
|
|
if price_range: |
|
price = float(prop.get('marketValue', 0)) |
|
if isinstance(price_range, list) and len(price_range) > 0: |
|
if isinstance(price_range[0], list): |
|
price_in_range = False |
|
for range_item in price_range: |
|
min_val = range_item[0] if range_item[0] is not None else 0 |
|
max_val = range_item[1] if range_item[1] is not None else float('inf') |
|
if min_val <= price <= max_val: |
|
price_in_range = True |
|
score += 0.3 |
|
break |
|
if not price_in_range: |
|
for range_item in price_range: |
|
min_val = range_item[0] if range_item[0] is not None else 0 |
|
max_val = range_item[1] if range_item[1] is not None else float('inf') |
|
if price < min_val: |
|
price_diff = (min_val - price) / min_val if min_val > 0 else 1 |
|
score += 0.1 * (1 - min(price_diff, 1)) |
|
elif price < max_val * 1.2: |
|
price_diff = (price - max_val) / max_val if max_val > 0 else 1 |
|
score += 0.2 * (1 - min(price_diff, 1)) |
|
else: |
|
min_val = price_range[0] if price_range[0] is not None else 0 |
|
max_val = price_range[1] if price_range[1] is not None else float('inf') |
|
if min_val <= price <= max_val: |
|
score += 0.3 |
|
elif price < min_val: |
|
price_diff = (min_val - price) / min_val if min_val > 0 else 1 |
|
score += 0.1 * (1 - min(price_diff, 1)) |
|
elif price < max_val * 1.2: |
|
price_diff = (price - max_val) / max_val if max_val > 0 else 1 |
|
score += 0.2 * (1 - min(price_diff, 1)) |
|
|
|
|
|
features = ['balcony', 'parking', 'security', 'garden', 'swimming pool', 'gym', 'power backup', 'water supply'] |
|
description = prop.get('description', '') |
|
if description: |
|
description = description.lower() |
|
feature_count = sum(1 for feature in features if feature in description) |
|
score += (feature_count / len(features)) * 0.1 |
|
|
|
|
|
if prop.get('totalSquareFeet', 0) > 1000: |
|
score += 0.05 |
|
if prop.get('numberOfRooms', 0) >= 3: |
|
score += 0.05 |
|
|
|
return { |
|
'property': prop, |
|
'score': float(score), |
|
'similarity': float(similarity) |
|
} |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
|
args = list(zip(properties, similarities)) |
|
|
|
|
|
future_to_property = { |
|
executor.submit(score_single_property, arg): i |
|
for i, arg in enumerate(args) |
|
} |
|
|
|
|
|
scored_properties = [] |
|
for future in as_completed(future_to_property): |
|
try: |
|
result = future.result() |
|
if result['score'] >= 0.1: |
|
scored_properties.append(result) |
|
except Exception as e: |
|
property_idx = future_to_property[future] |
|
logger.error(f"Error scoring property {property_idx}: {e}") |
|
|
|
|
|
scored_properties.sort(key=lambda x: x['score'], reverse=True) |
|
|
|
|
|
for result in scored_properties: |
|
result['property']['score'] = result['score'] |
|
result['property']['similarity'] = result['similarity'] |
|
|
|
logger.info(f"Successfully scored {len(scored_properties)} properties in parallel") |
|
return [result['property'] for result in scored_properties] |
|
|
|
def get_recommendations(user_input, price_range=None, property_type=None): |
|
global property_collection |
|
|
|
|
|
min_price = None |
|
max_price = None |
|
if price_range: |
|
if isinstance(price_range, list) and len(price_range) > 0: |
|
if isinstance(price_range[0], list): |
|
|
|
min_price = [range[0] for range in price_range] |
|
max_price = [range[1] for range in price_range] |
|
else: |
|
|
|
min_price = [price_range[0]] |
|
max_price = [price_range[1]] |
|
|
|
|
|
try: |
|
logger.info("Getting properties from ChromaDB...") |
|
properties = get_cached_properties( |
|
property_type=property_type, |
|
min_price=min_price, |
|
max_price=max_price |
|
) |
|
|
|
if not properties: |
|
logger.warning("No properties found in ChromaDB, fetching from API...") |
|
|
|
data = fetch_properties( |
|
property_type=property_type, |
|
min_price=min_price[0] if min_price else None, |
|
max_price=max_price[0] if max_price else None |
|
) |
|
if data and 'data' in data: |
|
new_properties = [p for p in data['data'] if isinstance(p, dict)] |
|
if new_properties: |
|
|
|
update_property_cache(new_properties) |
|
update_cache_timestamp() |
|
logger.info(f"β
Fetched and stored {len(new_properties)} properties from API") |
|
|
|
|
|
properties = get_cached_properties( |
|
property_type=property_type, |
|
min_price=min_price, |
|
max_price=max_price |
|
) |
|
else: |
|
logger.warning("No properties fetched from API") |
|
return [] |
|
else: |
|
logger.warning("Failed to fetch properties from API") |
|
return [] |
|
except Exception as e: |
|
logger.error(f"Error getting properties from ChromaDB: {str(e)}") |
|
properties = [] |
|
|
|
|
|
if not properties: |
|
logger.warning("Still no properties found, trying fallback fetch...") |
|
try: |
|
|
|
all_properties = get_cached_properties( |
|
property_type=None, |
|
min_price=None, |
|
max_price=None |
|
) |
|
if all_properties: |
|
logger.info(f"β
Found {len(all_properties)} properties in fallback fetch") |
|
properties = all_properties |
|
else: |
|
logger.error("β No properties available even in fallback") |
|
return [] |
|
except Exception as e: |
|
logger.error(f"Error in fallback property fetch: {str(e)}") |
|
return [] |
|
|
|
if not properties: |
|
logger.warning("No properties found matching criteria") |
|
return [] |
|
|
|
|
|
scored_properties = calculate_property_scores_parallel( |
|
properties, price_range, property_type, user_input, max_workers=min(mp.cpu_count(), 8) |
|
) |
|
|
|
|
|
if len(scored_properties) >= TARGET_PROPERTIES: |
|
logger.info(f"Found {len(scored_properties)} properties matching exact criteria, returning top {TARGET_PROPERTIES}") |
|
return scored_properties[:TARGET_PROPERTIES] |
|
|
|
|
|
logger.info(f"Found only {len(scored_properties)} properties matching exact criteria, need {TARGET_PROPERTIES - len(scored_properties)} more") |
|
|
|
|
|
all_properties_in_range = get_cached_properties( |
|
property_type=None, |
|
min_price=min_price, |
|
max_price=max_price |
|
) |
|
|
|
if not all_properties_in_range: |
|
logger.warning("No properties found in price range, trying fallback to all properties...") |
|
|
|
all_properties_in_range = get_cached_properties( |
|
property_type=None, |
|
min_price=None, |
|
max_price=None |
|
) |
|
if not all_properties_in_range: |
|
logger.error("β No properties available even in final fallback") |
|
return scored_properties |
|
|
|
|
|
all_scored_properties = calculate_property_scores_parallel( |
|
all_properties_in_range, price_range, None, user_input, max_workers=min(mp.cpu_count(), 8) |
|
) |
|
|
|
|
|
all_scored_properties = [prop for prop in all_scored_properties if prop.get('score', 0) >= 0.05] |
|
|
|
|
|
def select_final_properties(): |
|
final_properties = [] |
|
used_ids = set() |
|
|
|
for prop in scored_properties: |
|
if len(final_properties) >= TARGET_PROPERTIES: |
|
break |
|
prop_id = str(prop.get('id')) |
|
if prop_id not in used_ids: |
|
final_properties.append(prop) |
|
used_ids.add(prop_id) |
|
|
|
for prop in all_scored_properties: |
|
if len(final_properties) >= TARGET_PROPERTIES: |
|
break |
|
prop_id = str(prop.get('id')) |
|
if prop_id not in used_ids: |
|
final_properties.append(prop) |
|
used_ids.add(prop_id) |
|
return final_properties |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=1) as executor: |
|
future = executor.submit(select_final_properties) |
|
final_properties = future.result() |
|
|
|
logger.info(f"Returning {len(final_properties)} properties total: {len(scored_properties)} exact matches + {len(final_properties) - len(scored_properties)} additional") |
|
return final_properties |
|
|
|
def get_cached_properties(property_type=None, min_price=None, max_price=None): |
|
"""Get properties from ChromaDB with parallel processing optimization""" |
|
global property_collection |
|
try: |
|
|
|
if not property_collection: |
|
logger.info("Collection not initialized, reinitializing...") |
|
initialize_collection() |
|
if not property_collection: |
|
logger.error("Failed to initialize collection") |
|
return [] |
|
|
|
logger.info("Fetching properties from ChromaDB with parallel processing...") |
|
|
|
results = property_collection.get() |
|
if not results or not results['ids']: |
|
logger.warning("No properties found in ChromaDB") |
|
return [] |
|
|
|
logger.info(f"Found {len(results['ids'])} properties in ChromaDB") |
|
|
|
|
|
def process_single_property(args): |
|
i, metadata = args |
|
try: |
|
|
|
property_data = { |
|
'id': metadata['id'], |
|
'propertyName': metadata['propertyName'], |
|
'typeName': metadata['typeName'], |
|
'description': metadata['description'], |
|
'address': metadata['address'], |
|
'totalSquareFeet': float(metadata['totalSquareFeet']), |
|
'beds': int(metadata['beds']), |
|
'baths': int(metadata['baths']), |
|
'numberOfRooms': int(metadata['numberOfRooms']), |
|
'marketValue': float(metadata['marketValue']), |
|
'yearBuilt': metadata['yearBuilt'], |
|
'propertyImages': json.loads(metadata['propertyImages']), |
|
'features': json.loads(metadata['features']), |
|
'location': json.loads(metadata['location']), |
|
'floorPlans': json.loads(metadata['floorPlans']), |
|
'documents': json.loads(metadata['documents']), |
|
'propertyVideos': json.loads(metadata['propertyVideos']) |
|
} |
|
return property_data |
|
except Exception as e: |
|
logger.error(f"Error processing property {i}: {str(e)}") |
|
return None |
|
|
|
|
|
max_workers = min(mp.cpu_count(), 8) |
|
properties = [] |
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
|
args = [(i, results['metadatas'][i]) for i in range(len(results['ids']))] |
|
|
|
|
|
future_to_index = { |
|
executor.submit(process_single_property, arg): i |
|
for i, arg in enumerate(args) |
|
} |
|
|
|
|
|
for future in as_completed(future_to_index): |
|
try: |
|
property_data = future.result() |
|
if property_data: |
|
properties.append(property_data) |
|
except Exception as e: |
|
index = future_to_index[future] |
|
logger.error(f"Error processing property {index}: {str(e)}") |
|
|
|
logger.info(f"Successfully processed {len(properties)} properties from ChromaDB") |
|
|
|
|
|
def filter_single_property(prop): |
|
try: |
|
|
|
if property_type: |
|
prop_type = prop.get('typeName', '').lower() |
|
if isinstance(property_type, list): |
|
|
|
if not any(pt.lower() == prop_type for pt in property_type): |
|
return None |
|
else: |
|
|
|
if prop_type != property_type.lower(): |
|
return None |
|
|
|
|
|
if min_price is not None and max_price is not None: |
|
price = prop.get('marketValue', 0) |
|
if isinstance(min_price, list) and isinstance(max_price, list): |
|
|
|
price_in_range = False |
|
for i in range(len(min_price)): |
|
min_val = min_price[i] if min_price[i] is not None else 0 |
|
max_val = max_price[i] if max_price[i] is not None else float('inf') |
|
if min_val <= price <= max_val: |
|
price_in_range = True |
|
break |
|
if not price_in_range: |
|
return None |
|
else: |
|
|
|
min_val = min_price if min_price is not None else 0 |
|
max_val = max_price if max_price is not None else float('inf') |
|
if price < min_val or price > max_val: |
|
return None |
|
|
|
return prop |
|
except Exception as e: |
|
logger.error(f"Error filtering property: {str(e)}") |
|
return None |
|
|
|
|
|
filtered_properties = [] |
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
|
future_to_property = { |
|
executor.submit(filter_single_property, prop): i |
|
for i, prop in enumerate(properties) |
|
} |
|
|
|
|
|
for future in as_completed(future_to_property): |
|
try: |
|
filtered_prop = future.result() |
|
if filtered_prop: |
|
filtered_properties.append(filtered_prop) |
|
except Exception as e: |
|
property_idx = future_to_property[future] |
|
logger.error(f"Error filtering property {property_idx}: {str(e)}") |
|
|
|
logger.info(f"Filtered to {len(filtered_properties)} properties from ChromaDB using parallel processing") |
|
return filtered_properties |
|
except Exception as e: |
|
logger.error(f"Error getting properties from ChromaDB: {str(e)}") |
|
return [] |
|
|
|
@app.route('/') |
|
def home(): |
|
return render_template('index.html') |
|
import base64 |
|
import logging |
|
from flask import request, jsonify |
|
import requests |
|
|
|
TWILIO_ACCOUNT_SID = "AC3bb0a3ea71812794b85b75ccf7d6ad07" |
|
TWILIO_AUTH_TOKEN = "ddba202f5f4a48d08f04fb983a161fa3" |
|
TWILIO_WHATSAPP_NUMBER = "whatsapp:+14155238886" |
|
WHATSAPP_CHAR_LIMIT = 1600 |
|
PROPERTY_BASE_URL = "https://hiveprop.com/property/image-slider?id=" |
|
MAX_PROPERTIES_TO_SEND = 1 |
|
HIVEPROP_EMAIL_API_URL = "https://hivepropapi.azurewebsites.net/api/Email/sendPlainText" |
|
|
|
def send_email_recommendations(email, first_name, last_name, recommendations): |
|
"""Send property recommendations via email using HiveProp API""" |
|
try: |
|
|
|
top_recommendations = recommendations[:MAX_PROPERTIES_TO_SEND] |
|
|
|
|
|
subject = f"Your Top {MAX_PROPERTIES_TO_SEND} Property Recommendations - HiveProp" |
|
|
|
|
|
email_body = f"""<!DOCTYPE html> |
|
<html> |
|
<head> |
|
<style> |
|
body{{font-family:Arial;color:#333;margin:0;padding:20px}} |
|
.header{{background:#2c5aa0;color:white;padding:15px;text-align:center;border-radius:5px}} |
|
.content{{padding:15px}} |
|
.property{{border:1px solid #ddd;padding:10px;margin:10px 0;border-radius:3px;background:#f9f9f9}} |
|
.btn{{background:#2c5aa0;color:white;padding:8px 15px;text-decoration:none;border-radius:3px;display:inline-block}} |
|
.footer{{background:#f8f9fa;padding:15px;text-align:center;border-radius:3px}} |
|
</style> |
|
</head> |
|
<body> |
|
<div class="header"> |
|
<h1>π‘ Hello {first_name} {last_name}!</h1> |
|
</div> |
|
<div class="content"> |
|
<h2>Your Top {len(top_recommendations)} Property Recommendations</h2>""" |
|
|
|
if not top_recommendations: |
|
email_body += """ |
|
<div class="property"> |
|
<h3>β οΈ No specific properties found</h3> |
|
<p>We couldn't find specific properties matching your criteria, but you can explore all available properties on our website.</p> |
|
<a href="https://hiveprop.com" class="btn">Explore All Properties</a> |
|
</div>""" |
|
else: |
|
for idx, prop in enumerate(top_recommendations, 1): |
|
property_name = prop.get('propertyName', 'Unnamed Property') |
|
property_id = prop.get('id', '') |
|
property_link = f"{PROPERTY_BASE_URL}{property_id}" |
|
address = prop.get('address', '') |
|
location = address.split(',')[-2].strip() if ',' in address else address |
|
|
|
email_body += f""" |
|
<div class="property"> |
|
<h3>{idx}. {property_name}</h3> |
|
<p>π {location}</p> |
|
<a href="{property_link}" class="btn">View Details</a> |
|
</div>""" |
|
|
|
email_body += f""" |
|
</div> |
|
<div class="footer"> |
|
<h3>π Looking for more options?</h3> |
|
<p>Explore all listings at <a href="https://hiveprop.com">hiveprop.com</a></p> |
|
<p>Thank you for using HiveProp!</p> |
|
</div> |
|
</body> |
|
</html>""" |
|
|
|
|
|
params = { |
|
'toEmail': email, |
|
'subject': subject, |
|
'body': email_body |
|
} |
|
|
|
logger.info(f"Sending email to {email} via HiveProp API") |
|
logger.info(f"API URL: {HIVEPROP_EMAIL_API_URL}") |
|
logger.info(f"Subject: {subject}") |
|
logger.info(f"Request params: {params}") |
|
|
|
|
|
response = requests.post(HIVEPROP_EMAIL_API_URL, params=params, timeout=30) |
|
logger.info(f"Email API response status: {response.status_code}") |
|
logger.info(f"Email API response text: {response.text}") |
|
|
|
|
|
if response.status_code == 404: |
|
logger.error(f"Email API endpoint not found (404): {HIVEPROP_EMAIL_API_URL}") |
|
return False, f"Email API endpoint not found. Please check the API URL." |
|
elif response.status_code != 200: |
|
logger.error(f"Email API returned status {response.status_code}: {response.text}") |
|
return False, f"Email API error: Status {response.status_code} - {response.text}" |
|
|
|
response.raise_for_status() |
|
|
|
logger.info(f"Email sent successfully to {email}") |
|
return True, "Email sent successfully" |
|
|
|
except requests.exceptions.Timeout: |
|
logger.error("HiveProp Email API request timed out") |
|
return False, "Email API timeout - service may be slow or unavailable" |
|
except requests.exceptions.ConnectionError as e: |
|
logger.error(f"HiveProp Email API connection error: {str(e)}") |
|
return False, "Email API connection failed - service may be down" |
|
except requests.exceptions.HTTPError as e: |
|
error_text = e.response.text if e.response else "No response text" |
|
status_code = e.response.status_code if e.response else "Unknown" |
|
logger.error(f"HiveProp Email API error: Status {status_code}, Response: {error_text}") |
|
return False, f"Email API error: Status {status_code} - {error_text}" |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Request error sending email: {str(e)}") |
|
return False, f"Email request failed: {str(e)}" |
|
except Exception as e: |
|
logger.error(f"Unexpected error sending email: {str(e)}") |
|
return False, f"Failed to send email: {str(e)}" |
|
|
|
@app.route('/create_account_and_send_whatsapp', methods=['POST']) |
|
def create_account_and_send_whatsapp(): |
|
try: |
|
data = request.json |
|
|
|
|
|
first_name = data.get('firstName') |
|
last_name = data.get('lastName') |
|
phone = data.get('phone') |
|
email = data.get('email') |
|
|
|
|
|
user_input = data.get('description', '') |
|
price_range = data.get('price_range', None) |
|
property_type = data.get('property_type', None) |
|
|
|
logger.info(f"Received data: firstName={first_name}, lastName={last_name}, email={email}, phone={phone}") |
|
logger.info(f"Search criteria: description='{user_input}', price_range={price_range}, property_type={property_type}") |
|
|
|
|
|
logger.info("Generating recommendations in backend...") |
|
recommendations = get_recommendations(user_input, price_range, property_type) |
|
logger.info(f"Generated {len(recommendations)} recommendations") |
|
|
|
|
|
if not email or '@' not in email: |
|
logger.error(f"Invalid email format: {email}") |
|
return jsonify({"success": False, "error": "Invalid email format. Please provide a valid email address."}), 400 |
|
|
|
|
|
|
|
phone = phone.replace(' ', '').replace('-', '').replace('(', '').replace(')', '') |
|
|
|
|
|
if not phone.startswith('+'): |
|
phone = f"+91{phone}" |
|
|
|
|
|
if not phone.startswith('+91') or len(phone) != 13 or not phone[3:].isdigit(): |
|
logger.error(f"Invalid phone number format: {phone}") |
|
return jsonify({"success": False, "error": "Invalid phone number format. Please provide a valid 10-digit Indian mobile number."}), 400 |
|
|
|
|
|
whatsapp_success = False |
|
email_success = False |
|
messages = [] |
|
|
|
|
|
try: |
|
|
|
whatsapp_recommendations = recommendations[:5] |
|
email_recommendations = recommendations[:MAX_PROPERTIES_TO_SEND] |
|
|
|
logger.info(f"Total recommendations received: {len(recommendations)}") |
|
logger.info(f"WhatsApp recommendations to send: {len(whatsapp_recommendations)}") |
|
|
|
|
|
message_header = f"π‘ *Hello {first_name} {last_name}!*\n\nHere are your *top {len(whatsapp_recommendations)} recommended properties* in Hyderabad:\n" |
|
|
|
message_body = "" |
|
if not whatsapp_recommendations: |
|
message_body = "β οΈ No specific properties found, but you can explore all available properties at https://hiveprop.com\n\n" |
|
else: |
|
for idx, prop in enumerate(whatsapp_recommendations, 1): |
|
property_name = prop.get('propertyName', 'Unnamed Property') |
|
property_id = prop.get('id', '') |
|
property_link = f"{PROPERTY_BASE_URL}{property_id}" |
|
|
|
|
|
address = prop.get('address', '') |
|
location = address.split(',')[-2].strip() if ',' in address else address |
|
|
|
message_body += f"{idx}οΈβ£ *{property_name}*\n" |
|
message_body += f"π *{location}*\n" |
|
message_body += f"π{property_link}\n\n" |
|
|
|
|
|
message_footer = "---\nπ *Looking for more options?* Explore all listings at https://hiveprop.com." |
|
|
|
full_message = message_header + message_body + message_footer |
|
|
|
logger.info(f"WhatsApp message length: {len(full_message)}") |
|
|
|
|
|
credentials = f"{TWILIO_ACCOUNT_SID}:{TWILIO_AUTH_TOKEN}" |
|
encoded_credentials = base64.b64encode(credentials.encode('utf-8')).decode('utf-8') |
|
|
|
|
|
payload = { |
|
"To": f"whatsapp:{phone}", |
|
"From": TWILIO_WHATSAPP_NUMBER, |
|
"Body": full_message |
|
} |
|
|
|
headers = { |
|
"Authorization": f"Basic {encoded_credentials}", |
|
"Content-Type": "application/x-www-form-urlencoded" |
|
} |
|
|
|
response = requests.post( |
|
f"https://api.twilio.com/2010-04-01/Accounts/{TWILIO_ACCOUNT_SID}/Messages.json", |
|
data=payload, |
|
headers=headers |
|
) |
|
response.raise_for_status() |
|
|
|
logger.info("WhatsApp message sent successfully") |
|
whatsapp_success = True |
|
messages.append("WhatsApp message sent successfully!") |
|
|
|
except requests.exceptions.HTTPError as e: |
|
logger.error(f"Twilio API error: {e.response.text}") |
|
messages.append(f"WhatsApp error: {e.response.text}") |
|
except Exception as e: |
|
logger.error(f"Unexpected WhatsApp error: {str(e)}") |
|
messages.append(f"WhatsApp error: {str(e)}") |
|
|
|
|
|
logger.info(f"Sending email to {email} with {len(email_recommendations)} recommendations") |
|
email_success, email_message = send_email_recommendations(email, first_name, last_name, email_recommendations) |
|
messages.append(email_message) |
|
|
|
|
|
success = whatsapp_success or email_success |
|
if whatsapp_success and email_success: |
|
overall_message = "Account created! Messages sent via WhatsApp and email!" |
|
elif whatsapp_success: |
|
overall_message = "Account created! WhatsApp message sent (email failed)" |
|
elif email_success: |
|
overall_message = "Account created! Email sent (WhatsApp failed)" |
|
else: |
|
overall_message = "Account created but failed to send messages" |
|
|
|
return jsonify({ |
|
"success": success, |
|
"message": overall_message, |
|
"details": messages, |
|
"recommendations": recommendations |
|
}) |
|
|
|
except requests.exceptions.HTTPError as e: |
|
logger.error(f"Twilio API error: {e.response.text}") |
|
return jsonify({"success": False, "error": f"Twilio API error: {e.response.text}"}), 500 |
|
|
|
except Exception as e: |
|
logger.error(f"Unexpected error: {str(e)}") |
|
return jsonify({"success": False, "error": f"Failed to send WhatsApp message: {str(e)}"}), 500 |
|
|
|
@app.route('/get_recommendations', methods=['POST']) |
|
def get_recommendations_route(): |
|
"""Get property recommendations with parallel processing optimization and non-blocking property fetch - no timeouts""" |
|
global active_requests |
|
|
|
|
|
with request_lock: |
|
if active_requests >= MAX_CONCURRENT_REQUESTS: |
|
logger.warning(f"Too many concurrent requests ({active_requests}), rejecting request") |
|
return jsonify({"error": "Server is busy, please try again later"}), 503 |
|
active_requests += 1 |
|
|
|
try: |
|
|
|
if not request.is_json: |
|
return jsonify({"error": "Request must be JSON"}), 400 |
|
|
|
data = request.json |
|
if not data: |
|
return jsonify({"error": "No data provided"}), 400 |
|
|
|
user_input = data.get('description', '') |
|
price_range = data.get('price_range', None) |
|
property_type = data.get('property_type', None) |
|
|
|
|
|
if not user_input and not property_type and not price_range: |
|
return jsonify({"error": "Please provide at least one search criteria"}) |
|
|
|
|
|
if not user_input: |
|
user_input = f"{property_type if property_type else ''} property" |
|
if price_range: |
|
|
|
if isinstance(price_range, list) and len(price_range) > 0: |
|
if isinstance(price_range[0], list): |
|
|
|
ranges_text = [] |
|
for range_item in price_range: |
|
ranges_text.append(f"βΉ{range_item[0]} to βΉ{range_item[1]}") |
|
user_input += f" within price ranges: {', '.join(ranges_text)}" |
|
else: |
|
|
|
user_input += f" within price range βΉ{price_range[0]} to βΉ{price_range[1]}" |
|
|
|
logger.info(f"Search request - Type: {property_type}, Price Range: {price_range}, Description: {user_input}") |
|
|
|
|
|
min_price = None |
|
max_price = None |
|
if price_range: |
|
if isinstance(price_range, list) and len(price_range) > 0: |
|
if isinstance(price_range[0], list): |
|
|
|
min_price = [range[0] for range in price_range] |
|
max_price = [range[1] for range in price_range] |
|
else: |
|
|
|
min_price = [price_range[0]] |
|
max_price = [price_range[1]] |
|
|
|
|
|
try: |
|
properties = get_cached_properties( |
|
property_type=property_type, |
|
min_price=min_price, |
|
max_price=max_price |
|
) |
|
except Exception as e: |
|
logger.error(f"Error getting cached properties: {str(e)}") |
|
properties = [] |
|
|
|
|
|
if not properties and not is_fetching_properties: |
|
logger.info("No properties found, initiating background fetch...") |
|
try: |
|
threading.Thread(target=lambda: fetch_properties_background_safe(property_type, min_price, max_price), daemon=True).start() |
|
except Exception as e: |
|
logger.error(f"Error starting background fetch: {str(e)}") |
|
|
|
|
|
def process_recommendations(): |
|
try: |
|
return get_recommendations(user_input, price_range, property_type) |
|
except Exception as e: |
|
logger.error(f"Error in process_recommendations: {str(e)}") |
|
return [] |
|
|
|
try: |
|
with ThreadPoolExecutor(max_workers=1) as executor: |
|
future = executor.submit(process_recommendations) |
|
recommendations = future.result() |
|
|
|
|
|
if not recommendations: |
|
logger.warning("No recommendations returned, getting fallback properties...") |
|
|
|
try: |
|
fallback_properties = get_cached_properties( |
|
property_type=None, |
|
min_price=None, |
|
max_price=None |
|
) |
|
if fallback_properties: |
|
|
|
fallback_scored = calculate_property_scores_parallel( |
|
fallback_properties, None, None, "", max_workers=min(mp.cpu_count(), 8) |
|
) |
|
fallback_scored = sorted(fallback_scored, key=lambda x: x.get('score', 0), reverse=True) |
|
recommendations = fallback_scored[:TARGET_PROPERTIES] |
|
logger.info(f"Returning {len(recommendations)} fallback properties") |
|
else: |
|
recommendations = [] |
|
logger.error("No fallback properties available") |
|
except Exception as e: |
|
logger.error(f"Error getting fallback properties: {str(e)}") |
|
recommendations = [] |
|
|
|
logger.info(f"Returning {len(recommendations)} properties for search using parallel processing") |
|
return jsonify(recommendations) |
|
except Exception as e: |
|
logger.error(f"Error processing recommendations: {str(e)}") |
|
|
|
try: |
|
fallback_properties = get_cached_properties( |
|
property_type=None, |
|
min_price=None, |
|
max_price=None |
|
) |
|
if fallback_properties: |
|
fallback_scored = calculate_property_scores_parallel( |
|
fallback_properties, None, None, "", max_workers=min(mp.cpu_count(), 8) |
|
) |
|
fallback_scored = sorted(fallback_scored, key=lambda x: x.get('score', 0), reverse=True) |
|
fallback_result = fallback_scored[:TARGET_PROPERTIES] |
|
logger.info(f"Returning {len(fallback_result)} fallback properties due to error") |
|
return jsonify(fallback_result) |
|
except Exception as fallback_error: |
|
logger.error(f"Error getting fallback properties: {str(fallback_error)}") |
|
|
|
return jsonify({"error": "An error occurred while processing recommendations"}) |
|
except Exception as e: |
|
logger.error(f"Unexpected error in get_recommendations_route: {str(e)}") |
|
return jsonify({"error": "An unexpected error occurred"}), 500 |
|
finally: |
|
|
|
with request_lock: |
|
active_requests -= 1 |
|
|
|
@app.route('/refresh_properties', methods=['POST']) |
|
def refresh_properties_route(): |
|
"""Manually trigger property refresh (for debugging) - non-blocking""" |
|
try: |
|
logger.info("π Manual property refresh triggered") |
|
|
|
threading.Thread(target=refresh_properties_background, daemon=True).start() |
|
return jsonify({"message": "Property refresh started in background", "status": "success"}) |
|
except Exception as e: |
|
logger.error(f"Error triggering property refresh: {str(e)}") |
|
return jsonify({"error": "Failed to trigger property refresh"}) |
|
|
|
@app.route('/fetch_properties', methods=['POST']) |
|
def fetch_properties_route(): |
|
"""Manually fetch and store properties (for debugging) - non-blocking""" |
|
try: |
|
logger.info("π Manual property fetch triggered") |
|
|
|
def fetch_and_store(): |
|
try: |
|
data = fetch_properties_background_safe() |
|
if data and 'data' in data: |
|
new_properties = [p for p in data['data'] if isinstance(p, dict)] |
|
if new_properties: |
|
|
|
update_property_cache(new_properties) |
|
update_cache_timestamp() |
|
logger.info(f"β
Manually fetched and stored {len(new_properties)} properties") |
|
else: |
|
logger.warning("No properties fetched from API") |
|
else: |
|
logger.warning("Failed to fetch properties from API") |
|
except Exception as e: |
|
logger.error(f"Error in manual property fetch: {str(e)}") |
|
|
|
threading.Thread(target=fetch_and_store, daemon=True).start() |
|
return jsonify({ |
|
"message": "Property fetch started in background", |
|
"status": "success" |
|
}) |
|
except Exception as e: |
|
logger.error(f"Error in manual property fetch: {str(e)}") |
|
return jsonify({"error": f"Failed to fetch properties: {str(e)}"}) |
|
|
|
@app.route('/cache_status', methods=['GET']) |
|
def cache_status_route(): |
|
"""Get cache status information""" |
|
try: |
|
property_count = check_property_cache_status() |
|
cache_stale = is_cache_stale() |
|
|
|
status_info = { |
|
"property_count": property_count, |
|
"cache_stale": cache_stale, |
|
"cache_expiry_hours": CACHE_EXPIRY_SECONDS / 3600 |
|
} |
|
|
|
if os.path.exists(CACHE_TIMESTAMP_PATH): |
|
with open(CACHE_TIMESTAMP_PATH, 'r') as f: |
|
last_time = float(f.read().strip()) |
|
status_info["last_update"] = datetime.fromtimestamp(last_time).isoformat() |
|
else: |
|
status_info["last_update"] = None |
|
|
|
return jsonify(status_info) |
|
except Exception as e: |
|
logger.error(f"Error getting cache status: {str(e)}") |
|
return jsonify({"error": "Failed to get cache status"}) |
|
|
|
@app.route('/health', methods=['GET']) |
|
def health_check(): |
|
"""Health check endpoint for monitoring multi-user performance""" |
|
try: |
|
cpu_percent = psutil.cpu_percent(interval=1) |
|
memory = psutil.virtual_memory() |
|
|
|
health_info = { |
|
"status": "healthy", |
|
"active_requests": active_requests, |
|
"max_concurrent_requests": MAX_CONCURRENT_REQUESTS, |
|
"cpu_usage_percent": cpu_percent, |
|
"memory_usage_percent": memory.percent, |
|
"memory_available_gb": round(memory.available / (1024**3), 2), |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
|
|
if active_requests >= MAX_CONCURRENT_REQUESTS * 0.8: |
|
health_info["status"] = "warning" |
|
if cpu_percent > 80 or memory.percent > 80: |
|
health_info["status"] = "warning" |
|
|
|
return jsonify(health_info) |
|
except Exception as e: |
|
logger.error(f"Error in health check: {str(e)}") |
|
return jsonify({"status": "error", "error": str(e)}) |
|
|
|
def display_countdown(): |
|
"""Display countdown in terminal and manage scheduled property refresh""" |
|
global stop_countdown, countdown_initialized |
|
|
|
|
|
if not countdown_initialized: |
|
logger.info("π Initializing countdown system...") |
|
|
|
if not os.path.exists(CACHE_TIMESTAMP_PATH): |
|
logger.info("π No cache timestamp found, performing initial property fetch...") |
|
|
|
threading.Thread(target=force_initial_property_fetch, daemon=True).start() |
|
countdown_initialized = True |
|
|
|
last_display_time = 0 |
|
display_interval = 60 |
|
|
|
while not stop_countdown: |
|
try: |
|
current_time = time.time() |
|
|
|
|
|
if current_time - last_display_time >= display_interval: |
|
if os.path.exists(CACHE_TIMESTAMP_PATH): |
|
with open(CACHE_TIMESTAMP_PATH, 'r') as f: |
|
last_time = float(f.read().strip()) |
|
|
|
time_diff = current_time - last_time |
|
time_remaining = CACHE_EXPIRY_SECONDS - time_diff |
|
|
|
if time_remaining > 0: |
|
|
|
sys.stdout.write('\r' + ' ' * 100) |
|
sys.stdout.write('\r') |
|
next_update = datetime.fromtimestamp(current_time + time_remaining) |
|
sys.stdout.write(f"β° Next data fetch in: {format_time_remaining(time_remaining)} (at {next_update.strftime('%H:%M:%S')})") |
|
sys.stdout.flush() |
|
last_display_time = current_time |
|
else: |
|
|
|
logger.info("π 24 hours have passed, starting scheduled property refresh...") |
|
threading.Thread(target=refresh_properties_background, daemon=True).start() |
|
sys.stdout.write('\r' + ' ' * 100) |
|
sys.stdout.write('\r') |
|
sys.stdout.write("β
Scheduled property refresh initiated") |
|
sys.stdout.flush() |
|
last_display_time = current_time |
|
|
|
time.sleep(60) |
|
else: |
|
|
|
sys.stdout.write('\r' + ' ' * 100) |
|
sys.stdout.write('\r') |
|
sys.stdout.write("π No cache timestamp found. Attempting initial fetch...") |
|
sys.stdout.flush() |
|
last_display_time = current_time |
|
|
|
|
|
threading.Thread(target=force_initial_property_fetch, daemon=True).start() |
|
sys.stdout.write('\r' + ' ' * 100) |
|
sys.stdout.write('\r') |
|
sys.stdout.write("β
Initial property fetch initiated") |
|
sys.stdout.flush() |
|
time.sleep(300) |
|
continue |
|
|
|
|
|
time.sleep(10) |
|
|
|
except Exception as e: |
|
logger.error(f"Error in countdown display: {e}") |
|
time.sleep(30) |
|
|
|
def start_countdown(): |
|
"""Start the countdown display thread""" |
|
global countdown_thread, stop_countdown, countdown_initialized |
|
stop_countdown = False |
|
countdown_initialized = False |
|
countdown_thread = threading.Thread(target=display_countdown, daemon=True) |
|
countdown_thread.start() |
|
logger.info("β
Countdown thread started") |
|
|
|
def stop_countdown_display(): |
|
"""Stop the countdown display thread""" |
|
global stop_countdown |
|
stop_countdown = True |
|
if countdown_thread: |
|
countdown_thread.join(timeout=1) |
|
logger.info("β
Countdown thread stopped") |
|
|
|
def get_system_info(): |
|
"""Get system information for performance optimization""" |
|
import psutil |
|
cpu_count = mp.cpu_count() |
|
memory_gb = psutil.virtual_memory().total / (1024**3) |
|
logger.info(f"π System Info - CPU Cores: {cpu_count}, Memory: {memory_gb:.1f}GB") |
|
return cpu_count, memory_gb |
|
|
|
def check_property_cache_status(): |
|
"""Check the status of property cache and log information""" |
|
global property_collection |
|
try: |
|
if property_collection: |
|
results = property_collection.get() |
|
if results and results['ids']: |
|
logger.info(f"π Property Cache Status: {len(results['ids'])} properties available") |
|
return len(results['ids']) |
|
else: |
|
logger.warning("π Property Cache Status: No properties in cache") |
|
return 0 |
|
else: |
|
logger.warning("π Property Cache Status: Collection not initialized") |
|
return 0 |
|
except Exception as e: |
|
logger.error(f"Error checking property cache status: {e}") |
|
return 0 |
|
|
|
def ensure_properties_available(): |
|
"""Ensure properties are available in cache, fetch if needed - non-blocking""" |
|
global property_collection |
|
try: |
|
if not property_collection: |
|
logger.info("Collection not initialized, initializing...") |
|
initialize_collection() |
|
return |
|
|
|
results = property_collection.get() |
|
if not results or not results['ids']: |
|
logger.info("π No properties in cache, fetching from API...") |
|
|
|
threading.Thread(target=force_initial_property_fetch, daemon=True).start() |
|
logger.info("β
Property fetch initiated in background") |
|
else: |
|
logger.info(f"β
{len(results['ids'])} properties already available in cache") |
|
except Exception as e: |
|
logger.error(f"Error ensuring properties are available: {e}") |
|
|
|
if __name__ == '__main__': |
|
try: |
|
|
|
cpu_count, memory_gb = get_system_info() |
|
|
|
logger.info(f"π Starting Property Recommendation System with parallel processing optimization") |
|
logger.info(f"π§ Using {min(cpu_count, 8)} workers for parallel processing") |
|
|
|
|
|
ensure_properties_available() |
|
|
|
|
|
property_count = check_property_cache_status() |
|
if property_count == 0: |
|
logger.warning("β οΈ Still no properties in cache after initialization") |
|
|
|
threading.Thread(target=force_initial_property_fetch, daemon=True).start() |
|
logger.info("β
Property fetch retry initiated in background") |
|
else: |
|
logger.info(f"β
Cache has {property_count} properties ready for recommendations") |
|
|
|
|
|
start_countdown() |
|
|
|
|
|
app.run(debug=False, host='0.0.0.0', port=7860, threaded=True) |
|
except KeyboardInterrupt: |
|
logger.info("π Application interrupted by user") |
|
except Exception as e: |
|
logger.error(f"β Fatal error starting application: {str(e)}") |
|
finally: |
|
|
|
stop_countdown_display() |
|
logger.info("π Application stopped") |