testpropertyai / app.py
sksameermujahid's picture
Upload 21 files
5279fd6 verified
import os
import logging
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import threading
from functools import wraps
import sys
import time
from geopy.distance import geodesic
import torch
# Add the modules directory to Python path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import modules
from modules.config import *
from modules.models import *
from modules.security import *
from modules.audio import *
from modules.location_processor import LocationProcessor, set_location
from modules.response import *
from modules.input_tracker import *
from modules.chatbot_processor import ChatbotProcessor
# Import specific functions
from modules.security import with_user_plan
from modules.audio import process_audio_file
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(BASE_DIR, 'app.log')),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Initialize Flask app with correct template folder path
app = Flask(__name__,
template_folder=os.path.join(BASE_DIR, 'templates'),
static_folder=os.path.join(BASE_DIR, 'static')
)
conversation_context = {}
# Configure CORS
CORS(app, resources={
r"/*": {
"origins": ["*"], # Allow all origins for Hugging Face Spaces
"methods": ["GET", "POST", "OPTIONS"],
"allow_headers": ["Content-Type", "X-Session-ID"]
}
})
# Initialize rate limiter
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=[f"{MAX_REQUESTS_PER_WINDOW} per minute", "1000 per hour"]
)
# Initialize components in the correct order
print("Loading sentence transformer...")
model_embedding = load_sentence_transformer()
print("Fetching and caching properties...")
properties = fetch_and_cache_properties()
if not properties:
logger.error("Failed to fetch properties. Please check API connection.")
sys.exit(1)
print("Loading FAISS index...")
index = load_faiss_index()
print("Loading PCA model...")
pca = load_pca_model()
print("Initializing retriever...")
retriever = CustomRagRetriever(index, model_embedding, pca)
print("Loading tokenizer and LLM model...")
tokenizer, model_llm = load_tokenizer_and_model()
print("Initializing security components...")
security_manager = SecurityManager()
query_validator = QueryValidator(model_embedding)
print("Initializing input tracker...")
input_tracker = UserInputTracker()
# Initialize processors
chatbot_processor = ChatbotProcessor()
def security_check(f):
@wraps(f)
def decorated_function(*args, **kwargs):
try:
ip_address = request.remote_addr
if not security_manager.check_rate_limit(ip_address):
return jsonify({"error": "Rate limit exceeded"}), 429
if request.method == 'POST':
if not request.is_json:
return jsonify({"error": "Content-Type must be application/json"}), 415
return f(*args, **kwargs)
except Exception as e:
logging.error(f"Security check failed: {str(e)}")
return jsonify({"error": "Security check failed"}), 400
return decorated_function
@app.before_request
def handle_preflight():
if request.method == 'OPTIONS':
response = app.make_default_options_response()
response.headers.add('Access-Control-Allow-Headers', 'Content-Type, X-Session-ID')
response.headers.add('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
return response
@app.route('/')
def index():
print("Rendering index page")
return render_template('index.html')
@app.route('/search', methods=['POST'])
@security_check
@limiter.limit("30 per minute")
@with_user_plan
def search():
try:
data = request.json
query = data.get('query')
if not query:
return jsonify({"error": "Query parameter is missing"}), 400
cleaned_query = query_validator.clean_input(query)
if not query_validator.validate_query_length(cleaned_query):
return jsonify({"error": "Query too long"}), 400
session_id = data.get('session_id')
continue_conversation = data.get('continue', False)
if session_id not in conversation_context or not continue_conversation:
search_results = retriever.retrieve(cleaned_query)
formatted_results = []
for result in search_results:
property_info = result['property']
# Get property images from the property info
property_images = property_info.get('propertyImages', [])
if isinstance(property_images, str):
if ',' in property_images:
property_images = [img.strip() for img in property_images.split(',')]
else:
property_images = [property_images]
elif property_images is None:
property_images = []
property_info = convert_numeric_fields_to_int(property_info)
formatted_result = {
"PropertyName": property_info.get('PropertyName', 'N/A'),
"Address": property_info.get('Address', 'N/A'),
"ZipCode": property_info.get('ZipCode', 0),
"LeasableSquareFeet": property_info.get('LeasableSquareFeet', 0),
"YearBuilt": property_info.get('YearBuilt', 0),
"NumberOfRooms": property_info.get('NumberOfRooms', 0),
"ParkingSpaces": property_info.get('ParkingSpaces', 0),
"PropertyManager": property_info.get('PropertyManager', 'N/A'),
"MarketValue": float(property_info.get('MarketValue', 0)),
"TaxAssessmentNumber": property_info.get('TaxAssessmentNumber', 'N/A'),
"Latitude": float(property_info.get('Latitude', 0)),
"Longitude": float(property_info.get('Longitude', 0)),
"CreateDate": property_info.get('CreateDate', 'N/A'),
"LastModifiedDate": property_info.get('LastModifiedDate', 'N/A'),
"City": property_info.get('City', 'N/A'),
"State": property_info.get('State', 'N/A'),
"Country": property_info.get('Country', 'N/A'),
"PropertyType": property_info.get('PropertyType', 'N/A'),
"PropertyStatus": property_info.get('PropertyStatus', 'N/A'),
"Description": property_info.get('Description', 'N/A'),
"ViewNumber": property_info.get('ViewNumber', 0),
"Contact": property_info.get('Contact', 0),
"TotalSquareFeet": property_info.get('TotalSquareFeet', 0),
"IsDeleted": bool(property_info.get('IsDeleted', False)),
"Beds": property_info.get('Beds', 0),
"Baths": property_info.get('Baths', 0),
"AgentName": property_info.get('AgentName', 'N/A'),
"AgentPhoneNumber": property_info.get('AgentPhoneNumber', 'N/A'),
"AgentEmail": property_info.get('AgentEmail', 'N/A'),
"KeyFeatures": property_info.get('KeyFeatures', 'N/A'),
"NearbyAmenities": property_info.get('NearbyAmenities', 'N/A'),
"propertyImages": property_images,
"Distance": result['distance']
}
formatted_results.append(formatted_result)
conversation_context[session_id] = formatted_results
else:
formatted_results = conversation_context[session_id]
print(f"Returning {len(formatted_results)} search results")
if formatted_results:
print(f"Sample property images array: {formatted_results[0]['propertyImages']}")
return jsonify(formatted_results)
except Exception as e:
logging.error(f"Error in search endpoint: {str(e)}")
return jsonify({"error": "An error occurred processing your request"}), 500
@app.route('/transcribe', methods=['POST'])
@security_check
def transcribe():
try:
if 'audio' not in request.files:
return jsonify({"error": "No audio file provided"}), 400
audio_file = request.files['audio']
# Validate file size (max 10MB)
if audio_file.content_length and audio_file.content_length > 10 * 1024 * 1024:
return jsonify({"error": "Audio file too large. Maximum size is 10MB"}), 400
# Validate file type
allowed_extensions = {'wav', 'mp3', 'ogg', 'webm'}
if '.' not in audio_file.filename or \
audio_file.filename.rsplit('.', 1)[1].lower() not in allowed_extensions:
return jsonify({"error": "Invalid audio file format. Supported formats: WAV, MP3, OGG, WEBM"}), 400
result = process_audio_file(audio_file)
if isinstance(result, tuple) and len(result) == 2:
response, status_code = result
return jsonify(response), status_code
return jsonify(result)
except Exception as e:
logger.error(f"Error in transcribe endpoint: {str(e)}")
return jsonify({"error": "An error occurred processing your audio file"}), 500
@app.route('/generate', methods=['POST'])
@security_check
@limiter.limit("30 per minute")
@with_user_plan
def generate():
data = request.json
query = data.get('query')
session_id = data.get('session_id')
continue_conversation = data.get('continue', False)
current_plan = get_current_plan()
if not query:
return jsonify({"error": "Query parameter is missing"}), 400
if session_id in conversation_context and continue_conversation:
previous_results = conversation_context[session_id]
combined_query = f"Based on previous results:{previous_results}New Query: {query}"
response, duration = generate_response(combined_query, tokenizer, model_llm)
else:
response, duration = generate_response(query, tokenizer, model_llm)
conversation_context[session_id] = response
print(f"Generated response: {response}")
print(f"Time taken to generate response: {duration:.2f} seconds\n")
return jsonify({"response": response, "duration": duration})
@app.route('/set-location', methods=['POST'])
@security_check
def handle_set_location():
"""Handle location setting and nearby property search"""
try:
# Get request data
data = request.get_json()
print(f"Received data: {data}")
# Extract values
latitude = float(data.get('latitude', 0))
longitude = float(data.get('longitude', 0))
session_id = data.get('session_id', '')
print(f"Extracted values - latitude: {latitude}, longitude: {longitude}, session_id: {session_id}")
# Validate coordinates
if latitude == 0 or longitude == 0:
return jsonify({
"status": "error",
"message": "Invalid coordinates"
}), 400
# Initialize location processor
location_processor = LocationProcessor()
# Set location and find nearby properties
result = location_processor.set_location(latitude, longitude, session_id)
return jsonify(result)
except Exception as e:
logger.error(f"Error in set_location: {str(e)}")
return jsonify({
"status": "error",
"message": "Error processing location"
}), 500
@app.route('/check-input-limit', methods=['GET'])
@security_check
def check_input_limit():
try:
session_id = request.args.get('session_id')
if not session_id:
return jsonify({"error": "session_id is required"}), 400
current_plan = get_current_plan()
remaining_inputs = input_tracker.get_remaining_inputs(session_id, current_plan)
usage_stats = input_tracker.get_usage_stats(session_id)
return jsonify({
"plan": current_plan.value,
"remaining_inputs": remaining_inputs,
"total_limit": PLAN_INPUT_LIMITS[current_plan],
"usage_stats": usage_stats
})
except Exception as e:
logging.error(f"Error checking input limit: {str(e)}")
return jsonify({"error": "Error checking input limit"}), 500
@app.route('/recommend', methods=['POST'])
@security_check
@limiter.limit("30 per minute")
@with_user_plan
def recommend():
try:
data = request.json
query = data.get('query')
session_id = data.get('session_id')
continue_conversation = data.get('continue', False)
current_plan = get_current_plan()
if not query:
return jsonify({"error": "Query parameter is missing"}), 400
# Clean and validate input
cleaned_query = query_validator.clean_input(query)
if not query_validator.validate_query_length(cleaned_query):
return jsonify({"error": "Query too long"}), 400
# Check if query is related to real estate
if not query_validator.is_real_estate_query(cleaned_query):
return jsonify({
"response": "I'm a real estate chatbot. I can help you with property-related queries like finding apartments, PG accommodations, hostels, or commercial properties. Please ask me about properties!",
"is_real_estate": False
})
# Special handling for "hi" query
if cleaned_query.lower() == 'hi':
return jsonify({
"response": "Do you want to know the properties located near you? (yes/no):",
"is_location_query": True
})
# Special handling for "yes" after "hi"
if cleaned_query.lower() == 'yes':
# Get location from the request
latitude = data.get('latitude')
longitude = data.get('longitude')
if not latitude or not longitude:
return jsonify({
"error": "Location not available. Please allow location access or set your location first.",
"needs_location": True
}), 400
# Initialize location processor
location_processor = LocationProcessor()
# Get nearby properties
result = location_processor.set_location(latitude, longitude, session_id)
if result["status"] == "success":
# Format the response for frontend
properties = result["properties"]
response_text = "Here are the properties near your location:\n\n"
for i, prop in enumerate(properties, 1):
response_text += (
f"{i}. {prop.get('PropertyName', 'Unnamed Property')}\n"
f" Address: {prop.get('Address', 'No address available')}\n"
f" Distance: {prop.get('Distance', 0)} km\n"
f" Type: {prop.get('PropertyType', 'Not specified')}\n"
f" Price: ${prop.get('MarketValue', 0):,.2f}\n\n"
)
return jsonify({
# "response": response_text,
"properties": properties,
"location": result["location"],
"is_location_based": True,
"status": "success"
})
else:
return jsonify({
"error": "No properties found near your location",
"status": "error"
}), 404
# Handle regular queries with RAG-based recommendation
if session_id in conversation_context and continue_conversation:
previous_results = conversation_context[session_id]
combined_query = f"Based on previous results:{previous_results}New Query: {cleaned_query}"
raw_results = retriever.retrieve(combined_query, top_k=5)
else:
raw_results = retriever.retrieve(cleaned_query, top_k=5)
# Filter results based on user plan
filtered_results = []
for result in raw_results:
property_dict = result['property'].to_dict() if hasattr(result['property'], 'to_dict') else result['property']
property_dict = convert_numeric_fields_to_int(property_dict)
filtered_property = filter_property_by_plan(property_dict, current_plan)
if 'propertyImages' in filtered_property:
del filtered_property['propertyImages']
if 'property_image' in filtered_property:
del filtered_property['property_image']
if 'image_url' in filtered_property:
del filtered_property['image_url']
filtered_results.append({
'property': filtered_property,
'propertyImages': result.get('image_url', []) if current_plan == UserPlan.PRO else [],
'distance': result.get('distance')
})
# Generate response
response_text, has_restricted_request = format_llm_prompt(
query=combined_query if continue_conversation else cleaned_query,
filtered_results=filtered_results,
user_plan=current_plan,
original_query=cleaned_query
)
response, duration = generate_response(
response_text,
tokenizer=tokenizer,
model_llm=model_llm,
max_new_tokens=512,
temperature=0.7,
top_k=30,
top_p=0.8,
repetition_penalty=1.05
)
# Store the response in conversation context
conversation_context[session_id] = response
return jsonify({
"response": response,
"duration": duration,
"plan_level": current_plan.value,
"filtered_results": filtered_results,
"input_limit_info": {
"remaining_inputs": input_tracker.get_remaining_inputs(session_id, current_plan),
"total_limit": PLAN_INPUT_LIMITS[current_plan],
"usage_stats": input_tracker.get_usage_stats(session_id)
}
})
except Exception as e:
logging.error(f"Error in recommend endpoint: {str(e)}")
return jsonify({"error": "An error occurred processing your request"}), 500
@app.route('/api/properties/search', methods=['POST'])
def search_properties():
try:
data = request.get_json()
query = data.get('query', '')
user_location = data.get('user_location') # (latitude, longitude)
# Get properties from database or external source
properties = get_properties() # Implement this function to get properties
# Process query and get filtered properties
results = chatbot_processor.process_query(
query, properties, user_location
)
return jsonify({
'status': 'success',
'results': results
})
except Exception as e:
logging.error(f"Error searching properties: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/properties/similar', methods=['POST'])
def find_similar_properties():
try:
data = request.get_json()
reference_property = data.get('property')
top_k = data.get('top_k', 5)
# Get properties from database or external source
properties = get_properties() # Implement this function to get properties
# Find similar properties
results = chatbot_processor.get_similar_properties(
reference_property, properties, top_k
)
return jsonify({
'status': 'success',
'results': results
})
except Exception as e:
logging.error(f"Error finding similar properties: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/properties/landmarks', methods=['POST'])
def get_property_landmarks():
try:
data = request.get_json()
property_data = data.get('property')
radius_miles = data.get('radius_miles', 5.0)
# Get nearby landmarks
landmarks = chatbot_processor.get_nearby_landmarks(
property_data, radius_miles
)
return jsonify({
'status': 'success',
'landmarks': landmarks
})
except Exception as e:
logging.error(f"Error getting property landmarks: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/properties/location', methods=['POST'])
def get_property_location():
try:
data = request.get_json()
property_data = data.get('property')
# Get location details
location_details = chatbot_processor.get_location_details(property_data)
return jsonify({
'status': 'success',
'location': location_details
})
except Exception as e:
logging.error(f"Error getting property location: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.errorhandler(429)
def ratelimit_handler(e):
return jsonify({"error": "Rate limit exceeded"}), 429
@app.errorhandler(400)
def bad_request_handler(e):
return jsonify({"error": "Bad request"}), 400
@app.errorhandler(500)
def internal_error_handler(e):
return jsonify({"error": "Internal server error"}), 500
# Add helper functions
def convert_numeric_fields_to_int(property_dict):
"""Convert numeric fields to integers in property dictionary"""
numeric_fields = ['Bedrooms', 'Bathrooms', 'SquareFeet', 'YearBuilt', 'Price']
for field in numeric_fields:
if field in property_dict and property_dict[field] is not None:
try:
property_dict[field] = int(float(property_dict[field]))
except (ValueError, TypeError):
property_dict[field] = None
return property_dict
if __name__ == '__main__':
# Get port from environment variable or default to 7860 for Hugging Face Spaces
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port)