|
from flask import Flask, render_template, request, jsonify |
|
from flask_cors import CORS |
|
import torch |
|
from transformers import pipeline, CLIPProcessor, CLIPModel, BitsAndBytesConfig |
|
import base64 |
|
import io |
|
import re |
|
import json |
|
import numpy as np |
|
from PIL import Image |
|
import fitz |
|
import os |
|
from datetime import datetime |
|
import uuid |
|
import requests |
|
from geopy.geocoders import Nominatim |
|
from sentence_transformers import SentenceTransformer, util |
|
import spacy |
|
import pytesseract |
|
from langdetect import detect |
|
from deep_translator import GoogleTranslator |
|
import logging |
|
from functools import lru_cache |
|
import time |
|
import math |
|
from pyngrok import ngrok |
|
import threading |
|
import gc |
|
import psutil |
|
|
|
app = Flask(__name__) |
|
CORS(app) |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler('app.log'), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
geocoder = Nominatim(user_agent="indian_property_verifier", timeout=10) |
|
|
|
|
|
def monitor_memory(): |
|
while True: |
|
process = psutil.Process() |
|
memory_info = process.memory_info() |
|
logger.info(f"Memory usage: {memory_info.rss / 1024 / 1024:.2f} MB") |
|
if memory_info.rss > 2 * 1024 * 1024 * 1024: |
|
logger.warning("High memory usage detected, clearing cache") |
|
clear_model_cache() |
|
time.sleep(300) |
|
|
|
|
|
memory_monitor_thread = threading.Thread(target=monitor_memory, daemon=True) |
|
memory_monitor_thread.start() |
|
|
|
|
|
try: |
|
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") |
|
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") |
|
has_clip_model = True |
|
logger.info("CLIP model loaded successfully") |
|
except Exception as e: |
|
logger.error(f"Error loading CLIP model: {str(e)}") |
|
has_clip_model = False |
|
|
|
|
|
try: |
|
sentence_model = SentenceTransformer('paraphrase-MiniLM-L6-v2') |
|
logger.info("Sentence transformer loaded successfully") |
|
except Exception as e: |
|
logger.error(f"Error loading sentence transformer: {str(e)}") |
|
sentence_model = None |
|
|
|
|
|
try: |
|
nlp = spacy.load('en_core_web_md') |
|
logger.info("spaCy model loaded successfully") |
|
except Exception as e: |
|
logger.error(f"Error loading spaCy model: {str(e)}") |
|
nlp = None |
|
|
|
def make_json_serializable(obj): |
|
try: |
|
if isinstance(obj, (bool, int, float, str, type(None))): |
|
return obj |
|
elif isinstance(obj, (list, tuple)): |
|
return [make_json_serializable(item) for item in obj] |
|
elif isinstance(obj, dict): |
|
return {str(key): make_json_serializable(value) for key, value in obj.items()} |
|
elif torch.is_tensor(obj): |
|
return obj.item() if obj.numel() == 1 else obj.tolist() |
|
elif np.isscalar(obj): |
|
return obj.item() if hasattr(obj, 'item') else float(obj) |
|
elif isinstance(obj, np.ndarray): |
|
return obj.tolist() |
|
else: |
|
return str(obj) |
|
except Exception as e: |
|
logger.error(f"Error serializing object: {str(e)}") |
|
return str(obj) |
|
|
|
@app.route('/') |
|
def index(): |
|
return render_template('newindex.html') |
|
|
|
@app.route('/get-location', methods=['POST']) |
|
def get_location(): |
|
try: |
|
data = request.json or {} |
|
latitude = data.get('latitude') |
|
longitude = data.get('longitude') |
|
|
|
if not latitude or not longitude: |
|
logger.warning("Missing latitude or longitude") |
|
return jsonify({ |
|
'status': 'error', |
|
'message': 'Latitude and longitude are required' |
|
}), 400 |
|
|
|
|
|
for attempt in range(3): |
|
try: |
|
location = geocoder.reverse((latitude, longitude), exactly_one=True) |
|
if location: |
|
address_components = location.raw.get('address', {}) |
|
return jsonify({ |
|
'status': 'success', |
|
'address': location.address, |
|
'street': address_components.get('road', ''), |
|
'city': address_components.get('city', address_components.get('town', address_components.get('village', ''))), |
|
'state': address_components.get('state', ''), |
|
'country': address_components.get('country', 'India'), |
|
'postal_code': address_components.get('postcode', ''), |
|
'latitude': latitude, |
|
'longitude': longitude |
|
}) |
|
logger.warning(f"Geocoding failed on attempt {attempt + 1}") |
|
time.sleep(1) |
|
except Exception as e: |
|
logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}") |
|
time.sleep(1) |
|
|
|
return jsonify({ |
|
'status': 'error', |
|
'message': 'Could not determine location after retries' |
|
}), 500 |
|
|
|
except Exception as e: |
|
logger.error(f"Error in get_location: {str(e)}") |
|
return jsonify({ |
|
'status': 'error', |
|
'message': str(e) |
|
}), 500 |
|
|
|
@app.route('/verify', methods=['POST']) |
|
def verify_property(): |
|
try: |
|
if not request.form and not request.files: |
|
logger.warning("No form data or files provided") |
|
return jsonify({ |
|
'error': 'No data provided', |
|
'status': 'error' |
|
}), 400 |
|
|
|
data = { |
|
'property_name': request.form.get('property_name', '').strip(), |
|
'property_type': request.form.get('property_type', '').strip(), |
|
'status': request.form.get('status', '').strip(), |
|
'description': request.form.get('description', '').strip(), |
|
'address': request.form.get('address', '').strip(), |
|
'city': request.form.get('city', '').strip(), |
|
'state': request.form.get('state', '').strip(), |
|
'country': request.form.get('country', 'India').strip(), |
|
'zip': request.form.get('zip', '').strip(), |
|
'latitude': request.form.get('latitude', '').strip(), |
|
'longitude': request.form.get('longitude', '').strip(), |
|
'bedrooms': request.form.get('bedrooms', '').strip(), |
|
'bathrooms': request.form.get('bathrooms', '').strip(), |
|
'total_rooms': request.form.get('total_rooms', '').strip(), |
|
'year_built': request.form.get('year_built', '').strip(), |
|
'parking': request.form.get('parking', '').strip(), |
|
'sq_ft': request.form.get('sq_ft', '').strip(), |
|
'market_value': request.form.get('market_value', '').strip(), |
|
'amenities': request.form.get('amenities', '').strip(), |
|
'nearby_landmarks': request.form.get('nearby_landmarks', '').strip(), |
|
'legal_details': request.form.get('legal_details', '').strip() |
|
} |
|
|
|
required_fields = ['property_name', 'property_type', 'address', 'city', 'state'] |
|
missing_fields = [field for field in required_fields if not data[field]] |
|
if missing_fields: |
|
logger.warning(f"Missing required fields: {', '.join(missing_fields)}") |
|
return jsonify({ |
|
'error': f"Missing required fields: {', '.join(missing_fields)}", |
|
'status': 'error' |
|
}), 400 |
|
|
|
images = [] |
|
image_analysis = [] |
|
if 'images' in request.files: |
|
image_files = request.files.getlist('images') |
|
for img_file in image_files: |
|
if img_file.filename and img_file.filename.lower().endswith(('.jpg', '.jpeg', '.png')): |
|
try: |
|
img = Image.open(img_file) |
|
buffered = io.BytesIO() |
|
img.save(buffered, format="JPEG") |
|
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
images.append(img_str) |
|
image_analysis.append(analyze_image(img)) |
|
except Exception as e: |
|
logger.error(f"Error processing image {img_file.filename}: {str(e)}") |
|
image_analysis.append({'error': str(e), 'is_property_related': False}) |
|
|
|
pdf_texts = [] |
|
pdf_analysis = [] |
|
if 'documents' in request.files: |
|
pdf_files = request.files.getlist('documents') |
|
for pdf_file in pdf_files: |
|
if pdf_file.filename and pdf_file.filename.lower().endswith('.pdf'): |
|
try: |
|
pdf_text = extract_pdf_text(pdf_file) |
|
pdf_texts.append({ |
|
'filename': pdf_file.filename, |
|
'text': pdf_text |
|
}) |
|
pdf_analysis.append(analyze_pdf_content(pdf_text, data)) |
|
except Exception as e: |
|
logger.error(f"Error processing PDF {pdf_file.filename}: {str(e)}") |
|
pdf_analysis.append({'error': str(e)}) |
|
|
|
consolidated_text = f""" |
|
Property Name: {data['property_name']} |
|
Property Type: {data['property_type']} |
|
Status: {data['status']} |
|
Description: {data['description']} |
|
Location: {data['address']}, {data['city']}, {data['state']}, {data['country']}, {data['zip']} |
|
Coordinates: Lat {data['latitude']}, Long {data['longitude']} |
|
Specifications: {data['bedrooms']} bedrooms, {data['bathrooms']} bathrooms, {data['total_rooms']} total rooms |
|
Year Built: {data['year_built']} |
|
Parking: {data['parking']} |
|
Size: {data['sq_ft']} sq. ft. |
|
Market Value: ₹{data['market_value']} |
|
Amenities: {data['amenities']} |
|
Nearby Landmarks: {data['nearby_landmarks']} |
|
Legal Details: {data['legal_details']} |
|
""" |
|
|
|
try: |
|
description = data['description'] |
|
if description and len(description) > 10: |
|
text_language = detect(description) |
|
if text_language != 'en': |
|
translated_description = GoogleTranslator(source=text_language, target='en').translate(description) |
|
data['description_translated'] = translated_description |
|
else: |
|
data['description_translated'] = description |
|
else: |
|
data['description_translated'] = description |
|
except Exception as e: |
|
logger.error(f"Error in language detection/translation: {str(e)}") |
|
data['description_translated'] = data['description'] |
|
|
|
summary = generate_property_summary(data) |
|
fraud_classification = classify_fraud(consolidated_text, data) |
|
trust_score, trust_reasoning = generate_trust_score(consolidated_text, image_analysis, pdf_analysis) |
|
suggestions = generate_suggestions(consolidated_text, data) |
|
quality_assessment = assess_text_quality(data['description_translated']) |
|
address_verification = verify_address(data) |
|
cross_validation = perform_cross_validation(data) |
|
location_analysis = analyze_location(data) |
|
price_analysis = analyze_price(data) |
|
legal_analysis = analyze_legal_details(data['legal_details']) |
|
specs_verification = verify_property_specs(data) |
|
market_analysis = analyze_market_value(data) |
|
|
|
document_analysis = { |
|
'pdf_count': len(pdf_texts), |
|
'pdf_texts': pdf_texts, |
|
'pdf_analysis': pdf_analysis |
|
} |
|
image_results = { |
|
'image_count': len(images), |
|
'image_analysis': image_analysis |
|
} |
|
|
|
report_id = str(uuid.uuid4()) |
|
|
|
results = { |
|
'report_id': report_id, |
|
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
|
'summary': summary, |
|
'fraud_classification': fraud_classification, |
|
'trust_score': { |
|
'score': trust_score, |
|
'reasoning': trust_reasoning |
|
}, |
|
'suggestions': suggestions, |
|
'quality_assessment': quality_assessment, |
|
'address_verification': address_verification, |
|
'cross_validation': cross_validation, |
|
'location_analysis': location_analysis, |
|
'price_analysis': price_analysis, |
|
'legal_analysis': legal_analysis, |
|
'document_analysis': document_analysis, |
|
'image_analysis': image_results, |
|
'specs_verification': specs_verification, |
|
'market_analysis': market_analysis, |
|
'images': images |
|
} |
|
|
|
return jsonify(make_json_serializable(results)) |
|
|
|
except Exception as e: |
|
logger.error(f"Error in verify_property: {str(e)}") |
|
return jsonify({ |
|
'error': 'Server error occurred. Please try again later.', |
|
'status': 'error', |
|
'details': str(e) |
|
}), 500 |
|
|
|
def extract_pdf_text(pdf_file): |
|
try: |
|
pdf_document = fitz.Document(stream=pdf_file.read(), filetype="pdf") |
|
text = "" |
|
for page in pdf_document: |
|
text += page.get_text() |
|
pdf_document.close() |
|
return text |
|
except Exception as e: |
|
logger.error(f"Error extracting PDF text: {str(e)}") |
|
return "" |
|
|
|
def analyze_image(image): |
|
try: |
|
if has_clip_model: |
|
img_rgb = image.convert('RGB') |
|
inputs = clip_processor( |
|
text=[ |
|
"real estate property interior", |
|
"real estate property exterior", |
|
"non-property-related image", |
|
"office space", |
|
"landscape" |
|
], |
|
images=img_rgb, |
|
return_tensors="pt", |
|
padding=True |
|
) |
|
outputs = clip_model(**inputs) |
|
logits_per_image = outputs.logits_per_image |
|
probs = logits_per_image.softmax(dim=1).detach().numpy()[0] |
|
|
|
property_related_score = probs[0] + probs[1] |
|
is_property_related = property_related_score > 0.5 |
|
|
|
quality = assess_image_quality(image) |
|
is_ai_generated = detect_ai_generated_image(image) |
|
|
|
return { |
|
'is_property_related': is_property_related, |
|
'property_confidence': float(property_related_score), |
|
'top_predictions': [ |
|
{'label': 'property interior', 'confidence': float(probs[0])}, |
|
{'label': 'property exterior', 'confidence': float(probs[1])}, |
|
{'label': 'non-property', 'confidence': float(probs[2])} |
|
], |
|
'image_quality': quality, |
|
'is_ai_generated': is_ai_generated, |
|
'authenticity_score': 0.95 if not is_ai_generated else 0.60 |
|
} |
|
else: |
|
logger.warning("CLIP model unavailable") |
|
return { |
|
'is_property_related': False, |
|
'property_confidence': 0.0, |
|
'top_predictions': [], |
|
'image_quality': assess_image_quality(image), |
|
'is_ai_generated': False, |
|
'authenticity_score': 0.5 |
|
} |
|
except Exception as e: |
|
logger.error(f"Error analyzing image: {str(e)}") |
|
return { |
|
'is_property_related': False, |
|
'property_confidence': 0.0, |
|
'top_predictions': [], |
|
'image_quality': {'resolution': 'unknown', 'quality_score': 0}, |
|
'is_ai_generated': False, |
|
'authenticity_score': 0.0, |
|
'error': str(e) |
|
} |
|
|
|
def detect_ai_generated_image(image): |
|
try: |
|
img_array = np.array(image) |
|
if len(img_array.shape) == 3: |
|
gray = np.mean(img_array, axis=2) |
|
else: |
|
gray = img_array |
|
noise = gray - np.mean(gray) |
|
noise_std = np.std(noise) |
|
width, height = image.size |
|
perfect_dimensions = (width % 64 == 0 and height % 64 == 0) |
|
has_exif = hasattr(image, '_getexif') and image._getexif() is not None |
|
return noise_std < 0.05 or perfect_dimensions or not has_exif |
|
except Exception as e: |
|
logger.error(f"Error detecting AI-generated image: {str(e)}") |
|
return False |
|
|
|
def analyze_pdf_content(document_text, property_data): |
|
try: |
|
if not document_text: |
|
return { |
|
'document_type': {'classification': 'unknown', 'confidence': 0.0}, |
|
'authenticity': {'assessment': 'could not verify', 'confidence': 0.0}, |
|
'key_info': {}, |
|
'consistency_score': 0.0, |
|
'is_property_related': False, |
|
'summary': 'Empty document', |
|
'has_signatures': False, |
|
'has_dates': False, |
|
'verification_score': 0.0 |
|
} |
|
|
|
|
|
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") |
|
|
|
|
|
doc_types = [ |
|
"property deed", "sales agreement", "mortgage document", |
|
"property tax record", "title document", "khata certificate", |
|
"encumbrance certificate", "lease agreement", "rental agreement", |
|
"property registration document", "building permit", "other document" |
|
] |
|
|
|
|
|
doc_context = f"{document_text[:1000]} property_type:{property_data.get('property_type', '')} location:{property_data.get('city', '')}" |
|
doc_result = classifier(doc_context, doc_types) |
|
doc_type = doc_result['labels'][0] |
|
doc_confidence = doc_result['scores'][0] |
|
|
|
|
|
authenticity_aspects = [ |
|
"authentic legal document", |
|
"questionable document", |
|
"forged document", |
|
"template document", |
|
"official document" |
|
] |
|
authenticity_result = classifier(document_text[:1000], authenticity_aspects) |
|
authenticity = "likely authentic" if authenticity_result['labels'][0] == "authentic legal document" else "questionable" |
|
authenticity_confidence = authenticity_result['scores'][0] |
|
|
|
|
|
key_info = extract_document_key_info(document_text) |
|
|
|
|
|
consistency_score = check_document_consistency(document_text, property_data) |
|
|
|
|
|
property_context = f"{document_text[:1000]} property:{property_data.get('property_name', '')} type:{property_data.get('property_type', '')}" |
|
is_property_related = check_if_property_related(property_context)['is_related'] |
|
|
|
|
|
summary = summarize_text(document_text[:2000]) |
|
|
|
|
|
has_signatures = bool(re.search(r'(?:sign|signature|signed|witness|notary|authorized).{0,50}(?:by|of|for)', document_text.lower())) |
|
has_dates = bool(re.search(r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|\d{4}[/-]\d{1,2}[/-]\d{1,2}', document_text)) |
|
|
|
|
|
verification_weights = { |
|
'doc_type': 0.3, |
|
'authenticity': 0.3, |
|
'consistency': 0.2, |
|
'property_relation': 0.1, |
|
'signatures_dates': 0.1 |
|
} |
|
|
|
verification_score = ( |
|
doc_confidence * verification_weights['doc_type'] + |
|
authenticity_confidence * verification_weights['authenticity'] + |
|
consistency_score * verification_weights['consistency'] + |
|
float(is_property_related) * verification_weights['property_relation'] + |
|
float(has_signatures and has_dates) * verification_weights['signatures_dates'] |
|
) |
|
|
|
return { |
|
'document_type': {'classification': doc_type, 'confidence': float(doc_confidence)}, |
|
'authenticity': {'assessment': authenticity, 'confidence': float(authenticity_confidence)}, |
|
'key_info': key_info, |
|
'consistency_score': float(consistency_score), |
|
'is_property_related': is_property_related, |
|
'summary': summary, |
|
'has_signatures': has_signatures, |
|
'has_dates': has_dates, |
|
'verification_score': float(verification_score) |
|
} |
|
except Exception as e: |
|
logger.error(f"Error analyzing PDF content: {str(e)}") |
|
return { |
|
'document_type': {'classification': 'unknown', 'confidence': 0.0}, |
|
'authenticity': {'assessment': 'could not verify', 'confidence': 0.0}, |
|
'key_info': {}, |
|
'consistency_score': 0.0, |
|
'is_property_related': False, |
|
'summary': 'Could not analyze document', |
|
'has_signatures': False, |
|
'has_dates': False, |
|
'verification_score': 0.0, |
|
'error': str(e) |
|
} |
|
|
|
def check_document_consistency(document_text, property_data): |
|
try: |
|
if not sentence_model: |
|
logger.warning("Sentence model unavailable") |
|
return 0.5 |
|
property_text = ' '.join([ |
|
property_data.get(key, '') for key in [ |
|
'property_name', 'property_type', 'address', 'city', |
|
'state', 'market_value', 'sq_ft', 'bedrooms' |
|
] |
|
]) |
|
property_embedding = sentence_model.encode(property_text) |
|
document_embedding = sentence_model.encode(document_text[:1000]) |
|
similarity = util.cos_sim(property_embedding, document_embedding)[0][0].item() |
|
return max(0.0, min(1.0, float(similarity))) |
|
except Exception as e: |
|
logger.error(f"Error checking document consistency: {str(e)}") |
|
return 0.0 |
|
|
|
def extract_document_key_info(text): |
|
try: |
|
info = {} |
|
patterns = { |
|
'property_address': r'(?:property|premises|located at)[:\s]+([^\n.]+)', |
|
'price': r'(?:price|value|amount)[:\s]+(?:Rs\.?|₹)?[\s]*([0-9,.]+)', |
|
'date': r'(?:date|dated|executed on)[:\s]+([^\n.]+\d{4})', |
|
'seller': r'(?:seller|grantor|owner)[:\s]+([^\n.]+)', |
|
'buyer': r'(?:buyer|grantee|purchaser)[:\s]+([^\n.]+)', |
|
'size': r'(?:area|size|extent)[:\s]+([0-9,.]+)[\s]*(?:sq\.?[\s]*(?:ft|feet))', |
|
'registration_number': r'(?:registration|reg\.?|document)[\s]*(?:no\.?|number|#)[:\s]*([A-Za-z0-9\-/]+)' |
|
} |
|
for key, pattern in patterns.items(): |
|
match = re.search(pattern, text, re.IGNORECASE) |
|
if match: |
|
info[key] = match.group(1).strip() |
|
return info |
|
except Exception as e: |
|
logger.error(f"Error extracting document key info: {str(e)}") |
|
return {} |
|
|
|
def generate_property_summary(data): |
|
try: |
|
|
|
property_context = f""" |
|
Property Name: {data.get('property_name', '')} |
|
Type: {data.get('property_type', '')} |
|
Status: {data.get('status', '')} |
|
Location: {data.get('address', '')}, {data.get('city', '')}, {data.get('state', '')}, {data.get('country', '')} |
|
Size: {data.get('sq_ft', '')} sq. ft. |
|
Price: ₹{data.get('market_value', '0')} |
|
Bedrooms: {data.get('bedrooms', '')} |
|
Bathrooms: {data.get('bathrooms', '')} |
|
Year Built: {data.get('year_built', '')} |
|
Description: {data.get('description', '')} |
|
""" |
|
|
|
|
|
summarizer = load_model("summarization", "facebook/bart-large-cnn") |
|
|
|
|
|
summary_result = summarizer(property_context, max_length=150, min_length=50, do_sample=False) |
|
initial_summary = summary_result[0]['summary_text'] |
|
|
|
|
|
key_features = [] |
|
|
|
|
|
if data.get('property_type') and data.get('status'): |
|
key_features.append(f"{data['property_type']} is {data['status'].lower()}") |
|
|
|
|
|
location_parts = [] |
|
if data.get('city'): |
|
location_parts.append(data['city']) |
|
if data.get('state'): |
|
location_parts.append(data['state']) |
|
if location_parts: |
|
key_features.append(f"Located in {', '.join(location_parts)}") |
|
|
|
|
|
if data.get('sq_ft'): |
|
key_features.append(f"Spans {data['sq_ft']} sq. ft.") |
|
if data.get('market_value'): |
|
key_features.append(f"Valued at ₹{data['market_value']}") |
|
|
|
|
|
rooms_info = [] |
|
if data.get('bedrooms'): |
|
rooms_info.append(f"{data['bedrooms']} bedroom{'s' if data['bedrooms'] != '1' else ''}") |
|
if data.get('bathrooms'): |
|
rooms_info.append(f"{data['bathrooms']} bathroom{'s' if data['bathrooms'] != '1' else ''}") |
|
if rooms_info: |
|
key_features.append(f"Features {' and '.join(rooms_info)}") |
|
|
|
|
|
if data.get('amenities'): |
|
key_features.append(f"Amenities: {data['amenities']}") |
|
|
|
|
|
enhanced_summary = initial_summary |
|
if key_features: |
|
enhanced_summary += " " + ". ".join(key_features) + "." |
|
|
|
|
|
enhanced_summary = enhanced_summary.replace(" ", " ").strip() |
|
|
|
return enhanced_summary |
|
except Exception as e: |
|
logger.error(f"Error generating property summary: {str(e)}") |
|
return "Could not generate summary." |
|
|
|
def summarize_text(text): |
|
try: |
|
if not text or len(text.strip()) < 10: |
|
return "No text to summarize." |
|
summarizer = load_model("summarization", "facebook/bart-large-cnn") |
|
input_length = len(text.split()) |
|
max_length = max(50, min(150, input_length // 2)) |
|
min_length = max(20, input_length // 4) |
|
summary = summarizer(text[:2000], max_length=max_length, min_length=min_length, do_sample=False) |
|
return summary[0]['summary_text'] |
|
except Exception as e: |
|
logger.error(f"Error summarizing text: {str(e)}") |
|
return text[:200] + "..." if len(text) > 200 else text |
|
|
|
def classify_fraud(text, data=None): |
|
try: |
|
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") |
|
categories = [ |
|
"suspicious pricing pattern", |
|
"potentially fraudulent listing", |
|
"ownership verification issues", |
|
"location verification issues", |
|
"document authenticity issues", |
|
"image authenticity issues", |
|
"urgent pressure tactics", |
|
"inconsistent information", |
|
"missing critical details", |
|
"suspicious contact information" |
|
] |
|
|
|
|
|
context = f""" |
|
Property Details: |
|
- Name: {data.get('property_name', 'Not provided')} |
|
- Type: {data.get('property_type', 'Not provided')} |
|
- Status: {data.get('property_status', 'Not provided')} |
|
- Price: {data.get('market_value', 'Not provided')} |
|
- Square Footage: {data.get('square_footage', 'Not provided')} |
|
- Year Built: {data.get('year_built', 'Not provided')} |
|
- Location: {data.get('address', 'Not provided')} |
|
- Description: {text} |
|
""" |
|
|
|
result = classifier(context, categories, multi_label=True) |
|
|
|
|
|
threshold = 0.2 |
|
|
|
|
|
high_risk = [] |
|
medium_risk = [] |
|
low_risk = [] |
|
|
|
for label, score in zip(result['labels'], result['scores']): |
|
if score > threshold: |
|
if score > 0.7: |
|
high_risk.append((label, score)) |
|
elif score > 0.5: |
|
medium_risk.append((label, score)) |
|
else: |
|
low_risk.append((label, score)) |
|
|
|
|
|
alert_score = ( |
|
sum(score * 1.0 for _, score in high_risk) + |
|
sum(score * 0.7 for _, score in medium_risk) + |
|
sum(score * 0.4 for _, score in low_risk) |
|
) / max(1, len(result['scores'])) |
|
|
|
|
|
if alert_score > 0.8: |
|
alert_level = 'critical' |
|
elif alert_score > 0.6: |
|
alert_level = 'high' |
|
elif alert_score > 0.4: |
|
alert_level = 'medium' |
|
elif alert_score > 0.2: |
|
alert_level = 'low' |
|
else: |
|
alert_level = 'minimal' |
|
|
|
|
|
fraud_indicators = [] |
|
|
|
|
|
price_patterns = [ |
|
(r'suspiciously low price', 0.8), |
|
(r'unusually high price', 0.7), |
|
(r'price too good to be true', 0.9), |
|
(r'urgent sale', 0.6), |
|
(r'must sell quickly', 0.7) |
|
] |
|
|
|
|
|
location_patterns = [ |
|
(r'location mismatch', 0.8), |
|
(r'address inconsistency', 0.7), |
|
(r'wrong neighborhood', 0.6), |
|
(r'incorrect zip code', 0.7) |
|
] |
|
|
|
|
|
document_patterns = [ |
|
(r'missing documents', 0.8), |
|
(r'unverified documents', 0.7), |
|
(r'fake documents', 0.9), |
|
(r'photoshopped documents', 0.8) |
|
] |
|
|
|
|
|
urgency_patterns = [ |
|
(r'act now', 0.6), |
|
(r'limited time offer', 0.5), |
|
(r'first come first served', 0.4), |
|
(r'won\'t last long', 0.5) |
|
] |
|
|
|
|
|
all_patterns = price_patterns + location_patterns + document_patterns + urgency_patterns |
|
for pattern, weight in all_patterns: |
|
if re.search(pattern, text.lower()): |
|
fraud_indicators.append({ |
|
'pattern': pattern, |
|
'weight': weight, |
|
'context': text[max(0, text.lower().find(pattern)-50):min(len(text), text.lower().find(pattern)+50)] |
|
}) |
|
|
|
|
|
if data: |
|
|
|
try: |
|
price = float(data.get('market_value', 0)) |
|
sqft = float(data.get('square_footage', 1)) |
|
price_per_sqft = price / sqft |
|
if price_per_sqft < 50: |
|
fraud_indicators.append({ |
|
'pattern': 'suspiciously low price per square foot', |
|
'weight': 0.8, |
|
'context': f'Price per square foot: ${price_per_sqft:.2f}' |
|
}) |
|
except (ValueError, ZeroDivisionError): |
|
pass |
|
|
|
|
|
try: |
|
year_built = int(data.get('year_built', 0)) |
|
if year_built < 1800 or year_built > 2024: |
|
fraud_indicators.append({ |
|
'pattern': 'impossible year built', |
|
'weight': 0.9, |
|
'context': f'Year built: {year_built}' |
|
}) |
|
except ValueError: |
|
pass |
|
|
|
|
|
critical_fields = ['property_name', 'property_type', 'address', 'market_value', 'square_footage'] |
|
missing_fields = [field for field in critical_fields if not data.get(field)] |
|
if missing_fields: |
|
fraud_indicators.append({ |
|
'pattern': 'missing critical information', |
|
'weight': 0.7, |
|
'context': f'Missing fields: {", ".join(missing_fields)}' |
|
}) |
|
|
|
return { |
|
'alert_level': alert_level, |
|
'alert_score': alert_score, |
|
'high_risk': high_risk, |
|
'medium_risk': medium_risk, |
|
'low_risk': low_risk, |
|
'fraud_indicators': fraud_indicators |
|
} |
|
except Exception as e: |
|
logger.error(f"Error in fraud classification: {str(e)}") |
|
return { |
|
'alert_level': 'error', |
|
'alert_score': 1.0, |
|
'high_risk': [], |
|
'medium_risk': [], |
|
'low_risk': [], |
|
'fraud_indicators': [] |
|
} |
|
|
|
def generate_trust_score(text, image_analysis, pdf_analysis): |
|
try: |
|
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") |
|
aspects = [ |
|
"complete information provided", |
|
"verified location", |
|
"consistent data", |
|
"authentic documents", |
|
"authentic images", |
|
"reasonable pricing", |
|
"verified ownership", |
|
"proper documentation" |
|
] |
|
result = classifier(text[:1000], aspects, multi_label=True) |
|
|
|
|
|
weights = { |
|
"complete information provided": 0.25, |
|
"verified location": 0.20, |
|
"consistent data": 0.15, |
|
"authentic documents": 0.15, |
|
"authentic images": 0.10, |
|
"reasonable pricing": 0.05, |
|
"verified ownership": 0.05, |
|
"proper documentation": 0.05 |
|
} |
|
|
|
score = 0 |
|
reasoning_parts = [] |
|
|
|
|
|
for label, confidence in zip(result['labels'], result['scores']): |
|
adjusted_confidence = confidence |
|
|
|
|
|
if label == "authentic documents": |
|
if not pdf_analysis or len(pdf_analysis) == 0: |
|
adjusted_confidence = 0.0 |
|
else: |
|
doc_scores = [p.get('verification_score', 0) for p in pdf_analysis] |
|
adjusted_confidence = sum(doc_scores) / max(1, len(doc_scores)) |
|
|
|
if any(score < 0.7 for score in doc_scores): |
|
adjusted_confidence *= 0.4 |
|
|
|
if len(doc_scores) < 2: |
|
adjusted_confidence *= 0.5 |
|
|
|
|
|
elif label == "authentic images": |
|
if not image_analysis or len(image_analysis) == 0: |
|
adjusted_confidence = 0.0 |
|
else: |
|
img_scores = [i.get('authenticity_score', 0) for i in image_analysis] |
|
adjusted_confidence = sum(img_scores) / max(1, len(img_scores)) |
|
|
|
if any(score < 0.8 for score in img_scores): |
|
adjusted_confidence *= 0.4 |
|
|
|
if any(i.get('is_ai_generated', False) for i in image_analysis): |
|
adjusted_confidence *= 0.5 |
|
|
|
if any(not i.get('is_property_related', False) for i in image_analysis): |
|
adjusted_confidence *= 0.6 |
|
|
|
|
|
elif label == "consistent data": |
|
|
|
if "inconsistent" in text.lower() or "suspicious" in text.lower(): |
|
adjusted_confidence *= 0.3 |
|
|
|
if "impossible" in text.lower() or "invalid" in text.lower(): |
|
adjusted_confidence *= 0.2 |
|
|
|
if "missing" in text.lower() or "not provided" in text.lower(): |
|
adjusted_confidence *= 0.4 |
|
|
|
|
|
elif label == "complete information provided": |
|
|
|
if len(text) < 300 or "not provided" in text.lower() or "missing" in text.lower(): |
|
adjusted_confidence *= 0.4 |
|
|
|
if "generic" in text.lower() or "vague" in text.lower(): |
|
adjusted_confidence *= 0.5 |
|
|
|
if len(text) < 150: |
|
adjusted_confidence *= 0.3 |
|
|
|
score += adjusted_confidence * weights.get(label, 0.1) |
|
reasoning_parts.append(f"{label} ({adjusted_confidence:.0%})") |
|
|
|
|
|
if "suspicious" in text.lower() or "fraudulent" in text.lower(): |
|
score *= 0.5 |
|
|
|
|
|
if "suspiciously low" in text.lower() or "unusually small" in text.lower(): |
|
score *= 0.6 |
|
|
|
|
|
if "inconsistent" in text.lower() or "mismatch" in text.lower(): |
|
score *= 0.6 |
|
|
|
|
|
if "missing critical" in text.lower() or "incomplete" in text.lower(): |
|
score *= 0.7 |
|
|
|
|
|
score = min(100, max(0, int(score * 100))) |
|
reasoning = f"Based on: {', '.join(reasoning_parts)}" |
|
return score, reasoning |
|
except Exception as e: |
|
logger.error(f"Error generating trust score: {str(e)}") |
|
return 20, "Could not assess trust." |
|
|
|
def generate_suggestions(text, data=None): |
|
try: |
|
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") |
|
|
|
|
|
suggestion_context = text |
|
if data: |
|
suggestion_context += f""" |
|
Additional Context: |
|
Property Type: {data.get('property_type', '')} |
|
Location: {data.get('city', '')}, {data.get('state', '')} |
|
Size: {data.get('sq_ft', '')} sq.ft. |
|
Year Built: {data.get('year_built', '')} |
|
""" |
|
|
|
|
|
base_suggestions = { |
|
'documentation': { |
|
'label': "add more documentation", |
|
'categories': [ |
|
"complete documentation provided", |
|
"missing essential documents", |
|
"incomplete paperwork", |
|
"documentation needs verification" |
|
], |
|
'weight': 2.0, |
|
'improvements': { |
|
'missing essential documents': [ |
|
"Add property deed or title documents", |
|
"Include recent property tax records", |
|
"Attach property registration documents" |
|
], |
|
'incomplete paperwork': [ |
|
"Complete all required legal documents", |
|
"Add missing ownership proof", |
|
"Include property survey documents" |
|
] |
|
} |
|
}, |
|
'details': { |
|
'label': "enhance property details", |
|
'categories': [ |
|
"detailed property information", |
|
"basic information only", |
|
"missing key details", |
|
"comprehensive description" |
|
], |
|
'weight': 1.8, |
|
'improvements': { |
|
'basic information only': [ |
|
"Add more details about property features", |
|
"Include information about recent renovations", |
|
"Describe unique selling points" |
|
], |
|
'missing key details': [ |
|
"Specify exact built-up area", |
|
"Add floor plan details", |
|
"Include maintenance costs" |
|
] |
|
} |
|
}, |
|
'images': { |
|
'label': "improve visual content", |
|
'categories': [ |
|
"high quality images provided", |
|
"poor image quality", |
|
"insufficient images", |
|
"missing key area photos" |
|
], |
|
'weight': 1.5, |
|
'improvements': { |
|
'poor image quality': [ |
|
"Add high-resolution property photos", |
|
"Include better lighting in images", |
|
"Provide professional photography" |
|
], |
|
'insufficient images': [ |
|
"Add more interior photos", |
|
"Include exterior and surrounding area images", |
|
"Add photos of amenities" |
|
] |
|
} |
|
}, |
|
'pricing': { |
|
'label': "pricing information", |
|
'categories': [ |
|
"detailed pricing breakdown", |
|
"basic price only", |
|
"missing price details", |
|
"unclear pricing terms" |
|
], |
|
'weight': 1.7, |
|
'improvements': { |
|
'basic price only': [ |
|
"Add detailed price breakdown", |
|
"Include maintenance charges", |
|
"Specify additional costs" |
|
], |
|
'missing price details': [ |
|
"Add price per square foot", |
|
"Include tax implications", |
|
"Specify payment terms" |
|
] |
|
} |
|
}, |
|
'location': { |
|
'label': "location details", |
|
'categories': [ |
|
"comprehensive location info", |
|
"basic location only", |
|
"missing location details", |
|
"unclear accessibility info" |
|
], |
|
'weight': 1.6, |
|
'improvements': { |
|
'basic location only': [ |
|
"Add nearby landmarks and distances", |
|
"Include transportation options", |
|
"Specify neighborhood facilities" |
|
], |
|
'missing location details': [ |
|
"Add exact GPS coordinates", |
|
"Include area development plans", |
|
"Specify distance to key facilities" |
|
] |
|
} |
|
} |
|
} |
|
|
|
suggestions = [] |
|
confidence_scores = [] |
|
|
|
for aspect, config in base_suggestions.items(): |
|
|
|
result = classifier(suggestion_context[:1000], config['categories']) |
|
|
|
|
|
top_category = result['labels'][0] |
|
confidence = float(result['scores'][0]) |
|
|
|
|
|
if confidence < 0.6 and top_category in config['improvements']: |
|
weighted_confidence = confidence * config['weight'] |
|
for improvement in config['improvements'][top_category]: |
|
suggestions.append({ |
|
'aspect': aspect, |
|
'category': top_category, |
|
'suggestion': improvement, |
|
'confidence': weighted_confidence |
|
}) |
|
confidence_scores.append(weighted_confidence) |
|
|
|
|
|
suggestions.sort(key=lambda x: x['confidence'], reverse=True) |
|
|
|
|
|
if data and data.get('property_type'): |
|
property_type = data['property_type'].lower() |
|
type_specific_suggestions = { |
|
'residential': [ |
|
"Add information about school districts", |
|
"Include details about neighborhood safety", |
|
"Specify parking arrangements" |
|
], |
|
'commercial': [ |
|
"Add foot traffic statistics", |
|
"Include zoning information", |
|
"Specify business licenses required" |
|
], |
|
'industrial': [ |
|
"Add power supply specifications", |
|
"Include environmental clearances", |
|
"Specify loading/unloading facilities" |
|
], |
|
'land': [ |
|
"Add soil testing reports", |
|
"Include development potential analysis", |
|
"Specify available utilities" |
|
] |
|
} |
|
|
|
for type_key, type_suggestions in type_specific_suggestions.items(): |
|
if type_key in property_type: |
|
for suggestion in type_suggestions: |
|
suggestions.append({ |
|
'aspect': 'property_type_specific', |
|
'category': 'type_specific_requirements', |
|
'suggestion': suggestion, |
|
'confidence': 0.8 |
|
}) |
|
|
|
|
|
if data and data.get('market_value'): |
|
try: |
|
market_value = float(data['market_value'].replace('₹', '').replace(',', '')) |
|
if market_value > 10000000: |
|
premium_suggestions = [ |
|
"Add virtual tour of the property", |
|
"Include detailed investment analysis", |
|
"Provide historical price trends" |
|
] |
|
for suggestion in premium_suggestions: |
|
suggestions.append({ |
|
'aspect': 'premium_property', |
|
'category': 'high_value_requirements', |
|
'suggestion': suggestion, |
|
'confidence': 0.9 |
|
}) |
|
except ValueError: |
|
pass |
|
|
|
|
|
completeness_score = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0 |
|
completeness_score = min(100, max(0, completeness_score * 100)) |
|
|
|
return { |
|
'suggestions': suggestions[:10], |
|
'completeness_score': completeness_score, |
|
'priority_aspects': [s['aspect'] for s in suggestions[:3]], |
|
'improvement_summary': f"Focus on improving {', '.join([s['aspect'] for s in suggestions[:3]])}", |
|
'total_suggestions': len(suggestions) |
|
} |
|
except Exception as e: |
|
logger.error(f"Error generating suggestions: {str(e)}") |
|
return { |
|
'suggestions': [ |
|
{ |
|
'aspect': 'general', |
|
'category': 'basic_requirements', |
|
'suggestion': 'Please provide more property details', |
|
'confidence': 0.5 |
|
} |
|
], |
|
'completeness_score': 0, |
|
'priority_aspects': ['general'], |
|
'improvement_summary': "Add basic property information", |
|
'total_suggestions': 1 |
|
} |
|
|
|
def assess_text_quality(text): |
|
try: |
|
if not text or len(text.strip()) < 20: |
|
return { |
|
'assessment': 'insufficient', |
|
'score': 0, |
|
'reasoning': 'Text too short.', |
|
'is_ai_generated': False, |
|
'quality_metrics': {} |
|
} |
|
|
|
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") |
|
|
|
|
|
quality_categories = [ |
|
"detailed and informative", |
|
"adequately detailed", |
|
"basic information", |
|
"vague description", |
|
"misleading content", |
|
"professional listing", |
|
"amateur listing", |
|
"spam-like content", |
|
"template-based content", |
|
"authentic description" |
|
] |
|
|
|
|
|
quality_result = classifier(text[:1000], quality_categories, multi_label=True) |
|
|
|
|
|
top_classifications = [] |
|
for label, score in zip(quality_result['labels'][:3], quality_result['scores'][:3]): |
|
if score > 0.3: |
|
top_classifications.append({ |
|
'classification': label, |
|
'confidence': float(score) |
|
}) |
|
|
|
|
|
ai_check = classifier(text[:1000], ["human-written", "AI-generated", "template-based", "authentic"]) |
|
is_ai_generated = ( |
|
(ai_check['labels'][0] == "AI-generated" and ai_check['scores'][0] > 0.6) or |
|
(ai_check['labels'][0] == "template-based" and ai_check['scores'][0] > 0.7) |
|
) |
|
|
|
|
|
quality_metrics = { |
|
'detail_level': sum(score for label, score in zip(quality_result['labels'], quality_result['scores']) |
|
if label in ['detailed and informative', 'adequately detailed']), |
|
'professionalism': sum(score for label, score in zip(quality_result['labels'], quality_result['scores']) |
|
if label in ['professional listing', 'authentic description']), |
|
'clarity': sum(score for label, score in zip(quality_result['labels'], quality_result['scores']) |
|
if label not in ['vague description', 'misleading content', 'spam-like content']), |
|
'authenticity': 1.0 - sum(score for label, score in zip(quality_result['labels'], quality_result['scores']) |
|
if label in ['template-based content', 'spam-like content']) |
|
} |
|
|
|
|
|
weights = { |
|
'detail_level': 0.3, |
|
'professionalism': 0.25, |
|
'clarity': 0.25, |
|
'authenticity': 0.2 |
|
} |
|
|
|
score = sum(metric * weights[metric_name] for metric_name, metric in quality_metrics.items()) |
|
score = score * 100 |
|
|
|
|
|
if is_ai_generated: |
|
score = score * 0.7 |
|
|
|
|
|
reasoning_parts = [] |
|
if top_classifications: |
|
primary_class = top_classifications[0]['classification'] |
|
reasoning_parts.append(f"Primary assessment: {primary_class}") |
|
|
|
if quality_metrics['detail_level'] > 0.7: |
|
reasoning_parts.append("Contains comprehensive details") |
|
elif quality_metrics['detail_level'] > 0.4: |
|
reasoning_parts.append("Contains adequate details") |
|
else: |
|
reasoning_parts.append("Lacks important details") |
|
|
|
if quality_metrics['professionalism'] > 0.7: |
|
reasoning_parts.append("Professional listing style") |
|
elif quality_metrics['professionalism'] < 0.4: |
|
reasoning_parts.append("Amateur listing style") |
|
|
|
if quality_metrics['clarity'] < 0.5: |
|
reasoning_parts.append("Content clarity issues detected") |
|
|
|
if is_ai_generated: |
|
reasoning_parts.append("Content appears to be AI-generated") |
|
|
|
return { |
|
'assessment': top_classifications[0]['classification'] if top_classifications else 'could not assess', |
|
'score': int(score), |
|
'reasoning': '. '.join(reasoning_parts), |
|
'is_ai_generated': is_ai_generated, |
|
'quality_metrics': quality_metrics, |
|
'top_classifications': top_classifications |
|
} |
|
except Exception as e: |
|
logger.error(f"Error assessing text quality: {str(e)}") |
|
return { |
|
'assessment': 'could not assess', |
|
'score': 50, |
|
'reasoning': 'Technical error.', |
|
'is_ai_generated': False, |
|
'quality_metrics': {}, |
|
'top_classifications': [] |
|
} |
|
|
|
def verify_address(data): |
|
try: |
|
address_results = { |
|
'address_exists': False, |
|
'pincode_valid': False, |
|
'city_state_match': False, |
|
'coordinates_match': False, |
|
'confidence': 0.0, |
|
'issues': [], |
|
'verification_score': 0.0 |
|
} |
|
|
|
if data['zip']: |
|
try: |
|
response = requests.get(f"https://api.postalpincode.in/pincode/{data['zip']}", timeout=5) |
|
if response.status_code == 200: |
|
pin_data = response.json() |
|
if pin_data[0]['Status'] == 'Success': |
|
address_results['pincode_valid'] = True |
|
post_offices = pin_data[0]['PostOffice'] |
|
cities = {po['Name'].lower() for po in post_offices} |
|
states = {po['State'].lower() for po in post_offices} |
|
if data['city'].lower() in cities or data['state'].lower() in states: |
|
address_results['city_state_match'] = True |
|
else: |
|
address_results['issues'].append("City/state may not match pincode") |
|
else: |
|
address_results['issues'].append(f"Invalid pincode: {data['zip']}") |
|
else: |
|
address_results['issues'].append("Pincode API error") |
|
except Exception as e: |
|
logger.error(f"Pincode API error: {str(e)}") |
|
address_results['issues'].append("Pincode validation failed") |
|
|
|
full_address = ', '.join(filter(None, [data['address'], data['city'], data['state'], data['country'], data['zip']])) |
|
for attempt in range(3): |
|
try: |
|
location = geocoder.geocode(full_address) |
|
if location: |
|
address_results['address_exists'] = True |
|
address_results['confidence'] = 0.9 |
|
if data['latitude'] and data['longitude']: |
|
try: |
|
provided_coords = (float(data['latitude']), float(data['longitude'])) |
|
geocoded_coords = (location.latitude, location.longitude) |
|
from geopy.distance import distance |
|
dist = distance(provided_coords, geocoded_coords).km |
|
address_results['coordinates_match'] = dist < 1.0 |
|
if not address_results['coordinates_match']: |
|
address_results['issues'].append(f"Coordinates {dist:.2f}km off") |
|
except: |
|
address_results['issues'].append("Invalid coordinates") |
|
break |
|
time.sleep(1) |
|
except Exception as e: |
|
logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}") |
|
time.sleep(1) |
|
else: |
|
address_results['issues'].append("Address geocoding failed") |
|
|
|
verification_points = ( |
|
address_results['address_exists'] * 0.4 + |
|
address_results['pincode_valid'] * 0.3 + |
|
address_results['city_state_match'] * 0.2 + |
|
address_results['coordinates_match'] * 0.1 |
|
) |
|
address_results['verification_score'] = verification_points |
|
|
|
return address_results |
|
except Exception as e: |
|
logger.error(f"Error verifying address: {str(e)}") |
|
address_results['issues'].append(str(e)) |
|
return address_results |
|
|
|
def perform_cross_validation(data): |
|
try: |
|
cross_checks = [] |
|
|
|
|
|
try: |
|
bedrooms = int(data['bedrooms']) if data['bedrooms'] else 0 |
|
desc_bhk = re.findall(r'(\d+)\s*bhk', data['description'].lower()) |
|
if desc_bhk and int(desc_bhk[0]) != bedrooms: |
|
cross_checks.append({ |
|
'check': 'bedroom_count', |
|
'status': 'inconsistent', |
|
'message': f"Description mentions {desc_bhk[0]} BHK, form says {bedrooms}" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'bedroom_count', |
|
'status': 'consistent', |
|
'message': f"Bedrooms: {bedrooms}" |
|
}) |
|
except: |
|
cross_checks.append({ |
|
'check': 'bedroom_count', |
|
'status': 'invalid', |
|
'message': 'Invalid bedroom data' |
|
}) |
|
|
|
|
|
try: |
|
bedrooms = int(data['bedrooms']) if data['bedrooms'] else 0 |
|
bathrooms = float(data['bathrooms']) if data['bathrooms'] else 0 |
|
total_rooms = int(data['total_rooms']) if data['total_rooms'] else 0 |
|
|
|
|
|
if total_rooms > 0: |
|
if total_rooms < bedrooms + bathrooms: |
|
cross_checks.append({ |
|
'check': 'room_count', |
|
'status': 'inconsistent', |
|
'message': f"Total rooms ({total_rooms}) less than bedrooms ({bedrooms}) + bathrooms ({bathrooms})" |
|
}) |
|
elif total_rooms > bedrooms + bathrooms + 5: |
|
cross_checks.append({ |
|
'check': 'room_count', |
|
'status': 'suspicious', |
|
'message': f"Total rooms ({total_rooms}) seems unusually high compared to bedrooms ({bedrooms}) + bathrooms ({bathrooms})" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'room_count', |
|
'status': 'consistent', |
|
'message': f"Rooms consistent: {total_rooms} total, {bedrooms} bedrooms, {bathrooms} bathrooms" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'room_count', |
|
'status': 'missing', |
|
'message': 'Total room count not provided' |
|
}) |
|
except: |
|
cross_checks.append({ |
|
'check': 'room_count', |
|
'status': 'invalid', |
|
'message': 'Invalid room count data' |
|
}) |
|
|
|
|
|
try: |
|
year_built = int(data['year_built']) if data['year_built'] else 0 |
|
current_year = datetime.now().year |
|
|
|
if year_built > 0: |
|
if year_built > current_year: |
|
cross_checks.append({ |
|
'check': 'year_built', |
|
'status': 'invalid', |
|
'message': f"Year built ({year_built}) is in the future" |
|
}) |
|
elif year_built < 1800: |
|
cross_checks.append({ |
|
'check': 'year_built', |
|
'status': 'suspicious', |
|
'message': f"Year built ({year_built}) seems unusually old" |
|
}) |
|
elif current_year - year_built > 200: |
|
cross_checks.append({ |
|
'check': 'year_built', |
|
'status': 'suspicious', |
|
'message': f"Property age ({current_year - year_built} years) seems unusually old" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'year_built', |
|
'status': 'reasonable', |
|
'message': f"Year built reasonable: {year_built}" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'year_built', |
|
'status': 'missing', |
|
'message': 'Year built not provided' |
|
}) |
|
except: |
|
cross_checks.append({ |
|
'check': 'year_built', |
|
'status': 'invalid', |
|
'message': 'Invalid year built data' |
|
}) |
|
|
|
|
|
try: |
|
sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data['sq_ft'] else 0 |
|
bedrooms = int(data['bedrooms']) if data['bedrooms'] else 0 |
|
|
|
if sq_ft > 0 and bedrooms > 0: |
|
sq_ft_per_bedroom = sq_ft / bedrooms |
|
|
|
if sq_ft_per_bedroom < 50: |
|
cross_checks.append({ |
|
'check': 'sq_ft_per_bedroom', |
|
'status': 'suspicious', |
|
'message': f"Square footage per bedroom ({sq_ft_per_bedroom:.1f} sq.ft.) seems unusually small" |
|
}) |
|
elif sq_ft_per_bedroom > 1000: |
|
cross_checks.append({ |
|
'check': 'sq_ft_per_bedroom', |
|
'status': 'suspicious', |
|
'message': f"Square footage per bedroom ({sq_ft_per_bedroom:.1f} sq.ft.) seems unusually large" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'sq_ft_per_bedroom', |
|
'status': 'reasonable', |
|
'message': f"Square footage per bedroom ({sq_ft_per_bedroom:.1f} sq.ft.) is reasonable" |
|
}) |
|
elif sq_ft > 0: |
|
cross_checks.append({ |
|
'check': 'sq_ft', |
|
'status': 'incomplete', |
|
'message': f"Square footage provided: {sq_ft} sq.ft., but bedroom count missing" |
|
}) |
|
elif bedrooms > 0: |
|
cross_checks.append({ |
|
'check': 'sq_ft', |
|
'status': 'missing', |
|
'message': f"Square footage not provided, but {bedrooms} bedrooms listed" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'sq_ft', |
|
'status': 'missing', |
|
'message': 'Square footage not provided' |
|
}) |
|
except: |
|
cross_checks.append({ |
|
'check': 'sq_ft', |
|
'status': 'invalid', |
|
'message': 'Invalid square footage data' |
|
}) |
|
|
|
|
|
try: |
|
market_value = float(data['market_value'].replace('₹', '').replace(',', '')) if data['market_value'] else 0 |
|
sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data['sq_ft'] else 0 |
|
|
|
if market_value > 0 and sq_ft > 0: |
|
price_per_sqft = market_value / sq_ft |
|
|
|
|
|
if price_per_sqft < 100: |
|
cross_checks.append({ |
|
'check': 'price_per_sqft', |
|
'status': 'suspiciously low', |
|
'message': f"Price per sq.ft.: ₹{price_per_sqft:.2f} is suspiciously low" |
|
}) |
|
|
|
elif price_per_sqft > 50000: |
|
cross_checks.append({ |
|
'check': 'price_per_sqft', |
|
'status': 'suspiciously high', |
|
'message': f"Price per sq.ft.: ₹{price_per_sqft:.2f} is suspiciously high" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'price_per_sqft', |
|
'status': 'reasonable', |
|
'message': f"Price per sq.ft.: ₹{price_per_sqft:.2f} is reasonable" |
|
}) |
|
elif market_value > 0: |
|
cross_checks.append({ |
|
'check': 'price_per_sqft', |
|
'status': 'incomplete', |
|
'message': f"Market value provided: ₹{market_value:,.2f}, but square footage missing" |
|
}) |
|
elif sq_ft > 0: |
|
cross_checks.append({ |
|
'check': 'price_per_sqft', |
|
'status': 'incomplete', |
|
'message': f"Square footage provided: {sq_ft} sq.ft., but market value missing" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'price_per_sqft', |
|
'status': 'missing', |
|
'message': 'Price per sq.ft. cannot be calculated (missing data)' |
|
}) |
|
except: |
|
cross_checks.append({ |
|
'check': 'price_per_sqft', |
|
'status': 'invalid', |
|
'message': 'Invalid price per sq.ft. data' |
|
}) |
|
|
|
|
|
try: |
|
latitude = float(data['latitude']) if data['latitude'] else 0 |
|
longitude = float(data['longitude']) if data['longitude'] else 0 |
|
address = data['address'].lower() if data['address'] else '' |
|
city = data['city'].lower() if data['city'] else '' |
|
state = data['state'].lower() if data['state'] else '' |
|
country = data['country'].lower() if data['country'] else 'india' |
|
|
|
|
|
if latitude != 0 and longitude != 0: |
|
if 6.5 <= latitude <= 35.5 and 68.1 <= longitude <= 97.4: |
|
cross_checks.append({ |
|
'check': 'coordinates', |
|
'status': 'valid', |
|
'message': 'Coordinates within India' |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'coordinates', |
|
'status': 'invalid', |
|
'message': 'Coordinates outside India' |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'coordinates', |
|
'status': 'missing', |
|
'message': 'Coordinates not provided' |
|
}) |
|
|
|
|
|
if address and city and state: |
|
if city in address and state in address: |
|
cross_checks.append({ |
|
'check': 'address_consistency', |
|
'status': 'consistent', |
|
'message': 'Address contains city and state' |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'address_consistency', |
|
'status': 'inconsistent', |
|
'message': 'Address does not contain city or state' |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'address_consistency', |
|
'status': 'incomplete', |
|
'message': 'Address consistency check incomplete (missing data)' |
|
}) |
|
except: |
|
cross_checks.append({ |
|
'check': 'location', |
|
'status': 'invalid', |
|
'message': 'Invalid location data' |
|
}) |
|
|
|
|
|
try: |
|
property_type = data['property_type'].lower() if data['property_type'] else '' |
|
description = data['description'].lower() if data['description'] else '' |
|
|
|
if property_type and description: |
|
property_types = ['apartment', 'house', 'condo', 'townhouse', 'villa', 'land', 'commercial'] |
|
found_types = [pt for pt in property_types if pt in description] |
|
|
|
if found_types and property_type not in found_types: |
|
cross_checks.append({ |
|
'check': 'property_type', |
|
'status': 'inconsistent', |
|
'message': f"Description mentions {', '.join(found_types)}, but property type is {property_type}" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'property_type', |
|
'status': 'consistent', |
|
'message': f"Property type consistent: {property_type}" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'property_type', |
|
'status': 'incomplete', |
|
'message': 'Property type consistency check incomplete (missing data)' |
|
}) |
|
except: |
|
cross_checks.append({ |
|
'check': 'property_type', |
|
'status': 'invalid', |
|
'message': 'Invalid property type data' |
|
}) |
|
|
|
|
|
try: |
|
market_value = float(data['market_value'].replace('₹', '').replace(',', '')) if data['market_value'] else 0 |
|
property_type = data['property_type'].lower() if data['property_type'] else '' |
|
|
|
if market_value > 0 and property_type: |
|
|
|
min_values = { |
|
'apartment': 500000, |
|
'house': 1000000, |
|
'condo': 800000, |
|
'townhouse': 900000, |
|
'villa': 2000000, |
|
'land': 300000, |
|
'commercial': 2000000 |
|
} |
|
|
|
min_value = min_values.get(property_type, 500000) |
|
|
|
if market_value < min_value: |
|
cross_checks.append({ |
|
'check': 'market_value', |
|
'status': 'suspiciously low', |
|
'message': f"Market value (₹{market_value:,.2f}) seems suspiciously low for a {property_type}" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'market_value', |
|
'status': 'reasonable', |
|
'message': f"Market value (₹{market_value:,.2f}) is reasonable for a {property_type}" |
|
}) |
|
elif market_value > 0: |
|
cross_checks.append({ |
|
'check': 'market_value', |
|
'status': 'incomplete', |
|
'message': f"Market value provided: ₹{market_value:,.2f}, but property type missing" |
|
}) |
|
else: |
|
cross_checks.append({ |
|
'check': 'market_value', |
|
'status': 'missing', |
|
'message': 'Market value not provided' |
|
}) |
|
except: |
|
cross_checks.append({ |
|
'check': 'market_value', |
|
'status': 'invalid', |
|
'message': 'Invalid market value data' |
|
}) |
|
|
|
return cross_checks |
|
except Exception as e: |
|
logger.error(f"Error performing cross validation: {str(e)}") |
|
return [{ |
|
'check': 'cross_validation', |
|
'status': 'error', |
|
'message': f'Error performing cross validation: {str(e)}' |
|
}] |
|
|
|
def analyze_location(data): |
|
try: |
|
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") |
|
location_text = ' '.join(filter(None, [ |
|
data['address'], data['city'], data['state'], data['country'], |
|
data['zip'], f"Lat: {data['latitude']}", f"Long: {data['longitude']}", |
|
data['nearby_landmarks'] |
|
])) |
|
categories = ["complete", "partial", "minimal", "missing"] |
|
result = classifier(location_text, categories) |
|
|
|
location_quality = "unknown" |
|
if data['city'] and data['state']: |
|
for attempt in range(3): |
|
try: |
|
location = geocoder.geocode(f"{data['city']}, {data['state']}, India") |
|
if location: |
|
location_quality = "verified" |
|
break |
|
time.sleep(1) |
|
except: |
|
time.sleep(1) |
|
else: |
|
location_quality = "unverified" |
|
|
|
coord_check = "missing" |
|
if data['latitude'] and data['longitude']: |
|
try: |
|
lat, lng = float(data['latitude']), float(data['longitude']) |
|
coord_check = "in_india" if 6.5 <= lat <= 37.5 and 68.0 <= lng <= 97.5 else "outside_india" |
|
except: |
|
coord_check = "invalid" |
|
|
|
completeness = calculate_location_completeness(data) |
|
assessment = "complete" if completeness >= 80 else "partial" if completeness >= 50 else "minimal" |
|
|
|
return { |
|
'assessment': assessment, |
|
'confidence': float(result['scores'][0]), |
|
'coordinates_check': coord_check, |
|
'landmarks_provided': bool(data['nearby_landmarks']), |
|
'completeness_score': completeness, |
|
'location_quality': location_quality |
|
} |
|
except Exception as e: |
|
logger.error(f"Error analyzing location: {str(e)}") |
|
return { |
|
'assessment': 'error', |
|
'confidence': 0.0, |
|
'coordinates_check': 'error', |
|
'landmarks_provided': False, |
|
'completeness_score': 0, |
|
'location_quality': 'error' |
|
} |
|
|
|
def calculate_location_completeness(data): |
|
fields = ['address', 'city', 'state', 'country', 'zip', 'latitude', 'longitude', 'nearby_landmarks'] |
|
return int((sum(1 for f in fields if data[f]) / len(fields)) * 100) |
|
|
|
def analyze_price(data): |
|
try: |
|
price_str = data['market_value'].replace('$', '').replace(',', '').strip() |
|
price = float(price_str) if price_str else 0 |
|
sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data['sq_ft'] else 0 |
|
price_per_sqft = price / sq_ft if sq_ft else 0 |
|
|
|
if not price: |
|
return { |
|
'assessment': 'no price', |
|
'confidence': 0.0, |
|
'price': 0, |
|
'formatted_price': '₹0', |
|
'price_per_sqft': 0, |
|
'formatted_price_per_sqft': '₹0', |
|
'price_range': 'unknown', |
|
'location_price_assessment': 'cannot assess', |
|
'has_price': False, |
|
'market_trends': {}, |
|
'price_factors': {}, |
|
'risk_indicators': [] |
|
} |
|
|
|
|
|
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") |
|
|
|
|
|
price_context = f""" |
|
Property Type: {data.get('property_type', '')} |
|
Location: {data.get('city', '')}, {data.get('state', '')} |
|
Size: {sq_ft} sq.ft. |
|
Price: ₹{price:,.2f} |
|
Price per sq.ft.: ₹{price_per_sqft:,.2f} |
|
Property Status: {data.get('status', '')} |
|
Year Built: {data.get('year_built', '')} |
|
Bedrooms: {data.get('bedrooms', '')} |
|
Bathrooms: {data.get('bathrooms', '')} |
|
Amenities: {data.get('amenities', '')} |
|
""" |
|
|
|
|
|
price_categories = [ |
|
"reasonable market price", |
|
"suspiciously low price", |
|
"suspiciously high price", |
|
"average market price", |
|
"luxury property price", |
|
"budget property price", |
|
"premium property price", |
|
"mid-range property price", |
|
"overpriced for location", |
|
"underpriced for location", |
|
"price matches amenities", |
|
"price matches property age", |
|
"price matches location value", |
|
"price matches property condition", |
|
"price matches market trends" |
|
] |
|
|
|
|
|
price_result = classifier(price_context, price_categories, multi_label=True) |
|
|
|
|
|
top_classifications = [] |
|
for label, score in zip(price_result['labels'][:5], price_result['scores'][:5]): |
|
if score > 0.25: |
|
top_classifications.append({ |
|
'classification': label, |
|
'confidence': float(score) |
|
}) |
|
|
|
|
|
price_range = 'unknown' |
|
if top_classifications: |
|
primary_class = top_classifications[0]['classification'] |
|
if 'luxury' in primary_class: |
|
price_range = 'luxury' |
|
elif 'premium' in primary_class: |
|
price_range = 'premium' |
|
elif 'mid-range' in primary_class: |
|
price_range = 'mid_range' |
|
elif 'budget' in primary_class: |
|
price_range = 'budget' |
|
|
|
|
|
location_assessment = "unknown" |
|
market_trends = {} |
|
if data.get('city') and price_per_sqft: |
|
city_lower = data['city'].lower() |
|
metro_cities = ["mumbai", "delhi", "bangalore", "hyderabad", "chennai", "kolkata", "pune"] |
|
|
|
|
|
if any(city in city_lower for city in metro_cities): |
|
market_trends = { |
|
'city_tier': 'metro', |
|
'avg_price_range': { |
|
'min': 5000, |
|
'max': 30000, |
|
'trend': 'stable' |
|
}, |
|
'price_per_sqft': { |
|
'current': price_per_sqft, |
|
'market_avg': 15000, |
|
'deviation': abs(price_per_sqft - 15000) / 15000 * 100 |
|
} |
|
} |
|
location_assessment = ( |
|
"reasonable" if 5000 <= price_per_sqft <= 30000 else |
|
"suspiciously low" if price_per_sqft < 5000 else |
|
"suspiciously high" |
|
) |
|
else: |
|
market_trends = { |
|
'city_tier': 'non-metro', |
|
'avg_price_range': { |
|
'min': 1500, |
|
'max': 15000, |
|
'trend': 'stable' |
|
}, |
|
'price_per_sqft': { |
|
'current': price_per_sqft, |
|
'market_avg': 7500, |
|
'deviation': abs(price_per_sqft - 7500) / 7500 * 100 |
|
} |
|
} |
|
location_assessment = ( |
|
"reasonable" if 1500 <= price_per_sqft <= 15000 else |
|
"suspiciously low" if price_per_sqft < 1500 else |
|
"suspiciously high" |
|
) |
|
|
|
|
|
price_factors = {} |
|
risk_indicators = [] |
|
|
|
|
|
try: |
|
year_built = int(data.get('year_built', 0)) |
|
current_year = datetime.now().year |
|
property_age = current_year - year_built |
|
|
|
if property_age > 0: |
|
depreciation_factor = max(0.5, 1 - (property_age * 0.01)) |
|
price_factors['age_factor'] = { |
|
'property_age': property_age, |
|
'depreciation_factor': depreciation_factor, |
|
'impact': 'high' if property_age > 30 else 'medium' if property_age > 15 else 'low' |
|
} |
|
except: |
|
price_factors['age_factor'] = {'error': 'Invalid year built'} |
|
|
|
|
|
if sq_ft > 0: |
|
size_factor = { |
|
'size': sq_ft, |
|
'price_per_sqft': price_per_sqft, |
|
'efficiency': 'high' if 800 <= sq_ft <= 2000 else 'medium' if 500 <= sq_ft <= 3000 else 'low' |
|
} |
|
price_factors['size_factor'] = size_factor |
|
|
|
|
|
if sq_ft < 300: |
|
risk_indicators.append('Unusually small property size') |
|
elif sq_ft > 10000: |
|
risk_indicators.append('Unusually large property size') |
|
|
|
|
|
if data.get('amenities'): |
|
amenities_list = [a.strip() for a in data['amenities'].split(',')] |
|
amenities_score = min(1.0, len(amenities_list) * 0.1) |
|
price_factors['amenities_factor'] = { |
|
'count': len(amenities_list), |
|
'score': amenities_score, |
|
'impact': 'high' if amenities_score > 0.7 else 'medium' if amenities_score > 0.4 else 'low' |
|
} |
|
|
|
|
|
confidence_weights = { |
|
'primary_classification': 0.3, |
|
'location_assessment': 0.25, |
|
'age_factor': 0.2, |
|
'size_factor': 0.15, |
|
'amenities_factor': 0.1 |
|
} |
|
|
|
confidence_scores = [] |
|
|
|
|
|
if top_classifications: |
|
confidence_scores.append(price_result['scores'][0] * confidence_weights['primary_classification']) |
|
|
|
|
|
location_confidence = 0.8 if location_assessment == "reasonable" else 0.4 |
|
confidence_scores.append(location_confidence * confidence_weights['location_assessment']) |
|
|
|
|
|
if 'age_factor' in price_factors and 'depreciation_factor' in price_factors['age_factor']: |
|
age_confidence = price_factors['age_factor']['depreciation_factor'] |
|
confidence_scores.append(age_confidence * confidence_weights['age_factor']) |
|
|
|
|
|
if 'size_factor' in price_factors: |
|
size_confidence = 0.8 if price_factors['size_factor']['efficiency'] == 'high' else 0.6 |
|
confidence_scores.append(size_confidence * confidence_weights['size_factor']) |
|
|
|
|
|
if 'amenities_factor' in price_factors: |
|
amenities_confidence = price_factors['amenities_factor']['score'] |
|
confidence_scores.append(amenities_confidence * confidence_weights['amenities_factor']) |
|
|
|
overall_confidence = sum(confidence_scores) / sum(confidence_weights.values()) |
|
|
|
return { |
|
'assessment': top_classifications[0]['classification'] if top_classifications else 'could not classify', |
|
'confidence': float(overall_confidence), |
|
'price': price, |
|
'formatted_price': f"₹{price:,.0f}", |
|
'price_per_sqft': price_per_sqft, |
|
'formatted_price_per_sqft': f"₹{price_per_sqft:,.2f}", |
|
'price_range': price_range, |
|
'location_price_assessment': location_assessment, |
|
'has_price': True, |
|
'market_trends': market_trends, |
|
'price_factors': price_factors, |
|
'risk_indicators': risk_indicators, |
|
'top_classifications': top_classifications |
|
} |
|
except Exception as e: |
|
logger.error(f"Error analyzing price: {str(e)}") |
|
return { |
|
'assessment': 'error', |
|
'confidence': 0.0, |
|
'price': 0, |
|
'formatted_price': '₹0', |
|
'price_per_sqft': 0, |
|
'formatted_price_per_sqft': '₹0', |
|
'price_range': 'unknown', |
|
'location_price_assessment': 'error', |
|
'has_price': False, |
|
'market_trends': {}, |
|
'price_factors': {}, |
|
'risk_indicators': [], |
|
'top_classifications': [] |
|
} |
|
|
|
def analyze_legal_details(legal_text): |
|
try: |
|
if not legal_text or len(legal_text.strip()) < 5: |
|
return { |
|
'assessment': 'insufficient', |
|
'confidence': 0.0, |
|
'summary': 'No legal details provided', |
|
'completeness_score': 0, |
|
'potential_issues': False, |
|
'legal_metrics': {}, |
|
'reasoning': 'No legal details provided for analysis', |
|
'top_classifications': [] |
|
} |
|
|
|
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") |
|
|
|
|
|
categories = [ |
|
"comprehensive legal documentation", |
|
"basic legal documentation", |
|
"missing critical legal details", |
|
"potential legal issues", |
|
"standard property documentation", |
|
"title verification documents", |
|
"encumbrance certificates", |
|
"property tax records", |
|
"building permits", |
|
"land use certificates", |
|
"clear title documentation", |
|
"property registration documents", |
|
"ownership transfer documents", |
|
"legal compliance certificates", |
|
"property dispute records" |
|
] |
|
|
|
|
|
legal_context = f""" |
|
Legal Documentation Analysis: |
|
{legal_text[:1000]} |
|
|
|
Key aspects to verify: |
|
- Title and ownership documentation |
|
- Property registration status |
|
- Tax compliance |
|
- Building permits and approvals |
|
- Land use compliance |
|
- Encumbrance status |
|
- Dispute history |
|
""" |
|
|
|
|
|
legal_result = classifier(legal_context, categories, multi_label=True) |
|
|
|
|
|
top_classifications = [] |
|
for label, score in zip(legal_result['labels'][:3], legal_result['scores'][:3]): |
|
if score > 0.3: |
|
top_classifications.append({ |
|
'classification': label, |
|
'confidence': float(score) |
|
}) |
|
|
|
|
|
summary = summarize_text(legal_text[:1000]) |
|
|
|
|
|
legal_metrics = { |
|
'completeness': sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) |
|
if label in ['comprehensive legal documentation', 'standard property documentation']), |
|
'documentation_quality': sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) |
|
if label in ['title verification documents', 'encumbrance certificates', 'clear title documentation']), |
|
'compliance': sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) |
|
if label in ['building permits', 'land use certificates', 'legal compliance certificates']), |
|
'risk_level': sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) |
|
if label in ['missing critical legal details', 'potential legal issues', 'property dispute records']) |
|
} |
|
|
|
|
|
completeness_score = ( |
|
legal_metrics['completeness'] * 0.4 + |
|
legal_metrics['documentation_quality'] * 0.4 + |
|
legal_metrics['compliance'] * 0.2 |
|
) * 100 |
|
|
|
|
|
potential_issues = legal_metrics['risk_level'] > 0.3 |
|
|
|
|
|
reasoning_parts = [] |
|
|
|
|
|
if top_classifications: |
|
primary_class = top_classifications[0]['classification'] |
|
confidence = top_classifications[0]['confidence'] |
|
reasoning_parts.append(f"Primary assessment: {primary_class} (confidence: {confidence:.0%})") |
|
|
|
|
|
if legal_metrics['completeness'] > 0.7: |
|
reasoning_parts.append("Comprehensive legal documentation present") |
|
elif legal_metrics['completeness'] > 0.4: |
|
reasoning_parts.append("Basic legal documentation present") |
|
else: |
|
reasoning_parts.append("Insufficient legal documentation") |
|
|
|
|
|
if legal_metrics['documentation_quality'] > 0.6: |
|
reasoning_parts.append("Quality documentation verified (title, encumbrance)") |
|
elif legal_metrics['documentation_quality'] > 0.3: |
|
reasoning_parts.append("Basic documentation quality verified") |
|
|
|
|
|
if legal_metrics['compliance'] > 0.6: |
|
reasoning_parts.append("Full compliance documentation present") |
|
elif legal_metrics['compliance'] > 0.3: |
|
reasoning_parts.append("Partial compliance documentation present") |
|
|
|
|
|
if potential_issues: |
|
if legal_metrics['risk_level'] > 0.6: |
|
reasoning_parts.append("High risk: Multiple potential legal issues detected") |
|
else: |
|
reasoning_parts.append("Moderate risk: Some potential legal issues detected") |
|
else: |
|
reasoning_parts.append("No significant legal issues detected") |
|
|
|
|
|
overall_confidence = min(1.0, ( |
|
legal_metrics['completeness'] * 0.4 + |
|
legal_metrics['documentation_quality'] * 0.4 + |
|
(1 - legal_metrics['risk_level']) * 0.2 |
|
)) |
|
|
|
return { |
|
'assessment': top_classifications[0]['classification'] if top_classifications else 'could not assess', |
|
'confidence': float(overall_confidence), |
|
'summary': summary, |
|
'completeness_score': int(completeness_score), |
|
'potential_issues': potential_issues, |
|
'legal_metrics': legal_metrics, |
|
'reasoning': '. '.join(reasoning_parts), |
|
'top_classifications': top_classifications |
|
} |
|
except Exception as e: |
|
logger.error(f"Error analyzing legal details: {str(e)}") |
|
return { |
|
'assessment': 'could not assess', |
|
'confidence': 0.0, |
|
'summary': 'Error analyzing legal details', |
|
'completeness_score': 0, |
|
'potential_issues': False, |
|
'legal_metrics': {}, |
|
'reasoning': 'Technical error occurred during analysis', |
|
'top_classifications': [] |
|
} |
|
|
|
def verify_property_specs(data): |
|
try: |
|
specs_verification = { |
|
'bedrooms_reasonable': True, |
|
'bathrooms_reasonable': True, |
|
'total_rooms_reasonable': True, |
|
'parking_reasonable': True, |
|
'sq_ft_reasonable': True, |
|
'market_value_reasonable': True, |
|
'issues': [], |
|
'verification_score': 0.0 |
|
} |
|
|
|
|
|
try: |
|
bedrooms = int(float(data['bedrooms'])) if data['bedrooms'] else 0 |
|
if bedrooms > 20 or bedrooms < 0: |
|
specs_verification['bedrooms_reasonable'] = False |
|
specs_verification['issues'].append(f"Invalid number of bedrooms: {bedrooms}. Should be between 0 and 20.") |
|
except ValueError: |
|
specs_verification['bedrooms_reasonable'] = False |
|
specs_verification['issues'].append("Invalid bedroom data: must be a number") |
|
|
|
|
|
try: |
|
bathrooms = float(data['bathrooms']) if data['bathrooms'] else 0 |
|
if bathrooms > 15 or bathrooms < 0: |
|
specs_verification['bathrooms_reasonable'] = False |
|
specs_verification['issues'].append(f"Invalid number of bathrooms: {bathrooms}. Should be between 0 and 15.") |
|
except ValueError: |
|
specs_verification['bathrooms_reasonable'] = False |
|
specs_verification['issues'].append("Invalid bathroom data: must be a number") |
|
|
|
|
|
total_rooms = 0 |
|
if data['total_rooms']: |
|
try: |
|
total_rooms = int(float(data['total_rooms'])) |
|
if total_rooms > 0: |
|
min_required_rooms = bedrooms + math.ceil(bathrooms) |
|
if total_rooms < min_required_rooms: |
|
specs_verification['total_rooms_reasonable'] = False |
|
specs_verification['issues'].append( |
|
f"Total rooms ({total_rooms}) must be at least equal to bedrooms ({bedrooms}) + bathrooms ({bathrooms} = {min_required_rooms})" |
|
) |
|
elif total_rooms > 50: |
|
specs_verification['total_rooms_reasonable'] = False |
|
specs_verification['issues'].append(f"Total rooms ({total_rooms}) seems unreasonably high") |
|
except (ValueError, TypeError): |
|
specs_verification['total_rooms_reasonable'] = False |
|
specs_verification['issues'].append("Invalid total rooms data: must be a number") |
|
|
|
|
|
try: |
|
parking = int(float(data['parking'])) if data['parking'] else 0 |
|
if parking > 20 or parking < 0: |
|
specs_verification['parking_reasonable'] = False |
|
specs_verification['issues'].append(f"Invalid parking spaces: {parking}. Should be between 0 and 20.") |
|
except ValueError: |
|
specs_verification['parking_reasonable'] = False |
|
specs_verification['issues'].append("Invalid parking data: must be a number") |
|
|
|
|
|
sq_ft = 0 |
|
if data['sq_ft']: |
|
try: |
|
sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) |
|
if sq_ft > 0: |
|
if sq_ft > 100000: |
|
specs_verification['sq_ft_reasonable'] = False |
|
specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably high") |
|
elif sq_ft < 100: |
|
specs_verification['sq_ft_reasonable'] = False |
|
specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably low") |
|
|
|
|
|
if bedrooms > 0: |
|
sq_ft_per_bedroom = sq_ft / bedrooms |
|
if sq_ft_per_bedroom < 50: |
|
specs_verification['sq_ft_reasonable'] = False |
|
specs_verification['issues'].append(f"Square footage per bedroom ({sq_ft_per_bedroom:.1f}) seems unreasonably low") |
|
except (ValueError, TypeError): |
|
specs_verification['sq_ft_reasonable'] = False |
|
specs_verification['issues'].append("Invalid square footage data: must be a number") |
|
|
|
|
|
try: |
|
market_value = float(re.sub(r'[^\d.]', '', data['market_value'])) if data['market_value'] else 0 |
|
if market_value > 0: |
|
if market_value > 1000000000: |
|
specs_verification['market_value_reasonable'] = False |
|
specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably high") |
|
elif market_value < 100000: |
|
specs_verification['market_value_reasonable'] = False |
|
specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably low") |
|
|
|
|
|
if sq_ft > 0: |
|
price_per_sqft = market_value / sq_ft |
|
if price_per_sqft < 100: |
|
specs_verification['market_value_reasonable'] = False |
|
specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably low") |
|
elif price_per_sqft > 100000: |
|
specs_verification['market_value_reasonable'] = False |
|
specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably high") |
|
except ValueError: |
|
specs_verification['market_value_reasonable'] = False |
|
specs_verification['issues'].append("Invalid market value data: must be a number") |
|
|
|
|
|
valid_checks = sum([ |
|
specs_verification[f] for f in [ |
|
'bedrooms_reasonable', 'bathrooms_reasonable', |
|
'total_rooms_reasonable', 'parking_reasonable', |
|
'sq_ft_reasonable', 'market_value_reasonable' |
|
] |
|
]) |
|
total_checks = 6 |
|
specs_verification['verification_score'] = (valid_checks / total_checks) * 100 |
|
|
|
return specs_verification |
|
except Exception as e: |
|
logger.error(f"Error verifying specs: {str(e)}") |
|
return { |
|
'bedrooms_reasonable': False, |
|
'bathrooms_reasonable': False, |
|
'total_rooms_reasonable': False, |
|
'parking_reasonable': False, |
|
'sq_ft_reasonable': False, |
|
'market_value_reasonable': False, |
|
'issues': [f"Error during verification: {str(e)}"], |
|
'verification_score': 0.0 |
|
} |
|
|
|
def analyze_market_value(data): |
|
try: |
|
|
|
price = float(data['market_value'].replace('$', '').replace(',', '').strip()) if data.get('market_value') else 0 |
|
sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data.get('sq_ft') else 0 |
|
year_built = int(data.get('year_built', 0)) if data.get('year_built') else 0 |
|
current_year = datetime.now().year |
|
property_age = current_year - year_built if year_built else 0 |
|
|
|
|
|
market_value_components = { |
|
'base_value': 0, |
|
'location_multiplier': 1.0, |
|
'age_factor': 1.0, |
|
'size_factor': 1.0, |
|
'amenities_factor': 1.0, |
|
'market_trend_factor': 1.0, |
|
'condition_factor': 1.0 |
|
} |
|
|
|
|
|
city_lower = data.get('city', '').lower() |
|
metro_cities = ["mumbai", "delhi", "bangalore", "hyderabad", "chennai", "kolkata", "pune"] |
|
|
|
if any(city in city_lower for city in metro_cities): |
|
base_price_per_sqft = 15000 |
|
market_value_components['location_multiplier'] = 1.5 |
|
else: |
|
base_price_per_sqft = 7500 |
|
market_value_components['location_multiplier'] = 1.0 |
|
|
|
|
|
if sq_ft > 0: |
|
market_value_components['base_value'] = base_price_per_sqft * sq_ft |
|
|
|
|
|
if property_age > 0: |
|
depreciation_rate = 0.01 |
|
max_depreciation = 0.5 |
|
age_factor = max(1 - max_depreciation, 1 - (property_age * depreciation_rate)) |
|
market_value_components['age_factor'] = age_factor |
|
|
|
|
|
if sq_ft > 0: |
|
if 800 <= sq_ft <= 2000: |
|
market_value_components['size_factor'] = 1.2 |
|
elif 500 <= sq_ft <= 3000: |
|
market_value_components['size_factor'] = 1.0 |
|
else: |
|
market_value_components['size_factor'] = 0.8 |
|
|
|
|
|
if data.get('amenities'): |
|
amenities_list = [a.strip() for a in data['amenities'].split(',')] |
|
amenities_count = len(amenities_list) |
|
amenities_factor = min(1.5, 1 + (amenities_count * 0.1)) |
|
market_value_components['amenities_factor'] = amenities_factor |
|
|
|
|
|
property_type = data.get('property_type', '').lower() |
|
if 'apartment' in property_type or 'flat' in property_type: |
|
market_value_components['market_trend_factor'] = 1.1 |
|
elif 'house' in property_type or 'villa' in property_type: |
|
market_value_components['market_trend_factor'] = 1.15 |
|
elif 'plot' in property_type or 'land' in property_type: |
|
market_value_components['market_trend_factor'] = 1.2 |
|
|
|
|
|
if property_age <= 5: |
|
market_value_components['condition_factor'] = 1.2 |
|
elif property_age <= 15: |
|
market_value_components['condition_factor'] = 1.1 |
|
elif property_age <= 30: |
|
market_value_components['condition_factor'] = 1.0 |
|
else: |
|
market_value_components['condition_factor'] = 0.9 |
|
|
|
|
|
market_value = market_value_components['base_value'] |
|
for factor, value in market_value_components.items(): |
|
if factor != 'base_value': |
|
market_value *= value |
|
|
|
|
|
estimated_price_per_sqft = market_value / sq_ft if sq_ft > 0 else 0 |
|
|
|
|
|
value_metrics = { |
|
'price_to_value_ratio': price / market_value if market_value > 0 else 0, |
|
'price_per_sqft_ratio': price / sq_ft if sq_ft > 0 else 0, |
|
'estimated_price_per_sqft': estimated_price_per_sqft, |
|
'value_appreciation': (market_value - price) / price * 100 if price > 0 else 0 |
|
} |
|
|
|
|
|
market_insights = [] |
|
|
|
|
|
if value_metrics['price_to_value_ratio'] > 1.2: |
|
market_insights.append("Property is overpriced compared to market value") |
|
elif value_metrics['price_to_value_ratio'] < 0.8: |
|
market_insights.append("Property is underpriced compared to market value") |
|
|
|
|
|
if sq_ft < 300: |
|
market_insights.append("Property size is unusually small for the market") |
|
elif sq_ft > 10000: |
|
market_insights.append("Property size is unusually large for the market") |
|
|
|
|
|
if property_age > 30: |
|
market_insights.append("Property age significantly impacts market value") |
|
|
|
|
|
if market_value_components['location_multiplier'] > 1.0: |
|
market_insights.append("Property is in a premium location") |
|
|
|
|
|
if market_value_components['market_trend_factor'] > 1.1: |
|
market_insights.append("Property type is trending upward in the market") |
|
|
|
return { |
|
'estimated_market_value': market_value, |
|
'formatted_market_value': f"₹{market_value:,.0f}", |
|
'price_per_sqft': estimated_price_per_sqft, |
|
'formatted_price_per_sqft': f"₹{estimated_price_per_sqft:,.2f}", |
|
'value_components': market_value_components, |
|
'value_metrics': value_metrics, |
|
'market_insights': market_insights, |
|
'confidence_score': min(0.95, 0.7 + (len(market_insights) * 0.05)) |
|
} |
|
except Exception as e: |
|
logger.error(f"Error analyzing market value: {str(e)}") |
|
return { |
|
'estimated_market_value': 0, |
|
'formatted_market_value': '₹0', |
|
'price_per_sqft': 0, |
|
'formatted_price_per_sqft': '₹0', |
|
'value_components': {}, |
|
'value_metrics': {}, |
|
'market_insights': [], |
|
'confidence_score': 0.0 |
|
} |
|
|
|
def assess_image_quality(img): |
|
try: |
|
width, height = img.size |
|
resolution = width * height |
|
quality_score = min(100, resolution // 20000) |
|
return { |
|
'resolution': f"{width}x{height}", |
|
'quality_score': quality_score |
|
} |
|
except Exception as e: |
|
logger.error(f"Error assessing image quality: {str(e)}") |
|
return { |
|
'resolution': 'unknown', |
|
'quality_score': 0 |
|
} |
|
|
|
def check_if_property_related(text): |
|
try: |
|
classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") |
|
result = classifier(text[:1000], ["property-related", "non-property-related"]) |
|
is_related = result['labels'][0] == "property-related" |
|
return { |
|
'is_related': is_related, |
|
'confidence': float(result['scores'][0]) |
|
} |
|
except Exception as e: |
|
logger.error(f"Error checking property relation: {str(e)}") |
|
return { |
|
'is_related': False, |
|
'confidence': 0.0 |
|
} |
|
|
|
|
|
@lru_cache(maxsize=3) |
|
def load_model(task, model_name): |
|
try: |
|
logger.info(f"Loading model: {model_name} for task: {task}") |
|
|
|
|
|
if task == "zero-shot-classification": |
|
|
|
model_name = "facebook/bart-large-mnli" |
|
return pipeline(task, model=model_name, device=-1) |
|
elif task == "summarization": |
|
|
|
model_name = "facebook/bart-large-cnn" |
|
return pipeline(task, model=model_name, device=-1) |
|
elif task == "text-classification": |
|
|
|
model_name = "distilbert-base-uncased" |
|
return pipeline(task, model=model_name, device=-1) |
|
elif task == "feature-extraction": |
|
|
|
model_name = "sentence-transformers/all-MiniLM-L6-v2" |
|
return pipeline(task, model=model_name, device=-1) |
|
else: |
|
|
|
model_name = "distilbert-base-uncased" |
|
return pipeline(task, model=model_name, device=-1) |
|
except Exception as e: |
|
logger.error(f"Error loading model {model_name}: {str(e)}") |
|
raise |
|
|
|
|
|
def clear_model_cache(): |
|
"""Clear model cache and free up memory""" |
|
load_model.cache_clear() |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
logger.info("Model cache cleared and memory freed") |
|
|
|
if __name__ == '__main__': |
|
|
|
http_tunnel = ngrok.connect(5000) |
|
print(f' * Public URL: {http_tunnel.public_url}') |
|
|
|
|
|
def run_flask(): |
|
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) |
|
|
|
flask_thread = threading.Thread(target=run_flask) |
|
flask_thread.daemon = True |
|
flask_thread.start() |
|
|
|
try: |
|
|
|
while True: |
|
time.sleep(1) |
|
except KeyboardInterrupt: |
|
print(" * Shutting down server...") |
|
ngrok.disconnect(http_tunnel.public_url) |
|
ngrok.kill() |