Spaces:
Runtime error
Runtime error
import os | |
import logging | |
from flask import Flask, request, jsonify, render_template | |
from flask_cors import CORS | |
from flask_limiter import Limiter | |
from flask_limiter.util import get_remote_address | |
import threading | |
from functools import wraps | |
import sys | |
import time | |
from geopy.distance import geodesic | |
import torch | |
# Add the modules directory to Python path | |
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
# Import modules | |
from modules.config import * | |
from modules.models import * | |
from modules.security import * | |
from modules.audio import * | |
from modules.location_processor import LocationProcessor, set_location | |
from modules.response import * | |
from modules.input_tracker import * | |
from modules.chatbot_processor import ChatbotProcessor | |
# Import specific functions | |
from modules.security import with_user_plan | |
from modules.audio import process_audio_file | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(os.path.join(BASE_DIR, 'app.log')), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Initialize Flask app with correct template folder path | |
app = Flask(__name__, | |
template_folder=os.path.join(BASE_DIR, 'templates'), | |
static_folder=os.path.join(BASE_DIR, 'static') | |
) | |
conversation_context = {} | |
# Configure CORS | |
CORS(app, resources={ | |
r"/*": { | |
"origins": ["*"], # Allow all origins for Hugging Face Spaces | |
"methods": ["GET", "POST", "OPTIONS"], | |
"allow_headers": ["Content-Type", "X-Session-ID"] | |
} | |
}) | |
# Initialize rate limiter | |
limiter = Limiter( | |
app=app, | |
key_func=get_remote_address, | |
default_limits=[f"{MAX_REQUESTS_PER_WINDOW} per minute", "1000 per hour"] | |
) | |
# Initialize components in the correct order | |
print("Loading sentence transformer...") | |
model_embedding = load_sentence_transformer() | |
print("Fetching and caching properties...") | |
properties = fetch_and_cache_properties() | |
if not properties: | |
logger.error("Failed to fetch properties. Please check API connection.") | |
sys.exit(1) | |
print("Loading FAISS index...") | |
index = load_faiss_index() | |
print("Loading PCA model...") | |
pca = load_pca_model() | |
print("Initializing retriever...") | |
retriever = CustomRagRetriever(index, model_embedding, pca) | |
print("Loading tokenizer and LLM model...") | |
tokenizer, model_llm = load_tokenizer_and_model() | |
print("Initializing security components...") | |
security_manager = SecurityManager() | |
query_validator = QueryValidator(model_embedding) | |
print("Initializing input tracker...") | |
input_tracker = UserInputTracker() | |
# Initialize processors | |
chatbot_processor = ChatbotProcessor() | |
def security_check(f): | |
def decorated_function(*args, **kwargs): | |
try: | |
ip_address = request.remote_addr | |
if not security_manager.check_rate_limit(ip_address): | |
return jsonify({"error": "Rate limit exceeded"}), 429 | |
if request.method == 'POST': | |
if not request.is_json: | |
return jsonify({"error": "Content-Type must be application/json"}), 415 | |
return f(*args, **kwargs) | |
except Exception as e: | |
logging.error(f"Security check failed: {str(e)}") | |
return jsonify({"error": "Security check failed"}), 400 | |
return decorated_function | |
def handle_preflight(): | |
if request.method == 'OPTIONS': | |
response = app.make_default_options_response() | |
response.headers.add('Access-Control-Allow-Headers', 'Content-Type, X-Session-ID') | |
response.headers.add('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') | |
return response | |
def index(): | |
print("Rendering index page") | |
return render_template('index.html') | |
def search(): | |
try: | |
data = request.json | |
query = data.get('query') | |
if not query: | |
return jsonify({"error": "Query parameter is missing"}), 400 | |
cleaned_query = query_validator.clean_input(query) | |
if not query_validator.validate_query_length(cleaned_query): | |
return jsonify({"error": "Query too long"}), 400 | |
session_id = data.get('session_id') | |
continue_conversation = data.get('continue', False) | |
if session_id not in conversation_context or not continue_conversation: | |
search_results = retriever.retrieve(cleaned_query) | |
formatted_results = [] | |
for result in search_results: | |
property_info = result['property'] | |
# Get property images from the property info | |
property_images = property_info.get('propertyImages', []) | |
if isinstance(property_images, str): | |
if ',' in property_images: | |
property_images = [img.strip() for img in property_images.split(',')] | |
else: | |
property_images = [property_images] | |
elif property_images is None: | |
property_images = [] | |
property_info = convert_numeric_fields_to_int(property_info) | |
formatted_result = { | |
"PropertyName": property_info.get('PropertyName', 'N/A'), | |
"Address": property_info.get('Address', 'N/A'), | |
"ZipCode": property_info.get('ZipCode', 0), | |
"LeasableSquareFeet": property_info.get('LeasableSquareFeet', 0), | |
"YearBuilt": property_info.get('YearBuilt', 0), | |
"NumberOfRooms": property_info.get('NumberOfRooms', 0), | |
"ParkingSpaces": property_info.get('ParkingSpaces', 0), | |
"PropertyManager": property_info.get('PropertyManager', 'N/A'), | |
"MarketValue": float(property_info.get('MarketValue', 0)), | |
"TaxAssessmentNumber": property_info.get('TaxAssessmentNumber', 'N/A'), | |
"Latitude": float(property_info.get('Latitude', 0)), | |
"Longitude": float(property_info.get('Longitude', 0)), | |
"CreateDate": property_info.get('CreateDate', 'N/A'), | |
"LastModifiedDate": property_info.get('LastModifiedDate', 'N/A'), | |
"City": property_info.get('City', 'N/A'), | |
"State": property_info.get('State', 'N/A'), | |
"Country": property_info.get('Country', 'N/A'), | |
"PropertyType": property_info.get('PropertyType', 'N/A'), | |
"PropertyStatus": property_info.get('PropertyStatus', 'N/A'), | |
"Description": property_info.get('Description', 'N/A'), | |
"ViewNumber": property_info.get('ViewNumber', 0), | |
"Contact": property_info.get('Contact', 0), | |
"TotalSquareFeet": property_info.get('TotalSquareFeet', 0), | |
"IsDeleted": bool(property_info.get('IsDeleted', False)), | |
"Beds": property_info.get('Beds', 0), | |
"Baths": property_info.get('Baths', 0), | |
"AgentName": property_info.get('AgentName', 'N/A'), | |
"AgentPhoneNumber": property_info.get('AgentPhoneNumber', 'N/A'), | |
"AgentEmail": property_info.get('AgentEmail', 'N/A'), | |
"KeyFeatures": property_info.get('KeyFeatures', 'N/A'), | |
"NearbyAmenities": property_info.get('NearbyAmenities', 'N/A'), | |
"propertyImages": property_images, | |
"Distance": result['distance'] | |
} | |
formatted_results.append(formatted_result) | |
conversation_context[session_id] = formatted_results | |
else: | |
formatted_results = conversation_context[session_id] | |
print(f"Returning {len(formatted_results)} search results") | |
if formatted_results: | |
print(f"Sample property images array: {formatted_results[0]['propertyImages']}") | |
return jsonify(formatted_results) | |
except Exception as e: | |
logging.error(f"Error in search endpoint: {str(e)}") | |
return jsonify({"error": "An error occurred processing your request"}), 500 | |
def transcribe(): | |
try: | |
if 'audio' not in request.files: | |
return jsonify({"error": "No audio file provided"}), 400 | |
audio_file = request.files['audio'] | |
# Validate file size (max 10MB) | |
if audio_file.content_length and audio_file.content_length > 10 * 1024 * 1024: | |
return jsonify({"error": "Audio file too large. Maximum size is 10MB"}), 400 | |
# Validate file type | |
allowed_extensions = {'wav', 'mp3', 'ogg', 'webm'} | |
if '.' not in audio_file.filename or \ | |
audio_file.filename.rsplit('.', 1)[1].lower() not in allowed_extensions: | |
return jsonify({"error": "Invalid audio file format. Supported formats: WAV, MP3, OGG, WEBM"}), 400 | |
result = process_audio_file(audio_file) | |
if isinstance(result, tuple) and len(result) == 2: | |
response, status_code = result | |
return jsonify(response), status_code | |
return jsonify(result) | |
except Exception as e: | |
logger.error(f"Error in transcribe endpoint: {str(e)}") | |
return jsonify({"error": "An error occurred processing your audio file"}), 500 | |
def generate(): | |
data = request.json | |
query = data.get('query') | |
session_id = data.get('session_id') | |
continue_conversation = data.get('continue', False) | |
current_plan = get_current_plan() | |
if not query: | |
return jsonify({"error": "Query parameter is missing"}), 400 | |
if session_id in conversation_context and continue_conversation: | |
previous_results = conversation_context[session_id] | |
combined_query = f"Based on previous results:{previous_results}New Query: {query}" | |
response, duration = generate_response(combined_query, tokenizer, model_llm) | |
else: | |
response, duration = generate_response(query, tokenizer, model_llm) | |
conversation_context[session_id] = response | |
print(f"Generated response: {response}") | |
print(f"Time taken to generate response: {duration:.2f} seconds\n") | |
return jsonify({"response": response, "duration": duration}) | |
def handle_set_location(): | |
"""Handle location setting and nearby property search""" | |
try: | |
# Get request data | |
data = request.get_json() | |
print(f"Received data: {data}") | |
# Extract values | |
latitude = float(data.get('latitude', 0)) | |
longitude = float(data.get('longitude', 0)) | |
session_id = data.get('session_id', '') | |
print(f"Extracted values - latitude: {latitude}, longitude: {longitude}, session_id: {session_id}") | |
# Validate coordinates | |
if latitude == 0 or longitude == 0: | |
return jsonify({ | |
"status": "error", | |
"message": "Invalid coordinates" | |
}), 400 | |
# Initialize location processor | |
location_processor = LocationProcessor() | |
# Set location and find nearby properties | |
result = location_processor.set_location(latitude, longitude, session_id) | |
return jsonify(result) | |
except Exception as e: | |
logger.error(f"Error in set_location: {str(e)}") | |
return jsonify({ | |
"status": "error", | |
"message": "Error processing location" | |
}), 500 | |
def check_input_limit(): | |
try: | |
session_id = request.args.get('session_id') | |
if not session_id: | |
return jsonify({"error": "session_id is required"}), 400 | |
current_plan = get_current_plan() | |
remaining_inputs = input_tracker.get_remaining_inputs(session_id, current_plan) | |
usage_stats = input_tracker.get_usage_stats(session_id) | |
return jsonify({ | |
"plan": current_plan.value, | |
"remaining_inputs": remaining_inputs, | |
"total_limit": PLAN_INPUT_LIMITS[current_plan], | |
"usage_stats": usage_stats | |
}) | |
except Exception as e: | |
logging.error(f"Error checking input limit: {str(e)}") | |
return jsonify({"error": "Error checking input limit"}), 500 | |
def recommend(): | |
try: | |
data = request.json | |
query = data.get('query') | |
session_id = data.get('session_id') | |
continue_conversation = data.get('continue', False) | |
current_plan = get_current_plan() | |
if not query: | |
return jsonify({"error": "Query parameter is missing"}), 400 | |
# Clean and validate input | |
cleaned_query = query_validator.clean_input(query) | |
if not query_validator.validate_query_length(cleaned_query): | |
return jsonify({"error": "Query too long"}), 400 | |
# Check if query is related to real estate | |
if not query_validator.is_real_estate_query(cleaned_query): | |
return jsonify({ | |
"response": "I'm a real estate chatbot. I can help you with property-related queries like finding apartments, PG accommodations, hostels, or commercial properties. Please ask me about properties!", | |
"is_real_estate": False | |
}) | |
# Special handling for "hi" query | |
if cleaned_query.lower() == 'hi': | |
return jsonify({ | |
"response": "Do you want to know the properties located near you? (yes/no):", | |
"is_location_query": True | |
}) | |
# Special handling for "yes" after "hi" | |
if cleaned_query.lower() == 'yes': | |
# Get location from the request | |
latitude = data.get('latitude') | |
longitude = data.get('longitude') | |
if not latitude or not longitude: | |
return jsonify({ | |
"error": "Location not available. Please allow location access or set your location first.", | |
"needs_location": True | |
}), 400 | |
# Initialize location processor | |
location_processor = LocationProcessor() | |
# Get nearby properties | |
result = location_processor.set_location(latitude, longitude, session_id) | |
if result["status"] == "success": | |
# Format the response for frontend | |
properties = result["properties"] | |
response_text = "Here are the properties near your location:\n\n" | |
for i, prop in enumerate(properties, 1): | |
response_text += ( | |
f"{i}. {prop.get('PropertyName', 'Unnamed Property')}\n" | |
f" Address: {prop.get('Address', 'No address available')}\n" | |
f" Distance: {prop.get('Distance', 0)} km\n" | |
f" Type: {prop.get('PropertyType', 'Not specified')}\n" | |
f" Price: ${prop.get('MarketValue', 0):,.2f}\n\n" | |
) | |
return jsonify({ | |
# "response": response_text, | |
"properties": properties, | |
"location": result["location"], | |
"is_location_based": True, | |
"status": "success" | |
}) | |
else: | |
return jsonify({ | |
"error": "No properties found near your location", | |
"status": "error" | |
}), 404 | |
# Handle regular queries with RAG-based recommendation | |
if session_id in conversation_context and continue_conversation: | |
previous_results = conversation_context[session_id] | |
combined_query = f"Based on previous results:{previous_results}New Query: {cleaned_query}" | |
raw_results = retriever.retrieve(combined_query, top_k=5) | |
else: | |
raw_results = retriever.retrieve(cleaned_query, top_k=5) | |
# Filter results based on user plan | |
filtered_results = [] | |
for result in raw_results: | |
property_dict = result['property'].to_dict() if hasattr(result['property'], 'to_dict') else result['property'] | |
property_dict = convert_numeric_fields_to_int(property_dict) | |
filtered_property = filter_property_by_plan(property_dict, current_plan) | |
if 'propertyImages' in filtered_property: | |
del filtered_property['propertyImages'] | |
if 'property_image' in filtered_property: | |
del filtered_property['property_image'] | |
if 'image_url' in filtered_property: | |
del filtered_property['image_url'] | |
filtered_results.append({ | |
'property': filtered_property, | |
'propertyImages': result.get('image_url', []) if current_plan == UserPlan.PRO else [], | |
'distance': result.get('distance') | |
}) | |
# Generate response | |
response_text, has_restricted_request = format_llm_prompt( | |
query=combined_query if continue_conversation else cleaned_query, | |
filtered_results=filtered_results, | |
user_plan=current_plan, | |
original_query=cleaned_query | |
) | |
response, duration = generate_response( | |
response_text, | |
tokenizer=tokenizer, | |
model_llm=model_llm, | |
max_new_tokens=512, | |
temperature=0.7, | |
top_k=30, | |
top_p=0.8, | |
repetition_penalty=1.05 | |
) | |
# Store the response in conversation context | |
conversation_context[session_id] = response | |
return jsonify({ | |
"response": response, | |
"duration": duration, | |
"plan_level": current_plan.value, | |
"filtered_results": filtered_results, | |
"input_limit_info": { | |
"remaining_inputs": input_tracker.get_remaining_inputs(session_id, current_plan), | |
"total_limit": PLAN_INPUT_LIMITS[current_plan], | |
"usage_stats": input_tracker.get_usage_stats(session_id) | |
} | |
}) | |
except Exception as e: | |
logging.error(f"Error in recommend endpoint: {str(e)}") | |
return jsonify({"error": "An error occurred processing your request"}), 500 | |
def search_properties(): | |
try: | |
data = request.get_json() | |
query = data.get('query', '') | |
user_location = data.get('user_location') # (latitude, longitude) | |
# Get properties from database or external source | |
properties = get_properties() # Implement this function to get properties | |
# Process query and get filtered properties | |
results = chatbot_processor.process_query( | |
query, properties, user_location | |
) | |
return jsonify({ | |
'status': 'success', | |
'results': results | |
}) | |
except Exception as e: | |
logging.error(f"Error searching properties: {str(e)}") | |
return jsonify({ | |
'status': 'error', | |
'message': str(e) | |
}), 500 | |
def find_similar_properties(): | |
try: | |
data = request.get_json() | |
reference_property = data.get('property') | |
top_k = data.get('top_k', 5) | |
# Get properties from database or external source | |
properties = get_properties() # Implement this function to get properties | |
# Find similar properties | |
results = chatbot_processor.get_similar_properties( | |
reference_property, properties, top_k | |
) | |
return jsonify({ | |
'status': 'success', | |
'results': results | |
}) | |
except Exception as e: | |
logging.error(f"Error finding similar properties: {str(e)}") | |
return jsonify({ | |
'status': 'error', | |
'message': str(e) | |
}), 500 | |
def get_property_landmarks(): | |
try: | |
data = request.get_json() | |
property_data = data.get('property') | |
radius_miles = data.get('radius_miles', 5.0) | |
# Get nearby landmarks | |
landmarks = chatbot_processor.get_nearby_landmarks( | |
property_data, radius_miles | |
) | |
return jsonify({ | |
'status': 'success', | |
'landmarks': landmarks | |
}) | |
except Exception as e: | |
logging.error(f"Error getting property landmarks: {str(e)}") | |
return jsonify({ | |
'status': 'error', | |
'message': str(e) | |
}), 500 | |
def get_property_location(): | |
try: | |
data = request.get_json() | |
property_data = data.get('property') | |
# Get location details | |
location_details = chatbot_processor.get_location_details(property_data) | |
return jsonify({ | |
'status': 'success', | |
'location': location_details | |
}) | |
except Exception as e: | |
logging.error(f"Error getting property location: {str(e)}") | |
return jsonify({ | |
'status': 'error', | |
'message': str(e) | |
}), 500 | |
def ratelimit_handler(e): | |
return jsonify({"error": "Rate limit exceeded"}), 429 | |
def bad_request_handler(e): | |
return jsonify({"error": "Bad request"}), 400 | |
def internal_error_handler(e): | |
return jsonify({"error": "Internal server error"}), 500 | |
# Add helper functions | |
def convert_numeric_fields_to_int(property_dict): | |
"""Convert numeric fields to integers in property dictionary""" | |
numeric_fields = ['Bedrooms', 'Bathrooms', 'SquareFeet', 'YearBuilt', 'Price'] | |
for field in numeric_fields: | |
if field in property_dict and property_dict[field] is not None: | |
try: | |
property_dict[field] = int(float(property_dict[field])) | |
except (ValueError, TypeError): | |
property_dict[field] = None | |
return property_dict | |
if __name__ == '__main__': | |
# Get port from environment variable or default to 7860 for Hugging Face Spaces | |
port = int(os.environ.get('PORT', 7860)) | |
app.run(host='0.0.0.0', port=port) |