sksameermujahid's picture
Update newapp.py
f558e11 verified
raw
history blame
111 kB
from flask import Flask, render_template, request, jsonify
from flask_cors import CORS
import torch
from transformers import pipeline, CLIPProcessor, CLIPModel, BitsAndBytesConfig
import base64
import io
import re
import json
import numpy as np
from PIL import Image
import fitz # PyMuPDF
import os
from datetime import datetime
import uuid
import requests
from geopy.geocoders import Nominatim
from sentence_transformers import SentenceTransformer, util
import spacy
import pytesseract
from langdetect import detect
from deep_translator import GoogleTranslator
import logging
from functools import lru_cache
import time
import math
from pyngrok import ngrok
import threading
import gc
import psutil
app = Flask(__name__)
CORS(app) # Enable CORS for frontend
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Initialize geocoder
geocoder = Nominatim(user_agent="indian_property_verifier", timeout=10)
# Add memory monitoring function
def monitor_memory():
while True:
process = psutil.Process()
memory_info = process.memory_info()
logger.info(f"Memory usage: {memory_info.rss / 1024 / 1024:.2f} MB")
if memory_info.rss > 2 * 1024 * 1024 * 1024: # If using more than 2GB
logger.warning("High memory usage detected, clearing cache")
clear_model_cache()
time.sleep(300) # Check every 5 minutes
# Start memory monitoring in a separate thread
memory_monitor_thread = threading.Thread(target=monitor_memory, daemon=True)
memory_monitor_thread.start()
# Initialize CLIP model
try:
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
has_clip_model = True
logger.info("CLIP model loaded successfully")
except Exception as e:
logger.error(f"Error loading CLIP model: {str(e)}")
has_clip_model = False
# Initialize sentence transformer
try:
sentence_model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
logger.info("Sentence transformer loaded successfully")
except Exception as e:
logger.error(f"Error loading sentence transformer: {str(e)}")
sentence_model = None
# Initialize spaCy
try:
nlp = spacy.load('en_core_web_md')
logger.info("spaCy model loaded successfully")
except Exception as e:
logger.error(f"Error loading spaCy model: {str(e)}")
nlp = None
def make_json_serializable(obj):
try:
if isinstance(obj, (bool, int, float, str, type(None))):
return obj
elif isinstance(obj, (list, tuple)):
return [make_json_serializable(item) for item in obj]
elif isinstance(obj, dict):
return {str(key): make_json_serializable(value) for key, value in obj.items()}
elif torch.is_tensor(obj):
return obj.item() if obj.numel() == 1 else obj.tolist()
elif np.isscalar(obj):
return obj.item() if hasattr(obj, 'item') else float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return str(obj)
except Exception as e:
logger.error(f"Error serializing object: {str(e)}")
return str(obj)
@app.route('/')
def index():
return render_template('newindex.html')
@app.route('/get-location', methods=['POST'])
def get_location():
try:
data = request.json or {}
latitude = data.get('latitude')
longitude = data.get('longitude')
if not latitude or not longitude:
logger.warning("Missing latitude or longitude")
return jsonify({
'status': 'error',
'message': 'Latitude and longitude are required'
}), 400
# Retry geocoding up to 3 times
for attempt in range(3):
try:
location = geocoder.reverse((latitude, longitude), exactly_one=True)
if location:
address_components = location.raw.get('address', {})
return jsonify({
'status': 'success',
'address': location.address,
'street': address_components.get('road', ''),
'city': address_components.get('city', address_components.get('town', address_components.get('village', ''))),
'state': address_components.get('state', ''),
'country': address_components.get('country', 'India'),
'postal_code': address_components.get('postcode', ''),
'latitude': latitude,
'longitude': longitude
})
logger.warning(f"Geocoding failed on attempt {attempt + 1}")
time.sleep(1) # Wait before retry
except Exception as e:
logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}")
time.sleep(1)
return jsonify({
'status': 'error',
'message': 'Could not determine location after retries'
}), 500
except Exception as e:
logger.error(f"Error in get_location: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/verify', methods=['POST'])
def verify_property():
try:
if not request.form and not request.files:
logger.warning("No form data or files provided")
return jsonify({
'error': 'No data provided',
'status': 'error'
}), 400
data = {
'property_name': request.form.get('property_name', '').strip(),
'property_type': request.form.get('property_type', '').strip(),
'status': request.form.get('status', '').strip(),
'description': request.form.get('description', '').strip(),
'address': request.form.get('address', '').strip(),
'city': request.form.get('city', '').strip(),
'state': request.form.get('state', '').strip(),
'country': request.form.get('country', 'India').strip(),
'zip': request.form.get('zip', '').strip(),
'latitude': request.form.get('latitude', '').strip(),
'longitude': request.form.get('longitude', '').strip(),
'bedrooms': request.form.get('bedrooms', '').strip(),
'bathrooms': request.form.get('bathrooms', '').strip(),
'total_rooms': request.form.get('total_rooms', '').strip(),
'year_built': request.form.get('year_built', '').strip(),
'parking': request.form.get('parking', '').strip(),
'sq_ft': request.form.get('sq_ft', '').strip(),
'market_value': request.form.get('market_value', '').strip(),
'amenities': request.form.get('amenities', '').strip(),
'nearby_landmarks': request.form.get('nearby_landmarks', '').strip(),
'legal_details': request.form.get('legal_details', '').strip()
}
required_fields = ['property_name', 'property_type', 'address', 'city', 'state']
missing_fields = [field for field in required_fields if not data[field]]
if missing_fields:
logger.warning(f"Missing required fields: {', '.join(missing_fields)}")
return jsonify({
'error': f"Missing required fields: {', '.join(missing_fields)}",
'status': 'error'
}), 400
images = []
image_analysis = []
if 'images' in request.files:
image_files = request.files.getlist('images')
for img_file in image_files:
if img_file.filename and img_file.filename.lower().endswith(('.jpg', '.jpeg', '.png')):
try:
img = Image.open(img_file)
buffered = io.BytesIO()
img.save(buffered, format="JPEG")
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
images.append(img_str)
image_analysis.append(analyze_image(img))
except Exception as e:
logger.error(f"Error processing image {img_file.filename}: {str(e)}")
image_analysis.append({'error': str(e), 'is_property_related': False})
pdf_texts = []
pdf_analysis = []
if 'documents' in request.files:
pdf_files = request.files.getlist('documents')
for pdf_file in pdf_files:
if pdf_file.filename and pdf_file.filename.lower().endswith('.pdf'):
try:
pdf_text = extract_pdf_text(pdf_file)
pdf_texts.append({
'filename': pdf_file.filename,
'text': pdf_text
})
pdf_analysis.append(analyze_pdf_content(pdf_text, data))
except Exception as e:
logger.error(f"Error processing PDF {pdf_file.filename}: {str(e)}")
pdf_analysis.append({'error': str(e)})
consolidated_text = f"""
Property Name: {data['property_name']}
Property Type: {data['property_type']}
Status: {data['status']}
Description: {data['description']}
Location: {data['address']}, {data['city']}, {data['state']}, {data['country']}, {data['zip']}
Coordinates: Lat {data['latitude']}, Long {data['longitude']}
Specifications: {data['bedrooms']} bedrooms, {data['bathrooms']} bathrooms, {data['total_rooms']} total rooms
Year Built: {data['year_built']}
Parking: {data['parking']}
Size: {data['sq_ft']} sq. ft.
Market Value: ₹{data['market_value']}
Amenities: {data['amenities']}
Nearby Landmarks: {data['nearby_landmarks']}
Legal Details: {data['legal_details']}
"""
try:
description = data['description']
if description and len(description) > 10:
text_language = detect(description)
if text_language != 'en':
translated_description = GoogleTranslator(source=text_language, target='en').translate(description)
data['description_translated'] = translated_description
else:
data['description_translated'] = description
else:
data['description_translated'] = description
except Exception as e:
logger.error(f"Error in language detection/translation: {str(e)}")
data['description_translated'] = data['description']
summary = generate_property_summary(data)
fraud_classification = classify_fraud(consolidated_text, data)
trust_score, trust_reasoning = generate_trust_score(consolidated_text, image_analysis, pdf_analysis)
suggestions = generate_suggestions(consolidated_text, data)
quality_assessment = assess_text_quality(data['description_translated'])
address_verification = verify_address(data)
cross_validation = perform_cross_validation(data)
location_analysis = analyze_location(data)
price_analysis = analyze_price(data)
legal_analysis = analyze_legal_details(data['legal_details'])
specs_verification = verify_property_specs(data)
market_analysis = analyze_market_value(data)
document_analysis = {
'pdf_count': len(pdf_texts),
'pdf_texts': pdf_texts,
'pdf_analysis': pdf_analysis
}
image_results = {
'image_count': len(images),
'image_analysis': image_analysis
}
report_id = str(uuid.uuid4())
results = {
'report_id': report_id,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'summary': summary,
'fraud_classification': fraud_classification,
'trust_score': {
'score': trust_score,
'reasoning': trust_reasoning
},
'suggestions': suggestions,
'quality_assessment': quality_assessment,
'address_verification': address_verification,
'cross_validation': cross_validation,
'location_analysis': location_analysis,
'price_analysis': price_analysis,
'legal_analysis': legal_analysis,
'document_analysis': document_analysis,
'image_analysis': image_results,
'specs_verification': specs_verification,
'market_analysis': market_analysis,
'images': images
}
return jsonify(make_json_serializable(results))
except Exception as e:
logger.error(f"Error in verify_property: {str(e)}")
return jsonify({
'error': 'Server error occurred. Please try again later.',
'status': 'error',
'details': str(e)
}), 500
def extract_pdf_text(pdf_file):
try:
pdf_document = fitz.Document(stream=pdf_file.read(), filetype="pdf")
text = ""
for page in pdf_document:
text += page.get_text()
pdf_document.close()
return text
except Exception as e:
logger.error(f"Error extracting PDF text: {str(e)}")
return ""
def analyze_image(image):
try:
if has_clip_model:
img_rgb = image.convert('RGB')
inputs = clip_processor(
text=[
"real estate property interior",
"real estate property exterior",
"non-property-related image",
"office space",
"landscape"
],
images=img_rgb,
return_tensors="pt",
padding=True
)
outputs = clip_model(**inputs)
logits_per_image = outputs.logits_per_image
probs = logits_per_image.softmax(dim=1).detach().numpy()[0]
property_related_score = probs[0] + probs[1]
is_property_related = property_related_score > 0.5
quality = assess_image_quality(image)
is_ai_generated = detect_ai_generated_image(image)
return {
'is_property_related': is_property_related,
'property_confidence': float(property_related_score),
'top_predictions': [
{'label': 'property interior', 'confidence': float(probs[0])},
{'label': 'property exterior', 'confidence': float(probs[1])},
{'label': 'non-property', 'confidence': float(probs[2])}
],
'image_quality': quality,
'is_ai_generated': is_ai_generated,
'authenticity_score': 0.95 if not is_ai_generated else 0.60
}
else:
logger.warning("CLIP model unavailable")
return {
'is_property_related': False,
'property_confidence': 0.0,
'top_predictions': [],
'image_quality': assess_image_quality(image),
'is_ai_generated': False,
'authenticity_score': 0.5
}
except Exception as e:
logger.error(f"Error analyzing image: {str(e)}")
return {
'is_property_related': False,
'property_confidence': 0.0,
'top_predictions': [],
'image_quality': {'resolution': 'unknown', 'quality_score': 0},
'is_ai_generated': False,
'authenticity_score': 0.0,
'error': str(e)
}
def detect_ai_generated_image(image):
try:
img_array = np.array(image)
if len(img_array.shape) == 3:
gray = np.mean(img_array, axis=2)
else:
gray = img_array
noise = gray - np.mean(gray)
noise_std = np.std(noise)
width, height = image.size
perfect_dimensions = (width % 64 == 0 and height % 64 == 0)
has_exif = hasattr(image, '_getexif') and image._getexif() is not None
return noise_std < 0.05 or perfect_dimensions or not has_exif
except Exception as e:
logger.error(f"Error detecting AI-generated image: {str(e)}")
return False
def analyze_pdf_content(document_text, property_data):
try:
if not document_text:
return {
'document_type': {'classification': 'unknown', 'confidence': 0.0},
'authenticity': {'assessment': 'could not verify', 'confidence': 0.0},
'key_info': {},
'consistency_score': 0.0,
'is_property_related': False,
'summary': 'Empty document',
'has_signatures': False,
'has_dates': False,
'verification_score': 0.0
}
# Use a more sophisticated model for document classification
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli")
# Enhanced document types with more specific categories
doc_types = [
"property deed", "sales agreement", "mortgage document",
"property tax record", "title document", "khata certificate",
"encumbrance certificate", "lease agreement", "rental agreement",
"property registration document", "building permit", "other document"
]
# Analyze document type with context
doc_context = f"{document_text[:1000]} property_type:{property_data.get('property_type', '')} location:{property_data.get('city', '')}"
doc_result = classifier(doc_context, doc_types)
doc_type = doc_result['labels'][0]
doc_confidence = doc_result['scores'][0]
# Enhanced authenticity check with multiple aspects
authenticity_aspects = [
"authentic legal document",
"questionable document",
"forged document",
"template document",
"official document"
]
authenticity_result = classifier(document_text[:1000], authenticity_aspects)
authenticity = "likely authentic" if authenticity_result['labels'][0] == "authentic legal document" else "questionable"
authenticity_confidence = authenticity_result['scores'][0]
# Extract key information using NLP
key_info = extract_document_key_info(document_text)
# Enhanced consistency check
consistency_score = check_document_consistency(document_text, property_data)
# Property relation check with context
property_context = f"{document_text[:1000]} property:{property_data.get('property_name', '')} type:{property_data.get('property_type', '')}"
is_property_related = check_if_property_related(property_context)['is_related']
# Generate summary using BART
summary = summarize_text(document_text[:2000])
# Enhanced signature and date detection
has_signatures = bool(re.search(r'(?:sign|signature|signed|witness|notary|authorized).{0,50}(?:by|of|for)', document_text.lower()))
has_dates = bool(re.search(r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|\d{4}[/-]\d{1,2}[/-]\d{1,2}', document_text))
# Calculate verification score with weighted components
verification_weights = {
'doc_type': 0.3,
'authenticity': 0.3,
'consistency': 0.2,
'property_relation': 0.1,
'signatures_dates': 0.1
}
verification_score = (
doc_confidence * verification_weights['doc_type'] +
authenticity_confidence * verification_weights['authenticity'] +
consistency_score * verification_weights['consistency'] +
float(is_property_related) * verification_weights['property_relation'] +
float(has_signatures and has_dates) * verification_weights['signatures_dates']
)
return {
'document_type': {'classification': doc_type, 'confidence': float(doc_confidence)},
'authenticity': {'assessment': authenticity, 'confidence': float(authenticity_confidence)},
'key_info': key_info,
'consistency_score': float(consistency_score),
'is_property_related': is_property_related,
'summary': summary,
'has_signatures': has_signatures,
'has_dates': has_dates,
'verification_score': float(verification_score)
}
except Exception as e:
logger.error(f"Error analyzing PDF content: {str(e)}")
return {
'document_type': {'classification': 'unknown', 'confidence': 0.0},
'authenticity': {'assessment': 'could not verify', 'confidence': 0.0},
'key_info': {},
'consistency_score': 0.0,
'is_property_related': False,
'summary': 'Could not analyze document',
'has_signatures': False,
'has_dates': False,
'verification_score': 0.0,
'error': str(e)
}
def check_document_consistency(document_text, property_data):
try:
if not sentence_model:
logger.warning("Sentence model unavailable")
return 0.5
property_text = ' '.join([
property_data.get(key, '') for key in [
'property_name', 'property_type', 'address', 'city',
'state', 'market_value', 'sq_ft', 'bedrooms'
]
])
property_embedding = sentence_model.encode(property_text)
document_embedding = sentence_model.encode(document_text[:1000])
similarity = util.cos_sim(property_embedding, document_embedding)[0][0].item()
return max(0.0, min(1.0, float(similarity)))
except Exception as e:
logger.error(f"Error checking document consistency: {str(e)}")
return 0.0
def extract_document_key_info(text):
try:
info = {}
patterns = {
'property_address': r'(?:property|premises|located at)[:\s]+([^\n.]+)',
'price': r'(?:price|value|amount)[:\s]+(?:Rs\.?|₹)?[\s]*([0-9,.]+)',
'date': r'(?:date|dated|executed on)[:\s]+([^\n.]+\d{4})',
'seller': r'(?:seller|grantor|owner)[:\s]+([^\n.]+)',
'buyer': r'(?:buyer|grantee|purchaser)[:\s]+([^\n.]+)',
'size': r'(?:area|size|extent)[:\s]+([0-9,.]+)[\s]*(?:sq\.?[\s]*(?:ft|feet))',
'registration_number': r'(?:registration|reg\.?|document)[\s]*(?:no\.?|number|#)[:\s]*([A-Za-z0-9\-/]+)'
}
for key, pattern in patterns.items():
match = re.search(pattern, text, re.IGNORECASE)
if match:
info[key] = match.group(1).strip()
return info
except Exception as e:
logger.error(f"Error extracting document key info: {str(e)}")
return {}
def generate_property_summary(data):
try:
# Create a detailed context for summary generation
property_context = f"""
Property Name: {data.get('property_name', '')}
Type: {data.get('property_type', '')}
Status: {data.get('status', '')}
Location: {data.get('address', '')}, {data.get('city', '')}, {data.get('state', '')}, {data.get('country', '')}
Size: {data.get('sq_ft', '')} sq. ft.
Price: ₹{data.get('market_value', '0')}
Bedrooms: {data.get('bedrooms', '')}
Bathrooms: {data.get('bathrooms', '')}
Year Built: {data.get('year_built', '')}
Description: {data.get('description', '')}
"""
# Use BART for summary generation
summarizer = load_model("summarization", "facebook/bart-large-cnn")
# Generate initial summary
summary_result = summarizer(property_context, max_length=150, min_length=50, do_sample=False)
initial_summary = summary_result[0]['summary_text']
# Enhance summary with key features
key_features = []
# Add property type and status
if data.get('property_type') and data.get('status'):
key_features.append(f"{data['property_type']} is {data['status'].lower()}")
# Add location if available
location_parts = []
if data.get('city'):
location_parts.append(data['city'])
if data.get('state'):
location_parts.append(data['state'])
if location_parts:
key_features.append(f"Located in {', '.join(location_parts)}")
# Add size and price if available
if data.get('sq_ft'):
key_features.append(f"Spans {data['sq_ft']} sq. ft.")
if data.get('market_value'):
key_features.append(f"Valued at ₹{data['market_value']}")
# Add rooms information
rooms_info = []
if data.get('bedrooms'):
rooms_info.append(f"{data['bedrooms']} bedroom{'s' if data['bedrooms'] != '1' else ''}")
if data.get('bathrooms'):
rooms_info.append(f"{data['bathrooms']} bathroom{'s' if data['bathrooms'] != '1' else ''}")
if rooms_info:
key_features.append(f"Features {' and '.join(rooms_info)}")
# Add amenities if available
if data.get('amenities'):
key_features.append(f"Amenities: {data['amenities']}")
# Combine initial summary with key features
enhanced_summary = initial_summary
if key_features:
enhanced_summary += " " + ". ".join(key_features) + "."
# Clean up the summary
enhanced_summary = enhanced_summary.replace(" ", " ").strip()
return enhanced_summary
except Exception as e:
logger.error(f"Error generating property summary: {str(e)}")
return "Could not generate summary."
def summarize_text(text):
try:
if not text or len(text.strip()) < 10:
return "No text to summarize."
summarizer = load_model("summarization", "facebook/bart-large-cnn")
input_length = len(text.split())
max_length = max(50, min(150, input_length // 2))
min_length = max(20, input_length // 4)
summary = summarizer(text[:2000], max_length=max_length, min_length=min_length, do_sample=False)
return summary[0]['summary_text']
except Exception as e:
logger.error(f"Error summarizing text: {str(e)}")
return text[:200] + "..." if len(text) > 200 else text
def classify_fraud(text, data=None):
try:
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli")
categories = [
"suspicious pricing pattern",
"potentially fraudulent listing",
"ownership verification issues",
"location verification issues",
"document authenticity issues",
"image authenticity issues",
"urgent pressure tactics",
"inconsistent information",
"missing critical details",
"suspicious contact information"
]
# Create a comprehensive context for analysis
context = f"""
Property Details:
- Name: {data.get('property_name', 'Not provided')}
- Type: {data.get('property_type', 'Not provided')}
- Status: {data.get('property_status', 'Not provided')}
- Price: {data.get('market_value', 'Not provided')}
- Square Footage: {data.get('square_footage', 'Not provided')}
- Year Built: {data.get('year_built', 'Not provided')}
- Location: {data.get('address', 'Not provided')}
- Description: {text}
"""
result = classifier(context, categories, multi_label=True)
# Lower threshold to catch more potential issues
threshold = 0.2
# Categorize risks with more granular levels
high_risk = []
medium_risk = []
low_risk = []
for label, score in zip(result['labels'], result['scores']):
if score > threshold:
if score > 0.7:
high_risk.append((label, score))
elif score > 0.5:
medium_risk.append((label, score))
else:
low_risk.append((label, score))
# Calculate alert score with adjusted weights
alert_score = (
sum(score * 1.0 for _, score in high_risk) +
sum(score * 0.7 for _, score in medium_risk) +
sum(score * 0.4 for _, score in low_risk)
) / max(1, len(result['scores']))
# More granular alert levels
if alert_score > 0.8:
alert_level = 'critical'
elif alert_score > 0.6:
alert_level = 'high'
elif alert_score > 0.4:
alert_level = 'medium'
elif alert_score > 0.2:
alert_level = 'low'
else:
alert_level = 'minimal'
# Enhanced fraud indicators with more specific patterns
fraud_indicators = []
# Price-related patterns
price_patterns = [
(r'suspiciously low price', 0.8),
(r'unusually high price', 0.7),
(r'price too good to be true', 0.9),
(r'urgent sale', 0.6),
(r'must sell quickly', 0.7)
]
# Location-related patterns
location_patterns = [
(r'location mismatch', 0.8),
(r'address inconsistency', 0.7),
(r'wrong neighborhood', 0.6),
(r'incorrect zip code', 0.7)
]
# Document-related patterns
document_patterns = [
(r'missing documents', 0.8),
(r'unverified documents', 0.7),
(r'fake documents', 0.9),
(r'photoshopped documents', 0.8)
]
# Urgency-related patterns
urgency_patterns = [
(r'act now', 0.6),
(r'limited time offer', 0.5),
(r'first come first served', 0.4),
(r'won\'t last long', 0.5)
]
# Check all patterns
all_patterns = price_patterns + location_patterns + document_patterns + urgency_patterns
for pattern, weight in all_patterns:
if re.search(pattern, text.lower()):
fraud_indicators.append({
'pattern': pattern,
'weight': weight,
'context': text[max(0, text.lower().find(pattern)-50):min(len(text), text.lower().find(pattern)+50)]
})
# Additional checks for data inconsistencies
if data:
# Check for suspiciously low price per square foot
try:
price = float(data.get('market_value', 0))
sqft = float(data.get('square_footage', 1))
price_per_sqft = price / sqft
if price_per_sqft < 50: # Unusually low price per square foot
fraud_indicators.append({
'pattern': 'suspiciously low price per square foot',
'weight': 0.8,
'context': f'Price per square foot: ${price_per_sqft:.2f}'
})
except (ValueError, ZeroDivisionError):
pass
# Check for impossible values
try:
year_built = int(data.get('year_built', 0))
if year_built < 1800 or year_built > 2024:
fraud_indicators.append({
'pattern': 'impossible year built',
'weight': 0.9,
'context': f'Year built: {year_built}'
})
except ValueError:
pass
# Check for missing critical information
critical_fields = ['property_name', 'property_type', 'address', 'market_value', 'square_footage']
missing_fields = [field for field in critical_fields if not data.get(field)]
if missing_fields:
fraud_indicators.append({
'pattern': 'missing critical information',
'weight': 0.7,
'context': f'Missing fields: {", ".join(missing_fields)}'
})
return {
'alert_level': alert_level,
'alert_score': alert_score,
'high_risk': high_risk,
'medium_risk': medium_risk,
'low_risk': low_risk,
'fraud_indicators': fraud_indicators
}
except Exception as e:
logger.error(f"Error in fraud classification: {str(e)}")
return {
'alert_level': 'error',
'alert_score': 1.0,
'high_risk': [],
'medium_risk': [],
'low_risk': [],
'fraud_indicators': []
}
def generate_trust_score(text, image_analysis, pdf_analysis):
try:
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli")
aspects = [
"complete information provided",
"verified location",
"consistent data",
"authentic documents",
"authentic images",
"reasonable pricing",
"verified ownership",
"proper documentation"
]
result = classifier(text[:1000], aspects, multi_label=True)
# Much stricter weights with higher emphasis on critical aspects
weights = {
"complete information provided": 0.25,
"verified location": 0.20,
"consistent data": 0.15,
"authentic documents": 0.15,
"authentic images": 0.10,
"reasonable pricing": 0.05,
"verified ownership": 0.05,
"proper documentation": 0.05
}
score = 0
reasoning_parts = []
# Much stricter scoring for each aspect
for label, confidence in zip(result['labels'], result['scores']):
adjusted_confidence = confidence
# Stricter document verification
if label == "authentic documents":
if not pdf_analysis or len(pdf_analysis) == 0:
adjusted_confidence = 0.0
else:
doc_scores = [p.get('verification_score', 0) for p in pdf_analysis]
adjusted_confidence = sum(doc_scores) / max(1, len(doc_scores))
# Heavily penalize if any document has low verification score
if any(score < 0.7 for score in doc_scores):
adjusted_confidence *= 0.4
# Additional penalty for missing documents
if len(doc_scores) < 2:
adjusted_confidence *= 0.5
# Stricter image verification
elif label == "authentic images":
if not image_analysis or len(image_analysis) == 0:
adjusted_confidence = 0.0
else:
img_scores = [i.get('authenticity_score', 0) for i in image_analysis]
adjusted_confidence = sum(img_scores) / max(1, len(img_scores))
# Heavily penalize if any image has low authenticity score
if any(score < 0.8 for score in img_scores):
adjusted_confidence *= 0.4
# Additional penalty for AI-generated images
if any(i.get('is_ai_generated', False) for i in image_analysis):
adjusted_confidence *= 0.5
# Additional penalty for non-property related images
if any(not i.get('is_property_related', False) for i in image_analysis):
adjusted_confidence *= 0.6
# Stricter consistency check
elif label == "consistent data":
# Check for inconsistencies in the data
if "inconsistent" in text.lower() or "suspicious" in text.lower():
adjusted_confidence *= 0.3
# Check for impossible values
if "impossible" in text.lower() or "invalid" in text.lower():
adjusted_confidence *= 0.2
# Check for missing critical information
if "missing" in text.lower() or "not provided" in text.lower():
adjusted_confidence *= 0.4
# Stricter completeness check
elif label == "complete information provided":
# Check for missing critical information
if len(text) < 300 or "not provided" in text.lower() or "missing" in text.lower():
adjusted_confidence *= 0.4
# Check for vague or generic descriptions
if "generic" in text.lower() or "vague" in text.lower():
adjusted_confidence *= 0.5
# Check for suspiciously short descriptions
if len(text) < 150:
adjusted_confidence *= 0.3
score += adjusted_confidence * weights.get(label, 0.1)
reasoning_parts.append(f"{label} ({adjusted_confidence:.0%})")
# Apply additional penalties for suspicious patterns
if "suspicious" in text.lower() or "fraudulent" in text.lower():
score *= 0.5
# Apply penalties for suspiciously low values
if "suspiciously low" in text.lower() or "unusually small" in text.lower():
score *= 0.6
# Apply penalties for inconsistencies
if "inconsistent" in text.lower() or "mismatch" in text.lower():
score *= 0.6
# Apply penalties for missing critical information
if "missing critical" in text.lower() or "incomplete" in text.lower():
score *= 0.7
# Ensure score is between 0 and 100
score = min(100, max(0, int(score * 100)))
reasoning = f"Based on: {', '.join(reasoning_parts)}"
return score, reasoning
except Exception as e:
logger.error(f"Error generating trust score: {str(e)}")
return 20, "Could not assess trust."
def generate_suggestions(text, data=None):
try:
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli")
# Create comprehensive context for analysis
suggestion_context = text
if data:
suggestion_context += f"""
Additional Context:
Property Type: {data.get('property_type', '')}
Location: {data.get('city', '')}, {data.get('state', '')}
Size: {data.get('sq_ft', '')} sq.ft.
Year Built: {data.get('year_built', '')}
"""
# Enhanced suggestion categories based on property context
base_suggestions = {
'documentation': {
'label': "add more documentation",
'categories': [
"complete documentation provided",
"missing essential documents",
"incomplete paperwork",
"documentation needs verification"
],
'weight': 2.0,
'improvements': {
'missing essential documents': [
"Add property deed or title documents",
"Include recent property tax records",
"Attach property registration documents"
],
'incomplete paperwork': [
"Complete all required legal documents",
"Add missing ownership proof",
"Include property survey documents"
]
}
},
'details': {
'label': "enhance property details",
'categories': [
"detailed property information",
"basic information only",
"missing key details",
"comprehensive description"
],
'weight': 1.8,
'improvements': {
'basic information only': [
"Add more details about property features",
"Include information about recent renovations",
"Describe unique selling points"
],
'missing key details': [
"Specify exact built-up area",
"Add floor plan details",
"Include maintenance costs"
]
}
},
'images': {
'label': "improve visual content",
'categories': [
"high quality images provided",
"poor image quality",
"insufficient images",
"missing key area photos"
],
'weight': 1.5,
'improvements': {
'poor image quality': [
"Add high-resolution property photos",
"Include better lighting in images",
"Provide professional photography"
],
'insufficient images': [
"Add more interior photos",
"Include exterior and surrounding area images",
"Add photos of amenities"
]
}
},
'pricing': {
'label': "pricing information",
'categories': [
"detailed pricing breakdown",
"basic price only",
"missing price details",
"unclear pricing terms"
],
'weight': 1.7,
'improvements': {
'basic price only': [
"Add detailed price breakdown",
"Include maintenance charges",
"Specify additional costs"
],
'missing price details': [
"Add price per square foot",
"Include tax implications",
"Specify payment terms"
]
}
},
'location': {
'label': "location details",
'categories': [
"comprehensive location info",
"basic location only",
"missing location details",
"unclear accessibility info"
],
'weight': 1.6,
'improvements': {
'basic location only': [
"Add nearby landmarks and distances",
"Include transportation options",
"Specify neighborhood facilities"
],
'missing location details': [
"Add exact GPS coordinates",
"Include area development plans",
"Specify distance to key facilities"
]
}
}
}
suggestions = []
confidence_scores = []
for aspect, config in base_suggestions.items():
# Analyze each aspect with context
result = classifier(suggestion_context[:1000], config['categories'])
# Get the most relevant category
top_category = result['labels'][0]
confidence = float(result['scores'][0])
# If the category indicates improvement needed (confidence < 0.6)
if confidence < 0.6 and top_category in config['improvements']:
weighted_confidence = confidence * config['weight']
for improvement in config['improvements'][top_category]:
suggestions.append({
'aspect': aspect,
'category': top_category,
'suggestion': improvement,
'confidence': weighted_confidence
})
confidence_scores.append(weighted_confidence)
# Sort suggestions by confidence and priority
suggestions.sort(key=lambda x: x['confidence'], reverse=True)
# Property type specific suggestions
if data and data.get('property_type'):
property_type = data['property_type'].lower()
type_specific_suggestions = {
'residential': [
"Add information about school districts",
"Include details about neighborhood safety",
"Specify parking arrangements"
],
'commercial': [
"Add foot traffic statistics",
"Include zoning information",
"Specify business licenses required"
],
'industrial': [
"Add power supply specifications",
"Include environmental clearances",
"Specify loading/unloading facilities"
],
'land': [
"Add soil testing reports",
"Include development potential analysis",
"Specify available utilities"
]
}
for type_key, type_suggestions in type_specific_suggestions.items():
if type_key in property_type:
for suggestion in type_suggestions:
suggestions.append({
'aspect': 'property_type_specific',
'category': 'type_specific_requirements',
'suggestion': suggestion,
'confidence': 0.8 # High confidence for type-specific suggestions
})
# Add market-based suggestions
if data and data.get('market_value'):
try:
market_value = float(data['market_value'].replace('₹', '').replace(',', ''))
if market_value > 10000000: # High-value property
premium_suggestions = [
"Add virtual tour of the property",
"Include detailed investment analysis",
"Provide historical price trends"
]
for suggestion in premium_suggestions:
suggestions.append({
'aspect': 'premium_property',
'category': 'high_value_requirements',
'suggestion': suggestion,
'confidence': 0.9
})
except ValueError:
pass
# Calculate overall completeness score
completeness_score = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0
completeness_score = min(100, max(0, completeness_score * 100))
return {
'suggestions': suggestions[:10], # Return top 10 suggestions
'completeness_score': completeness_score,
'priority_aspects': [s['aspect'] for s in suggestions[:3]],
'improvement_summary': f"Focus on improving {', '.join([s['aspect'] for s in suggestions[:3]])}",
'total_suggestions': len(suggestions)
}
except Exception as e:
logger.error(f"Error generating suggestions: {str(e)}")
return {
'suggestions': [
{
'aspect': 'general',
'category': 'basic_requirements',
'suggestion': 'Please provide more property details',
'confidence': 0.5
}
],
'completeness_score': 0,
'priority_aspects': ['general'],
'improvement_summary': "Add basic property information",
'total_suggestions': 1
}
def assess_text_quality(text):
try:
if not text or len(text.strip()) < 20:
return {
'assessment': 'insufficient',
'score': 0,
'reasoning': 'Text too short.',
'is_ai_generated': False,
'quality_metrics': {}
}
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli")
# Enhanced quality categories with more specific indicators
quality_categories = [
"detailed and informative",
"adequately detailed",
"basic information",
"vague description",
"misleading content",
"professional listing",
"amateur listing",
"spam-like content",
"template-based content",
"authentic description"
]
# Analyze text with multiple aspects
quality_result = classifier(text[:1000], quality_categories, multi_label=True)
# Get top classifications with confidence scores
top_classifications = []
for label, score in zip(quality_result['labels'][:3], quality_result['scores'][:3]):
if score > 0.3: # Only include if confidence is above 30%
top_classifications.append({
'classification': label,
'confidence': float(score)
})
# AI generation detection with multiple models
ai_check = classifier(text[:1000], ["human-written", "AI-generated", "template-based", "authentic"])
is_ai_generated = (
(ai_check['labels'][0] == "AI-generated" and ai_check['scores'][0] > 0.6) or
(ai_check['labels'][0] == "template-based" and ai_check['scores'][0] > 0.7)
)
# Calculate quality metrics
quality_metrics = {
'detail_level': sum(score for label, score in zip(quality_result['labels'], quality_result['scores'])
if label in ['detailed and informative', 'adequately detailed']),
'professionalism': sum(score for label, score in zip(quality_result['labels'], quality_result['scores'])
if label in ['professional listing', 'authentic description']),
'clarity': sum(score for label, score in zip(quality_result['labels'], quality_result['scores'])
if label not in ['vague description', 'misleading content', 'spam-like content']),
'authenticity': 1.0 - sum(score for label, score in zip(quality_result['labels'], quality_result['scores'])
if label in ['template-based content', 'spam-like content'])
}
# Calculate overall score with weighted metrics
weights = {
'detail_level': 0.3,
'professionalism': 0.25,
'clarity': 0.25,
'authenticity': 0.2
}
score = sum(metric * weights[metric_name] for metric_name, metric in quality_metrics.items())
score = score * 100 # Convert to percentage
# Adjust score for AI-generated content
if is_ai_generated:
score = score * 0.7 # Reduce score by 30% for AI-generated content
# Generate detailed reasoning
reasoning_parts = []
if top_classifications:
primary_class = top_classifications[0]['classification']
reasoning_parts.append(f"Primary assessment: {primary_class}")
if quality_metrics['detail_level'] > 0.7:
reasoning_parts.append("Contains comprehensive details")
elif quality_metrics['detail_level'] > 0.4:
reasoning_parts.append("Contains adequate details")
else:
reasoning_parts.append("Lacks important details")
if quality_metrics['professionalism'] > 0.7:
reasoning_parts.append("Professional listing style")
elif quality_metrics['professionalism'] < 0.4:
reasoning_parts.append("Amateur listing style")
if quality_metrics['clarity'] < 0.5:
reasoning_parts.append("Content clarity issues detected")
if is_ai_generated:
reasoning_parts.append("Content appears to be AI-generated")
return {
'assessment': top_classifications[0]['classification'] if top_classifications else 'could not assess',
'score': int(score),
'reasoning': '. '.join(reasoning_parts),
'is_ai_generated': is_ai_generated,
'quality_metrics': quality_metrics,
'top_classifications': top_classifications
}
except Exception as e:
logger.error(f"Error assessing text quality: {str(e)}")
return {
'assessment': 'could not assess',
'score': 50,
'reasoning': 'Technical error.',
'is_ai_generated': False,
'quality_metrics': {},
'top_classifications': []
}
def verify_address(data):
try:
address_results = {
'address_exists': False,
'pincode_valid': False,
'city_state_match': False,
'coordinates_match': False,
'confidence': 0.0,
'issues': [],
'verification_score': 0.0
}
if data['zip']:
try:
response = requests.get(f"https://api.postalpincode.in/pincode/{data['zip']}", timeout=5)
if response.status_code == 200:
pin_data = response.json()
if pin_data[0]['Status'] == 'Success':
address_results['pincode_valid'] = True
post_offices = pin_data[0]['PostOffice']
cities = {po['Name'].lower() for po in post_offices}
states = {po['State'].lower() for po in post_offices}
if data['city'].lower() in cities or data['state'].lower() in states:
address_results['city_state_match'] = True
else:
address_results['issues'].append("City/state may not match pincode")
else:
address_results['issues'].append(f"Invalid pincode: {data['zip']}")
else:
address_results['issues'].append("Pincode API error")
except Exception as e:
logger.error(f"Pincode API error: {str(e)}")
address_results['issues'].append("Pincode validation failed")
full_address = ', '.join(filter(None, [data['address'], data['city'], data['state'], data['country'], data['zip']]))
for attempt in range(3):
try:
location = geocoder.geocode(full_address)
if location:
address_results['address_exists'] = True
address_results['confidence'] = 0.9
if data['latitude'] and data['longitude']:
try:
provided_coords = (float(data['latitude']), float(data['longitude']))
geocoded_coords = (location.latitude, location.longitude)
from geopy.distance import distance
dist = distance(provided_coords, geocoded_coords).km
address_results['coordinates_match'] = dist < 1.0
if not address_results['coordinates_match']:
address_results['issues'].append(f"Coordinates {dist:.2f}km off")
except:
address_results['issues'].append("Invalid coordinates")
break
time.sleep(1)
except Exception as e:
logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}")
time.sleep(1)
else:
address_results['issues'].append("Address geocoding failed")
verification_points = (
address_results['address_exists'] * 0.4 +
address_results['pincode_valid'] * 0.3 +
address_results['city_state_match'] * 0.2 +
address_results['coordinates_match'] * 0.1
)
address_results['verification_score'] = verification_points
return address_results
except Exception as e:
logger.error(f"Error verifying address: {str(e)}")
address_results['issues'].append(str(e))
return address_results
def perform_cross_validation(data):
try:
cross_checks = []
# Check bedroom count consistency
try:
bedrooms = int(data['bedrooms']) if data['bedrooms'] else 0
desc_bhk = re.findall(r'(\d+)\s*bhk', data['description'].lower())
if desc_bhk and int(desc_bhk[0]) != bedrooms:
cross_checks.append({
'check': 'bedroom_count',
'status': 'inconsistent',
'message': f"Description mentions {desc_bhk[0]} BHK, form says {bedrooms}"
})
else:
cross_checks.append({
'check': 'bedroom_count',
'status': 'consistent',
'message': f"Bedrooms: {bedrooms}"
})
except:
cross_checks.append({
'check': 'bedroom_count',
'status': 'invalid',
'message': 'Invalid bedroom data'
})
# Check room count consistency
try:
bedrooms = int(data['bedrooms']) if data['bedrooms'] else 0
bathrooms = float(data['bathrooms']) if data['bathrooms'] else 0
total_rooms = int(data['total_rooms']) if data['total_rooms'] else 0
# More thorough room count validation
if total_rooms > 0:
if total_rooms < bedrooms + bathrooms:
cross_checks.append({
'check': 'room_count',
'status': 'inconsistent',
'message': f"Total rooms ({total_rooms}) less than bedrooms ({bedrooms}) + bathrooms ({bathrooms})"
})
elif total_rooms > bedrooms + bathrooms + 5: # Allow for some extra rooms
cross_checks.append({
'check': 'room_count',
'status': 'suspicious',
'message': f"Total rooms ({total_rooms}) seems unusually high compared to bedrooms ({bedrooms}) + bathrooms ({bathrooms})"
})
else:
cross_checks.append({
'check': 'room_count',
'status': 'consistent',
'message': f"Rooms consistent: {total_rooms} total, {bedrooms} bedrooms, {bathrooms} bathrooms"
})
else:
cross_checks.append({
'check': 'room_count',
'status': 'missing',
'message': 'Total room count not provided'
})
except:
cross_checks.append({
'check': 'room_count',
'status': 'invalid',
'message': 'Invalid room count data'
})
# Check year built consistency
try:
year_built = int(data['year_built']) if data['year_built'] else 0
current_year = datetime.now().year
if year_built > 0:
if year_built > current_year:
cross_checks.append({
'check': 'year_built',
'status': 'invalid',
'message': f"Year built ({year_built}) is in the future"
})
elif year_built < 1800:
cross_checks.append({
'check': 'year_built',
'status': 'suspicious',
'message': f"Year built ({year_built}) seems unusually old"
})
elif current_year - year_built > 200:
cross_checks.append({
'check': 'year_built',
'status': 'suspicious',
'message': f"Property age ({current_year - year_built} years) seems unusually old"
})
else:
cross_checks.append({
'check': 'year_built',
'status': 'reasonable',
'message': f"Year built reasonable: {year_built}"
})
else:
cross_checks.append({
'check': 'year_built',
'status': 'missing',
'message': 'Year built not provided'
})
except:
cross_checks.append({
'check': 'year_built',
'status': 'invalid',
'message': 'Invalid year built data'
})
# Check square footage consistency
try:
sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data['sq_ft'] else 0
bedrooms = int(data['bedrooms']) if data['bedrooms'] else 0
if sq_ft > 0 and bedrooms > 0:
sq_ft_per_bedroom = sq_ft / bedrooms
if sq_ft_per_bedroom < 50: # Unusually small per bedroom
cross_checks.append({
'check': 'sq_ft_per_bedroom',
'status': 'suspicious',
'message': f"Square footage per bedroom ({sq_ft_per_bedroom:.1f} sq.ft.) seems unusually small"
})
elif sq_ft_per_bedroom > 1000: # Unusually large per bedroom
cross_checks.append({
'check': 'sq_ft_per_bedroom',
'status': 'suspicious',
'message': f"Square footage per bedroom ({sq_ft_per_bedroom:.1f} sq.ft.) seems unusually large"
})
else:
cross_checks.append({
'check': 'sq_ft_per_bedroom',
'status': 'reasonable',
'message': f"Square footage per bedroom ({sq_ft_per_bedroom:.1f} sq.ft.) is reasonable"
})
elif sq_ft > 0:
cross_checks.append({
'check': 'sq_ft',
'status': 'incomplete',
'message': f"Square footage provided: {sq_ft} sq.ft., but bedroom count missing"
})
elif bedrooms > 0:
cross_checks.append({
'check': 'sq_ft',
'status': 'missing',
'message': f"Square footage not provided, but {bedrooms} bedrooms listed"
})
else:
cross_checks.append({
'check': 'sq_ft',
'status': 'missing',
'message': 'Square footage not provided'
})
except:
cross_checks.append({
'check': 'sq_ft',
'status': 'invalid',
'message': 'Invalid square footage data'
})
# Check price per square foot
try:
market_value = float(data['market_value'].replace('₹', '').replace(',', '')) if data['market_value'] else 0
sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data['sq_ft'] else 0
if market_value > 0 and sq_ft > 0:
price_per_sqft = market_value / sq_ft
# Check for suspiciously low price per sq ft
if price_per_sqft < 100:
cross_checks.append({
'check': 'price_per_sqft',
'status': 'suspiciously low',
'message': f"Price per sq.ft.: ₹{price_per_sqft:.2f} is suspiciously low"
})
# Check for suspiciously high price per sq ft
elif price_per_sqft > 50000:
cross_checks.append({
'check': 'price_per_sqft',
'status': 'suspiciously high',
'message': f"Price per sq.ft.: ₹{price_per_sqft:.2f} is suspiciously high"
})
else:
cross_checks.append({
'check': 'price_per_sqft',
'status': 'reasonable',
'message': f"Price per sq.ft.: ₹{price_per_sqft:.2f} is reasonable"
})
elif market_value > 0:
cross_checks.append({
'check': 'price_per_sqft',
'status': 'incomplete',
'message': f"Market value provided: ₹{market_value:,.2f}, but square footage missing"
})
elif sq_ft > 0:
cross_checks.append({
'check': 'price_per_sqft',
'status': 'incomplete',
'message': f"Square footage provided: {sq_ft} sq.ft., but market value missing"
})
else:
cross_checks.append({
'check': 'price_per_sqft',
'status': 'missing',
'message': 'Price per sq.ft. cannot be calculated (missing data)'
})
except:
cross_checks.append({
'check': 'price_per_sqft',
'status': 'invalid',
'message': 'Invalid price per sq.ft. data'
})
# Check location consistency
try:
latitude = float(data['latitude']) if data['latitude'] else 0
longitude = float(data['longitude']) if data['longitude'] else 0
address = data['address'].lower() if data['address'] else ''
city = data['city'].lower() if data['city'] else ''
state = data['state'].lower() if data['state'] else ''
country = data['country'].lower() if data['country'] else 'india'
# Check if coordinates are within India
if latitude != 0 and longitude != 0:
if 6.5 <= latitude <= 35.5 and 68.1 <= longitude <= 97.4:
cross_checks.append({
'check': 'coordinates',
'status': 'valid',
'message': 'Coordinates within India'
})
else:
cross_checks.append({
'check': 'coordinates',
'status': 'invalid',
'message': 'Coordinates outside India'
})
else:
cross_checks.append({
'check': 'coordinates',
'status': 'missing',
'message': 'Coordinates not provided'
})
# Check if address contains city and state
if address and city and state:
if city in address and state in address:
cross_checks.append({
'check': 'address_consistency',
'status': 'consistent',
'message': 'Address contains city and state'
})
else:
cross_checks.append({
'check': 'address_consistency',
'status': 'inconsistent',
'message': 'Address does not contain city or state'
})
else:
cross_checks.append({
'check': 'address_consistency',
'status': 'incomplete',
'message': 'Address consistency check incomplete (missing data)'
})
except:
cross_checks.append({
'check': 'location',
'status': 'invalid',
'message': 'Invalid location data'
})
# Check property type consistency
try:
property_type = data['property_type'].lower() if data['property_type'] else ''
description = data['description'].lower() if data['description'] else ''
if property_type and description:
property_types = ['apartment', 'house', 'condo', 'townhouse', 'villa', 'land', 'commercial']
found_types = [pt for pt in property_types if pt in description]
if found_types and property_type not in found_types:
cross_checks.append({
'check': 'property_type',
'status': 'inconsistent',
'message': f"Description mentions {', '.join(found_types)}, but property type is {property_type}"
})
else:
cross_checks.append({
'check': 'property_type',
'status': 'consistent',
'message': f"Property type consistent: {property_type}"
})
else:
cross_checks.append({
'check': 'property_type',
'status': 'incomplete',
'message': 'Property type consistency check incomplete (missing data)'
})
except:
cross_checks.append({
'check': 'property_type',
'status': 'invalid',
'message': 'Invalid property type data'
})
# Check for suspiciously low market value
try:
market_value = float(data['market_value'].replace('₹', '').replace(',', '')) if data['market_value'] else 0
property_type = data['property_type'].lower() if data['property_type'] else ''
if market_value > 0 and property_type:
# Define minimum reasonable values for different property types
min_values = {
'apartment': 500000,
'house': 1000000,
'condo': 800000,
'townhouse': 900000,
'villa': 2000000,
'land': 300000,
'commercial': 2000000
}
min_value = min_values.get(property_type, 500000)
if market_value < min_value:
cross_checks.append({
'check': 'market_value',
'status': 'suspiciously low',
'message': f"Market value (₹{market_value:,.2f}) seems suspiciously low for a {property_type}"
})
else:
cross_checks.append({
'check': 'market_value',
'status': 'reasonable',
'message': f"Market value (₹{market_value:,.2f}) is reasonable for a {property_type}"
})
elif market_value > 0:
cross_checks.append({
'check': 'market_value',
'status': 'incomplete',
'message': f"Market value provided: ₹{market_value:,.2f}, but property type missing"
})
else:
cross_checks.append({
'check': 'market_value',
'status': 'missing',
'message': 'Market value not provided'
})
except:
cross_checks.append({
'check': 'market_value',
'status': 'invalid',
'message': 'Invalid market value data'
})
return cross_checks
except Exception as e:
logger.error(f"Error performing cross validation: {str(e)}")
return [{
'check': 'cross_validation',
'status': 'error',
'message': f'Error performing cross validation: {str(e)}'
}]
def analyze_location(data):
try:
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli")
location_text = ' '.join(filter(None, [
data['address'], data['city'], data['state'], data['country'],
data['zip'], f"Lat: {data['latitude']}", f"Long: {data['longitude']}",
data['nearby_landmarks']
]))
categories = ["complete", "partial", "minimal", "missing"]
result = classifier(location_text, categories)
location_quality = "unknown"
if data['city'] and data['state']:
for attempt in range(3):
try:
location = geocoder.geocode(f"{data['city']}, {data['state']}, India")
if location:
location_quality = "verified"
break
time.sleep(1)
except:
time.sleep(1)
else:
location_quality = "unverified"
coord_check = "missing"
if data['latitude'] and data['longitude']:
try:
lat, lng = float(data['latitude']), float(data['longitude'])
coord_check = "in_india" if 6.5 <= lat <= 37.5 and 68.0 <= lng <= 97.5 else "outside_india"
except:
coord_check = "invalid"
completeness = calculate_location_completeness(data)
assessment = "complete" if completeness >= 80 else "partial" if completeness >= 50 else "minimal"
return {
'assessment': assessment,
'confidence': float(result['scores'][0]),
'coordinates_check': coord_check,
'landmarks_provided': bool(data['nearby_landmarks']),
'completeness_score': completeness,
'location_quality': location_quality
}
except Exception as e:
logger.error(f"Error analyzing location: {str(e)}")
return {
'assessment': 'error',
'confidence': 0.0,
'coordinates_check': 'error',
'landmarks_provided': False,
'completeness_score': 0,
'location_quality': 'error'
}
def calculate_location_completeness(data):
fields = ['address', 'city', 'state', 'country', 'zip', 'latitude', 'longitude', 'nearby_landmarks']
return int((sum(1 for f in fields if data[f]) / len(fields)) * 100)
def analyze_price(data):
try:
price_str = data['market_value'].replace('$', '').replace(',', '').strip()
price = float(price_str) if price_str else 0
sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data['sq_ft'] else 0
price_per_sqft = price / sq_ft if sq_ft else 0
if not price:
return {
'assessment': 'no price',
'confidence': 0.0,
'price': 0,
'formatted_price': '₹0',
'price_per_sqft': 0,
'formatted_price_per_sqft': '₹0',
'price_range': 'unknown',
'location_price_assessment': 'cannot assess',
'has_price': False,
'market_trends': {},
'price_factors': {},
'risk_indicators': []
}
# Use a more sophisticated model for price analysis
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli")
# Create a detailed context for price analysis
price_context = f"""
Property Type: {data.get('property_type', '')}
Location: {data.get('city', '')}, {data.get('state', '')}
Size: {sq_ft} sq.ft.
Price: ₹{price:,.2f}
Price per sq.ft.: ₹{price_per_sqft:,.2f}
Property Status: {data.get('status', '')}
Year Built: {data.get('year_built', '')}
Bedrooms: {data.get('bedrooms', '')}
Bathrooms: {data.get('bathrooms', '')}
Amenities: {data.get('amenities', '')}
"""
# Enhanced price categories with more specific indicators
price_categories = [
"reasonable market price",
"suspiciously low price",
"suspiciously high price",
"average market price",
"luxury property price",
"budget property price",
"premium property price",
"mid-range property price",
"overpriced for location",
"underpriced for location",
"price matches amenities",
"price matches property age",
"price matches location value",
"price matches property condition",
"price matches market trends"
]
# Analyze price with multiple aspects
price_result = classifier(price_context, price_categories, multi_label=True)
# Get top classifications with enhanced confidence calculation
top_classifications = []
for label, score in zip(price_result['labels'][:5], price_result['scores'][:5]):
if score > 0.25: # Lower threshold for better sensitivity
top_classifications.append({
'classification': label,
'confidence': float(score)
})
# Determine price range based on AI classification and market data
price_range = 'unknown'
if top_classifications:
primary_class = top_classifications[0]['classification']
if 'luxury' in primary_class:
price_range = 'luxury'
elif 'premium' in primary_class:
price_range = 'premium'
elif 'mid-range' in primary_class:
price_range = 'mid_range'
elif 'budget' in primary_class:
price_range = 'budget'
# Enhanced location-specific price assessment
location_assessment = "unknown"
market_trends = {}
if data.get('city') and price_per_sqft:
city_lower = data['city'].lower()
metro_cities = ["mumbai", "delhi", "bangalore", "hyderabad", "chennai", "kolkata", "pune"]
# Define price ranges for different city tiers
if any(city in city_lower for city in metro_cities):
market_trends = {
'city_tier': 'metro',
'avg_price_range': {
'min': 5000,
'max': 30000,
'trend': 'stable'
},
'price_per_sqft': {
'current': price_per_sqft,
'market_avg': 15000,
'deviation': abs(price_per_sqft - 15000) / 15000 * 100
}
}
location_assessment = (
"reasonable" if 5000 <= price_per_sqft <= 30000 else
"suspiciously low" if price_per_sqft < 5000 else
"suspiciously high"
)
else:
market_trends = {
'city_tier': 'non-metro',
'avg_price_range': {
'min': 1500,
'max': 15000,
'trend': 'stable'
},
'price_per_sqft': {
'current': price_per_sqft,
'market_avg': 7500,
'deviation': abs(price_per_sqft - 7500) / 7500 * 100
}
}
location_assessment = (
"reasonable" if 1500 <= price_per_sqft <= 15000 else
"suspiciously low" if price_per_sqft < 1500 else
"suspiciously high"
)
# Enhanced price analysis factors
price_factors = {}
risk_indicators = []
# Property age factor
try:
year_built = int(data.get('year_built', 0))
current_year = datetime.now().year
property_age = current_year - year_built
if property_age > 0:
depreciation_factor = max(0.5, 1 - (property_age * 0.01)) # 1% depreciation per year, min 50%
price_factors['age_factor'] = {
'property_age': property_age,
'depreciation_factor': depreciation_factor,
'impact': 'high' if property_age > 30 else 'medium' if property_age > 15 else 'low'
}
except:
price_factors['age_factor'] = {'error': 'Invalid year built'}
# Size factor
if sq_ft > 0:
size_factor = {
'size': sq_ft,
'price_per_sqft': price_per_sqft,
'efficiency': 'high' if 800 <= sq_ft <= 2000 else 'medium' if 500 <= sq_ft <= 3000 else 'low'
}
price_factors['size_factor'] = size_factor
# Add risk indicators based on size
if sq_ft < 300:
risk_indicators.append('Unusually small property size')
elif sq_ft > 10000:
risk_indicators.append('Unusually large property size')
# Amenities factor
if data.get('amenities'):
amenities_list = [a.strip() for a in data['amenities'].split(',')]
amenities_score = min(1.0, len(amenities_list) * 0.1) # 10% per amenity, max 100%
price_factors['amenities_factor'] = {
'count': len(amenities_list),
'score': amenities_score,
'impact': 'high' if amenities_score > 0.7 else 'medium' if amenities_score > 0.4 else 'low'
}
# Calculate overall confidence with weighted factors
confidence_weights = {
'primary_classification': 0.3,
'location_assessment': 0.25,
'age_factor': 0.2,
'size_factor': 0.15,
'amenities_factor': 0.1
}
confidence_scores = []
# Primary classification confidence
if top_classifications:
confidence_scores.append(price_result['scores'][0] * confidence_weights['primary_classification'])
# Location assessment confidence
location_confidence = 0.8 if location_assessment == "reasonable" else 0.4
confidence_scores.append(location_confidence * confidence_weights['location_assessment'])
# Age factor confidence
if 'age_factor' in price_factors and 'depreciation_factor' in price_factors['age_factor']:
age_confidence = price_factors['age_factor']['depreciation_factor']
confidence_scores.append(age_confidence * confidence_weights['age_factor'])
# Size factor confidence
if 'size_factor' in price_factors:
size_confidence = 0.8 if price_factors['size_factor']['efficiency'] == 'high' else 0.6
confidence_scores.append(size_confidence * confidence_weights['size_factor'])
# Amenities factor confidence
if 'amenities_factor' in price_factors:
amenities_confidence = price_factors['amenities_factor']['score']
confidence_scores.append(amenities_confidence * confidence_weights['amenities_factor'])
overall_confidence = sum(confidence_scores) / sum(confidence_weights.values())
return {
'assessment': top_classifications[0]['classification'] if top_classifications else 'could not classify',
'confidence': float(overall_confidence),
'price': price,
'formatted_price': f"₹{price:,.0f}",
'price_per_sqft': price_per_sqft,
'formatted_price_per_sqft': f"₹{price_per_sqft:,.2f}",
'price_range': price_range,
'location_price_assessment': location_assessment,
'has_price': True,
'market_trends': market_trends,
'price_factors': price_factors,
'risk_indicators': risk_indicators,
'top_classifications': top_classifications
}
except Exception as e:
logger.error(f"Error analyzing price: {str(e)}")
return {
'assessment': 'error',
'confidence': 0.0,
'price': 0,
'formatted_price': '₹0',
'price_per_sqft': 0,
'formatted_price_per_sqft': '₹0',
'price_range': 'unknown',
'location_price_assessment': 'error',
'has_price': False,
'market_trends': {},
'price_factors': {},
'risk_indicators': [],
'top_classifications': []
}
def analyze_legal_details(legal_text):
try:
if not legal_text or len(legal_text.strip()) < 5:
return {
'assessment': 'insufficient',
'confidence': 0.0,
'summary': 'No legal details provided',
'completeness_score': 0,
'potential_issues': False,
'legal_metrics': {},
'reasoning': 'No legal details provided for analysis',
'top_classifications': []
}
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli")
# Enhanced legal categories with more specific indicators
categories = [
"comprehensive legal documentation",
"basic legal documentation",
"missing critical legal details",
"potential legal issues",
"standard property documentation",
"title verification documents",
"encumbrance certificates",
"property tax records",
"building permits",
"land use certificates",
"clear title documentation",
"property registration documents",
"ownership transfer documents",
"legal compliance certificates",
"property dispute records"
]
# Create a more detailed context for analysis
legal_context = f"""
Legal Documentation Analysis:
{legal_text[:1000]}
Key aspects to verify:
- Title and ownership documentation
- Property registration status
- Tax compliance
- Building permits and approvals
- Land use compliance
- Encumbrance status
- Dispute history
"""
# Analyze legal text with multiple aspects
legal_result = classifier(legal_context, categories, multi_label=True)
# Get top classifications with confidence scores
top_classifications = []
for label, score in zip(legal_result['labels'][:3], legal_result['scores'][:3]):
if score > 0.3: # Only include if confidence is above 30%
top_classifications.append({
'classification': label,
'confidence': float(score)
})
# Generate summary using BART
summary = summarize_text(legal_text[:1000])
# Calculate legal metrics with weighted scoring
legal_metrics = {
'completeness': sum(score for label, score in zip(legal_result['labels'], legal_result['scores'])
if label in ['comprehensive legal documentation', 'standard property documentation']),
'documentation_quality': sum(score for label, score in zip(legal_result['labels'], legal_result['scores'])
if label in ['title verification documents', 'encumbrance certificates', 'clear title documentation']),
'compliance': sum(score for label, score in zip(legal_result['labels'], legal_result['scores'])
if label in ['building permits', 'land use certificates', 'legal compliance certificates']),
'risk_level': sum(score for label, score in zip(legal_result['labels'], legal_result['scores'])
if label in ['missing critical legal details', 'potential legal issues', 'property dispute records'])
}
# Calculate completeness score with weighted components
completeness_score = (
legal_metrics['completeness'] * 0.4 +
legal_metrics['documentation_quality'] * 0.4 +
legal_metrics['compliance'] * 0.2
) * 100
# Determine if there are potential issues with threshold
potential_issues = legal_metrics['risk_level'] > 0.3
# Generate detailed reasoning with specific points
reasoning_parts = []
# Primary assessment
if top_classifications:
primary_class = top_classifications[0]['classification']
confidence = top_classifications[0]['confidence']
reasoning_parts.append(f"Primary assessment: {primary_class} (confidence: {confidence:.0%})")
# Documentation completeness
if legal_metrics['completeness'] > 0.7:
reasoning_parts.append("Comprehensive legal documentation present")
elif legal_metrics['completeness'] > 0.4:
reasoning_parts.append("Basic legal documentation present")
else:
reasoning_parts.append("Insufficient legal documentation")
# Documentation quality
if legal_metrics['documentation_quality'] > 0.6:
reasoning_parts.append("Quality documentation verified (title, encumbrance)")
elif legal_metrics['documentation_quality'] > 0.3:
reasoning_parts.append("Basic documentation quality verified")
# Compliance status
if legal_metrics['compliance'] > 0.6:
reasoning_parts.append("Full compliance documentation present")
elif legal_metrics['compliance'] > 0.3:
reasoning_parts.append("Partial compliance documentation present")
# Risk assessment
if potential_issues:
if legal_metrics['risk_level'] > 0.6:
reasoning_parts.append("High risk: Multiple potential legal issues detected")
else:
reasoning_parts.append("Moderate risk: Some potential legal issues detected")
else:
reasoning_parts.append("No significant legal issues detected")
# Calculate overall confidence
overall_confidence = min(1.0, (
legal_metrics['completeness'] * 0.4 +
legal_metrics['documentation_quality'] * 0.4 +
(1 - legal_metrics['risk_level']) * 0.2
))
return {
'assessment': top_classifications[0]['classification'] if top_classifications else 'could not assess',
'confidence': float(overall_confidence),
'summary': summary,
'completeness_score': int(completeness_score),
'potential_issues': potential_issues,
'legal_metrics': legal_metrics,
'reasoning': '. '.join(reasoning_parts),
'top_classifications': top_classifications
}
except Exception as e:
logger.error(f"Error analyzing legal details: {str(e)}")
return {
'assessment': 'could not assess',
'confidence': 0.0,
'summary': 'Error analyzing legal details',
'completeness_score': 0,
'potential_issues': False,
'legal_metrics': {},
'reasoning': 'Technical error occurred during analysis',
'top_classifications': []
}
def verify_property_specs(data):
try:
specs_verification = {
'bedrooms_reasonable': True,
'bathrooms_reasonable': True,
'total_rooms_reasonable': True,
'parking_reasonable': True,
'sq_ft_reasonable': True,
'market_value_reasonable': True,
'issues': [],
'verification_score': 0.0
}
# Validate bedrooms
try:
bedrooms = int(float(data['bedrooms'])) if data['bedrooms'] else 0
if bedrooms > 20 or bedrooms < 0:
specs_verification['bedrooms_reasonable'] = False
specs_verification['issues'].append(f"Invalid number of bedrooms: {bedrooms}. Should be between 0 and 20.")
except ValueError:
specs_verification['bedrooms_reasonable'] = False
specs_verification['issues'].append("Invalid bedroom data: must be a number")
# Validate bathrooms
try:
bathrooms = float(data['bathrooms']) if data['bathrooms'] else 0
if bathrooms > 15 or bathrooms < 0:
specs_verification['bathrooms_reasonable'] = False
specs_verification['issues'].append(f"Invalid number of bathrooms: {bathrooms}. Should be between 0 and 15.")
except ValueError:
specs_verification['bathrooms_reasonable'] = False
specs_verification['issues'].append("Invalid bathroom data: must be a number")
# Validate total rooms
total_rooms = 0
if data['total_rooms']:
try:
total_rooms = int(float(data['total_rooms']))
if total_rooms > 0: # Only validate if total_rooms is provided
min_required_rooms = bedrooms + math.ceil(bathrooms) # Round up for half bathrooms
if total_rooms < min_required_rooms:
specs_verification['total_rooms_reasonable'] = False
specs_verification['issues'].append(
f"Total rooms ({total_rooms}) must be at least equal to bedrooms ({bedrooms}) + bathrooms ({bathrooms} = {min_required_rooms})"
)
elif total_rooms > 50:
specs_verification['total_rooms_reasonable'] = False
specs_verification['issues'].append(f"Total rooms ({total_rooms}) seems unreasonably high")
except (ValueError, TypeError):
specs_verification['total_rooms_reasonable'] = False
specs_verification['issues'].append("Invalid total rooms data: must be a number")
# Validate parking
try:
parking = int(float(data['parking'])) if data['parking'] else 0
if parking > 20 or parking < 0:
specs_verification['parking_reasonable'] = False
specs_verification['issues'].append(f"Invalid parking spaces: {parking}. Should be between 0 and 20.")
except ValueError:
specs_verification['parking_reasonable'] = False
specs_verification['issues'].append("Invalid parking data: must be a number")
# Validate square feet
sq_ft = 0
if data['sq_ft']:
try:
sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft']))
if sq_ft > 0: # Only validate if sq_ft is provided
if sq_ft > 100000:
specs_verification['sq_ft_reasonable'] = False
specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably high")
elif sq_ft < 100:
specs_verification['sq_ft_reasonable'] = False
specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably low")
# Validate sq ft per bedroom if both are provided
if bedrooms > 0:
sq_ft_per_bedroom = sq_ft / bedrooms
if sq_ft_per_bedroom < 50:
specs_verification['sq_ft_reasonable'] = False
specs_verification['issues'].append(f"Square footage per bedroom ({sq_ft_per_bedroom:.1f}) seems unreasonably low")
except (ValueError, TypeError):
specs_verification['sq_ft_reasonable'] = False
specs_verification['issues'].append("Invalid square footage data: must be a number")
# Validate market value
try:
market_value = float(re.sub(r'[^\d.]', '', data['market_value'])) if data['market_value'] else 0
if market_value > 0: # Only validate if market_value is provided
if market_value > 1000000000: # 100 crore limit
specs_verification['market_value_reasonable'] = False
specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably high")
elif market_value < 100000: # 1 lakh minimum
specs_verification['market_value_reasonable'] = False
specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably low")
# Validate price per sq ft if both are provided
if sq_ft > 0:
price_per_sqft = market_value / sq_ft
if price_per_sqft < 100: # Less than ₹100 per sq ft
specs_verification['market_value_reasonable'] = False
specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably low")
elif price_per_sqft > 100000: # More than ₹1 lakh per sq ft
specs_verification['market_value_reasonable'] = False
specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably high")
except ValueError:
specs_verification['market_value_reasonable'] = False
specs_verification['issues'].append("Invalid market value data: must be a number")
# Calculate verification score based on all checks
valid_checks = sum([
specs_verification[f] for f in [
'bedrooms_reasonable', 'bathrooms_reasonable',
'total_rooms_reasonable', 'parking_reasonable',
'sq_ft_reasonable', 'market_value_reasonable'
]
])
total_checks = 6 # Total number of checks
specs_verification['verification_score'] = (valid_checks / total_checks) * 100
return specs_verification
except Exception as e:
logger.error(f"Error verifying specs: {str(e)}")
return {
'bedrooms_reasonable': False,
'bathrooms_reasonable': False,
'total_rooms_reasonable': False,
'parking_reasonable': False,
'sq_ft_reasonable': False,
'market_value_reasonable': False,
'issues': [f"Error during verification: {str(e)}"],
'verification_score': 0.0
}
def analyze_market_value(data):
try:
# Extract basic property information
price = float(data['market_value'].replace('$', '').replace(',', '').strip()) if data.get('market_value') else 0
sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data.get('sq_ft') else 0
year_built = int(data.get('year_built', 0)) if data.get('year_built') else 0
current_year = datetime.now().year
property_age = current_year - year_built if year_built else 0
# Initialize market value components
market_value_components = {
'base_value': 0,
'location_multiplier': 1.0,
'age_factor': 1.0,
'size_factor': 1.0,
'amenities_factor': 1.0,
'market_trend_factor': 1.0,
'condition_factor': 1.0
}
# Calculate base value per sq ft based on city tier
city_lower = data.get('city', '').lower()
metro_cities = ["mumbai", "delhi", "bangalore", "hyderabad", "chennai", "kolkata", "pune"]
if any(city in city_lower for city in metro_cities):
base_price_per_sqft = 15000 # Metro city base price
market_value_components['location_multiplier'] = 1.5
else:
base_price_per_sqft = 7500 # Non-metro city base price
market_value_components['location_multiplier'] = 1.0
# Calculate base value
if sq_ft > 0:
market_value_components['base_value'] = base_price_per_sqft * sq_ft
# Age factor calculation with depreciation
if property_age > 0:
depreciation_rate = 0.01 # 1% depreciation per year
max_depreciation = 0.5 # Maximum 50% depreciation
age_factor = max(1 - max_depreciation, 1 - (property_age * depreciation_rate))
market_value_components['age_factor'] = age_factor
# Size factor calculation
if sq_ft > 0:
if 800 <= sq_ft <= 2000: # Optimal size range
market_value_components['size_factor'] = 1.2
elif 500 <= sq_ft <= 3000: # Acceptable size range
market_value_components['size_factor'] = 1.0
else: # Unusual size
market_value_components['size_factor'] = 0.8
# Amenities factor calculation
if data.get('amenities'):
amenities_list = [a.strip() for a in data['amenities'].split(',')]
amenities_count = len(amenities_list)
amenities_factor = min(1.5, 1 + (amenities_count * 0.1)) # 10% per amenity, max 50% bonus
market_value_components['amenities_factor'] = amenities_factor
# Market trend factor (based on property type and location)
property_type = data.get('property_type', '').lower()
if 'apartment' in property_type or 'flat' in property_type:
market_value_components['market_trend_factor'] = 1.1 # Apartments trending up
elif 'house' in property_type or 'villa' in property_type:
market_value_components['market_trend_factor'] = 1.15 # Houses trending up more
elif 'plot' in property_type or 'land' in property_type:
market_value_components['market_trend_factor'] = 1.2 # Land trending up most
# Condition factor (based on year built and amenities)
if property_age <= 5:
market_value_components['condition_factor'] = 1.2
elif property_age <= 15:
market_value_components['condition_factor'] = 1.1
elif property_age <= 30:
market_value_components['condition_factor'] = 1.0
else:
market_value_components['condition_factor'] = 0.9
# Calculate final market value
market_value = market_value_components['base_value']
for factor, value in market_value_components.items():
if factor != 'base_value':
market_value *= value
# Calculate price per sq ft for the estimated market value
estimated_price_per_sqft = market_value / sq_ft if sq_ft > 0 else 0
# Calculate value metrics
value_metrics = {
'price_to_value_ratio': price / market_value if market_value > 0 else 0,
'price_per_sqft_ratio': price / sq_ft if sq_ft > 0 else 0,
'estimated_price_per_sqft': estimated_price_per_sqft,
'value_appreciation': (market_value - price) / price * 100 if price > 0 else 0
}
# Generate market insights
market_insights = []
# Price vs Market Value insight
if value_metrics['price_to_value_ratio'] > 1.2:
market_insights.append("Property is overpriced compared to market value")
elif value_metrics['price_to_value_ratio'] < 0.8:
market_insights.append("Property is underpriced compared to market value")
# Size insight
if sq_ft < 300:
market_insights.append("Property size is unusually small for the market")
elif sq_ft > 10000:
market_insights.append("Property size is unusually large for the market")
# Age insight
if property_age > 30:
market_insights.append("Property age significantly impacts market value")
# Location insight
if market_value_components['location_multiplier'] > 1.0:
market_insights.append("Property is in a premium location")
# Market trend insight
if market_value_components['market_trend_factor'] > 1.1:
market_insights.append("Property type is trending upward in the market")
return {
'estimated_market_value': market_value,
'formatted_market_value': f"₹{market_value:,.0f}",
'price_per_sqft': estimated_price_per_sqft,
'formatted_price_per_sqft': f"₹{estimated_price_per_sqft:,.2f}",
'value_components': market_value_components,
'value_metrics': value_metrics,
'market_insights': market_insights,
'confidence_score': min(0.95, 0.7 + (len(market_insights) * 0.05)) # Base 0.7 + 0.05 per insight, max 0.95
}
except Exception as e:
logger.error(f"Error analyzing market value: {str(e)}")
return {
'estimated_market_value': 0,
'formatted_market_value': '₹0',
'price_per_sqft': 0,
'formatted_price_per_sqft': '₹0',
'value_components': {},
'value_metrics': {},
'market_insights': [],
'confidence_score': 0.0
}
def assess_image_quality(img):
try:
width, height = img.size
resolution = width * height
quality_score = min(100, resolution // 20000)
return {
'resolution': f"{width}x{height}",
'quality_score': quality_score
}
except Exception as e:
logger.error(f"Error assessing image quality: {str(e)}")
return {
'resolution': 'unknown',
'quality_score': 0
}
def check_if_property_related(text):
try:
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli")
result = classifier(text[:1000], ["property-related", "non-property-related"])
is_related = result['labels'][0] == "property-related"
return {
'is_related': is_related,
'confidence': float(result['scores'][0])
}
except Exception as e:
logger.error(f"Error checking property relation: {str(e)}")
return {
'is_related': False,
'confidence': 0.0
}
# Update the load_model function to use memory optimizations
@lru_cache(maxsize=3) # Limit cache size
def load_model(task, model_name):
try:
logger.info(f"Loading model: {model_name} for task: {task}")
# Use smaller, more efficient models
if task == "zero-shot-classification":
# Use smaller model for zero-shot classification
model_name = "facebook/bart-large-mnli" # ~1.6GB
return pipeline(task, model=model_name, device=-1)
elif task == "summarization":
# Use smaller model for summarization
model_name = "facebook/bart-large-cnn" # ~1.6GB
return pipeline(task, model=model_name, device=-1)
elif task == "text-classification":
# Use very small model for text classification
model_name = "distilbert-base-uncased" # ~260MB
return pipeline(task, model=model_name, device=-1)
elif task == "feature-extraction":
# Use small model for feature extraction
model_name = "sentence-transformers/all-MiniLM-L6-v2" # ~80MB
return pipeline(task, model=model_name, device=-1)
else:
# Default to small model for unknown tasks
model_name = "distilbert-base-uncased"
return pipeline(task, model=model_name, device=-1)
except Exception as e:
logger.error(f"Error loading model {model_name}: {str(e)}")
raise
# Add memory cleanup function
def clear_model_cache():
"""Clear model cache and free up memory"""
load_model.cache_clear()
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
logger.info("Model cache cleared and memory freed")
if __name__ == '__main__':
# Set up ngrok
http_tunnel = ngrok.connect(5000)
print(f' * Public URL: {http_tunnel.public_url}')
# Run Flask app in a separate thread
def run_flask():
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)
flask_thread = threading.Thread(target=run_flask)
flask_thread.daemon = True
flask_thread.start()
try:
# Keep the main thread running
while True:
time.sleep(1)
except KeyboardInterrupt:
print(" * Shutting down server...")
ngrok.disconnect(http_tunnel.public_url)
ngrok.kill()