from flask import Flask, render_template, request, jsonify from flask_cors import CORS import torch from transformers import pipeline, CLIPProcessor, CLIPModel import base64 import io import re import json import numpy as np from PIL import Image import fitz # PyMuPDF import os from datetime import datetime import uuid import requests from geopy.geocoders import Nominatim from sentence_transformers import SentenceTransformer, util import spacy import pytesseract from langdetect import detect from deep_translator import GoogleTranslator import logging from functools import lru_cache import time import math from pyngrok import ngrok import threading import asyncio import concurrent.futures app = Flask(__name__) CORS(app) # Enable CORS for frontend # Configure logging def setup_logging(): log_dir = os.environ.get('LOG_DIR', '/app/logs') try: os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, 'app.log') # Ensure log file exists and is writable if not os.path.exists(log_file): open(log_file, 'a').close() os.chmod(log_file, 0o666) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) return logging.getLogger(__name__) except Exception as e: # Fallback to console-only logging if file logging fails logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) return logging.getLogger(__name__) logger = setup_logging() # Initialize geocoder geocoder = Nominatim(user_agent="indian_property_verifier", timeout=10) # Cache models @lru_cache(maxsize=10) def load_model(task, model_name): try: logger.info(f"Loading model: {model_name} for task: {task}") return pipeline(task, model=model_name, device=-1) except Exception as e: logger.error(f"Error loading model {model_name}: {str(e)}") raise # Initialize CLIP model try: clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") has_clip_model = True logger.info("CLIP model loaded successfully") except Exception as e: logger.error(f"Error loading CLIP model: {str(e)}") has_clip_model = False # Initialize sentence transformer try: sentence_model = SentenceTransformer('paraphrase-MiniLM-L6-v2') logger.info("Sentence transformer loaded successfully") except Exception as e: logger.error(f"Error loading sentence transformer: {str(e)}") sentence_model = None # Initialize spaCy try: nlp = spacy.load('en_core_web_md') logger.info("spaCy model loaded successfully") except Exception as e: logger.error(f"Error loading spaCy model: {str(e)}") nlp = None def make_json_serializable(obj): try: if isinstance(obj, (bool, int, float, str, type(None))): return obj elif isinstance(obj, (list, tuple)): return [make_json_serializable(item) for item in obj] elif isinstance(obj, dict): return {str(key): make_json_serializable(value) for key, value in obj.items()} elif torch.is_tensor(obj): return obj.item() if obj.numel() == 1 else obj.tolist() elif np.isscalar(obj): return obj.item() if hasattr(obj, 'item') else float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() else: return str(obj) except Exception as e: logger.error(f"Error serializing object: {str(e)}") return str(obj) @app.route('/') def index(): return render_template('index.html') @app.route('/get-location', methods=['POST']) def get_location(): try: data = request.json or {} latitude = data.get('latitude') longitude = data.get('longitude') if not latitude or not longitude: logger.warning("Missing latitude or longitude") return jsonify({ 'status': 'error', 'message': 'Latitude and longitude are required' }), 400 # Validate coordinates are within India try: lat, lng = float(latitude), float(longitude) if not (6.5 <= lat <= 37.5 and 68.0 <= lng <= 97.5): return jsonify({ 'status': 'error', 'message': 'Coordinates are outside India' }), 400 except ValueError: return jsonify({ 'status': 'error', 'message': 'Invalid coordinates format' }), 400 # Retry geocoding up to 3 times for attempt in range(3): try: location = geocoder.reverse((latitude, longitude), exactly_one=True) if location: address_components = location.raw.get('address', {}) # Extract Indian-specific address components city = address_components.get('city', '') if not city: city = address_components.get('town', '') if not city: city = address_components.get('village', '') if not city: city = address_components.get('suburb', '') state = address_components.get('state', '') if not state: state = address_components.get('state_district', '') # Get postal code and validate Indian format postal_code = address_components.get('postcode', '') if postal_code and not re.match(r'^\d{6}$', postal_code): postal_code = '' # Get road/street name road = address_components.get('road', '') if not road: road = address_components.get('street', '') # Get area/locality area = address_components.get('suburb', '') if not area: area = address_components.get('neighbourhood', '') return jsonify({ 'status': 'success', 'address': location.address, 'street': road, 'area': area, 'city': city, 'state': state, 'country': 'India', 'postal_code': postal_code, 'latitude': latitude, 'longitude': longitude, 'formatted_address': f"{road}, {area}, {city}, {state}, India - {postal_code}" }) logger.warning(f"Geocoding failed on attempt {attempt + 1}") time.sleep(1) # Wait before retry except Exception as e: logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}") time.sleep(1) return jsonify({ 'status': 'error', 'message': 'Could not determine location after retries' }), 500 except Exception as e: logger.error(f"Error in get_location: {str(e)}") return jsonify({ 'status': 'error', 'message': str(e) }), 500 def calculate_final_verdict(results): """ Calculate a comprehensive final verdict based on all analysis results. This function combines all verification scores, fraud indicators, and quality assessments to determine if a property listing is legitimate, suspicious, or fraudulent. """ try: # Initialize verdict components verdict = { 'status': 'unknown', 'confidence': 0.0, 'score': 0.0, 'reasons': [], 'critical_issues': [], 'warnings': [], 'recommendations': [] } # Extract key components from results trust_score = results.get('trust_score', {}).get('score', 0) fraud_classification = results.get('fraud_classification', {}) quality_assessment = results.get('quality_assessment', {}) specs_verification = results.get('specs_verification', {}) cross_validation = results.get('cross_validation', []) location_analysis = results.get('location_analysis', {}) price_analysis = results.get('price_analysis', {}) legal_analysis = results.get('legal_analysis', {}) document_analysis = results.get('document_analysis', {}) image_analysis = results.get('image_analysis', {}) # Calculate component scores (0-100) component_scores = { 'trust': trust_score, 'fraud': 100 - (fraud_classification.get('alert_score', 0) * 100), 'quality': quality_assessment.get('score', 0), 'specs': specs_verification.get('verification_score', 0), 'location': location_analysis.get('completeness_score', 0), 'price': price_analysis.get('confidence', 0) * 100 if price_analysis.get('has_price') else 0, 'legal': legal_analysis.get('completeness_score', 0), 'documents': min(100, (document_analysis.get('pdf_count', 0) / 3) * 100) if document_analysis.get('pdf_count') else 0, 'images': min(100, (image_analysis.get('image_count', 0) / 5) * 100) if image_analysis.get('image_count') else 0 } # Calculate weighted final score with adjusted weights weights = { 'trust': 0.20, 'fraud': 0.25, # Increased weight for fraud detection 'quality': 0.15, 'specs': 0.10, 'location': 0.10, 'price': 0.05, 'legal': 0.05, 'documents': 0.05, 'images': 0.05 } final_score = sum(score * weights.get(component, 0) for component, score in component_scores.items()) verdict['score'] = final_score # Determine verdict status based on multiple factors fraud_level = fraud_classification.get('alert_level', 'minimal') high_risk_indicators = len(fraud_classification.get('high_risk', [])) critical_issues = [] warnings = [] # Check for critical issues if fraud_level in ['critical', 'high']: critical_issues.append(f"High fraud risk detected: {fraud_level} alert level") if trust_score < 40: critical_issues.append(f"Very low trust score: {trust_score}%") if quality_assessment.get('score', 0) < 30: critical_issues.append(f"Very low content quality: {quality_assessment.get('score', 0)}%") if specs_verification.get('verification_score', 0) < 40: critical_issues.append(f"Property specifications verification failed: {specs_verification.get('verification_score', 0)}%") # Check for warnings if fraud_level == 'medium': warnings.append(f"Medium fraud risk detected: {fraud_level} alert level") if trust_score < 60: warnings.append(f"Low trust score: {trust_score}%") if quality_assessment.get('score', 0) < 60: warnings.append(f"Low content quality: {quality_assessment.get('score', 0)}%") if specs_verification.get('verification_score', 0) < 70: warnings.append(f"Property specifications have issues: {specs_verification.get('verification_score', 0)}%") # Check cross-validation results for check in cross_validation: if check.get('status') in ['inconsistent', 'invalid', 'suspicious', 'no_match']: warnings.append(f"Cross-validation issue: {check.get('message', 'Unknown issue')}") # Check for missing critical information missing_critical = [] if not location_analysis.get('completeness_score', 0) > 70: missing_critical.append("Location information is incomplete") if not price_analysis.get('has_price', False): missing_critical.append("Price information is missing") if not legal_analysis.get('completeness_score', 0) > 70: missing_critical.append("Legal information is incomplete") if document_analysis.get('pdf_count', 0) == 0: missing_critical.append("No supporting documents provided") if image_analysis.get('image_count', 0) == 0: missing_critical.append("No property images provided") if missing_critical: warnings.append(f"Missing critical information: {', '.join(missing_critical)}") # Enhanced verdict determination with more strict criteria if critical_issues or (fraud_level in ['critical', 'high'] and trust_score < 50) or high_risk_indicators > 0: verdict['status'] = 'fraudulent' verdict['confidence'] = min(100, max(70, 100 - (trust_score * 0.5))) elif warnings or (fraud_level == 'medium' and trust_score < 70) or specs_verification.get('verification_score', 0) < 60: verdict['status'] = 'suspicious' verdict['confidence'] = min(100, max(50, trust_score * 0.8)) else: verdict['status'] = 'legitimate' verdict['confidence'] = min(100, max(70, trust_score * 0.9)) # Add reasons to verdict verdict['critical_issues'] = critical_issues verdict['warnings'] = warnings # Add recommendations based on issues if critical_issues: verdict['recommendations'].append("Do not proceed with this property listing") verdict['recommendations'].append("Report this listing to the platform") elif warnings: verdict['recommendations'].append("Proceed with extreme caution") verdict['recommendations'].append("Request additional verification documents") verdict['recommendations'].append("Verify all information with independent sources") else: verdict['recommendations'].append("Proceed with standard due diligence") verdict['recommendations'].append("Verify final details before transaction") # Add specific recommendations based on missing information for missing in missing_critical: verdict['recommendations'].append(f"Request {missing.lower()}") return verdict except Exception as e: logger.error(f"Error calculating final verdict: {str(e)}") return { 'status': 'error', 'confidence': 0.0, 'score': 0.0, 'reasons': [f"Error calculating verdict: {str(e)}"], 'critical_issues': [], 'warnings': [], 'recommendations': ["Unable to determine property status due to an error"] } @app.route('/verify', methods=['POST']) def verify_property(): try: if not request.form and not request.files: logger.warning("No form data or files provided") return jsonify({ 'error': 'No data provided', 'status': 'error' }), 400 # Extract form data data = { 'property_name': request.form.get('property_name', '').strip(), 'property_type': request.form.get('property_type', '').strip(), 'status': request.form.get('status', '').strip(), 'description': request.form.get('description', '').strip(), 'address': request.form.get('address', '').strip(), 'city': request.form.get('city', '').strip(), 'state': request.form.get('state', '').strip(), 'country': request.form.get('country', 'India').strip(), 'zip': request.form.get('zip', '').strip(), 'latitude': request.form.get('latitude', '').strip(), 'longitude': request.form.get('longitude', '').strip(), 'bedrooms': request.form.get('bedrooms', '').strip(), 'bathrooms': request.form.get('bathrooms', '').strip(), 'total_rooms': request.form.get('total_rooms', '').strip(), 'year_built': request.form.get('year_built', '').strip(), 'parking': request.form.get('parking', '').strip(), 'sq_ft': request.form.get('sq_ft', '').strip(), 'market_value': request.form.get('market_value', '').strip(), 'amenities': request.form.get('amenities', '').strip(), 'nearby_landmarks': request.form.get('nearby_landmarks', '').strip(), 'legal_details': request.form.get('legal_details', '').strip() } # Validate required fields required_fields = ['property_name', 'property_type', 'address', 'city', 'state'] missing_fields = [field for field in required_fields if not data[field]] if missing_fields: logger.warning(f"Missing required fields: {', '.join(missing_fields)}") return jsonify({ 'error': f"Missing required fields: {', '.join(missing_fields)}", 'status': 'error' }), 400 # Process images images = [] image_analysis = [] if 'images' in request.files: # Get unique image files by filename to prevent duplicates image_files = {} for img_file in request.files.getlist('images'): if img_file.filename and img_file.filename.lower().endswith(('.jpg', '.jpeg', '.png')): image_files[img_file.filename] = img_file # Process unique images for img_file in image_files.values(): try: img = Image.open(img_file) buffered = io.BytesIO() img.save(buffered, format="JPEG") img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') images.append(img_str) image_analysis.append(analyze_image(img)) except Exception as e: logger.error(f"Error processing image {img_file.filename}: {str(e)}") image_analysis.append({'error': str(e), 'is_property_related': False}) # Process PDFs pdf_texts = [] pdf_analysis = [] if 'documents' in request.files: # Get unique PDF files by filename to prevent duplicates pdf_files = {} for pdf_file in request.files.getlist('documents'): if pdf_file.filename and pdf_file.filename.lower().endswith('.pdf'): pdf_files[pdf_file.filename] = pdf_file # Process unique PDFs for pdf_file in pdf_files.values(): try: pdf_text = extract_pdf_text(pdf_file) pdf_texts.append({ 'filename': pdf_file.filename, 'text': pdf_text }) pdf_analysis.append(analyze_pdf_content(pdf_text, data)) except Exception as e: logger.error(f"Error processing PDF {pdf_file.filename}: {str(e)}") pdf_analysis.append({'error': str(e)}) # Create consolidated text for analysis consolidated_text = f""" Property Name: {data['property_name']} Property Type: {data['property_type']} Status: {data['status']} Description: {data['description']} Location: {data['address']}, {data['city']}, {data['state']}, {data['country']}, {data['zip']} Coordinates: Lat {data['latitude']}, Long {data['longitude']} Specifications: {data['bedrooms']} bedrooms, {data['bathrooms']} bathrooms, {data['total_rooms']} total rooms Year Built: {data['year_built']} Parking: {data['parking']} Size: {data['sq_ft']} sq. ft. Market Value: ₹{data['market_value']} Amenities: {data['amenities']} Nearby Landmarks: {data['nearby_landmarks']} Legal Details: {data['legal_details']} """ # Process description translation if needed try: description = data['description'] if description and len(description) > 10: text_language = detect(description) if text_language != 'en': translated_description = GoogleTranslator(source=text_language, target='en').translate(description) data['description_translated'] = translated_description else: data['description_translated'] = description else: data['description_translated'] = description except Exception as e: logger.error(f"Error in language detection/translation: {str(e)}") data['description_translated'] = data['description'] # Run all analyses in parallel using asyncio async def run_analyses(): with concurrent.futures.ThreadPoolExecutor() as executor: loop = asyncio.get_event_loop() tasks = [ loop.run_in_executor(executor, generate_property_summary, data), loop.run_in_executor(executor, classify_fraud, consolidated_text, data), loop.run_in_executor(executor, generate_trust_score, consolidated_text, image_analysis, pdf_analysis), loop.run_in_executor(executor, generate_suggestions, consolidated_text, data), loop.run_in_executor(executor, assess_text_quality, data['description_translated']), loop.run_in_executor(executor, verify_address, data), loop.run_in_executor(executor, perform_cross_validation, data), loop.run_in_executor(executor, analyze_location, data), loop.run_in_executor(executor, analyze_price, data), loop.run_in_executor(executor, analyze_legal_details, data['legal_details']), loop.run_in_executor(executor, verify_property_specs, data), loop.run_in_executor(executor, analyze_market_value, data) ] results = await asyncio.gather(*tasks) return results # Run analyses and get results loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) analysis_results = loop.run_until_complete(run_analyses()) loop.close() # Unpack results summary, fraud_classification, (trust_score, trust_reasoning), suggestions, quality_assessment, \ address_verification, cross_validation, location_analysis, price_analysis, legal_analysis, \ specs_verification, market_analysis = analysis_results # Prepare response document_analysis = { 'pdf_count': len(pdf_texts), 'pdf_texts': pdf_texts, 'pdf_analysis': pdf_analysis } image_results = { 'image_count': len(images), 'image_analysis': image_analysis } report_id = str(uuid.uuid4()) # Create results dictionary results = { 'report_id': report_id, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'summary': summary, 'fraud_classification': fraud_classification, 'trust_score': { 'score': trust_score, 'reasoning': trust_reasoning }, 'suggestions': suggestions, 'quality_assessment': quality_assessment, 'address_verification': address_verification, 'cross_validation': cross_validation, 'location_analysis': location_analysis, 'price_analysis': price_analysis, 'legal_analysis': legal_analysis, 'document_analysis': document_analysis, 'image_analysis': image_results, 'specs_verification': specs_verification, 'market_analysis': market_analysis, 'images': images } # Calculate final verdict final_verdict = calculate_final_verdict(results) results['final_verdict'] = final_verdict return jsonify(make_json_serializable(results)) except Exception as e: logger.error(f"Error in verify_property: {str(e)}") return jsonify({ 'error': 'Server error occurred. Please try again later.', 'status': 'error', 'details': str(e) }), 500 def extract_pdf_text(pdf_file): try: pdf_document = fitz.Document(stream=pdf_file.read(), filetype="pdf") text = "" for page in pdf_document: text += page.get_text() pdf_document.close() return text except Exception as e: logger.error(f"Error extracting PDF text: {str(e)}") return "" def analyze_image(image): try: if has_clip_model: img_rgb = image.convert('RGB') inputs = clip_processor( text=[ "real estate property interior", "real estate property exterior", "non-property-related image", "office space", "landscape" ], images=img_rgb, return_tensors="pt", padding=True ) outputs = clip_model(**inputs) logits_per_image = outputs.logits_per_image probs = logits_per_image.softmax(dim=1).detach().numpy()[0] property_related_score = probs[0] + probs[1] is_property_related = property_related_score > 0.5 quality = assess_image_quality(image) is_ai_generated = detect_ai_generated_image(image) return { 'is_property_related': is_property_related, 'property_confidence': float(property_related_score), 'top_predictions': [ {'label': 'property interior', 'confidence': float(probs[0])}, {'label': 'property exterior', 'confidence': float(probs[1])}, {'label': 'non-property', 'confidence': float(probs[2])} ], 'image_quality': quality, 'is_ai_generated': is_ai_generated, 'authenticity_score': 0.95 if not is_ai_generated else 0.60 } else: logger.warning("CLIP model unavailable") return { 'is_property_related': False, 'property_confidence': 0.0, 'top_predictions': [], 'image_quality': assess_image_quality(image), 'is_ai_generated': False, 'authenticity_score': 0.5 } except Exception as e: logger.error(f"Error analyzing image: {str(e)}") return { 'is_property_related': False, 'property_confidence': 0.0, 'top_predictions': [], 'image_quality': {'resolution': 'unknown', 'quality_score': 0}, 'is_ai_generated': False, 'authenticity_score': 0.0, 'error': str(e) } def detect_ai_generated_image(image): try: img_array = np.array(image) if len(img_array.shape) == 3: gray = np.mean(img_array, axis=2) else: gray = img_array noise = gray - np.mean(gray) noise_std = np.std(noise) width, height = image.size perfect_dimensions = (width % 64 == 0 and height % 64 == 0) has_exif = hasattr(image, '_getexif') and image._getexif() is not None return noise_std < 0.05 or perfect_dimensions or not has_exif except Exception as e: logger.error(f"Error detecting AI-generated image: {str(e)}") return False def analyze_pdf_content(document_text, property_data): try: if not document_text: return { 'document_type': {'classification': 'unknown', 'confidence': 0.0}, 'authenticity': {'assessment': 'could not verify', 'confidence': 0.0}, 'key_info': {}, 'consistency_score': 0.0, 'is_property_related': False, 'summary': 'Empty document', 'has_signatures': False, 'has_dates': False, 'verification_score': 0.0 } # Use a more sophisticated model for document classification classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") # Enhanced document types with more specific categories doc_types = [ "property deed", "sales agreement", "mortgage document", "property tax record", "title document", "khata certificate", "encumbrance certificate", "lease agreement", "rental agreement", "property registration document", "building permit", "other document" ] # Analyze document type with context doc_context = f"{document_text[:1000]} property_type:{property_data.get('property_type', '')} location:{property_data.get('city', '')}" doc_result = classifier(doc_context, doc_types) doc_type = doc_result['labels'][0] doc_confidence = doc_result['scores'][0] # Enhanced authenticity check with multiple aspects authenticity_aspects = [ "authentic legal document", "questionable document", "forged document", "template document", "official document" ] authenticity_result = classifier(document_text[:1000], authenticity_aspects) authenticity = "likely authentic" if authenticity_result['labels'][0] == "authentic legal document" else "questionable" authenticity_confidence = authenticity_result['scores'][0] # Extract key information using NLP key_info = extract_document_key_info(document_text) # Enhanced consistency check consistency_score = check_document_consistency(document_text, property_data) # Property relation check with context property_context = f"{document_text[:1000]} property:{property_data.get('property_name', '')} type:{property_data.get('property_type', '')}" is_property_related = check_if_property_related(property_context)['is_related'] # Generate summary using BART summary = summarize_text(document_text[:2000]) # Enhanced signature and date detection has_signatures = bool(re.search(r'(?:sign|signature|signed|witness|notary|authorized).{0,50}(?:by|of|for)', document_text.lower())) has_dates = bool(re.search(r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|\d{4}[/-]\d{1,2}[/-]\d{1,2}', document_text)) # Calculate verification score with weighted components verification_weights = { 'doc_type': 0.3, 'authenticity': 0.3, 'consistency': 0.2, 'property_relation': 0.1, 'signatures_dates': 0.1 } verification_score = ( doc_confidence * verification_weights['doc_type'] + authenticity_confidence * verification_weights['authenticity'] + consistency_score * verification_weights['consistency'] + float(is_property_related) * verification_weights['property_relation'] + float(has_signatures and has_dates) * verification_weights['signatures_dates'] ) return { 'document_type': {'classification': doc_type, 'confidence': float(doc_confidence)}, 'authenticity': {'assessment': authenticity, 'confidence': float(authenticity_confidence)}, 'key_info': key_info, 'consistency_score': float(consistency_score), 'is_property_related': is_property_related, 'summary': summary, 'has_signatures': has_signatures, 'has_dates': has_dates, 'verification_score': float(verification_score) } except Exception as e: logger.error(f"Error analyzing PDF content: {str(e)}") return { 'document_type': {'classification': 'unknown', 'confidence': 0.0}, 'authenticity': {'assessment': 'could not verify', 'confidence': 0.0}, 'key_info': {}, 'consistency_score': 0.0, 'is_property_related': False, 'summary': 'Could not analyze document', 'has_signatures': False, 'has_dates': False, 'verification_score': 0.0, 'error': str(e) } def check_document_consistency(document_text, property_data): try: if not sentence_model: logger.warning("Sentence model unavailable") return 0.5 property_text = ' '.join([ property_data.get(key, '') for key in [ 'property_name', 'property_type', 'address', 'city', 'state', 'market_value', 'sq_ft', 'bedrooms' ] ]) property_embedding = sentence_model.encode(property_text) document_embedding = sentence_model.encode(document_text[:1000]) similarity = util.cos_sim(property_embedding, document_embedding)[0][0].item() return max(0.0, min(1.0, float(similarity))) except Exception as e: logger.error(f"Error checking document consistency: {str(e)}") return 0.0 def extract_document_key_info(text): try: info = {} patterns = { 'property_address': r'(?:property|premises|located at)[:\s]+([^\n.]+)', 'price': r'(?:price|value|amount)[:\s]+(?:Rs\.?|₹)?[\s]*([0-9,.]+)', 'date': r'(?:date|dated|executed on)[:\s]+([^\n.]+\d{4})', 'seller': r'(?:seller|grantor|owner)[:\s]+([^\n.]+)', 'buyer': r'(?:buyer|grantee|purchaser)[:\s]+([^\n.]+)', 'size': r'(?:area|size|extent)[:\s]+([0-9,.]+)[\s]*(?:sq\.?[\s]*(?:ft|feet))', 'registration_number': r'(?:registration|reg\.?|document)[\s]*(?:no\.?|number|#)[:\s]*([A-Za-z0-9\-/]+)' } for key, pattern in patterns.items(): match = re.search(pattern, text, re.IGNORECASE) if match: info[key] = match.group(1).strip() return info except Exception as e: logger.error(f"Error extracting document key info: {str(e)}") return {} def generate_property_summary(data): try: # Create a detailed context for summary generation property_context = f""" Property Name: {data.get('property_name', '')} Type: {data.get('property_type', '')} Status: {data.get('status', '')} Location: {data.get('address', '')}, {data.get('city', '')}, {data.get('state', '')}, {data.get('country', '')} Size: {data.get('sq_ft', '')} sq. ft. Price: ₹{data.get('market_value', '0')} Bedrooms: {data.get('bedrooms', '')} Bathrooms: {data.get('bathrooms', '')} Year Built: {data.get('year_built', '')} Description: {data.get('description', '')} """ # Use BART for summary generation summarizer = load_model("summarization", "facebook/bart-large-cnn") # Generate initial summary summary_result = summarizer(property_context, max_length=150, min_length=50, do_sample=False) initial_summary = summary_result[0]['summary_text'] # Enhance summary with key features key_features = [] # Add property type and status if data.get('property_type') and data.get('status'): key_features.append(f"{data['property_type']} is {data['status'].lower()}") # Add location if available location_parts = [] if data.get('city'): location_parts.append(data['city']) if data.get('state'): location_parts.append(data['state']) if location_parts: key_features.append(f"Located in {', '.join(location_parts)}") # Add size and price if available if data.get('sq_ft'): key_features.append(f"Spans {data['sq_ft']} sq. ft.") if data.get('market_value'): key_features.append(f"Valued at ₹{data['market_value']}") # Add rooms information rooms_info = [] if data.get('bedrooms'): rooms_info.append(f"{data['bedrooms']} bedroom{'s' if data['bedrooms'] != '1' else ''}") if data.get('bathrooms'): rooms_info.append(f"{data['bathrooms']} bathroom{'s' if data['bathrooms'] != '1' else ''}") if rooms_info: key_features.append(f"Features {' and '.join(rooms_info)}") # Add amenities if available if data.get('amenities'): key_features.append(f"Amenities: {data['amenities']}") # Combine initial summary with key features enhanced_summary = initial_summary if key_features: enhanced_summary += " " + ". ".join(key_features) + "." # Clean up the summary enhanced_summary = enhanced_summary.replace(" ", " ").strip() return enhanced_summary except Exception as e: logger.error(f"Error generating property summary: {str(e)}") return "Could not generate summary." def summarize_text(text): try: if not text or len(text.strip()) < 10: return "No text to summarize." summarizer = load_model("summarization", "facebook/bart-large-cnn") input_length = len(text.split()) max_length = max(50, min(150, input_length // 2)) min_length = max(20, input_length // 4) summary = summarizer(text[:2000], max_length=max_length, min_length=min_length, do_sample=False) return summary[0]['summary_text'] except Exception as e: logger.error(f"Error summarizing text: {str(e)}") return text[:200] + "..." if len(text) > 200 else text def classify_fraud(property_details, description): """ Classify the risk of fraud in a property listing using zero-shot classification. This function analyzes property details and description to identify potential fraud indicators. """ try: # Initialize fraud classification result fraud_classification = { 'alert_level': 'minimal', 'alert_score': 0.0, 'high_risk': [], 'medium_risk': [], 'low_risk': [], 'confidence_scores': {} } # Combine property details and description for analysis text_to_analyze = f"{property_details}\n{description}" # Define risk categories for zero-shot classification risk_categories = [ "fraudulent listing", "misleading information", "fake property", "scam attempt", "legitimate listing" ] # Perform zero-shot classification classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli") result = classifier(text_to_analyze, risk_categories, multi_label=True) # Process classification results fraud_score = 0.0 for label, score in zip(result['labels'], result['scores']): if label != "legitimate listing": fraud_score += score fraud_classification['confidence_scores'][label] = score # Normalize fraud score to 0-1 range fraud_score = min(1.0, fraud_score / (len(risk_categories) - 1)) fraud_classification['alert_score'] = fraud_score # Define fraud indicators to check fraud_indicators = { 'high_risk': [ r'urgent|immediate|hurry|limited time|special offer', r'bank|transfer|wire|payment|money', r'fake|scam|fraud|illegal|unauthorized', r'guaranteed|promised|assured|certain', r'contact.*whatsapp|whatsapp.*contact', r'price.*negotiable|negotiable.*price', r'no.*documents|documents.*not.*required', r'cash.*only|only.*cash', r'off.*market|market.*off', r'under.*table|table.*under' ], 'medium_risk': [ r'unverified|unconfirmed|unchecked', r'partial|incomplete|missing', r'different.*location|location.*different', r'price.*increased|increased.*price', r'no.*photos|photos.*not.*available', r'contact.*email|email.*contact', r'agent.*not.*available|not.*available.*agent', r'property.*not.*viewable|not.*viewable.*property', r'price.*changed|changed.*price', r'details.*updated|updated.*details' ], 'low_risk': [ r'new.*listing|listing.*new', r'recent.*update|update.*recent', r'price.*reduced|reduced.*price', r'contact.*phone|phone.*contact', r'agent.*available|available.*agent', r'property.*viewable|viewable.*property', r'photos.*available|available.*photos', r'documents.*available|available.*documents', r'price.*fixed|fixed.*price', r'details.*complete|complete.*details' ] } # Check for fraud indicators in text for risk_level, patterns in fraud_indicators.items(): for pattern in patterns: matches = re.finditer(pattern, text_to_analyze, re.IGNORECASE) for match in matches: indicator = match.group(0) if indicator not in fraud_classification[risk_level]: fraud_classification[risk_level].append(indicator) # Determine alert level based on fraud score and indicators if fraud_score > 0.7 or len(fraud_classification['high_risk']) > 0: fraud_classification['alert_level'] = 'critical' elif fraud_score > 0.5 or len(fraud_classification['medium_risk']) > 2: fraud_classification['alert_level'] = 'high' elif fraud_score > 0.3 or len(fraud_classification['medium_risk']) > 0: fraud_classification['alert_level'] = 'medium' elif fraud_score > 0.1 or len(fraud_classification['low_risk']) > 0: fraud_classification['alert_level'] = 'low' else: fraud_classification['alert_level'] = 'minimal' # Additional checks for common fraud patterns if re.search(r'price.*too.*good|too.*good.*price', text_to_analyze, re.IGNORECASE): fraud_classification['high_risk'].append("Unrealistically low price") if re.search(r'no.*inspection|inspection.*not.*allowed', text_to_analyze, re.IGNORECASE): fraud_classification['high_risk'].append("No property inspection allowed") if re.search(r'owner.*abroad|abroad.*owner', text_to_analyze, re.IGNORECASE): fraud_classification['medium_risk'].append("Owner claims to be abroad") if re.search(r'agent.*unavailable|unavailable.*agent', text_to_analyze, re.IGNORECASE): fraud_classification['medium_risk'].append("Agent unavailable for verification") # Check for inconsistencies in property details if 'price' in property_details and 'market_value' in property_details: try: price = float(re.search(r'\d+(?:,\d+)*(?:\.\d+)?', property_details['price']).group().replace(',', '')) market_value = float(re.search(r'\d+(?:,\d+)*(?:\.\d+)?', property_details['market_value']).group().replace(',', '')) if price < market_value * 0.5: fraud_classification['high_risk'].append("Price significantly below market value") except (ValueError, AttributeError): pass return fraud_classification except Exception as e: logger.error(f"Error in fraud classification: {str(e)}") return { 'alert_level': 'error', 'alert_score': 1.0, 'high_risk': [f"Error in fraud classification: {str(e)}"], 'medium_risk': [], 'low_risk': [], 'confidence_scores': {} } def generate_trust_score(text, image_analysis, pdf_analysis): try: classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") aspects = [ "complete information provided", "verified location", "consistent data", "authentic documents", "authentic images", "reasonable pricing", "verified ownership", "proper documentation" ] result = classifier(text[:1000], aspects, multi_label=True) # Much stricter weights with higher emphasis on critical aspects weights = { "complete information provided": 0.25, "verified location": 0.20, "consistent data": 0.15, "authentic documents": 0.15, "authentic images": 0.10, "reasonable pricing": 0.05, "verified ownership": 0.05, "proper documentation": 0.05 } score = 0 reasoning_parts = [] # Much stricter scoring for each aspect for label, confidence in zip(result['labels'], result['scores']): adjusted_confidence = confidence # Stricter document verification if label == "authentic documents": if not pdf_analysis or len(pdf_analysis) == 0: adjusted_confidence = 0.0 else: doc_scores = [p.get('verification_score', 0) for p in pdf_analysis] adjusted_confidence = sum(doc_scores) / max(1, len(doc_scores)) # Heavily penalize if any document has low verification score if any(score < 0.7 for score in doc_scores): adjusted_confidence *= 0.4 # Additional penalty for missing documents if len(doc_scores) < 2: adjusted_confidence *= 0.5 # Stricter image verification elif label == "authentic images": if not image_analysis or len(image_analysis) == 0: adjusted_confidence = 0.0 else: img_scores = [i.get('authenticity_score', 0) for i in image_analysis] adjusted_confidence = sum(img_scores) / max(1, len(img_scores)) # Heavily penalize if any image has low authenticity score if any(score < 0.8 for score in img_scores): adjusted_confidence *= 0.4 # Additional penalty for AI-generated images if any(i.get('is_ai_generated', False) for i in image_analysis): adjusted_confidence *= 0.5 # Additional penalty for non-property related images if any(not i.get('is_property_related', False) for i in image_analysis): adjusted_confidence *= 0.6 # Stricter consistency check elif label == "consistent data": # Check for inconsistencies in the data if "inconsistent" in text.lower() or "suspicious" in text.lower(): adjusted_confidence *= 0.3 # Check for impossible values if "impossible" in text.lower() or "invalid" in text.lower(): adjusted_confidence *= 0.2 # Check for missing critical information if "missing" in text.lower() or "not provided" in text.lower(): adjusted_confidence *= 0.4 # Stricter completeness check elif label == "complete information provided": # Check for missing critical information if len(text) < 300 or "not provided" in text.lower() or "missing" in text.lower(): adjusted_confidence *= 0.4 # Check for vague or generic descriptions if "generic" in text.lower() or "vague" in text.lower(): adjusted_confidence *= 0.5 # Check for suspiciously short descriptions if len(text) < 150: adjusted_confidence *= 0.3 score += adjusted_confidence * weights.get(label, 0.1) reasoning_parts.append(f"{label} ({adjusted_confidence:.0%})") # Apply additional penalties for suspicious patterns if "suspicious" in text.lower() or "fraudulent" in text.lower(): score *= 0.5 # Apply penalties for suspiciously low values if "suspiciously low" in text.lower() or "unusually small" in text.lower(): score *= 0.6 # Apply penalties for inconsistencies if "inconsistent" in text.lower() or "mismatch" in text.lower(): score *= 0.6 # Apply penalties for missing critical information if "missing critical" in text.lower() or "incomplete" in text.lower(): score *= 0.7 # Ensure score is between 0 and 100 score = min(100, max(0, int(score * 100))) reasoning = f"Based on: {', '.join(reasoning_parts)}" return score, reasoning except Exception as e: logger.error(f"Error generating trust score: {str(e)}") return 20, "Could not assess trust." def generate_suggestions(text, data=None): try: classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") # Create comprehensive context for analysis suggestion_context = text if data: suggestion_context += f""" Additional Context: Property Type: {data.get('property_type', '')} Location: {data.get('city', '')}, {data.get('state', '')} Size: {data.get('sq_ft', '')} sq.ft. Year Built: {data.get('year_built', '')} """ # Enhanced suggestion categories based on property context base_suggestions = { 'documentation': { 'label': "add more documentation", 'categories': [ "complete documentation provided", "missing essential documents", "incomplete paperwork", "documentation needs verification" ], 'weight': 2.0, 'improvements': { 'missing essential documents': [ "Add property deed or title documents", "Include recent property tax records", "Attach property registration documents" ], 'incomplete paperwork': [ "Complete all required legal documents", "Add missing ownership proof", "Include property survey documents" ] } }, 'details': { 'label': "enhance property details", 'categories': [ "detailed property information", "basic information only", "missing key details", "comprehensive description" ], 'weight': 1.8, 'improvements': { 'basic information only': [ "Add more details about property features", "Include information about recent renovations", "Describe unique selling points" ], 'missing key details': [ "Specify exact built-up area", "Add floor plan details", "Include maintenance costs" ] } }, 'images': { 'label': "improve visual content", 'categories': [ "high quality images provided", "poor image quality", "insufficient images", "missing key area photos" ], 'weight': 1.5, 'improvements': { 'poor image quality': [ "Add high-resolution property photos", "Include better lighting in images", "Provide professional photography" ], 'insufficient images': [ "Add more interior photos", "Include exterior and surrounding area images", "Add photos of amenities" ] } }, 'pricing': { 'label': "pricing information", 'categories': [ "detailed pricing breakdown", "basic price only", "missing price details", "unclear pricing terms" ], 'weight': 1.7, 'improvements': { 'basic price only': [ "Add detailed price breakdown", "Include maintenance charges", "Specify additional costs" ], 'missing price details': [ "Add price per square foot", "Include tax implications", "Specify payment terms" ] } }, 'location': { 'label': "location details", 'categories': [ "comprehensive location info", "basic location only", "missing location details", "unclear accessibility info" ], 'weight': 1.6, 'improvements': { 'basic location only': [ "Add nearby landmarks and distances", "Include transportation options", "Specify neighborhood facilities" ], 'missing location details': [ "Add exact GPS coordinates", "Include area development plans", "Specify distance to key facilities" ] } } } suggestions = [] confidence_scores = [] for aspect, config in base_suggestions.items(): # Analyze each aspect with context result = classifier(suggestion_context[:1000], config['categories']) # Get the most relevant category top_category = result['labels'][0] confidence = float(result['scores'][0]) # If the category indicates improvement needed (confidence < 0.6) if confidence < 0.6 and top_category in config['improvements']: weighted_confidence = confidence * config['weight'] for improvement in config['improvements'][top_category]: suggestions.append({ 'aspect': aspect, 'category': top_category, 'suggestion': improvement, 'confidence': weighted_confidence }) confidence_scores.append(weighted_confidence) # Sort suggestions by confidence and priority suggestions.sort(key=lambda x: x['confidence'], reverse=True) # Property type specific suggestions if data and data.get('property_type'): property_type = data['property_type'].lower() type_specific_suggestions = { 'residential': [ "Add information about school districts", "Include details about neighborhood safety", "Specify parking arrangements" ], 'commercial': [ "Add foot traffic statistics", "Include zoning information", "Specify business licenses required" ], 'industrial': [ "Add power supply specifications", "Include environmental clearances", "Specify loading/unloading facilities" ], 'land': [ "Add soil testing reports", "Include development potential analysis", "Specify available utilities" ] } for type_key, type_suggestions in type_specific_suggestions.items(): if type_key in property_type: for suggestion in type_suggestions: suggestions.append({ 'aspect': 'property_type_specific', 'category': 'type_specific_requirements', 'suggestion': suggestion, 'confidence': 0.8 # High confidence for type-specific suggestions }) # Add market-based suggestions if data and data.get('market_value'): try: market_value = float(data['market_value'].replace('₹', '').replace(',', '')) if market_value > 10000000: # High-value property premium_suggestions = [ "Add virtual tour of the property", "Include detailed investment analysis", "Provide historical price trends" ] for suggestion in premium_suggestions: suggestions.append({ 'aspect': 'premium_property', 'category': 'high_value_requirements', 'suggestion': suggestion, 'confidence': 0.9 }) except ValueError: pass # Calculate overall completeness score completeness_score = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0 completeness_score = min(100, max(0, completeness_score * 100)) return { 'suggestions': suggestions[:10], # Return top 10 suggestions 'completeness_score': completeness_score, 'priority_aspects': [s['aspect'] for s in suggestions[:3]], 'improvement_summary': f"Focus on improving {', '.join([s['aspect'] for s in suggestions[:3]])}", 'total_suggestions': len(suggestions) } except Exception as e: logger.error(f"Error generating suggestions: {str(e)}") return { 'suggestions': [ { 'aspect': 'general', 'category': 'basic_requirements', 'suggestion': 'Please provide more property details', 'confidence': 0.5 } ], 'completeness_score': 0, 'priority_aspects': ['general'], 'improvement_summary': "Add basic property information", 'total_suggestions': 1 } def assess_text_quality(text): try: if not text or len(text.strip()) < 20: return { 'assessment': 'insufficient', 'score': 0, 'reasoning': 'Text too short.', 'is_ai_generated': False, 'quality_metrics': {} } classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") # Enhanced quality categories with more specific indicators quality_categories = [ "detailed and informative", "adequately detailed", "basic information", "vague description", "misleading content", "professional listing", "amateur listing", "spam-like content", "template-based content", "authentic description" ] # Analyze text with multiple aspects quality_result = classifier(text[:1000], quality_categories, multi_label=True) # Get top classifications with confidence scores top_classifications = [] for label, score in zip(quality_result['labels'][:3], quality_result['scores'][:3]): if score > 0.3: # Only include if confidence is above 30% top_classifications.append({ 'classification': label, 'confidence': float(score) }) # AI generation detection with multiple models ai_check = classifier(text[:1000], ["human-written", "AI-generated", "template-based", "authentic"]) is_ai_generated = ( (ai_check['labels'][0] == "AI-generated" and ai_check['scores'][0] > 0.6) or (ai_check['labels'][0] == "template-based" and ai_check['scores'][0] > 0.7) ) # Calculate quality metrics quality_metrics = { 'detail_level': sum(score for label, score in zip(quality_result['labels'], quality_result['scores']) if label in ['detailed and informative', 'adequately detailed']), 'professionalism': sum(score for label, score in zip(quality_result['labels'], quality_result['scores']) if label in ['professional listing', 'authentic description']), 'clarity': sum(score for label, score in zip(quality_result['labels'], quality_result['scores']) if label not in ['vague description', 'misleading content', 'spam-like content']), 'authenticity': 1.0 - sum(score for label, score in zip(quality_result['labels'], quality_result['scores']) if label in ['template-based content', 'spam-like content']) } # Calculate overall score with weighted metrics weights = { 'detail_level': 0.3, 'professionalism': 0.25, 'clarity': 0.25, 'authenticity': 0.2 } score = sum(metric * weights[metric_name] for metric_name, metric in quality_metrics.items()) score = score * 100 # Convert to percentage # Adjust score for AI-generated content if is_ai_generated: score = score * 0.7 # Reduce score by 30% for AI-generated content # Generate detailed reasoning reasoning_parts = [] if top_classifications: primary_class = top_classifications[0]['classification'] reasoning_parts.append(f"Primary assessment: {primary_class}") if quality_metrics['detail_level'] > 0.7: reasoning_parts.append("Contains comprehensive details") elif quality_metrics['detail_level'] > 0.4: reasoning_parts.append("Contains adequate details") else: reasoning_parts.append("Lacks important details") if quality_metrics['professionalism'] > 0.7: reasoning_parts.append("Professional listing style") elif quality_metrics['professionalism'] < 0.4: reasoning_parts.append("Amateur listing style") if quality_metrics['clarity'] < 0.5: reasoning_parts.append("Content clarity issues detected") if is_ai_generated: reasoning_parts.append("Content appears to be AI-generated") return { 'assessment': top_classifications[0]['classification'] if top_classifications else 'could not assess', 'score': int(score), 'reasoning': '. '.join(reasoning_parts), 'is_ai_generated': is_ai_generated, 'quality_metrics': quality_metrics, 'top_classifications': top_classifications } except Exception as e: logger.error(f"Error assessing text quality: {str(e)}") return { 'assessment': 'could not assess', 'score': 50, 'reasoning': 'Technical error.', 'is_ai_generated': False, 'quality_metrics': {}, 'top_classifications': [] } def verify_address(data): try: address_results = { 'address_exists': False, 'pincode_valid': False, 'city_state_match': False, 'coordinates_match': False, 'confidence': 0.0, 'issues': [], 'verification_score': 0.0 } if data['zip']: try: response = requests.get(f"https://api.postalpincode.in/pincode/{data['zip']}", timeout=5) if response.status_code == 200: pin_data = response.json() if pin_data[0]['Status'] == 'Success': address_results['pincode_valid'] = True post_offices = pin_data[0]['PostOffice'] cities = {po['Name'].lower() for po in post_offices} states = {po['State'].lower() for po in post_offices} if data['city'].lower() in cities or data['state'].lower() in states: address_results['city_state_match'] = True else: address_results['issues'].append("City/state may not match pincode") else: address_results['issues'].append(f"Invalid pincode: {data['zip']}") else: address_results['issues'].append("Pincode API error") except Exception as e: logger.error(f"Pincode API error: {str(e)}") address_results['issues'].append("Pincode validation failed") full_address = ', '.join(filter(None, [data['address'], data['city'], data['state'], data['country'], data['zip']])) for attempt in range(3): try: location = geocoder.geocode(full_address) if location: address_results['address_exists'] = True address_results['confidence'] = 0.9 if data['latitude'] and data['longitude']: try: provided_coords = (float(data['latitude']), float(data['longitude'])) geocoded_coords = (location.latitude, location.longitude) from geopy.distance import distance dist = distance(provided_coords, geocoded_coords).km address_results['coordinates_match'] = dist < 1.0 if not address_results['coordinates_match']: address_results['issues'].append(f"Coordinates {dist:.2f}km off") except: address_results['issues'].append("Invalid coordinates") break time.sleep(1) except Exception as e: logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}") time.sleep(1) else: address_results['issues'].append("Address geocoding failed") verification_points = ( address_results['address_exists'] * 0.4 + address_results['pincode_valid'] * 0.3 + address_results['city_state_match'] * 0.2 + address_results['coordinates_match'] * 0.1 ) address_results['verification_score'] = verification_points return address_results except Exception as e: logger.error(f"Error verifying address: {str(e)}") address_results['issues'].append(str(e)) return address_results def perform_cross_validation(data): try: cross_checks = [] # Check bedroom count consistency try: bedrooms = int(data['bedrooms']) if data['bedrooms'] else 0 desc_bhk = re.findall(r'(\d+)\s*bhk', data['description'].lower()) if desc_bhk and int(desc_bhk[0]) != bedrooms: cross_checks.append({ 'check': 'bedroom_count', 'status': 'inconsistent', 'message': f"Description mentions {desc_bhk[0]} BHK, form says {bedrooms}" }) else: cross_checks.append({ 'check': 'bedroom_count', 'status': 'consistent', 'message': f"Bedrooms: {bedrooms}" }) except: cross_checks.append({ 'check': 'bedroom_count', 'status': 'invalid', 'message': 'Invalid bedroom data' }) # Check room count consistency try: bedrooms = int(data['bedrooms']) if data['bedrooms'] else 0 bathrooms = float(data['bathrooms']) if data['bathrooms'] else 0 total_rooms = int(data['total_rooms']) if data['total_rooms'] else 0 # More thorough room count validation if total_rooms > 0: if total_rooms < bedrooms + bathrooms: cross_checks.append({ 'check': 'room_count', 'status': 'inconsistent', 'message': f"Total rooms ({total_rooms}) less than bedrooms ({bedrooms}) + bathrooms ({bathrooms})" }) elif total_rooms > bedrooms + bathrooms + 5: # Allow for some extra rooms cross_checks.append({ 'check': 'room_count', 'status': 'suspicious', 'message': f"Total rooms ({total_rooms}) seems unusually high compared to bedrooms ({bedrooms}) + bathrooms ({bathrooms})" }) else: cross_checks.append({ 'check': 'room_count', 'status': 'consistent', 'message': f"Rooms consistent: {total_rooms} total, {bedrooms} bedrooms, {bathrooms} bathrooms" }) else: cross_checks.append({ 'check': 'room_count', 'status': 'missing', 'message': 'Total room count not provided' }) except: cross_checks.append({ 'check': 'room_count', 'status': 'invalid', 'message': 'Invalid room count data' }) # Check year built consistency try: year_built = int(data['year_built']) if data['year_built'] else 0 current_year = datetime.now().year if year_built > 0: if year_built > current_year: cross_checks.append({ 'check': 'year_built', 'status': 'invalid', 'message': f"Year built ({year_built}) is in the future" }) elif year_built < 1800: cross_checks.append({ 'check': 'year_built', 'status': 'suspicious', 'message': f"Year built ({year_built}) seems unusually old" }) elif current_year - year_built > 200: cross_checks.append({ 'check': 'year_built', 'status': 'suspicious', 'message': f"Property age ({current_year - year_built} years) seems unusually old" }) else: cross_checks.append({ 'check': 'year_built', 'status': 'reasonable', 'message': f"Year built reasonable: {year_built}" }) else: cross_checks.append({ 'check': 'year_built', 'status': 'missing', 'message': 'Year built not provided' }) except: cross_checks.append({ 'check': 'year_built', 'status': 'invalid', 'message': 'Invalid year built data' }) # Check square footage consistency try: sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data['sq_ft'] else 0 bedrooms = int(data['bedrooms']) if data['bedrooms'] else 0 if sq_ft > 0 and bedrooms > 0: sq_ft_per_bedroom = sq_ft / bedrooms if sq_ft_per_bedroom < 50: # Unusually small per bedroom cross_checks.append({ 'check': 'sq_ft_per_bedroom', 'status': 'suspicious', 'message': f"Square footage per bedroom ({sq_ft_per_bedroom:.1f} sq.ft.) seems unusually small" }) elif sq_ft_per_bedroom > 1000: # Unusually large per bedroom cross_checks.append({ 'check': 'sq_ft_per_bedroom', 'status': 'suspicious', 'message': f"Square footage per bedroom ({sq_ft_per_bedroom:.1f} sq.ft.) seems unusually large" }) else: cross_checks.append({ 'check': 'sq_ft_per_bedroom', 'status': 'reasonable', 'message': f"Square footage per bedroom ({sq_ft_per_bedroom:.1f} sq.ft.) is reasonable" }) elif sq_ft > 0: cross_checks.append({ 'check': 'sq_ft', 'status': 'incomplete', 'message': f"Square footage provided: {sq_ft} sq.ft., but bedroom count missing" }) elif bedrooms > 0: cross_checks.append({ 'check': 'sq_ft', 'status': 'missing', 'message': f"Square footage not provided, but {bedrooms} bedrooms listed" }) else: cross_checks.append({ 'check': 'sq_ft', 'status': 'missing', 'message': 'Square footage not provided' }) except: cross_checks.append({ 'check': 'sq_ft', 'status': 'invalid', 'message': 'Invalid square footage data' }) # Check price per square foot try: market_value = float(data['market_value'].replace('₹', '').replace(',', '')) if data['market_value'] else 0 sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data['sq_ft'] else 0 if market_value > 0 and sq_ft > 0: price_per_sqft = market_value / sq_ft # Check for suspiciously low price per sq ft if price_per_sqft < 100: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'suspiciously low', 'message': f"Price per sq.ft.: ₹{price_per_sqft:.2f} is suspiciously low" }) # Check for suspiciously high price per sq ft elif price_per_sqft > 50000: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'suspiciously high', 'message': f"Price per sq.ft.: ₹{price_per_sqft:.2f} is suspiciously high" }) else: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'reasonable', 'message': f"Price per sq.ft.: ₹{price_per_sqft:.2f} is reasonable" }) elif market_value > 0: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'incomplete', 'message': f"Market value provided: ₹{market_value:,.2f}, but square footage missing" }) elif sq_ft > 0: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'incomplete', 'message': f"Square footage provided: {sq_ft} sq.ft., but market value missing" }) else: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'missing', 'message': 'Price per sq.ft. cannot be calculated (missing data)' }) except: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'invalid', 'message': 'Invalid price per sq.ft. data' }) # Check location consistency try: latitude = float(data['latitude']) if data['latitude'] else 0 longitude = float(data['longitude']) if data['longitude'] else 0 address = data['address'].lower() if data['address'] else '' city = data['city'].lower() if data['city'] else '' state = data['state'].lower() if data['state'] else '' country = data['country'].lower() if data['country'] else 'india' # Check if coordinates are within India if latitude != 0 and longitude != 0: if 6.5 <= latitude <= 35.5 and 68.1 <= longitude <= 97.4: cross_checks.append({ 'check': 'coordinates', 'status': 'valid', 'message': 'Coordinates within India' }) else: cross_checks.append({ 'check': 'coordinates', 'status': 'invalid', 'message': 'Coordinates outside India' }) else: cross_checks.append({ 'check': 'coordinates', 'status': 'missing', 'message': 'Coordinates not provided' }) # Check if address contains city and state if address and city and state: if city in address and state in address: cross_checks.append({ 'check': 'address_consistency', 'status': 'consistent', 'message': 'Address contains city and state' }) else: cross_checks.append({ 'check': 'address_consistency', 'status': 'inconsistent', 'message': 'Address does not contain city or state' }) else: cross_checks.append({ 'check': 'address_consistency', 'status': 'incomplete', 'message': 'Address consistency check incomplete (missing data)' }) except: cross_checks.append({ 'check': 'location', 'status': 'invalid', 'message': 'Invalid location data' }) # Check property type consistency try: property_type = data['property_type'].lower() if data['property_type'] else '' description = data['description'].lower() if data['description'] else '' if property_type and description: property_types = ['apartment', 'house', 'condo', 'townhouse', 'villa', 'land', 'commercial'] found_types = [pt for pt in property_types if pt in description] if found_types and property_type not in found_types: cross_checks.append({ 'check': 'property_type', 'status': 'inconsistent', 'message': f"Description mentions {', '.join(found_types)}, but property type is {property_type}" }) else: cross_checks.append({ 'check': 'property_type', 'status': 'consistent', 'message': f"Property type consistent: {property_type}" }) else: cross_checks.append({ 'check': 'property_type', 'status': 'incomplete', 'message': 'Property type consistency check incomplete (missing data)' }) except: cross_checks.append({ 'check': 'property_type', 'status': 'invalid', 'message': 'Invalid property type data' }) # Check for suspiciously low market value try: market_value = float(data['market_value'].replace('₹', '').replace(',', '')) if data['market_value'] else 0 property_type = data['property_type'].lower() if data['property_type'] else '' if market_value > 0 and property_type: # Define minimum reasonable values for different property types min_values = { 'apartment': 500000, 'house': 1000000, 'condo': 800000, 'townhouse': 900000, 'villa': 2000000, 'land': 300000, 'commercial': 2000000 } min_value = min_values.get(property_type, 500000) if market_value < min_value: cross_checks.append({ 'check': 'market_value', 'status': 'suspiciously low', 'message': f"Market value (₹{market_value:,.2f}) seems suspiciously low for a {property_type}" }) else: cross_checks.append({ 'check': 'market_value', 'status': 'reasonable', 'message': f"Market value (₹{market_value:,.2f}) is reasonable for a {property_type}" }) elif market_value > 0: cross_checks.append({ 'check': 'market_value', 'status': 'incomplete', 'message': f"Market value provided: ₹{market_value:,.2f}, but property type missing" }) else: cross_checks.append({ 'check': 'market_value', 'status': 'missing', 'message': 'Market value not provided' }) except: cross_checks.append({ 'check': 'market_value', 'status': 'invalid', 'message': 'Invalid market value data' }) return cross_checks except Exception as e: logger.error(f"Error performing cross validation: {str(e)}") return [{ 'check': 'cross_validation', 'status': 'error', 'message': f'Error performing cross validation: {str(e)}' }] def analyze_location(data): try: classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") # Create a detailed location text for analysis location_text = ' '.join(filter(None, [ data['address'], data['city'], data['state'], data['country'], data['zip'], f"Lat: {data['latitude']}", f"Long: {data['longitude']}", data['nearby_landmarks'] ])) # Classify location completeness categories = ["complete", "partial", "minimal", "missing"] result = classifier(location_text, categories) # Verify location quality location_quality = "unknown" if data['city'] and data['state']: for attempt in range(3): try: location = geocoder.geocode(f"{data['city']}, {data['state']}, India") if location: location_quality = "verified" break time.sleep(1) except: time.sleep(1) else: location_quality = "unverified" # Check coordinates coord_check = "missing" if data['latitude'] and data['longitude']: try: lat, lng = float(data['latitude']), float(data['longitude']) if 6.5 <= lat <= 37.5 and 68.0 <= lng <= 97.5: coord_check = "in_india" # Further validate coordinates against known Indian cities if any(city in data['city'].lower() for city in ["mumbai", "delhi", "bangalore", "hyderabad", "chennai", "kolkata", "pune"]): coord_check = "in_metro_city" else: coord_check = "outside_india" except: coord_check = "invalid" # Calculate location completeness with weighted scoring completeness = calculate_location_completeness(data) # Analyze landmarks landmarks_analysis = { 'provided': bool(data['nearby_landmarks']), 'count': len(data['nearby_landmarks'].split(',')) if data['nearby_landmarks'] else 0, 'types': [] } if data['nearby_landmarks']: landmark_types = { 'transport': ['station', 'metro', 'bus', 'railway', 'airport'], 'education': ['school', 'college', 'university', 'institute'], 'healthcare': ['hospital', 'clinic', 'medical'], 'shopping': ['mall', 'market', 'shop', 'store'], 'entertainment': ['park', 'garden', 'theater', 'cinema'], 'business': ['office', 'business', 'corporate'] } landmarks = data['nearby_landmarks'].lower().split(',') for landmark in landmarks: for type_name, keywords in landmark_types.items(): if any(keyword in landmark for keyword in keywords): if type_name not in landmarks_analysis['types']: landmarks_analysis['types'].append(type_name) # Determine location assessment assessment = "complete" if completeness >= 80 else "partial" if completeness >= 50 else "minimal" # Add city tier information city_tier = "unknown" if data['city']: city_lower = data['city'].lower() if any(city in city_lower for city in ["mumbai", "delhi", "bangalore", "hyderabad", "chennai", "kolkata", "pune"]): city_tier = "metro" elif any(city in city_lower for city in ["ahmedabad", "jaipur", "surat", "lucknow", "kanpur", "nagpur", "indore", "thane", "bhopal", "visakhapatnam"]): city_tier = "tier2" else: city_tier = "tier3" return { 'assessment': assessment, 'confidence': float(result['scores'][0]), 'coordinates_check': coord_check, 'landmarks_analysis': landmarks_analysis, 'completeness_score': completeness, 'location_quality': location_quality, 'city_tier': city_tier, 'formatted_address': f"{data['address']}, {data['city']}, {data['state']}, India - {data['zip']}", 'verification_status': "verified" if location_quality == "verified" and coord_check in ["in_india", "in_metro_city"] else "unverified" } except Exception as e: logger.error(f"Error analyzing location: {str(e)}") return { 'assessment': 'error', 'confidence': 0.0, 'coordinates_check': 'error', 'landmarks_analysis': {'provided': False, 'count': 0, 'types': []}, 'completeness_score': 0, 'location_quality': 'error', 'city_tier': 'unknown', 'formatted_address': '', 'verification_status': 'error' } def calculate_location_completeness(data): # Define weights for different fields weights = { 'address': 0.25, 'city': 0.20, 'state': 0.15, 'country': 0.05, 'zip': 0.10, 'latitude': 0.10, 'longitude': 0.10, 'nearby_landmarks': 0.05 } # Calculate weighted score score = 0 for field, weight in weights.items(): if data[field]: score += weight return int(score * 100) def analyze_price(data): try: price_str = data['market_value'].replace('$', '').replace(',', '').strip() price = float(price_str) if price_str else 0 sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data['sq_ft'] else 0 price_per_sqft = price / sq_ft if sq_ft else 0 if not price: return { 'assessment': 'no price', 'confidence': 0.0, 'price': 0, 'formatted_price': '₹0', 'price_per_sqft': 0, 'formatted_price_per_sqft': '₹0', 'price_range': 'unknown', 'location_price_assessment': 'cannot assess', 'has_price': False, 'market_trends': {}, 'price_factors': {}, 'risk_indicators': [] } # Use a more sophisticated model for price analysis classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") # Create a detailed context for price analysis price_context = f""" Property Type: {data.get('property_type', '')} Location: {data.get('city', '')}, {data.get('state', '')} Size: {sq_ft} sq.ft. Price: ₹{price:,.2f} Price per sq.ft.: ₹{price_per_sqft:,.2f} Property Status: {data.get('status', '')} Year Built: {data.get('year_built', '')} Bedrooms: {data.get('bedrooms', '')} Bathrooms: {data.get('bathrooms', '')} Amenities: {data.get('amenities', '')} """ # Enhanced price categories with more specific indicators price_categories = [ "reasonable market price", "suspiciously low price", "suspiciously high price", "average market price", "luxury property price", "budget property price", "premium property price", "mid-range property price", "overpriced for location", "underpriced for location", "price matches amenities", "price matches property age", "price matches location value", "price matches property condition", "price matches market trends" ] # Analyze price with multiple aspects price_result = classifier(price_context, price_categories, multi_label=True) # Get top classifications with enhanced confidence calculation top_classifications = [] for label, score in zip(price_result['labels'][:5], price_result['scores'][:5]): if score > 0.25: # Lower threshold for better sensitivity top_classifications.append({ 'classification': label, 'confidence': float(score) }) # Determine price range based on AI classification and market data price_range = 'unknown' if top_classifications: primary_class = top_classifications[0]['classification'] if 'luxury' in primary_class: price_range = 'luxury' elif 'premium' in primary_class: price_range = 'premium' elif 'mid-range' in primary_class: price_range = 'mid_range' elif 'budget' in primary_class: price_range = 'budget' # Enhanced location-specific price assessment location_assessment = "unknown" market_trends = {} if data.get('city') and price_per_sqft: city_lower = data['city'].lower() metro_cities = ["mumbai", "delhi", "bangalore", "hyderabad", "chennai", "kolkata", "pune"] # Define price ranges for different city tiers if any(city in city_lower for city in metro_cities): market_trends = { 'city_tier': 'metro', 'avg_price_range': { 'min': 5000, 'max': 30000, 'trend': 'stable' }, 'price_per_sqft': { 'current': price_per_sqft, 'market_avg': 15000, 'deviation': abs(price_per_sqft - 15000) / 15000 * 100 } } location_assessment = ( "reasonable" if 5000 <= price_per_sqft <= 30000 else "suspiciously low" if price_per_sqft < 5000 else "suspiciously high" ) else: market_trends = { 'city_tier': 'non-metro', 'avg_price_range': { 'min': 1500, 'max': 15000, 'trend': 'stable' }, 'price_per_sqft': { 'current': price_per_sqft, 'market_avg': 7500, 'deviation': abs(price_per_sqft - 7500) / 7500 * 100 } } location_assessment = ( "reasonable" if 1500 <= price_per_sqft <= 15000 else "suspiciously low" if price_per_sqft < 1500 else "suspiciously high" ) # Enhanced price analysis factors price_factors = {} risk_indicators = [] # Property age factor try: year_built = int(data.get('year_built', 0)) current_year = datetime.now().year property_age = current_year - year_built if property_age > 0: depreciation_factor = max(0.5, 1 - (property_age * 0.01)) # 1% depreciation per year, min 50% price_factors['age_factor'] = { 'property_age': property_age, 'depreciation_factor': depreciation_factor, 'impact': 'high' if property_age > 30 else 'medium' if property_age > 15 else 'low' } except: price_factors['age_factor'] = {'error': 'Invalid year built'} # Size factor if sq_ft > 0: size_factor = { 'size': sq_ft, 'price_per_sqft': price_per_sqft, 'efficiency': 'high' if 800 <= sq_ft <= 2000 else 'medium' if 500 <= sq_ft <= 3000 else 'low' } price_factors['size_factor'] = size_factor # Add risk indicators based on size if sq_ft < 300: risk_indicators.append('Unusually small property size') elif sq_ft > 10000: risk_indicators.append('Unusually large property size') # Amenities factor if data.get('amenities'): amenities_list = [a.strip() for a in data['amenities'].split(',')] amenities_score = min(1.0, len(amenities_list) * 0.1) # 10% per amenity, max 100% price_factors['amenities_factor'] = { 'count': len(amenities_list), 'score': amenities_score, 'impact': 'high' if amenities_score > 0.7 else 'medium' if amenities_score > 0.4 else 'low' } # Calculate overall confidence with weighted factors confidence_weights = { 'primary_classification': 0.3, 'location_assessment': 0.25, 'age_factor': 0.2, 'size_factor': 0.15, 'amenities_factor': 0.1 } confidence_scores = [] # Primary classification confidence if top_classifications: confidence_scores.append(price_result['scores'][0] * confidence_weights['primary_classification']) # Location assessment confidence location_confidence = 0.8 if location_assessment == "reasonable" else 0.4 confidence_scores.append(location_confidence * confidence_weights['location_assessment']) # Age factor confidence if 'age_factor' in price_factors and 'depreciation_factor' in price_factors['age_factor']: age_confidence = price_factors['age_factor']['depreciation_factor'] confidence_scores.append(age_confidence * confidence_weights['age_factor']) # Size factor confidence if 'size_factor' in price_factors: size_confidence = 0.8 if price_factors['size_factor']['efficiency'] == 'high' else 0.6 confidence_scores.append(size_confidence * confidence_weights['size_factor']) # Amenities factor confidence if 'amenities_factor' in price_factors: amenities_confidence = price_factors['amenities_factor']['score'] confidence_scores.append(amenities_confidence * confidence_weights['amenities_factor']) overall_confidence = sum(confidence_scores) / sum(confidence_weights.values()) return { 'assessment': top_classifications[0]['classification'] if top_classifications else 'could not classify', 'confidence': float(overall_confidence), 'price': price, 'formatted_price': f"₹{price:,.0f}", 'price_per_sqft': price_per_sqft, 'formatted_price_per_sqft': f"₹{price_per_sqft:,.2f}", 'price_range': price_range, 'location_price_assessment': location_assessment, 'has_price': True, 'market_trends': market_trends, 'price_factors': price_factors, 'risk_indicators': risk_indicators, 'top_classifications': top_classifications } except Exception as e: logger.error(f"Error analyzing price: {str(e)}") return { 'assessment': 'error', 'confidence': 0.0, 'price': 0, 'formatted_price': '₹0', 'price_per_sqft': 0, 'formatted_price_per_sqft': '₹0', 'price_range': 'unknown', 'location_price_assessment': 'error', 'has_price': False, 'market_trends': {}, 'price_factors': {}, 'risk_indicators': [], 'top_classifications': [] } def analyze_legal_details(legal_text): try: if not legal_text or len(legal_text.strip()) < 5: return { 'assessment': 'insufficient', 'confidence': 0.0, 'summary': 'No legal details provided', 'completeness_score': 0, 'potential_issues': False, 'legal_metrics': {}, 'reasoning': 'No legal details provided for analysis', 'top_classifications': [] } classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") # Enhanced legal categories with more specific indicators categories = [ "comprehensive legal documentation", "basic legal documentation", "missing critical legal details", "potential legal issues", "standard property documentation", "title verification documents", "encumbrance certificates", "property tax records", "building permits", "land use certificates", "clear title documentation", "property registration documents", "ownership transfer documents", "legal compliance certificates", "property dispute records" ] # Create a more detailed context for analysis legal_context = f""" Legal Documentation Analysis: {legal_text[:1000]} Key aspects to verify: - Title and ownership documentation - Property registration status - Tax compliance - Building permits and approvals - Land use compliance - Encumbrance status - Dispute history """ # Analyze legal text with multiple aspects legal_result = classifier(legal_context, categories, multi_label=True) # Get top classifications with confidence scores top_classifications = [] for label, score in zip(legal_result['labels'][:3], legal_result['scores'][:3]): if score > 0.3: # Only include if confidence is above 30% top_classifications.append({ 'classification': label, 'confidence': float(score) }) # Generate summary using BART summary = summarize_text(legal_text[:1000]) # Calculate legal metrics with weighted scoring legal_metrics = { 'completeness': sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) if label in ['comprehensive legal documentation', 'standard property documentation']), 'documentation_quality': sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) if label in ['title verification documents', 'encumbrance certificates', 'clear title documentation']), 'compliance': sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) if label in ['building permits', 'land use certificates', 'legal compliance certificates']), 'risk_level': sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) if label in ['missing critical legal details', 'potential legal issues', 'property dispute records']) } # Calculate completeness score with weighted components completeness_score = ( legal_metrics['completeness'] * 0.4 + legal_metrics['documentation_quality'] * 0.4 + legal_metrics['compliance'] * 0.2 ) * 100 # Determine if there are potential issues with threshold potential_issues = legal_metrics['risk_level'] > 0.3 # Generate detailed reasoning with specific points reasoning_parts = [] # Primary assessment if top_classifications: primary_class = top_classifications[0]['classification'] confidence = top_classifications[0]['confidence'] reasoning_parts.append(f"Primary assessment: {primary_class} (confidence: {confidence:.0%})") # Documentation completeness if legal_metrics['completeness'] > 0.7: reasoning_parts.append("Comprehensive legal documentation present") elif legal_metrics['completeness'] > 0.4: reasoning_parts.append("Basic legal documentation present") else: reasoning_parts.append("Insufficient legal documentation") # Documentation quality if legal_metrics['documentation_quality'] > 0.6: reasoning_parts.append("Quality documentation verified (title, encumbrance)") elif legal_metrics['documentation_quality'] > 0.3: reasoning_parts.append("Basic documentation quality verified") # Compliance status if legal_metrics['compliance'] > 0.6: reasoning_parts.append("Full compliance documentation present") elif legal_metrics['compliance'] > 0.3: reasoning_parts.append("Partial compliance documentation present") # Risk assessment if potential_issues: if legal_metrics['risk_level'] > 0.6: reasoning_parts.append("High risk: Multiple potential legal issues detected") else: reasoning_parts.append("Moderate risk: Some potential legal issues detected") else: reasoning_parts.append("No significant legal issues detected") # Calculate overall confidence overall_confidence = min(1.0, ( legal_metrics['completeness'] * 0.4 + legal_metrics['documentation_quality'] * 0.4 + (1 - legal_metrics['risk_level']) * 0.2 )) return { 'assessment': top_classifications[0]['classification'] if top_classifications else 'could not assess', 'confidence': float(overall_confidence), 'summary': summary, 'completeness_score': int(completeness_score), 'potential_issues': potential_issues, 'legal_metrics': legal_metrics, 'reasoning': '. '.join(reasoning_parts), 'top_classifications': top_classifications } except Exception as e: logger.error(f"Error analyzing legal details: {str(e)}") return { 'assessment': 'could not assess', 'confidence': 0.0, 'summary': 'Error analyzing legal details', 'completeness_score': 0, 'potential_issues': False, 'legal_metrics': {}, 'reasoning': 'Technical error occurred during analysis', 'top_classifications': [] } def verify_property_specs(data): """ Verify property specifications for reasonableness and consistency. This function checks if the provided property details are within reasonable ranges for the Indian real estate market. """ specs_verification = { 'is_valid': True, 'bedrooms_reasonable': True, 'bathrooms_reasonable': True, 'total_rooms_reasonable': True, 'year_built_reasonable': True, 'parking_reasonable': True, 'sq_ft_reasonable': True, 'market_value_reasonable': True, 'issues': [] } try: # Validate property type valid_property_types = [ 'Apartment', 'House', 'Villa', 'Independent House', 'Independent Villa', 'Studio', 'Commercial', 'Office', 'Shop', 'Warehouse', 'Industrial' ] if 'property_type' not in data or data['property_type'] not in valid_property_types: specs_verification['is_valid'] = False specs_verification['issues'].append(f"Invalid property type: {data.get('property_type', 'Not specified')}") # Validate bedrooms if 'bedrooms' in data: try: bedrooms = int(data['bedrooms']) if data['property_type'] in ['Apartment', 'Studio']: if bedrooms > 5 or bedrooms < 0: specs_verification['bedrooms_reasonable'] = False specs_verification['issues'].append(f"Invalid number of bedrooms for {data['property_type']}: {bedrooms}. Should be between 0 and 5.") elif data['property_type'] in ['House', 'Villa', 'Independent House', 'Independent Villa']: if bedrooms > 8 or bedrooms < 0: specs_verification['bedrooms_reasonable'] = False specs_verification['issues'].append(f"Invalid number of bedrooms for {data['property_type']}: {bedrooms}. Should be between 0 and 8.") elif data['property_type'] in ['Commercial', 'Office', 'Shop', 'Warehouse', 'Industrial']: if bedrooms > 0: specs_verification['bedrooms_reasonable'] = False specs_verification['issues'].append(f"Commercial properties typically don't have bedrooms: {bedrooms}") except ValueError: specs_verification['bedrooms_reasonable'] = False specs_verification['issues'].append("Invalid bedrooms data: must be a number") # Validate bathrooms if 'bathrooms' in data: try: bathrooms = float(data['bathrooms']) if data['property_type'] in ['Apartment', 'Studio']: if bathrooms > 4 or bathrooms < 0: specs_verification['bathrooms_reasonable'] = False specs_verification['issues'].append(f"Invalid number of bathrooms for {data['property_type']}: {bathrooms}. Should be between 0 and 4.") elif data['property_type'] in ['House', 'Villa', 'Independent House', 'Independent Villa']: if bathrooms > 6 or bathrooms < 0: specs_verification['bathrooms_reasonable'] = False specs_verification['issues'].append(f"Invalid number of bathrooms for {data['property_type']}: {bathrooms}. Should be between 0 and 6.") elif data['property_type'] in ['Commercial', 'Office', 'Shop', 'Warehouse', 'Industrial']: if bathrooms > 0: specs_verification['bathrooms_reasonable'] = False specs_verification['issues'].append(f"Commercial properties typically don't have bathrooms: {bathrooms}") except ValueError: specs_verification['bathrooms_reasonable'] = False specs_verification['issues'].append("Invalid bathrooms data: must be a number") # Validate total rooms if 'total_rooms' in data: try: total_rooms = int(data['total_rooms']) if total_rooms < 0: specs_verification['total_rooms_reasonable'] = False specs_verification['issues'].append(f"Invalid total rooms: {total_rooms}. Cannot be negative.") elif 'bedrooms' in data and 'bathrooms' in data: try: bedrooms = int(data['bedrooms']) bathrooms = int(float(data['bathrooms'])) if total_rooms < (bedrooms + bathrooms): specs_verification['total_rooms_reasonable'] = False specs_verification['issues'].append(f"Total rooms ({total_rooms}) is less than bedrooms + bathrooms ({bedrooms + bathrooms})") except ValueError: pass except ValueError: specs_verification['total_rooms_reasonable'] = False specs_verification['issues'].append("Invalid total rooms data: must be a number") # Validate year built if 'year_built' in data: try: year_built = int(data['year_built']) current_year = datetime.now().year if year_built > current_year: specs_verification['year_built_reasonable'] = False specs_verification['issues'].append(f"Year built ({year_built}) is in the future") elif year_built < 1800: specs_verification['year_built_reasonable'] = False specs_verification['issues'].append(f"Year built ({year_built}) seems unreasonably old") except ValueError: specs_verification['year_built_reasonable'] = False specs_verification['issues'].append("Invalid year built data: must be a number") # Validate parking if 'parking' in data: try: parking = int(data['parking']) if data['property_type'] in ['Apartment', 'Studio']: if parking > 2 or parking < 0: specs_verification['parking_reasonable'] = False specs_verification['issues'].append(f"Invalid parking spaces for {data['property_type']}: {parking}. Should be between 0 and 2.") elif data['property_type'] in ['House', 'Villa', 'Independent House', 'Independent Villa']: if parking > 4 or parking < 0: specs_verification['parking_reasonable'] = False specs_verification['issues'].append(f"Invalid parking spaces for {data['property_type']}: {parking}. Should be between 0 and 4.") elif data['property_type'] in ['Commercial', 'Office', 'Shop', 'Warehouse', 'Industrial']: if parking < 0: specs_verification['parking_reasonable'] = False specs_verification['issues'].append(f"Invalid parking spaces: {parking}. Cannot be negative.") except ValueError: specs_verification['parking_reasonable'] = False specs_verification['issues'].append("Invalid parking data: must be a number") # Validate square footage if 'sq_ft' in data: try: sq_ft = float(data['sq_ft'].replace(',', '')) if sq_ft <= 0: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Invalid square footage: {sq_ft}. Must be greater than 0.") else: if data['property_type'] in ['Apartment', 'Studio']: if sq_ft > 5000: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably high for {data['property_type']}") elif sq_ft < 200: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably low for {data['property_type']}") elif data['property_type'] in ['House', 'Villa', 'Independent House', 'Independent Villa']: if sq_ft > 10000: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably high for {data['property_type']}") elif sq_ft < 500: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably low for {data['property_type']}") except ValueError: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append("Invalid square footage data: must be a number") # Validate market value if 'market_value' in data: try: market_value = float(data['market_value'].replace(',', '').replace('₹', '').strip()) if market_value <= 0: specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Invalid market value: {market_value}. Must be greater than 0.") else: if data['property_type'] in ['Apartment', 'Studio']: if market_value > 500000000: # 5 crore limit for apartments specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably high for {data['property_type']}") elif market_value < 500000: # 5 lakh minimum specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably low for {data['property_type']}") elif data['property_type'] in ['House', 'Villa', 'Independent House', 'Independent Villa']: if market_value > 2000000000: # 20 crore limit for houses specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably high for {data['property_type']}") elif market_value < 1000000: # 10 lakh minimum specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably low for {data['property_type']}") elif data['property_type'] in ['Commercial', 'Office', 'Shop']: if market_value < 2000000: # 20 lakh minimum specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably low for {data['property_type']}") elif data['property_type'] in ['Warehouse', 'Industrial']: if market_value < 5000000: # 50 lakh minimum specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably low for {data['property_type']}") # Check price per square foot if 'sq_ft' in data and float(data['sq_ft'].replace(',', '')) > 0: try: sq_ft = float(data['sq_ft'].replace(',', '')) price_per_sqft = market_value / sq_ft if data['property_type'] in ['Apartment', 'Studio']: if price_per_sqft < 1000: # Less than ₹1000 per sq ft specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably low for {data['property_type']}") elif price_per_sqft > 50000: # More than ₹50k per sq ft specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably high for {data['property_type']}") elif data['property_type'] in ['House', 'Villa', 'Independent House', 'Independent Villa']: if price_per_sqft < 500: # Less than ₹500 per sq ft specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably low for {data['property_type']}") elif price_per_sqft > 100000: # More than ₹1 lakh per sq ft specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably high for {data['property_type']}") except ValueError: pass except ValueError: specs_verification['market_value_reasonable'] = False specs_verification['issues'].append("Invalid market value data: must be a number") # Calculate verification score valid_checks = sum([ specs_verification['bedrooms_reasonable'], specs_verification['bathrooms_reasonable'], specs_verification['total_rooms_reasonable'], specs_verification['year_built_reasonable'], specs_verification['parking_reasonable'], specs_verification['sq_ft_reasonable'], specs_verification['market_value_reasonable'] ]) total_checks = 7 specs_verification['verification_score'] = (valid_checks / total_checks) * 100 # Overall validity specs_verification['is_valid'] = all([ specs_verification['bedrooms_reasonable'], specs_verification['bathrooms_reasonable'], specs_verification['total_rooms_reasonable'], specs_verification['year_built_reasonable'], specs_verification['parking_reasonable'], specs_verification['sq_ft_reasonable'], specs_verification['market_value_reasonable'] ]) except Exception as e: logger.error(f"Error in property specs verification: {str(e)}") specs_verification['is_valid'] = False specs_verification['issues'].append(f"Error in verification: {str(e)}") return specs_verification def analyze_market_value(data): """ Analyzes the market value of a property based on its specifications and location for the Indian real estate market. """ specs_verification = { 'is_valid': True, 'bedrooms_reasonable': True, 'bathrooms_reasonable': True, 'total_rooms_reasonable': True, 'parking_reasonable': True, 'sq_ft_reasonable': True, 'market_value_reasonable': True, 'year_built_reasonable': True, # Added missing field 'issues': [] } try: # Validate property type valid_property_types = [ 'Apartment', 'House', 'Villa', 'Independent House', 'Independent Villa', 'Studio', 'Commercial', 'Office', 'Shop', 'Warehouse', 'Industrial' ] if 'property_type' not in data or data['property_type'] not in valid_property_types: specs_verification['is_valid'] = False specs_verification['issues'].append(f"Invalid property type: {data.get('property_type', 'Not specified')}") # Validate bedrooms if 'bedrooms' in data: try: bedrooms = int(data['bedrooms']) if data['property_type'] in ['Apartment', 'Studio']: if bedrooms > 5 or bedrooms < 0: specs_verification['bedrooms_reasonable'] = False specs_verification['issues'].append(f"Invalid number of bedrooms for {data['property_type']}: {bedrooms}. Should be between 0 and 5.") elif data['property_type'] in ['House', 'Villa', 'Independent House', 'Independent Villa']: if bedrooms > 8 or bedrooms < 0: specs_verification['bedrooms_reasonable'] = False specs_verification['issues'].append(f"Invalid number of bedrooms for {data['property_type']}: {bedrooms}. Should be between 0 and 8.") elif data['property_type'] in ['Commercial', 'Office', 'Shop', 'Warehouse', 'Industrial']: if bedrooms > 0: specs_verification['bedrooms_reasonable'] = False specs_verification['issues'].append(f"Commercial properties typically don't have bedrooms: {bedrooms}") except ValueError: specs_verification['bedrooms_reasonable'] = False specs_verification['issues'].append("Invalid bedrooms data: must be a number") # Validate bathrooms if 'bathrooms' in data: try: bathrooms = float(data['bathrooms']) if data['property_type'] in ['Apartment', 'Studio']: if bathrooms > 4 or bathrooms < 0: specs_verification['bathrooms_reasonable'] = False specs_verification['issues'].append(f"Invalid number of bathrooms for {data['property_type']}: {bathrooms}. Should be between 0 and 4.") elif data['property_type'] in ['House', 'Villa', 'Independent House', 'Independent Villa']: if bathrooms > 6 or bathrooms < 0: specs_verification['bathrooms_reasonable'] = False specs_verification['issues'].append(f"Invalid number of bathrooms for {data['property_type']}: {bathrooms}. Should be between 0 and 6.") elif data['property_type'] in ['Commercial', 'Office', 'Shop', 'Warehouse', 'Industrial']: if bathrooms > 0: specs_verification['bathrooms_reasonable'] = False specs_verification['issues'].append(f"Commercial properties typically don't have bathrooms: {bathrooms}") except ValueError: specs_verification['bathrooms_reasonable'] = False specs_verification['issues'].append("Invalid bathrooms data: must be a number") # Validate total rooms if 'total_rooms' in data: try: total_rooms = int(data['total_rooms']) if total_rooms < 0: specs_verification['total_rooms_reasonable'] = False specs_verification['issues'].append(f"Invalid total rooms: {total_rooms}. Cannot be negative.") elif 'bedrooms' in data and 'bathrooms' in data: try: bedrooms = int(data['bedrooms']) bathrooms = int(float(data['bathrooms'])) if total_rooms < (bedrooms + bathrooms): specs_verification['total_rooms_reasonable'] = False specs_verification['issues'].append(f"Total rooms ({total_rooms}) is less than bedrooms + bathrooms ({bedrooms + bathrooms})") except ValueError: pass except ValueError: specs_verification['total_rooms_reasonable'] = False specs_verification['issues'].append("Invalid total rooms data: must be a number") # Validate parking if 'parking' in data: try: parking = int(data['parking']) if data['property_type'] in ['Apartment', 'Studio']: if parking > 2 or parking < 0: specs_verification['parking_reasonable'] = False specs_verification['issues'].append(f"Invalid parking spaces for {data['property_type']}: {parking}. Should be between 0 and 2.") elif data['property_type'] in ['House', 'Villa', 'Independent House', 'Independent Villa']: if parking > 4 or parking < 0: specs_verification['parking_reasonable'] = False specs_verification['issues'].append(f"Invalid parking spaces for {data['property_type']}: {parking}. Should be between 0 and 4.") elif data['property_type'] in ['Commercial', 'Office', 'Shop', 'Warehouse', 'Industrial']: if parking < 0: specs_verification['parking_reasonable'] = False specs_verification['issues'].append(f"Invalid parking spaces: {parking}. Cannot be negative.") except ValueError: specs_verification['parking_reasonable'] = False specs_verification['issues'].append("Invalid parking data: must be a number") # Validate square footage if 'sq_ft' in data: try: sq_ft = float(data['sq_ft'].replace(',', '')) if sq_ft <= 0: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Invalid square footage: {sq_ft}. Must be greater than 0.") else: if data['property_type'] in ['Apartment', 'Studio']: if sq_ft > 5000: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably high for {data['property_type']}") elif sq_ft < 200: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably low for {data['property_type']}") elif data['property_type'] in ['House', 'Villa', 'Independent House', 'Independent Villa']: if sq_ft > 10000: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably high for {data['property_type']}") elif sq_ft < 500: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably low for {data['property_type']}") except ValueError: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append("Invalid square footage data: must be a number") # Validate market value if 'market_value' in data: try: market_value = float(data['market_value'].replace(',', '').replace('₹', '').strip()) if market_value <= 0: specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Invalid market value: {market_value}. Must be greater than 0.") else: if data['property_type'] in ['Apartment', 'Studio']: if market_value > 500000000: # 5 crore limit for apartments specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably high for {data['property_type']}") elif market_value < 500000: # 5 lakh minimum specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably low for {data['property_type']}") elif data['property_type'] in ['House', 'Villa', 'Independent House', 'Independent Villa']: if market_value > 2000000000: # 20 crore limit for houses specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably high for {data['property_type']}") elif market_value < 1000000: # 10 lakh minimum specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably low for {data['property_type']}") elif data['property_type'] in ['Commercial', 'Office', 'Shop']: if market_value < 2000000: # 20 lakh minimum specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably low for {data['property_type']}") elif data['property_type'] in ['Warehouse', 'Industrial']: if market_value < 5000000: # 50 lakh minimum specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably low for {data['property_type']}") # Check price per square foot if 'sq_ft' in data and float(data['sq_ft'].replace(',', '')) > 0: try: sq_ft = float(data['sq_ft'].replace(',', '')) price_per_sqft = market_value / sq_ft if data['property_type'] in ['Apartment', 'Studio']: if price_per_sqft < 1000: # Less than ₹1000 per sq ft specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably low for {data['property_type']}") elif price_per_sqft > 50000: # More than ₹50k per sq ft specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably high for {data['property_type']}") elif data['property_type'] in ['House', 'Villa', 'Independent House', 'Independent Villa']: if price_per_sqft < 500: # Less than ₹500 per sq ft specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably low for {data['property_type']}") elif price_per_sqft > 100000: # More than ₹1 lakh per sq ft specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably high for {data['property_type']}") except ValueError: pass except ValueError: specs_verification['market_value_reasonable'] = False specs_verification['issues'].append("Invalid market value data: must be a number") # Calculate verification score valid_checks = sum([ specs_verification['bedrooms_reasonable'], specs_verification['bathrooms_reasonable'], specs_verification['total_rooms_reasonable'], specs_verification['year_built_reasonable'], specs_verification['parking_reasonable'], specs_verification['sq_ft_reasonable'], specs_verification['market_value_reasonable'] ]) total_checks = 7 specs_verification['verification_score'] = (valid_checks / total_checks) * 100 # Overall validity specs_verification['is_valid'] = all([ specs_verification['bedrooms_reasonable'], specs_verification['bathrooms_reasonable'], specs_verification['total_rooms_reasonable'], specs_verification['year_built_reasonable'], specs_verification['parking_reasonable'], specs_verification['sq_ft_reasonable'], specs_verification['market_value_reasonable'] ]) except Exception as e: logger.error(f"Error in property specs verification: {str(e)}") specs_verification['is_valid'] = False specs_verification['issues'].append(f"Error in verification: {str(e)}") return specs_verification def assess_image_quality(img): try: width, height = img.size resolution = width * height quality_score = min(100, resolution // 20000) return { 'resolution': f"{width}x{height}", 'quality_score': quality_score } except Exception as e: logger.error(f"Error assessing image quality: {str(e)}") return { 'resolution': 'unknown', 'quality_score': 0 } def check_if_property_related(text): try: classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") result = classifier(text[:1000], ["property-related", "non-property-related"]) is_related = result['labels'][0] == "property-related" return { 'is_related': is_related, 'confidence': float(result['scores'][0]) } except Exception as e: logger.error(f"Error checking property relation: {str(e)}") return { 'is_related': False, 'confidence': 0.0 } if __name__ == '__main__': # Run Flask app app.run(host='0.0.0.0', port=8000, debug=True, use_reloader=False)