from flask import Flask, render_template, request, jsonify from flask_cors import CORS import torch from transformers import pipeline, CLIPProcessor, CLIPModel, BitsAndBytesConfig import base64 import io import re import json import numpy as np from PIL import Image import fitz # PyMuPDF import os from datetime import datetime import uuid import requests from geopy.geocoders import Nominatim from sentence_transformers import SentenceTransformer, util import spacy import pytesseract from langdetect import detect from deep_translator import GoogleTranslator import logging from functools import lru_cache import time import math from pyngrok import ngrok import threading import gc import psutil app = Flask(__name__) CORS(app) # Enable CORS for frontend # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('app.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Initialize geocoder geocoder = Nominatim(user_agent="indian_property_verifier", timeout=10) # Add memory monitoring function def monitor_memory(): while True: process = psutil.Process() memory_info = process.memory_info() logger.info(f"Memory usage: {memory_info.rss / 1024 / 1024:.2f} MB") if memory_info.rss > 2 * 1024 * 1024 * 1024: # If using more than 2GB logger.warning("High memory usage detected, clearing cache") clear_model_cache() time.sleep(300) # Check every 5 minutes # Start memory monitoring in a separate thread memory_monitor_thread = threading.Thread(target=monitor_memory, daemon=True) memory_monitor_thread.start() # Initialize CLIP model try: clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") has_clip_model = True logger.info("CLIP model loaded successfully") except Exception as e: logger.error(f"Error loading CLIP model: {str(e)}") has_clip_model = False # Initialize sentence transformer try: sentence_model = SentenceTransformer('paraphrase-MiniLM-L6-v2') logger.info("Sentence transformer loaded successfully") except Exception as e: logger.error(f"Error loading sentence transformer: {str(e)}") sentence_model = None # Initialize spaCy try: nlp = spacy.load('en_core_web_md') logger.info("spaCy model loaded successfully") except Exception as e: logger.error(f"Error loading spaCy model: {str(e)}") nlp = None def make_json_serializable(obj): try: if isinstance(obj, (bool, int, float, str, type(None))): return obj elif isinstance(obj, (list, tuple)): return [make_json_serializable(item) for item in obj] elif isinstance(obj, dict): return {str(key): make_json_serializable(value) for key, value in obj.items()} elif torch.is_tensor(obj): return obj.item() if obj.numel() == 1 else obj.tolist() elif np.isscalar(obj): return obj.item() if hasattr(obj, 'item') else float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() else: return str(obj) except Exception as e: logger.error(f"Error serializing object: {str(e)}") return str(obj) @app.route('/') def index(): return render_template('newindex.html') @app.route('/get-location', methods=['POST']) def get_location(): try: data = request.json or {} latitude = data.get('latitude') longitude = data.get('longitude') if not latitude or not longitude: logger.warning("Missing latitude or longitude") return jsonify({ 'status': 'error', 'message': 'Latitude and longitude are required' }), 400 # Retry geocoding up to 3 times for attempt in range(3): try: location = geocoder.reverse((latitude, longitude), exactly_one=True) if location: address_components = location.raw.get('address', {}) return jsonify({ 'status': 'success', 'address': location.address, 'street': address_components.get('road', ''), 'city': address_components.get('city', address_components.get('town', address_components.get('village', ''))), 'state': address_components.get('state', ''), 'country': address_components.get('country', 'India'), 'postal_code': address_components.get('postcode', ''), 'latitude': latitude, 'longitude': longitude }) logger.warning(f"Geocoding failed on attempt {attempt + 1}") time.sleep(1) # Wait before retry except Exception as e: logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}") time.sleep(1) return jsonify({ 'status': 'error', 'message': 'Could not determine location after retries' }), 500 except Exception as e: logger.error(f"Error in get_location: {str(e)}") return jsonify({ 'status': 'error', 'message': str(e) }), 500 @app.route('/verify', methods=['POST']) def verify_property(): try: if not request.form and not request.files: logger.warning("No form data or files provided") return jsonify({ 'error': 'No data provided', 'status': 'error' }), 400 data = { 'property_name': request.form.get('property_name', '').strip(), 'property_type': request.form.get('property_type', '').strip(), 'status': request.form.get('status', '').strip(), 'description': request.form.get('description', '').strip(), 'address': request.form.get('address', '').strip(), 'city': request.form.get('city', '').strip(), 'state': request.form.get('state', '').strip(), 'country': request.form.get('country', 'India').strip(), 'zip': request.form.get('zip', '').strip(), 'latitude': request.form.get('latitude', '').strip(), 'longitude': request.form.get('longitude', '').strip(), 'bedrooms': request.form.get('bedrooms', '').strip(), 'bathrooms': request.form.get('bathrooms', '').strip(), 'total_rooms': request.form.get('total_rooms', '').strip(), 'year_built': request.form.get('year_built', '').strip(), 'parking': request.form.get('parking', '').strip(), 'sq_ft': request.form.get('sq_ft', '').strip(), 'market_value': request.form.get('market_value', '').strip(), 'amenities': request.form.get('amenities', '').strip(), 'nearby_landmarks': request.form.get('nearby_landmarks', '').strip(), 'legal_details': request.form.get('legal_details', '').strip() } required_fields = ['property_name', 'property_type', 'address', 'city', 'state'] missing_fields = [field for field in required_fields if not data[field]] if missing_fields: logger.warning(f"Missing required fields: {', '.join(missing_fields)}") return jsonify({ 'error': f"Missing required fields: {', '.join(missing_fields)}", 'status': 'error' }), 400 images = [] image_analysis = [] if 'images' in request.files: image_files = request.files.getlist('images') for img_file in image_files: if img_file.filename and img_file.filename.lower().endswith(('.jpg', '.jpeg', '.png')): try: img = Image.open(img_file) buffered = io.BytesIO() img.save(buffered, format="JPEG") img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') images.append(img_str) image_analysis.append(analyze_image(img)) except Exception as e: logger.error(f"Error processing image {img_file.filename}: {str(e)}") image_analysis.append({'error': str(e), 'is_property_related': False}) pdf_texts = [] pdf_analysis = [] if 'documents' in request.files: pdf_files = request.files.getlist('documents') for pdf_file in pdf_files: if pdf_file.filename and pdf_file.filename.lower().endswith('.pdf'): try: pdf_text = extract_pdf_text(pdf_file) pdf_texts.append({ 'filename': pdf_file.filename, 'text': pdf_text }) pdf_analysis.append(analyze_pdf_content(pdf_text, data)) except Exception as e: logger.error(f"Error processing PDF {pdf_file.filename}: {str(e)}") pdf_analysis.append({'error': str(e)}) consolidated_text = f""" Property Name: {data['property_name']} Property Type: {data['property_type']} Status: {data['status']} Description: {data['description']} Location: {data['address']}, {data['city']}, {data['state']}, {data['country']}, {data['zip']} Coordinates: Lat {data['latitude']}, Long {data['longitude']} Specifications: {data['bedrooms']} bedrooms, {data['bathrooms']} bathrooms, {data['total_rooms']} total rooms Year Built: {data['year_built']} Parking: {data['parking']} Size: {data['sq_ft']} sq. ft. Market Value: ₹{data['market_value']} Amenities: {data['amenities']} Nearby Landmarks: {data['nearby_landmarks']} Legal Details: {data['legal_details']} """ try: description = data['description'] if description and len(description) > 10: text_language = detect(description) if text_language != 'en': translated_description = GoogleTranslator(source=text_language, target='en').translate(description) data['description_translated'] = translated_description else: data['description_translated'] = description else: data['description_translated'] = description except Exception as e: logger.error(f"Error in language detection/translation: {str(e)}") data['description_translated'] = data['description'] summary = generate_property_summary(data) fraud_classification = classify_fraud(consolidated_text, data) trust_score, trust_reasoning = generate_trust_score(consolidated_text, image_analysis, pdf_analysis) suggestions = generate_suggestions(consolidated_text, data) quality_assessment = assess_text_quality(data['description_translated']) address_verification = verify_address(data) cross_validation = perform_cross_validation(data) location_analysis = analyze_location(data) price_analysis = analyze_price(data) legal_analysis = analyze_legal_details(data['legal_details']) specs_verification = verify_property_specs(data) market_analysis = analyze_market_value(data) document_analysis = { 'pdf_count': len(pdf_texts), 'pdf_texts': pdf_texts, 'pdf_analysis': pdf_analysis } image_results = { 'image_count': len(images), 'image_analysis': image_analysis } report_id = str(uuid.uuid4()) results = { 'report_id': report_id, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'summary': summary, 'fraud_classification': fraud_classification, 'trust_score': { 'score': trust_score, 'reasoning': trust_reasoning }, 'suggestions': suggestions, 'quality_assessment': quality_assessment, 'address_verification': address_verification, 'cross_validation': cross_validation, 'location_analysis': location_analysis, 'price_analysis': price_analysis, 'legal_analysis': legal_analysis, 'document_analysis': document_analysis, 'image_analysis': image_results, 'specs_verification': specs_verification, 'market_analysis': market_analysis, 'images': images } return jsonify(make_json_serializable(results)) except Exception as e: logger.error(f"Error in verify_property: {str(e)}") return jsonify({ 'error': 'Server error occurred. Please try again later.', 'status': 'error', 'details': str(e) }), 500 def extract_pdf_text(pdf_file): try: pdf_document = fitz.Document(stream=pdf_file.read(), filetype="pdf") text = "" for page in pdf_document: text += page.get_text() pdf_document.close() return text except Exception as e: logger.error(f"Error extracting PDF text: {str(e)}") return "" def analyze_image(image): try: if has_clip_model: img_rgb = image.convert('RGB') inputs = clip_processor( text=[ "real estate property interior", "real estate property exterior", "non-property-related image", "office space", "landscape" ], images=img_rgb, return_tensors="pt", padding=True ) outputs = clip_model(**inputs) logits_per_image = outputs.logits_per_image probs = logits_per_image.softmax(dim=1).detach().numpy()[0] property_related_score = probs[0] + probs[1] is_property_related = property_related_score > 0.5 quality = assess_image_quality(image) is_ai_generated = detect_ai_generated_image(image) return { 'is_property_related': is_property_related, 'property_confidence': float(property_related_score), 'top_predictions': [ {'label': 'property interior', 'confidence': float(probs[0])}, {'label': 'property exterior', 'confidence': float(probs[1])}, {'label': 'non-property', 'confidence': float(probs[2])} ], 'image_quality': quality, 'is_ai_generated': is_ai_generated, 'authenticity_score': 0.95 if not is_ai_generated else 0.60 } else: logger.warning("CLIP model unavailable") return { 'is_property_related': False, 'property_confidence': 0.0, 'top_predictions': [], 'image_quality': assess_image_quality(image), 'is_ai_generated': False, 'authenticity_score': 0.5 } except Exception as e: logger.error(f"Error analyzing image: {str(e)}") return { 'is_property_related': False, 'property_confidence': 0.0, 'top_predictions': [], 'image_quality': {'resolution': 'unknown', 'quality_score': 0}, 'is_ai_generated': False, 'authenticity_score': 0.0, 'error': str(e) } def detect_ai_generated_image(image): try: img_array = np.array(image) if len(img_array.shape) == 3: gray = np.mean(img_array, axis=2) else: gray = img_array noise = gray - np.mean(gray) noise_std = np.std(noise) width, height = image.size perfect_dimensions = (width % 64 == 0 and height % 64 == 0) has_exif = hasattr(image, '_getexif') and image._getexif() is not None return noise_std < 0.05 or perfect_dimensions or not has_exif except Exception as e: logger.error(f"Error detecting AI-generated image: {str(e)}") return False def analyze_pdf_content(document_text, property_data): try: if not document_text: return { 'document_type': {'classification': 'unknown', 'confidence': 0.0}, 'authenticity': {'assessment': 'could not verify', 'confidence': 0.0}, 'key_info': {}, 'consistency_score': 0.0, 'is_property_related': False, 'summary': 'Empty document', 'has_signatures': False, 'has_dates': False, 'verification_score': 0.0 } # Use a more sophisticated model for document classification classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") # Enhanced document types with more specific categories doc_types = [ "property deed", "sales agreement", "mortgage document", "property tax record", "title document", "khata certificate", "encumbrance certificate", "lease agreement", "rental agreement", "property registration document", "building permit", "other document" ] # Analyze document type with context doc_context = f"{document_text[:1000]} property_type:{property_data.get('property_type', '')} location:{property_data.get('city', '')}" doc_result = classifier(doc_context, doc_types) doc_type = doc_result['labels'][0] doc_confidence = doc_result['scores'][0] # Enhanced authenticity check with multiple aspects authenticity_aspects = [ "authentic legal document", "questionable document", "forged document", "template document", "official document" ] authenticity_result = classifier(document_text[:1000], authenticity_aspects) authenticity = "likely authentic" if authenticity_result['labels'][0] == "authentic legal document" else "questionable" authenticity_confidence = authenticity_result['scores'][0] # Extract key information using NLP key_info = extract_document_key_info(document_text) # Enhanced consistency check consistency_score = check_document_consistency(document_text, property_data) # Property relation check with context property_context = f"{document_text[:1000]} property:{property_data.get('property_name', '')} type:{property_data.get('property_type', '')}" is_property_related = check_if_property_related(property_context)['is_related'] # Generate summary using BART summary = summarize_text(document_text[:2000]) # Enhanced signature and date detection has_signatures = bool(re.search(r'(?:sign|signature|signed|witness|notary|authorized).{0,50}(?:by|of|for)', document_text.lower())) has_dates = bool(re.search(r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|\d{4}[/-]\d{1,2}[/-]\d{1,2}', document_text)) # Calculate verification score with weighted components verification_weights = { 'doc_type': 0.3, 'authenticity': 0.3, 'consistency': 0.2, 'property_relation': 0.1, 'signatures_dates': 0.1 } verification_score = ( doc_confidence * verification_weights['doc_type'] + authenticity_confidence * verification_weights['authenticity'] + consistency_score * verification_weights['consistency'] + float(is_property_related) * verification_weights['property_relation'] + float(has_signatures and has_dates) * verification_weights['signatures_dates'] ) return { 'document_type': {'classification': doc_type, 'confidence': float(doc_confidence)}, 'authenticity': {'assessment': authenticity, 'confidence': float(authenticity_confidence)}, 'key_info': key_info, 'consistency_score': float(consistency_score), 'is_property_related': is_property_related, 'summary': summary, 'has_signatures': has_signatures, 'has_dates': has_dates, 'verification_score': float(verification_score) } except Exception as e: logger.error(f"Error analyzing PDF content: {str(e)}") return { 'document_type': {'classification': 'unknown', 'confidence': 0.0}, 'authenticity': {'assessment': 'could not verify', 'confidence': 0.0}, 'key_info': {}, 'consistency_score': 0.0, 'is_property_related': False, 'summary': 'Could not analyze document', 'has_signatures': False, 'has_dates': False, 'verification_score': 0.0, 'error': str(e) } def check_document_consistency(document_text, property_data): try: if not sentence_model: logger.warning("Sentence model unavailable") return 0.5 property_text = ' '.join([ property_data.get(key, '') for key in [ 'property_name', 'property_type', 'address', 'city', 'state', 'market_value', 'sq_ft', 'bedrooms' ] ]) property_embedding = sentence_model.encode(property_text) document_embedding = sentence_model.encode(document_text[:1000]) similarity = util.cos_sim(property_embedding, document_embedding)[0][0].item() return max(0.0, min(1.0, float(similarity))) except Exception as e: logger.error(f"Error checking document consistency: {str(e)}") return 0.0 def extract_document_key_info(text): try: info = {} patterns = { 'property_address': r'(?:property|premises|located at)[:\s]+([^\n.]+)', 'price': r'(?:price|value|amount)[:\s]+(?:Rs\.?|₹)?[\s]*([0-9,.]+)', 'date': r'(?:date|dated|executed on)[:\s]+([^\n.]+\d{4})', 'seller': r'(?:seller|grantor|owner)[:\s]+([^\n.]+)', 'buyer': r'(?:buyer|grantee|purchaser)[:\s]+([^\n.]+)', 'size': r'(?:area|size|extent)[:\s]+([0-9,.]+)[\s]*(?:sq\.?[\s]*(?:ft|feet))', 'registration_number': r'(?:registration|reg\.?|document)[\s]*(?:no\.?|number|#)[:\s]*([A-Za-z0-9\-/]+)' } for key, pattern in patterns.items(): match = re.search(pattern, text, re.IGNORECASE) if match: info[key] = match.group(1).strip() return info except Exception as e: logger.error(f"Error extracting document key info: {str(e)}") return {} def generate_property_summary(data): try: # Create a detailed context for summary generation property_context = f""" Property Name: {data.get('property_name', '')} Type: {data.get('property_type', '')} Status: {data.get('status', '')} Location: {data.get('address', '')}, {data.get('city', '')}, {data.get('state', '')}, {data.get('country', '')} Size: {data.get('sq_ft', '')} sq. ft. Price: ₹{data.get('market_value', '0')} Bedrooms: {data.get('bedrooms', '')} Bathrooms: {data.get('bathrooms', '')} Year Built: {data.get('year_built', '')} Description: {data.get('description', '')} """ # Use BART for summary generation summarizer = load_model("summarization", "facebook/bart-large-cnn") # Generate initial summary summary_result = summarizer(property_context, max_length=150, min_length=50, do_sample=False) initial_summary = summary_result[0]['summary_text'] # Enhance summary with key features key_features = [] # Add property type and status if data.get('property_type') and data.get('status'): key_features.append(f"{data['property_type']} is {data['status'].lower()}") # Add location if available location_parts = [] if data.get('city'): location_parts.append(data['city']) if data.get('state'): location_parts.append(data['state']) if location_parts: key_features.append(f"Located in {', '.join(location_parts)}") # Add size and price if available if data.get('sq_ft'): key_features.append(f"Spans {data['sq_ft']} sq. ft.") if data.get('market_value'): key_features.append(f"Valued at ₹{data['market_value']}") # Add rooms information rooms_info = [] if data.get('bedrooms'): rooms_info.append(f"{data['bedrooms']} bedroom{'s' if data['bedrooms'] != '1' else ''}") if data.get('bathrooms'): rooms_info.append(f"{data['bathrooms']} bathroom{'s' if data['bathrooms'] != '1' else ''}") if rooms_info: key_features.append(f"Features {' and '.join(rooms_info)}") # Add amenities if available if data.get('amenities'): key_features.append(f"Amenities: {data['amenities']}") # Combine initial summary with key features enhanced_summary = initial_summary if key_features: enhanced_summary += " " + ". ".join(key_features) + "." # Clean up the summary enhanced_summary = enhanced_summary.replace(" ", " ").strip() return enhanced_summary except Exception as e: logger.error(f"Error generating property summary: {str(e)}") return "Could not generate summary." def summarize_text(text): try: if not text or len(text.strip()) < 10: return "No text to summarize." summarizer = load_model("summarization", "facebook/bart-large-cnn") input_length = len(text.split()) max_length = max(50, min(150, input_length // 2)) min_length = max(20, input_length // 4) summary = summarizer(text[:2000], max_length=max_length, min_length=min_length, do_sample=False) return summary[0]['summary_text'] except Exception as e: logger.error(f"Error summarizing text: {str(e)}") return text[:200] + "..." if len(text) > 200 else text def classify_fraud(text, data=None): try: classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") categories = [ "suspicious pricing pattern", "potentially fraudulent listing", "ownership verification issues", "location verification issues", "document authenticity issues", "image authenticity issues", "urgent pressure tactics", "inconsistent information", "missing critical details", "suspicious contact information" ] # Create a comprehensive context for analysis context = f""" Property Details: - Name: {data.get('property_name', 'Not provided')} - Type: {data.get('property_type', 'Not provided')} - Status: {data.get('property_status', 'Not provided')} - Price: {data.get('market_value', 'Not provided')} - Square Footage: {data.get('square_footage', 'Not provided')} - Year Built: {data.get('year_built', 'Not provided')} - Location: {data.get('address', 'Not provided')} - Description: {text} """ result = classifier(context, categories, multi_label=True) # Lower threshold to catch more potential issues threshold = 0.2 # Categorize risks with more granular levels high_risk = [] medium_risk = [] low_risk = [] for label, score in zip(result['labels'], result['scores']): if score > threshold: if score > 0.7: high_risk.append((label, score)) elif score > 0.5: medium_risk.append((label, score)) else: low_risk.append((label, score)) # Calculate alert score with adjusted weights alert_score = ( sum(score * 1.0 for _, score in high_risk) + sum(score * 0.7 for _, score in medium_risk) + sum(score * 0.4 for _, score in low_risk) ) / max(1, len(result['scores'])) # More granular alert levels if alert_score > 0.8: alert_level = 'critical' elif alert_score > 0.6: alert_level = 'high' elif alert_score > 0.4: alert_level = 'medium' elif alert_score > 0.2: alert_level = 'low' else: alert_level = 'minimal' # Enhanced fraud indicators with more specific patterns fraud_indicators = [] # Price-related patterns price_patterns = [ (r'suspiciously low price', 0.8), (r'unusually high price', 0.7), (r'price too good to be true', 0.9), (r'urgent sale', 0.6), (r'must sell quickly', 0.7) ] # Location-related patterns location_patterns = [ (r'location mismatch', 0.8), (r'address inconsistency', 0.7), (r'wrong neighborhood', 0.6), (r'incorrect zip code', 0.7) ] # Document-related patterns document_patterns = [ (r'missing documents', 0.8), (r'unverified documents', 0.7), (r'fake documents', 0.9), (r'photoshopped documents', 0.8) ] # Urgency-related patterns urgency_patterns = [ (r'act now', 0.6), (r'limited time offer', 0.5), (r'first come first served', 0.4), (r'won\'t last long', 0.5) ] # Check all patterns all_patterns = price_patterns + location_patterns + document_patterns + urgency_patterns for pattern, weight in all_patterns: if re.search(pattern, text.lower()): fraud_indicators.append({ 'pattern': pattern, 'weight': weight, 'context': text[max(0, text.lower().find(pattern)-50):min(len(text), text.lower().find(pattern)+50)] }) # Additional checks for data inconsistencies if data: # Check for suspiciously low price per square foot try: price = float(data.get('market_value', 0)) sqft = float(data.get('square_footage', 1)) price_per_sqft = price / sqft if price_per_sqft < 50: # Unusually low price per square foot fraud_indicators.append({ 'pattern': 'suspiciously low price per square foot', 'weight': 0.8, 'context': f'Price per square foot: ${price_per_sqft:.2f}' }) except (ValueError, ZeroDivisionError): pass # Check for impossible values try: year_built = int(data.get('year_built', 0)) if year_built < 1800 or year_built > 2024: fraud_indicators.append({ 'pattern': 'impossible year built', 'weight': 0.9, 'context': f'Year built: {year_built}' }) except ValueError: pass # Check for missing critical information critical_fields = ['property_name', 'property_type', 'address', 'market_value', 'square_footage'] missing_fields = [field for field in critical_fields if not data.get(field)] if missing_fields: fraud_indicators.append({ 'pattern': 'missing critical information', 'weight': 0.7, 'context': f'Missing fields: {", ".join(missing_fields)}' }) return { 'alert_level': alert_level, 'alert_score': alert_score, 'high_risk': high_risk, 'medium_risk': medium_risk, 'low_risk': low_risk, 'fraud_indicators': fraud_indicators } except Exception as e: logger.error(f"Error in fraud classification: {str(e)}") return { 'alert_level': 'error', 'alert_score': 1.0, 'high_risk': [], 'medium_risk': [], 'low_risk': [], 'fraud_indicators': [] } def generate_trust_score(text, image_analysis, pdf_analysis): try: classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") aspects = [ "complete information provided", "verified location", "consistent data", "authentic documents", "authentic images", "reasonable pricing", "verified ownership", "proper documentation" ] result = classifier(text[:1000], aspects, multi_label=True) # Much stricter weights with higher emphasis on critical aspects weights = { "complete information provided": 0.25, "verified location": 0.20, "consistent data": 0.15, "authentic documents": 0.15, "authentic images": 0.10, "reasonable pricing": 0.05, "verified ownership": 0.05, "proper documentation": 0.05 } score = 0 reasoning_parts = [] # Much stricter scoring for each aspect for label, confidence in zip(result['labels'], result['scores']): adjusted_confidence = confidence # Stricter document verification if label == "authentic documents": if not pdf_analysis or len(pdf_analysis) == 0: adjusted_confidence = 0.0 else: doc_scores = [p.get('verification_score', 0) for p in pdf_analysis] adjusted_confidence = sum(doc_scores) / max(1, len(doc_scores)) # Heavily penalize if any document has low verification score if any(score < 0.7 for score in doc_scores): adjusted_confidence *= 0.4 # Additional penalty for missing documents if len(doc_scores) < 2: adjusted_confidence *= 0.5 # Stricter image verification elif label == "authentic images": if not image_analysis or len(image_analysis) == 0: adjusted_confidence = 0.0 else: img_scores = [i.get('authenticity_score', 0) for i in image_analysis] adjusted_confidence = sum(img_scores) / max(1, len(img_scores)) # Heavily penalize if any image has low authenticity score if any(score < 0.8 for score in img_scores): adjusted_confidence *= 0.4 # Additional penalty for AI-generated images if any(i.get('is_ai_generated', False) for i in image_analysis): adjusted_confidence *= 0.5 # Additional penalty for non-property related images if any(not i.get('is_property_related', False) for i in image_analysis): adjusted_confidence *= 0.6 # Stricter consistency check elif label == "consistent data": # Check for inconsistencies in the data if "inconsistent" in text.lower() or "suspicious" in text.lower(): adjusted_confidence *= 0.3 # Check for impossible values if "impossible" in text.lower() or "invalid" in text.lower(): adjusted_confidence *= 0.2 # Check for missing critical information if "missing" in text.lower() or "not provided" in text.lower(): adjusted_confidence *= 0.4 # Stricter completeness check elif label == "complete information provided": # Check for missing critical information if len(text) < 300 or "not provided" in text.lower() or "missing" in text.lower(): adjusted_confidence *= 0.4 # Check for vague or generic descriptions if "generic" in text.lower() or "vague" in text.lower(): adjusted_confidence *= 0.5 # Check for suspiciously short descriptions if len(text) < 150: adjusted_confidence *= 0.3 score += adjusted_confidence * weights.get(label, 0.1) reasoning_parts.append(f"{label} ({adjusted_confidence:.0%})") # Apply additional penalties for suspicious patterns if "suspicious" in text.lower() or "fraudulent" in text.lower(): score *= 0.5 # Apply penalties for suspiciously low values if "suspiciously low" in text.lower() or "unusually small" in text.lower(): score *= 0.6 # Apply penalties for inconsistencies if "inconsistent" in text.lower() or "mismatch" in text.lower(): score *= 0.6 # Apply penalties for missing critical information if "missing critical" in text.lower() or "incomplete" in text.lower(): score *= 0.7 # Ensure score is between 0 and 100 score = min(100, max(0, int(score * 100))) reasoning = f"Based on: {', '.join(reasoning_parts)}" return score, reasoning except Exception as e: logger.error(f"Error generating trust score: {str(e)}") return 20, "Could not assess trust." def generate_suggestions(text, data=None): try: classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") # Create comprehensive context for analysis suggestion_context = text if data: suggestion_context += f""" Additional Context: Property Type: {data.get('property_type', '')} Location: {data.get('city', '')}, {data.get('state', '')} Size: {data.get('sq_ft', '')} sq.ft. Year Built: {data.get('year_built', '')} """ # Enhanced suggestion categories based on property context base_suggestions = { 'documentation': { 'label': "add more documentation", 'categories': [ "complete documentation provided", "missing essential documents", "incomplete paperwork", "documentation needs verification" ], 'weight': 2.0, 'improvements': { 'missing essential documents': [ "Add property deed or title documents", "Include recent property tax records", "Attach property registration documents" ], 'incomplete paperwork': [ "Complete all required legal documents", "Add missing ownership proof", "Include property survey documents" ] } }, 'details': { 'label': "enhance property details", 'categories': [ "detailed property information", "basic information only", "missing key details", "comprehensive description" ], 'weight': 1.8, 'improvements': { 'basic information only': [ "Add more details about property features", "Include information about recent renovations", "Describe unique selling points" ], 'missing key details': [ "Specify exact built-up area", "Add floor plan details", "Include maintenance costs" ] } }, 'images': { 'label': "improve visual content", 'categories': [ "high quality images provided", "poor image quality", "insufficient images", "missing key area photos" ], 'weight': 1.5, 'improvements': { 'poor image quality': [ "Add high-resolution property photos", "Include better lighting in images", "Provide professional photography" ], 'insufficient images': [ "Add more interior photos", "Include exterior and surrounding area images", "Add photos of amenities" ] } }, 'pricing': { 'label': "pricing information", 'categories': [ "detailed pricing breakdown", "basic price only", "missing price details", "unclear pricing terms" ], 'weight': 1.7, 'improvements': { 'basic price only': [ "Add detailed price breakdown", "Include maintenance charges", "Specify additional costs" ], 'missing price details': [ "Add price per square foot", "Include tax implications", "Specify payment terms" ] } }, 'location': { 'label': "location details", 'categories': [ "comprehensive location info", "basic location only", "missing location details", "unclear accessibility info" ], 'weight': 1.6, 'improvements': { 'basic location only': [ "Add nearby landmarks and distances", "Include transportation options", "Specify neighborhood facilities" ], 'missing location details': [ "Add exact GPS coordinates", "Include area development plans", "Specify distance to key facilities" ] } } } suggestions = [] confidence_scores = [] for aspect, config in base_suggestions.items(): # Analyze each aspect with context result = classifier(suggestion_context[:1000], config['categories']) # Get the most relevant category top_category = result['labels'][0] confidence = float(result['scores'][0]) # If the category indicates improvement needed (confidence < 0.6) if confidence < 0.6 and top_category in config['improvements']: weighted_confidence = confidence * config['weight'] for improvement in config['improvements'][top_category]: suggestions.append({ 'aspect': aspect, 'category': top_category, 'suggestion': improvement, 'confidence': weighted_confidence }) confidence_scores.append(weighted_confidence) # Sort suggestions by confidence and priority suggestions.sort(key=lambda x: x['confidence'], reverse=True) # Property type specific suggestions if data and data.get('property_type'): property_type = data['property_type'].lower() type_specific_suggestions = { 'residential': [ "Add information about school districts", "Include details about neighborhood safety", "Specify parking arrangements" ], 'commercial': [ "Add foot traffic statistics", "Include zoning information", "Specify business licenses required" ], 'industrial': [ "Add power supply specifications", "Include environmental clearances", "Specify loading/unloading facilities" ], 'land': [ "Add soil testing reports", "Include development potential analysis", "Specify available utilities" ] } for type_key, type_suggestions in type_specific_suggestions.items(): if type_key in property_type: for suggestion in type_suggestions: suggestions.append({ 'aspect': 'property_type_specific', 'category': 'type_specific_requirements', 'suggestion': suggestion, 'confidence': 0.8 # High confidence for type-specific suggestions }) # Add market-based suggestions if data and data.get('market_value'): try: market_value = float(data['market_value'].replace('₹', '').replace(',', '')) if market_value > 10000000: # High-value property premium_suggestions = [ "Add virtual tour of the property", "Include detailed investment analysis", "Provide historical price trends" ] for suggestion in premium_suggestions: suggestions.append({ 'aspect': 'premium_property', 'category': 'high_value_requirements', 'suggestion': suggestion, 'confidence': 0.9 }) except ValueError: pass # Calculate overall completeness score completeness_score = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0 completeness_score = min(100, max(0, completeness_score * 100)) return { 'suggestions': suggestions[:10], # Return top 10 suggestions 'completeness_score': completeness_score, 'priority_aspects': [s['aspect'] for s in suggestions[:3]], 'improvement_summary': f"Focus on improving {', '.join([s['aspect'] for s in suggestions[:3]])}", 'total_suggestions': len(suggestions) } except Exception as e: logger.error(f"Error generating suggestions: {str(e)}") return { 'suggestions': [ { 'aspect': 'general', 'category': 'basic_requirements', 'suggestion': 'Please provide more property details', 'confidence': 0.5 } ], 'completeness_score': 0, 'priority_aspects': ['general'], 'improvement_summary': "Add basic property information", 'total_suggestions': 1 } def assess_text_quality(text): try: if not text or len(text.strip()) < 20: return { 'assessment': 'insufficient', 'score': 0, 'reasoning': 'Text too short.', 'is_ai_generated': False, 'quality_metrics': {} } classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") # Enhanced quality categories with more specific indicators quality_categories = [ "detailed and informative", "adequately detailed", "basic information", "vague description", "misleading content", "professional listing", "amateur listing", "spam-like content", "template-based content", "authentic description" ] # Analyze text with multiple aspects quality_result = classifier(text[:1000], quality_categories, multi_label=True) # Get top classifications with confidence scores top_classifications = [] for label, score in zip(quality_result['labels'][:3], quality_result['scores'][:3]): if score > 0.3: # Only include if confidence is above 30% top_classifications.append({ 'classification': label, 'confidence': float(score) }) # AI generation detection with multiple models ai_check = classifier(text[:1000], ["human-written", "AI-generated", "template-based", "authentic"]) is_ai_generated = ( (ai_check['labels'][0] == "AI-generated" and ai_check['scores'][0] > 0.6) or (ai_check['labels'][0] == "template-based" and ai_check['scores'][0] > 0.7) ) # Calculate quality metrics quality_metrics = { 'detail_level': sum(score for label, score in zip(quality_result['labels'], quality_result['scores']) if label in ['detailed and informative', 'adequately detailed']), 'professionalism': sum(score for label, score in zip(quality_result['labels'], quality_result['scores']) if label in ['professional listing', 'authentic description']), 'clarity': sum(score for label, score in zip(quality_result['labels'], quality_result['scores']) if label not in ['vague description', 'misleading content', 'spam-like content']), 'authenticity': 1.0 - sum(score for label, score in zip(quality_result['labels'], quality_result['scores']) if label in ['template-based content', 'spam-like content']) } # Calculate overall score with weighted metrics weights = { 'detail_level': 0.3, 'professionalism': 0.25, 'clarity': 0.25, 'authenticity': 0.2 } score = sum(metric * weights[metric_name] for metric_name, metric in quality_metrics.items()) score = score * 100 # Convert to percentage # Adjust score for AI-generated content if is_ai_generated: score = score * 0.7 # Reduce score by 30% for AI-generated content # Generate detailed reasoning reasoning_parts = [] if top_classifications: primary_class = top_classifications[0]['classification'] reasoning_parts.append(f"Primary assessment: {primary_class}") if quality_metrics['detail_level'] > 0.7: reasoning_parts.append("Contains comprehensive details") elif quality_metrics['detail_level'] > 0.4: reasoning_parts.append("Contains adequate details") else: reasoning_parts.append("Lacks important details") if quality_metrics['professionalism'] > 0.7: reasoning_parts.append("Professional listing style") elif quality_metrics['professionalism'] < 0.4: reasoning_parts.append("Amateur listing style") if quality_metrics['clarity'] < 0.5: reasoning_parts.append("Content clarity issues detected") if is_ai_generated: reasoning_parts.append("Content appears to be AI-generated") return { 'assessment': top_classifications[0]['classification'] if top_classifications else 'could not assess', 'score': int(score), 'reasoning': '. '.join(reasoning_parts), 'is_ai_generated': is_ai_generated, 'quality_metrics': quality_metrics, 'top_classifications': top_classifications } except Exception as e: logger.error(f"Error assessing text quality: {str(e)}") return { 'assessment': 'could not assess', 'score': 50, 'reasoning': 'Technical error.', 'is_ai_generated': False, 'quality_metrics': {}, 'top_classifications': [] } def verify_address(data): try: address_results = { 'address_exists': False, 'pincode_valid': False, 'city_state_match': False, 'coordinates_match': False, 'confidence': 0.0, 'issues': [], 'verification_score': 0.0 } if data['zip']: try: response = requests.get(f"https://api.postalpincode.in/pincode/{data['zip']}", timeout=5) if response.status_code == 200: pin_data = response.json() if pin_data[0]['Status'] == 'Success': address_results['pincode_valid'] = True post_offices = pin_data[0]['PostOffice'] cities = {po['Name'].lower() for po in post_offices} states = {po['State'].lower() for po in post_offices} if data['city'].lower() in cities or data['state'].lower() in states: address_results['city_state_match'] = True else: address_results['issues'].append("City/state may not match pincode") else: address_results['issues'].append(f"Invalid pincode: {data['zip']}") else: address_results['issues'].append("Pincode API error") except Exception as e: logger.error(f"Pincode API error: {str(e)}") address_results['issues'].append("Pincode validation failed") full_address = ', '.join(filter(None, [data['address'], data['city'], data['state'], data['country'], data['zip']])) for attempt in range(3): try: location = geocoder.geocode(full_address) if location: address_results['address_exists'] = True address_results['confidence'] = 0.9 if data['latitude'] and data['longitude']: try: provided_coords = (float(data['latitude']), float(data['longitude'])) geocoded_coords = (location.latitude, location.longitude) from geopy.distance import distance dist = distance(provided_coords, geocoded_coords).km address_results['coordinates_match'] = dist < 1.0 if not address_results['coordinates_match']: address_results['issues'].append(f"Coordinates {dist:.2f}km off") except: address_results['issues'].append("Invalid coordinates") break time.sleep(1) except Exception as e: logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}") time.sleep(1) else: address_results['issues'].append("Address geocoding failed") verification_points = ( address_results['address_exists'] * 0.4 + address_results['pincode_valid'] * 0.3 + address_results['city_state_match'] * 0.2 + address_results['coordinates_match'] * 0.1 ) address_results['verification_score'] = verification_points return address_results except Exception as e: logger.error(f"Error verifying address: {str(e)}") address_results['issues'].append(str(e)) return address_results def perform_cross_validation(data): try: cross_checks = [] # Check bedroom count consistency try: bedrooms = int(data['bedrooms']) if data['bedrooms'] else 0 desc_bhk = re.findall(r'(\d+)\s*bhk', data['description'].lower()) if desc_bhk and int(desc_bhk[0]) != bedrooms: cross_checks.append({ 'check': 'bedroom_count', 'status': 'inconsistent', 'message': f"Description mentions {desc_bhk[0]} BHK, form says {bedrooms}" }) else: cross_checks.append({ 'check': 'bedroom_count', 'status': 'consistent', 'message': f"Bedrooms: {bedrooms}" }) except: cross_checks.append({ 'check': 'bedroom_count', 'status': 'invalid', 'message': 'Invalid bedroom data' }) # Check room count consistency try: bedrooms = int(data['bedrooms']) if data['bedrooms'] else 0 bathrooms = float(data['bathrooms']) if data['bathrooms'] else 0 total_rooms = int(data['total_rooms']) if data['total_rooms'] else 0 # More thorough room count validation if total_rooms > 0: if total_rooms < bedrooms + bathrooms: cross_checks.append({ 'check': 'room_count', 'status': 'inconsistent', 'message': f"Total rooms ({total_rooms}) less than bedrooms ({bedrooms}) + bathrooms ({bathrooms})" }) elif total_rooms > bedrooms + bathrooms + 5: # Allow for some extra rooms cross_checks.append({ 'check': 'room_count', 'status': 'suspicious', 'message': f"Total rooms ({total_rooms}) seems unusually high compared to bedrooms ({bedrooms}) + bathrooms ({bathrooms})" }) else: cross_checks.append({ 'check': 'room_count', 'status': 'consistent', 'message': f"Rooms consistent: {total_rooms} total, {bedrooms} bedrooms, {bathrooms} bathrooms" }) else: cross_checks.append({ 'check': 'room_count', 'status': 'missing', 'message': 'Total room count not provided' }) except: cross_checks.append({ 'check': 'room_count', 'status': 'invalid', 'message': 'Invalid room count data' }) # Check year built consistency try: year_built = int(data['year_built']) if data['year_built'] else 0 current_year = datetime.now().year if year_built > 0: if year_built > current_year: cross_checks.append({ 'check': 'year_built', 'status': 'invalid', 'message': f"Year built ({year_built}) is in the future" }) elif year_built < 1800: cross_checks.append({ 'check': 'year_built', 'status': 'suspicious', 'message': f"Year built ({year_built}) seems unusually old" }) elif current_year - year_built > 200: cross_checks.append({ 'check': 'year_built', 'status': 'suspicious', 'message': f"Property age ({current_year - year_built} years) seems unusually old" }) else: cross_checks.append({ 'check': 'year_built', 'status': 'reasonable', 'message': f"Year built reasonable: {year_built}" }) else: cross_checks.append({ 'check': 'year_built', 'status': 'missing', 'message': 'Year built not provided' }) except: cross_checks.append({ 'check': 'year_built', 'status': 'invalid', 'message': 'Invalid year built data' }) # Check square footage consistency try: sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data['sq_ft'] else 0 bedrooms = int(data['bedrooms']) if data['bedrooms'] else 0 if sq_ft > 0 and bedrooms > 0: sq_ft_per_bedroom = sq_ft / bedrooms if sq_ft_per_bedroom < 50: # Unusually small per bedroom cross_checks.append({ 'check': 'sq_ft_per_bedroom', 'status': 'suspicious', 'message': f"Square footage per bedroom ({sq_ft_per_bedroom:.1f} sq.ft.) seems unusually small" }) elif sq_ft_per_bedroom > 1000: # Unusually large per bedroom cross_checks.append({ 'check': 'sq_ft_per_bedroom', 'status': 'suspicious', 'message': f"Square footage per bedroom ({sq_ft_per_bedroom:.1f} sq.ft.) seems unusually large" }) else: cross_checks.append({ 'check': 'sq_ft_per_bedroom', 'status': 'reasonable', 'message': f"Square footage per bedroom ({sq_ft_per_bedroom:.1f} sq.ft.) is reasonable" }) elif sq_ft > 0: cross_checks.append({ 'check': 'sq_ft', 'status': 'incomplete', 'message': f"Square footage provided: {sq_ft} sq.ft., but bedroom count missing" }) elif bedrooms > 0: cross_checks.append({ 'check': 'sq_ft', 'status': 'missing', 'message': f"Square footage not provided, but {bedrooms} bedrooms listed" }) else: cross_checks.append({ 'check': 'sq_ft', 'status': 'missing', 'message': 'Square footage not provided' }) except: cross_checks.append({ 'check': 'sq_ft', 'status': 'invalid', 'message': 'Invalid square footage data' }) # Check price per square foot try: market_value = float(data['market_value'].replace('₹', '').replace(',', '')) if data['market_value'] else 0 sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data['sq_ft'] else 0 if market_value > 0 and sq_ft > 0: price_per_sqft = market_value / sq_ft # Check for suspiciously low price per sq ft if price_per_sqft < 100: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'suspiciously low', 'message': f"Price per sq.ft.: ₹{price_per_sqft:.2f} is suspiciously low" }) # Check for suspiciously high price per sq ft elif price_per_sqft > 50000: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'suspiciously high', 'message': f"Price per sq.ft.: ₹{price_per_sqft:.2f} is suspiciously high" }) else: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'reasonable', 'message': f"Price per sq.ft.: ₹{price_per_sqft:.2f} is reasonable" }) elif market_value > 0: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'incomplete', 'message': f"Market value provided: ₹{market_value:,.2f}, but square footage missing" }) elif sq_ft > 0: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'incomplete', 'message': f"Square footage provided: {sq_ft} sq.ft., but market value missing" }) else: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'missing', 'message': 'Price per sq.ft. cannot be calculated (missing data)' }) except: cross_checks.append({ 'check': 'price_per_sqft', 'status': 'invalid', 'message': 'Invalid price per sq.ft. data' }) # Check location consistency try: latitude = float(data['latitude']) if data['latitude'] else 0 longitude = float(data['longitude']) if data['longitude'] else 0 address = data['address'].lower() if data['address'] else '' city = data['city'].lower() if data['city'] else '' state = data['state'].lower() if data['state'] else '' country = data['country'].lower() if data['country'] else 'india' # Check if coordinates are within India if latitude != 0 and longitude != 0: if 6.5 <= latitude <= 35.5 and 68.1 <= longitude <= 97.4: cross_checks.append({ 'check': 'coordinates', 'status': 'valid', 'message': 'Coordinates within India' }) else: cross_checks.append({ 'check': 'coordinates', 'status': 'invalid', 'message': 'Coordinates outside India' }) else: cross_checks.append({ 'check': 'coordinates', 'status': 'missing', 'message': 'Coordinates not provided' }) # Check if address contains city and state if address and city and state: if city in address and state in address: cross_checks.append({ 'check': 'address_consistency', 'status': 'consistent', 'message': 'Address contains city and state' }) else: cross_checks.append({ 'check': 'address_consistency', 'status': 'inconsistent', 'message': 'Address does not contain city or state' }) else: cross_checks.append({ 'check': 'address_consistency', 'status': 'incomplete', 'message': 'Address consistency check incomplete (missing data)' }) except: cross_checks.append({ 'check': 'location', 'status': 'invalid', 'message': 'Invalid location data' }) # Check property type consistency try: property_type = data['property_type'].lower() if data['property_type'] else '' description = data['description'].lower() if data['description'] else '' if property_type and description: property_types = ['apartment', 'house', 'condo', 'townhouse', 'villa', 'land', 'commercial'] found_types = [pt for pt in property_types if pt in description] if found_types and property_type not in found_types: cross_checks.append({ 'check': 'property_type', 'status': 'inconsistent', 'message': f"Description mentions {', '.join(found_types)}, but property type is {property_type}" }) else: cross_checks.append({ 'check': 'property_type', 'status': 'consistent', 'message': f"Property type consistent: {property_type}" }) else: cross_checks.append({ 'check': 'property_type', 'status': 'incomplete', 'message': 'Property type consistency check incomplete (missing data)' }) except: cross_checks.append({ 'check': 'property_type', 'status': 'invalid', 'message': 'Invalid property type data' }) # Check for suspiciously low market value try: market_value = float(data['market_value'].replace('₹', '').replace(',', '')) if data['market_value'] else 0 property_type = data['property_type'].lower() if data['property_type'] else '' if market_value > 0 and property_type: # Define minimum reasonable values for different property types min_values = { 'apartment': 500000, 'house': 1000000, 'condo': 800000, 'townhouse': 900000, 'villa': 2000000, 'land': 300000, 'commercial': 2000000 } min_value = min_values.get(property_type, 500000) if market_value < min_value: cross_checks.append({ 'check': 'market_value', 'status': 'suspiciously low', 'message': f"Market value (₹{market_value:,.2f}) seems suspiciously low for a {property_type}" }) else: cross_checks.append({ 'check': 'market_value', 'status': 'reasonable', 'message': f"Market value (₹{market_value:,.2f}) is reasonable for a {property_type}" }) elif market_value > 0: cross_checks.append({ 'check': 'market_value', 'status': 'incomplete', 'message': f"Market value provided: ₹{market_value:,.2f}, but property type missing" }) else: cross_checks.append({ 'check': 'market_value', 'status': 'missing', 'message': 'Market value not provided' }) except: cross_checks.append({ 'check': 'market_value', 'status': 'invalid', 'message': 'Invalid market value data' }) return cross_checks except Exception as e: logger.error(f"Error performing cross validation: {str(e)}") return [{ 'check': 'cross_validation', 'status': 'error', 'message': f'Error performing cross validation: {str(e)}' }] def analyze_location(data): try: classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") location_text = ' '.join(filter(None, [ data['address'], data['city'], data['state'], data['country'], data['zip'], f"Lat: {data['latitude']}", f"Long: {data['longitude']}", data['nearby_landmarks'] ])) categories = ["complete", "partial", "minimal", "missing"] result = classifier(location_text, categories) location_quality = "unknown" if data['city'] and data['state']: for attempt in range(3): try: location = geocoder.geocode(f"{data['city']}, {data['state']}, India") if location: location_quality = "verified" break time.sleep(1) except: time.sleep(1) else: location_quality = "unverified" coord_check = "missing" if data['latitude'] and data['longitude']: try: lat, lng = float(data['latitude']), float(data['longitude']) coord_check = "in_india" if 6.5 <= lat <= 37.5 and 68.0 <= lng <= 97.5 else "outside_india" except: coord_check = "invalid" completeness = calculate_location_completeness(data) assessment = "complete" if completeness >= 80 else "partial" if completeness >= 50 else "minimal" return { 'assessment': assessment, 'confidence': float(result['scores'][0]), 'coordinates_check': coord_check, 'landmarks_provided': bool(data['nearby_landmarks']), 'completeness_score': completeness, 'location_quality': location_quality } except Exception as e: logger.error(f"Error analyzing location: {str(e)}") return { 'assessment': 'error', 'confidence': 0.0, 'coordinates_check': 'error', 'landmarks_provided': False, 'completeness_score': 0, 'location_quality': 'error' } def calculate_location_completeness(data): fields = ['address', 'city', 'state', 'country', 'zip', 'latitude', 'longitude', 'nearby_landmarks'] return int((sum(1 for f in fields if data[f]) / len(fields)) * 100) def analyze_price(data): try: price_str = data['market_value'].replace('$', '').replace(',', '').strip() price = float(price_str) if price_str else 0 sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data['sq_ft'] else 0 price_per_sqft = price / sq_ft if sq_ft else 0 if not price: return { 'assessment': 'no price', 'confidence': 0.0, 'price': 0, 'formatted_price': '₹0', 'price_per_sqft': 0, 'formatted_price_per_sqft': '₹0', 'price_range': 'unknown', 'location_price_assessment': 'cannot assess', 'has_price': False, 'market_trends': {}, 'price_factors': {}, 'risk_indicators': [] } # Use a more sophisticated model for price analysis classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") # Create a detailed context for price analysis price_context = f""" Property Type: {data.get('property_type', '')} Location: {data.get('city', '')}, {data.get('state', '')} Size: {sq_ft} sq.ft. Price: ₹{price:,.2f} Price per sq.ft.: ₹{price_per_sqft:,.2f} Property Status: {data.get('status', '')} Year Built: {data.get('year_built', '')} Bedrooms: {data.get('bedrooms', '')} Bathrooms: {data.get('bathrooms', '')} Amenities: {data.get('amenities', '')} """ # Enhanced price categories with more specific indicators price_categories = [ "reasonable market price", "suspiciously low price", "suspiciously high price", "average market price", "luxury property price", "budget property price", "premium property price", "mid-range property price", "overpriced for location", "underpriced for location", "price matches amenities", "price matches property age", "price matches location value", "price matches property condition", "price matches market trends" ] # Analyze price with multiple aspects price_result = classifier(price_context, price_categories, multi_label=True) # Get top classifications with enhanced confidence calculation top_classifications = [] for label, score in zip(price_result['labels'][:5], price_result['scores'][:5]): if score > 0.25: # Lower threshold for better sensitivity top_classifications.append({ 'classification': label, 'confidence': float(score) }) # Determine price range based on AI classification and market data price_range = 'unknown' if top_classifications: primary_class = top_classifications[0]['classification'] if 'luxury' in primary_class: price_range = 'luxury' elif 'premium' in primary_class: price_range = 'premium' elif 'mid-range' in primary_class: price_range = 'mid_range' elif 'budget' in primary_class: price_range = 'budget' # Enhanced location-specific price assessment location_assessment = "unknown" market_trends = {} if data.get('city') and price_per_sqft: city_lower = data['city'].lower() metro_cities = ["mumbai", "delhi", "bangalore", "hyderabad", "chennai", "kolkata", "pune"] # Define price ranges for different city tiers if any(city in city_lower for city in metro_cities): market_trends = { 'city_tier': 'metro', 'avg_price_range': { 'min': 5000, 'max': 30000, 'trend': 'stable' }, 'price_per_sqft': { 'current': price_per_sqft, 'market_avg': 15000, 'deviation': abs(price_per_sqft - 15000) / 15000 * 100 } } location_assessment = ( "reasonable" if 5000 <= price_per_sqft <= 30000 else "suspiciously low" if price_per_sqft < 5000 else "suspiciously high" ) else: market_trends = { 'city_tier': 'non-metro', 'avg_price_range': { 'min': 1500, 'max': 15000, 'trend': 'stable' }, 'price_per_sqft': { 'current': price_per_sqft, 'market_avg': 7500, 'deviation': abs(price_per_sqft - 7500) / 7500 * 100 } } location_assessment = ( "reasonable" if 1500 <= price_per_sqft <= 15000 else "suspiciously low" if price_per_sqft < 1500 else "suspiciously high" ) # Enhanced price analysis factors price_factors = {} risk_indicators = [] # Property age factor try: year_built = int(data.get('year_built', 0)) current_year = datetime.now().year property_age = current_year - year_built if property_age > 0: depreciation_factor = max(0.5, 1 - (property_age * 0.01)) # 1% depreciation per year, min 50% price_factors['age_factor'] = { 'property_age': property_age, 'depreciation_factor': depreciation_factor, 'impact': 'high' if property_age > 30 else 'medium' if property_age > 15 else 'low' } except: price_factors['age_factor'] = {'error': 'Invalid year built'} # Size factor if sq_ft > 0: size_factor = { 'size': sq_ft, 'price_per_sqft': price_per_sqft, 'efficiency': 'high' if 800 <= sq_ft <= 2000 else 'medium' if 500 <= sq_ft <= 3000 else 'low' } price_factors['size_factor'] = size_factor # Add risk indicators based on size if sq_ft < 300: risk_indicators.append('Unusually small property size') elif sq_ft > 10000: risk_indicators.append('Unusually large property size') # Amenities factor if data.get('amenities'): amenities_list = [a.strip() for a in data['amenities'].split(',')] amenities_score = min(1.0, len(amenities_list) * 0.1) # 10% per amenity, max 100% price_factors['amenities_factor'] = { 'count': len(amenities_list), 'score': amenities_score, 'impact': 'high' if amenities_score > 0.7 else 'medium' if amenities_score > 0.4 else 'low' } # Calculate overall confidence with weighted factors confidence_weights = { 'primary_classification': 0.3, 'location_assessment': 0.25, 'age_factor': 0.2, 'size_factor': 0.15, 'amenities_factor': 0.1 } confidence_scores = [] # Primary classification confidence if top_classifications: confidence_scores.append(price_result['scores'][0] * confidence_weights['primary_classification']) # Location assessment confidence location_confidence = 0.8 if location_assessment == "reasonable" else 0.4 confidence_scores.append(location_confidence * confidence_weights['location_assessment']) # Age factor confidence if 'age_factor' in price_factors and 'depreciation_factor' in price_factors['age_factor']: age_confidence = price_factors['age_factor']['depreciation_factor'] confidence_scores.append(age_confidence * confidence_weights['age_factor']) # Size factor confidence if 'size_factor' in price_factors: size_confidence = 0.8 if price_factors['size_factor']['efficiency'] == 'high' else 0.6 confidence_scores.append(size_confidence * confidence_weights['size_factor']) # Amenities factor confidence if 'amenities_factor' in price_factors: amenities_confidence = price_factors['amenities_factor']['score'] confidence_scores.append(amenities_confidence * confidence_weights['amenities_factor']) overall_confidence = sum(confidence_scores) / sum(confidence_weights.values()) return { 'assessment': top_classifications[0]['classification'] if top_classifications else 'could not classify', 'confidence': float(overall_confidence), 'price': price, 'formatted_price': f"₹{price:,.0f}", 'price_per_sqft': price_per_sqft, 'formatted_price_per_sqft': f"₹{price_per_sqft:,.2f}", 'price_range': price_range, 'location_price_assessment': location_assessment, 'has_price': True, 'market_trends': market_trends, 'price_factors': price_factors, 'risk_indicators': risk_indicators, 'top_classifications': top_classifications } except Exception as e: logger.error(f"Error analyzing price: {str(e)}") return { 'assessment': 'error', 'confidence': 0.0, 'price': 0, 'formatted_price': '₹0', 'price_per_sqft': 0, 'formatted_price_per_sqft': '₹0', 'price_range': 'unknown', 'location_price_assessment': 'error', 'has_price': False, 'market_trends': {}, 'price_factors': {}, 'risk_indicators': [], 'top_classifications': [] } def analyze_legal_details(legal_text): try: if not legal_text or len(legal_text.strip()) < 5: return { 'assessment': 'insufficient', 'confidence': 0.0, 'summary': 'No legal details provided', 'completeness_score': 0, 'potential_issues': False, 'legal_metrics': {}, 'reasoning': 'No legal details provided for analysis', 'top_classifications': [] } classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") # Enhanced legal categories with more specific indicators categories = [ "comprehensive legal documentation", "basic legal documentation", "missing critical legal details", "potential legal issues", "standard property documentation", "title verification documents", "encumbrance certificates", "property tax records", "building permits", "land use certificates", "clear title documentation", "property registration documents", "ownership transfer documents", "legal compliance certificates", "property dispute records" ] # Create a more detailed context for analysis legal_context = f""" Legal Documentation Analysis: {legal_text[:1000]} Key aspects to verify: - Title and ownership documentation - Property registration status - Tax compliance - Building permits and approvals - Land use compliance - Encumbrance status - Dispute history """ # Analyze legal text with multiple aspects legal_result = classifier(legal_context, categories, multi_label=True) # Get top classifications with confidence scores top_classifications = [] for label, score in zip(legal_result['labels'][:3], legal_result['scores'][:3]): if score > 0.3: # Only include if confidence is above 30% top_classifications.append({ 'classification': label, 'confidence': float(score) }) # Generate summary using BART summary = summarize_text(legal_text[:1000]) # Calculate legal metrics with weighted scoring legal_metrics = { 'completeness': sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) if label in ['comprehensive legal documentation', 'standard property documentation']), 'documentation_quality': sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) if label in ['title verification documents', 'encumbrance certificates', 'clear title documentation']), 'compliance': sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) if label in ['building permits', 'land use certificates', 'legal compliance certificates']), 'risk_level': sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) if label in ['missing critical legal details', 'potential legal issues', 'property dispute records']) } # Calculate completeness score with weighted components completeness_score = ( legal_metrics['completeness'] * 0.4 + legal_metrics['documentation_quality'] * 0.4 + legal_metrics['compliance'] * 0.2 ) * 100 # Determine if there are potential issues with threshold potential_issues = legal_metrics['risk_level'] > 0.3 # Generate detailed reasoning with specific points reasoning_parts = [] # Primary assessment if top_classifications: primary_class = top_classifications[0]['classification'] confidence = top_classifications[0]['confidence'] reasoning_parts.append(f"Primary assessment: {primary_class} (confidence: {confidence:.0%})") # Documentation completeness if legal_metrics['completeness'] > 0.7: reasoning_parts.append("Comprehensive legal documentation present") elif legal_metrics['completeness'] > 0.4: reasoning_parts.append("Basic legal documentation present") else: reasoning_parts.append("Insufficient legal documentation") # Documentation quality if legal_metrics['documentation_quality'] > 0.6: reasoning_parts.append("Quality documentation verified (title, encumbrance)") elif legal_metrics['documentation_quality'] > 0.3: reasoning_parts.append("Basic documentation quality verified") # Compliance status if legal_metrics['compliance'] > 0.6: reasoning_parts.append("Full compliance documentation present") elif legal_metrics['compliance'] > 0.3: reasoning_parts.append("Partial compliance documentation present") # Risk assessment if potential_issues: if legal_metrics['risk_level'] > 0.6: reasoning_parts.append("High risk: Multiple potential legal issues detected") else: reasoning_parts.append("Moderate risk: Some potential legal issues detected") else: reasoning_parts.append("No significant legal issues detected") # Calculate overall confidence overall_confidence = min(1.0, ( legal_metrics['completeness'] * 0.4 + legal_metrics['documentation_quality'] * 0.4 + (1 - legal_metrics['risk_level']) * 0.2 )) return { 'assessment': top_classifications[0]['classification'] if top_classifications else 'could not assess', 'confidence': float(overall_confidence), 'summary': summary, 'completeness_score': int(completeness_score), 'potential_issues': potential_issues, 'legal_metrics': legal_metrics, 'reasoning': '. '.join(reasoning_parts), 'top_classifications': top_classifications } except Exception as e: logger.error(f"Error analyzing legal details: {str(e)}") return { 'assessment': 'could not assess', 'confidence': 0.0, 'summary': 'Error analyzing legal details', 'completeness_score': 0, 'potential_issues': False, 'legal_metrics': {}, 'reasoning': 'Technical error occurred during analysis', 'top_classifications': [] } def verify_property_specs(data): try: specs_verification = { 'bedrooms_reasonable': True, 'bathrooms_reasonable': True, 'total_rooms_reasonable': True, 'parking_reasonable': True, 'sq_ft_reasonable': True, 'market_value_reasonable': True, 'issues': [], 'verification_score': 0.0 } # Validate bedrooms try: bedrooms = int(float(data['bedrooms'])) if data['bedrooms'] else 0 if bedrooms > 20 or bedrooms < 0: specs_verification['bedrooms_reasonable'] = False specs_verification['issues'].append(f"Invalid number of bedrooms: {bedrooms}. Should be between 0 and 20.") except ValueError: specs_verification['bedrooms_reasonable'] = False specs_verification['issues'].append("Invalid bedroom data: must be a number") # Validate bathrooms try: bathrooms = float(data['bathrooms']) if data['bathrooms'] else 0 if bathrooms > 15 or bathrooms < 0: specs_verification['bathrooms_reasonable'] = False specs_verification['issues'].append(f"Invalid number of bathrooms: {bathrooms}. Should be between 0 and 15.") except ValueError: specs_verification['bathrooms_reasonable'] = False specs_verification['issues'].append("Invalid bathroom data: must be a number") # Validate total rooms total_rooms = 0 if data['total_rooms']: try: total_rooms = int(float(data['total_rooms'])) if total_rooms > 0: # Only validate if total_rooms is provided min_required_rooms = bedrooms + math.ceil(bathrooms) # Round up for half bathrooms if total_rooms < min_required_rooms: specs_verification['total_rooms_reasonable'] = False specs_verification['issues'].append( f"Total rooms ({total_rooms}) must be at least equal to bedrooms ({bedrooms}) + bathrooms ({bathrooms} = {min_required_rooms})" ) elif total_rooms > 50: specs_verification['total_rooms_reasonable'] = False specs_verification['issues'].append(f"Total rooms ({total_rooms}) seems unreasonably high") except (ValueError, TypeError): specs_verification['total_rooms_reasonable'] = False specs_verification['issues'].append("Invalid total rooms data: must be a number") # Validate parking try: parking = int(float(data['parking'])) if data['parking'] else 0 if parking > 20 or parking < 0: specs_verification['parking_reasonable'] = False specs_verification['issues'].append(f"Invalid parking spaces: {parking}. Should be between 0 and 20.") except ValueError: specs_verification['parking_reasonable'] = False specs_verification['issues'].append("Invalid parking data: must be a number") # Validate square feet sq_ft = 0 if data['sq_ft']: try: sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if sq_ft > 0: # Only validate if sq_ft is provided if sq_ft > 100000: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably high") elif sq_ft < 100: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Square footage ({sq_ft}) seems unreasonably low") # Validate sq ft per bedroom if both are provided if bedrooms > 0: sq_ft_per_bedroom = sq_ft / bedrooms if sq_ft_per_bedroom < 50: specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append(f"Square footage per bedroom ({sq_ft_per_bedroom:.1f}) seems unreasonably low") except (ValueError, TypeError): specs_verification['sq_ft_reasonable'] = False specs_verification['issues'].append("Invalid square footage data: must be a number") # Validate market value try: market_value = float(re.sub(r'[^\d.]', '', data['market_value'])) if data['market_value'] else 0 if market_value > 0: # Only validate if market_value is provided if market_value > 1000000000: # 100 crore limit specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably high") elif market_value < 100000: # 1 lakh minimum specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Market value (₹{market_value:,.2f}) seems unreasonably low") # Validate price per sq ft if both are provided if sq_ft > 0: price_per_sqft = market_value / sq_ft if price_per_sqft < 100: # Less than ₹100 per sq ft specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably low") elif price_per_sqft > 100000: # More than ₹1 lakh per sq ft specs_verification['market_value_reasonable'] = False specs_verification['issues'].append(f"Price per sq ft (₹{price_per_sqft:,.2f}) seems unreasonably high") except ValueError: specs_verification['market_value_reasonable'] = False specs_verification['issues'].append("Invalid market value data: must be a number") # Calculate verification score based on all checks valid_checks = sum([ specs_verification[f] for f in [ 'bedrooms_reasonable', 'bathrooms_reasonable', 'total_rooms_reasonable', 'parking_reasonable', 'sq_ft_reasonable', 'market_value_reasonable' ] ]) total_checks = 6 # Total number of checks specs_verification['verification_score'] = (valid_checks / total_checks) * 100 return specs_verification except Exception as e: logger.error(f"Error verifying specs: {str(e)}") return { 'bedrooms_reasonable': False, 'bathrooms_reasonable': False, 'total_rooms_reasonable': False, 'parking_reasonable': False, 'sq_ft_reasonable': False, 'market_value_reasonable': False, 'issues': [f"Error during verification: {str(e)}"], 'verification_score': 0.0 } def analyze_market_value(data): try: # Extract basic property information price = float(data['market_value'].replace('$', '').replace(',', '').strip()) if data.get('market_value') else 0 sq_ft = float(re.sub(r'[^\d.]', '', data['sq_ft'])) if data.get('sq_ft') else 0 year_built = int(data.get('year_built', 0)) if data.get('year_built') else 0 current_year = datetime.now().year property_age = current_year - year_built if year_built else 0 # Initialize market value components market_value_components = { 'base_value': 0, 'location_multiplier': 1.0, 'age_factor': 1.0, 'size_factor': 1.0, 'amenities_factor': 1.0, 'market_trend_factor': 1.0, 'condition_factor': 1.0 } # Calculate base value per sq ft based on city tier city_lower = data.get('city', '').lower() metro_cities = ["mumbai", "delhi", "bangalore", "hyderabad", "chennai", "kolkata", "pune"] if any(city in city_lower for city in metro_cities): base_price_per_sqft = 15000 # Metro city base price market_value_components['location_multiplier'] = 1.5 else: base_price_per_sqft = 7500 # Non-metro city base price market_value_components['location_multiplier'] = 1.0 # Calculate base value if sq_ft > 0: market_value_components['base_value'] = base_price_per_sqft * sq_ft # Age factor calculation with depreciation if property_age > 0: depreciation_rate = 0.01 # 1% depreciation per year max_depreciation = 0.5 # Maximum 50% depreciation age_factor = max(1 - max_depreciation, 1 - (property_age * depreciation_rate)) market_value_components['age_factor'] = age_factor # Size factor calculation if sq_ft > 0: if 800 <= sq_ft <= 2000: # Optimal size range market_value_components['size_factor'] = 1.2 elif 500 <= sq_ft <= 3000: # Acceptable size range market_value_components['size_factor'] = 1.0 else: # Unusual size market_value_components['size_factor'] = 0.8 # Amenities factor calculation if data.get('amenities'): amenities_list = [a.strip() for a in data['amenities'].split(',')] amenities_count = len(amenities_list) amenities_factor = min(1.5, 1 + (amenities_count * 0.1)) # 10% per amenity, max 50% bonus market_value_components['amenities_factor'] = amenities_factor # Market trend factor (based on property type and location) property_type = data.get('property_type', '').lower() if 'apartment' in property_type or 'flat' in property_type: market_value_components['market_trend_factor'] = 1.1 # Apartments trending up elif 'house' in property_type or 'villa' in property_type: market_value_components['market_trend_factor'] = 1.15 # Houses trending up more elif 'plot' in property_type or 'land' in property_type: market_value_components['market_trend_factor'] = 1.2 # Land trending up most # Condition factor (based on year built and amenities) if property_age <= 5: market_value_components['condition_factor'] = 1.2 elif property_age <= 15: market_value_components['condition_factor'] = 1.1 elif property_age <= 30: market_value_components['condition_factor'] = 1.0 else: market_value_components['condition_factor'] = 0.9 # Calculate final market value market_value = market_value_components['base_value'] for factor, value in market_value_components.items(): if factor != 'base_value': market_value *= value # Calculate price per sq ft for the estimated market value estimated_price_per_sqft = market_value / sq_ft if sq_ft > 0 else 0 # Calculate value metrics value_metrics = { 'price_to_value_ratio': price / market_value if market_value > 0 else 0, 'price_per_sqft_ratio': price / sq_ft if sq_ft > 0 else 0, 'estimated_price_per_sqft': estimated_price_per_sqft, 'value_appreciation': (market_value - price) / price * 100 if price > 0 else 0 } # Generate market insights market_insights = [] # Price vs Market Value insight if value_metrics['price_to_value_ratio'] > 1.2: market_insights.append("Property is overpriced compared to market value") elif value_metrics['price_to_value_ratio'] < 0.8: market_insights.append("Property is underpriced compared to market value") # Size insight if sq_ft < 300: market_insights.append("Property size is unusually small for the market") elif sq_ft > 10000: market_insights.append("Property size is unusually large for the market") # Age insight if property_age > 30: market_insights.append("Property age significantly impacts market value") # Location insight if market_value_components['location_multiplier'] > 1.0: market_insights.append("Property is in a premium location") # Market trend insight if market_value_components['market_trend_factor'] > 1.1: market_insights.append("Property type is trending upward in the market") return { 'estimated_market_value': market_value, 'formatted_market_value': f"₹{market_value:,.0f}", 'price_per_sqft': estimated_price_per_sqft, 'formatted_price_per_sqft': f"₹{estimated_price_per_sqft:,.2f}", 'value_components': market_value_components, 'value_metrics': value_metrics, 'market_insights': market_insights, 'confidence_score': min(0.95, 0.7 + (len(market_insights) * 0.05)) # Base 0.7 + 0.05 per insight, max 0.95 } except Exception as e: logger.error(f"Error analyzing market value: {str(e)}") return { 'estimated_market_value': 0, 'formatted_market_value': '₹0', 'price_per_sqft': 0, 'formatted_price_per_sqft': '₹0', 'value_components': {}, 'value_metrics': {}, 'market_insights': [], 'confidence_score': 0.0 } def assess_image_quality(img): try: width, height = img.size resolution = width * height quality_score = min(100, resolution // 20000) return { 'resolution': f"{width}x{height}", 'quality_score': quality_score } except Exception as e: logger.error(f"Error assessing image quality: {str(e)}") return { 'resolution': 'unknown', 'quality_score': 0 } def check_if_property_related(text): try: classifier = load_model("zero-shot-classification", "facebook/bart-large-mnli") result = classifier(text[:1000], ["property-related", "non-property-related"]) is_related = result['labels'][0] == "property-related" return { 'is_related': is_related, 'confidence': float(result['scores'][0]) } except Exception as e: logger.error(f"Error checking property relation: {str(e)}") return { 'is_related': False, 'confidence': 0.0 } # Update the load_model function to use memory optimizations @lru_cache(maxsize=3) # Limit cache size def load_model(task, model_name): try: logger.info(f"Loading model: {model_name} for task: {task}") # Use smaller, more efficient models if task == "zero-shot-classification": # Use smaller model for zero-shot classification model_name = "facebook/bart-large-mnli" # ~1.6GB return pipeline(task, model=model_name, device=-1) elif task == "summarization": # Use smaller model for summarization model_name = "facebook/bart-large-cnn" # ~1.6GB return pipeline(task, model=model_name, device=-1) elif task == "text-classification": # Use very small model for text classification model_name = "distilbert-base-uncased" # ~260MB return pipeline(task, model=model_name, device=-1) elif task == "feature-extraction": # Use small model for feature extraction model_name = "sentence-transformers/all-MiniLM-L6-v2" # ~80MB return pipeline(task, model=model_name, device=-1) else: # Default to small model for unknown tasks model_name = "distilbert-base-uncased" return pipeline(task, model=model_name, device=-1) except Exception as e: logger.error(f"Error loading model {model_name}: {str(e)}") raise # Add memory cleanup function def clear_model_cache(): """Clear model cache and free up memory""" load_model.cache_clear() gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() logger.info("Model cache cleared and memory freed") if __name__ == '__main__': # Set up ngrok http_tunnel = ngrok.connect(5000) print(f' * Public URL: {http_tunnel.public_url}') # Run Flask app in a separate thread def run_flask(): app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) flask_thread = threading.Thread(target=run_flask) flask_thread.daemon = True flask_thread.start() try: # Keep the main thread running while True: time.sleep(1) except KeyboardInterrupt: print(" * Shutting down server...") ngrok.disconnect(http_tunnel.public_url) ngrok.kill()