import os import json import requests import time from datetime import datetime, timedelta from flask import Flask, request, jsonify, render_template from flask_cors import CORS import logging from typing import Dict, List, Optional, Any import threading import math # Import centralized configuration from config import get_backend_url, get_ngrok_headers, get_api_endpoint, get_app_config # Import the AI recommendation engine try: from ai_recommendation_engine import AIRecommendationEngine ai_engine = AIRecommendationEngine() AI_ENGINE_AVAILABLE = True except Exception as e: logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logger.warning(f"AI engine not available: {e}") ai_engine = None AI_ENGINE_AVAILABLE = False # Import mock data service try: from mock_data_service import mock_data_service MOCK_DATA_AVAILABLE = True except ImportError: MOCK_DATA_AVAILABLE = False # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CustomJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, float) and math.isinf(obj): return 999999999 if isinstance(obj, float) and math.isnan(obj): return 0 return super().default(obj) def formatPrice(price): """Format price in Indian currency""" return f"โ‚น{price:,.0f}" def formatDuration(seconds): """Format duration in hours and minutes""" hours = seconds // 3600 minutes = (seconds % 3600) // 60 return f"{hours}h {minutes}m" class EnhancedLeadQualificationAPI: def __init__(self): self.app = Flask(__name__) self.app.json_encoder = CustomJSONEncoder CORS(self.app) # Configuration - Centralized from config.py self.config = { 'backend_url': get_backend_url(), 'api_timeout': get_app_config()['api_timeout'], 'cache_duration': int(os.getenv('CACHE_DURATION', get_app_config()['cache_duration'])), 'max_retries': int(os.getenv('MAX_RETRIES', get_app_config()['max_retries'])), 'retry_delay': int(os.getenv('RETRY_DELAY', get_app_config()['retry_delay'])), 'ngrok_headers': get_ngrok_headers() } # Cache for API responses self.cache = {} self.cache_timestamps = {} # AI processing status tracking self.ai_processing_status = {} # Setup routes self.setup_routes() # Add no-cache headers to all responses @self.app.after_request def add_no_cache_headers(response): """Add headers to prevent any caching""" response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' return response # Start cache cleanup thread self.start_cache_cleanup() def get_ai_engine(self): """Get the AI recommendation engine instance""" try: if AI_ENGINE_AVAILABLE and ai_engine: return ai_engine else: logger.warning("โš ๏ธ AI engine not available") return None except Exception as e: logger.error(f"โŒ Error getting AI engine: {e}") return None def setup_routes(self): """Setup all API routes including AI-enhanced endpoints and frontend""" # Frontend routes @self.app.route('/') def index(): """Perfect AI Customer Analysis & Email Automation Dashboard""" return render_template('perfect_ai_dashboard.html', customer_id=None) @self.app.route('/old') def old_dashboard(): """Old dashboard for reference""" return render_template('ai_lead_analysis.html', customer_id=None) @self.app.route('/customer/') def customer_analysis(customer_id): """Customer analysis page with pre-filled customer ID""" return render_template('ai_lead_analysis.html', customer_id=customer_id) @self.app.route('/email-automation') def email_automation_dashboard(): """Email automation dashboard""" return render_template('email_automation_dashboard.html') @self.app.route('/health') def health(): """Health check endpoint""" return jsonify({ 'success': True, 'service': 'AI Lead Analysis System', 'status': 'running', 'timestamp': datetime.now().isoformat() }) @self.app.route('/api/frontend/status') def frontend_status(): """Get frontend service status""" return jsonify({ 'success': True, 'service': 'AI Lead Analysis System', 'status': 'running', 'templates_available': [ 'ai_lead_analysis.html' ], 'features': [ 'AI Lead Analysis Dashboard', 'Email Automation Testing', 'Behavioral Analysis', 'Real-time Analytics' ], 'timestamp': datetime.now().isoformat() }) # API routes @self.app.route('/api/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ 'status': 'healthy', 'timestamp': datetime.now().isoformat(), 'backend_url': self.config['backend_url'], 'cache_size': len(self.cache), 'ai_status': 'enabled', 'ai_models': 'loaded' }) @self.app.route('/api/lead-analysis/', methods=['GET']) def get_lead_analysis(customer_id): """Get complete lead analysis for a customer (original functionality)""" logger.info(f"๐Ÿ” Lead analysis request for customer {customer_id}") try: cache_key = f'lead_analysis_{customer_id}' cached_data = self.get_cached_data(cache_key) if cached_data: logger.info(f"๐Ÿ“‹ Returning cached data for customer {customer_id}") return jsonify(cached_data) # Try to fetch data from backend with improved fallback mechanism logger.info(f"๐ŸŒ Fetching data for customer {customer_id} with fallback support") backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') logger.info(f"๐Ÿ“ฅ Data received for customer {customer_id}: {len(backend_response) if isinstance(backend_response, list) else 'No data'}") if not backend_response: return jsonify({ 'success': False, 'customer_id': customer_id, 'error': 'No data found for this customer', 'data': None }), 404 # Process lead data logger.info(f"โš™๏ธ Processing lead data for customer {customer_id}") processed_data = self.process_lead_data(backend_response, customer_id) # Generate analytics logger.info(f"๐Ÿง  Generating analytics for customer {customer_id}") analytics = self.generate_analytics(backend_response, customer_id) # Create combined response combined_response = { 'success': True, 'customer_id': customer_id, 'timestamp': datetime.now().isoformat(), 'data': { 'lead_qualification': analytics.get('lead_qualification', {}), 'analytics': { 'engagement_level': analytics.get('engagement_level'), 'preferred_property_types': analytics.get('preferred_property_types', []), 'price_preferences': analytics.get('price_preferences', {}), 'viewing_patterns': analytics.get('viewing_patterns', {}), 'property_analysis': analytics.get('property_analysis', {}), 'recommendations': analytics.get('recommendations', []), 'risk_assessment': analytics.get('risk_assessment', {}), 'opportunity_score': analytics.get('opportunity_score'), 'lead_timeline': analytics.get('lead_timeline', []), 'conversion_probability': analytics.get('conversion_probability', {}) }, 'properties': processed_data.get('properties', []), 'summary': processed_data.get('summary', {}) }, 'api_info': { 'backend_url': self.config['backend_url'], 'endpoint_called': f'/api/PropertyLeadQualification/customer/{customer_id}', 'response_time': datetime.now().isoformat(), 'data_points': len(backend_response) if isinstance(backend_response, list) else 0 } } # Clean the response for JSON serialization cleaned_response = self.clean_for_json(combined_response) logger.info(f"โœ… Combined response created successfully for customer {customer_id}") # Cache the result self.cache_data(cache_key, cleaned_response) logger.info(f"๐Ÿ’พ Combined data cached for customer {customer_id}") return jsonify(cleaned_response) except Exception as e: logger.error(f"โŒ Error in lead analysis for customer {customer_id}: {e}") return jsonify({ 'success': False, 'customer_id': customer_id, 'error': f'Failed to generate lead analysis: {str(e)}', 'data': None }), 500 @self.app.route('/api/ai-analysis/', methods=['GET']) def get_ai_analysis(customer_id): """Get AI-enhanced lead analysis for a customer""" logger.info(f"๐Ÿค– AI analysis request for customer {customer_id}") try: # First get the regular analysis cache_key = f'lead_analysis_{customer_id}' analysis_data = self.get_cached_data(cache_key) if not analysis_data: # Generate regular analysis first with improved fallback logger.info(f"๐ŸŒ Fetching data for AI analysis - customer {customer_id}") backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') if not backend_response: return jsonify({ 'success': False, 'customer_id': customer_id, 'error': 'No data found for this customer' }), 404 processed_data = self.process_lead_data(backend_response, customer_id) analytics = self.generate_analytics(backend_response, customer_id) analysis_data = { 'success': True, 'customer_id': customer_id, 'data': { 'lead_qualification': analytics.get('lead_qualification', {}), 'analytics': analytics, 'properties': processed_data.get('properties', []), 'summary': processed_data.get('summary', {}) } } # Now enhance with AI analysis if AI_ENGINE_AVAILABLE and ai_engine: logger.info(f"๐Ÿง  Generating AI insights for customer {customer_id}") ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) # Find AI-recommended properties logger.info(f"๐Ÿ” Finding AI recommendations for customer {customer_id}") ai_recommendations = ai_engine.find_similar_properties_ai(analysis_data, ai_insights) else: logger.warning("โš ๏ธ AI engine not available, using mock AI insights") ai_insights = { 'personality_type': 'Analytical', 'decision_making_style': 'Data-driven', 'preferred_communication': 'Email', 'urgency_level': 'Medium', 'budget_confidence': 'High', 'location_preference': 'Premium areas', 'property_type_preference': 'Luxury properties' } ai_recommendations = [] # Combine everything enhanced_response = analysis_data.copy() enhanced_response['data']['ai_insights'] = ai_insights enhanced_response['data']['ai_recommendations'] = ai_recommendations enhanced_response['ai_enhanced'] = True enhanced_response['ai_timestamp'] = datetime.now().isoformat() # Cache AI-enhanced result ai_cache_key = f'ai_analysis_{customer_id}' self.cache_data(ai_cache_key, enhanced_response) logger.info(f"โœ… AI analysis completed for customer {customer_id}") return jsonify(enhanced_response) except Exception as e: logger.error(f"โŒ Error in AI analysis for customer {customer_id}: {e}") return jsonify({ 'success': False, 'customer_id': customer_id, 'error': f'Failed to generate AI analysis: {str(e)}', 'ai_error': True }), 500 @self.app.route('/api/ai-recommendations/', methods=['POST']) def send_ai_recommendations(customer_id): """Send AI-powered recommendations via email""" logger.info(f"๐Ÿ“ง AI recommendation email request for customer {customer_id}") try: # Get email from request data = request.get_json() recipient_email = data.get('email') if not recipient_email: return jsonify({ 'success': False, 'error': 'Email address is required' }), 400 # Mark as processing self.ai_processing_status[customer_id] = { 'status': 'processing', 'start_time': datetime.now().isoformat(), 'email': recipient_email } # Get or generate analysis data cache_key = f'lead_analysis_{customer_id}' analysis_data = self.get_cached_data(cache_key) if not analysis_data: # Generate analysis if not cached with improved fallback logger.info(f"๐ŸŒ Fetching data for recommendations - customer {customer_id}") backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') if not backend_response: self.ai_processing_status[customer_id]['status'] = 'failed' self.ai_processing_status[customer_id]['error'] = 'No data found' return jsonify({ 'success': False, 'customer_id': customer_id, 'error': 'No data found for this customer' }), 404 processed_data = self.process_lead_data(backend_response, customer_id) analytics = self.generate_analytics(backend_response, customer_id) analysis_data = { 'success': True, 'customer_id': customer_id, 'data': { 'lead_qualification': analytics.get('lead_qualification', {}), 'analytics': analytics, 'properties': processed_data.get('properties', []), 'summary': processed_data.get('summary', {}) } } # Process with AI in background def process_ai_recommendations(): try: if AI_ENGINE_AVAILABLE and ai_engine: result = ai_engine.process_customer_with_ai(customer_id, recipient_email, analysis_data) else: # Mock AI processing result result = { 'success': True, 'customer_id': customer_id, 'email_sent': True, 'recipient': recipient_email, 'subject': f"๐Ÿค– AI-Powered Property Recommendations for Customer {customer_id}", 'ai_insights': { 'personality_type': 'Analytical', 'decision_making_style': 'Data-driven', 'preferred_communication': 'Email', 'urgency_level': 'Medium', 'budget_confidence': 'High' }, 'recommendations_count': 5, 'mock_data': True, 'message': 'Mock AI processing completed (AI engine not available)' } self.ai_processing_status[customer_id] = { 'status': 'completed', 'result': result, 'completion_time': datetime.now().isoformat() } logger.info(f"โœ… AI processing completed for customer {customer_id}") except Exception as e: self.ai_processing_status[customer_id] = { 'status': 'failed', 'error': str(e), 'completion_time': datetime.now().isoformat() } logger.error(f"โŒ AI processing failed for customer {customer_id}: {e}") # Start background processing thread = threading.Thread(target=process_ai_recommendations) thread.daemon = True thread.start() return jsonify({ 'success': True, 'customer_id': customer_id, 'message': 'AI recommendation processing started', 'status': 'processing', 'email': recipient_email, 'estimated_completion': '2-3 minutes' }) except Exception as e: logger.error(f"โŒ Error starting AI recommendations for customer {customer_id}: {e}") return jsonify({ 'success': False, 'customer_id': customer_id, 'error': f'Failed to start AI processing: {str(e)}' }), 500 @self.app.route('/api/ai-status/', methods=['GET']) def get_ai_status(customer_id): """Get AI processing status for a customer""" status = self.ai_processing_status.get(customer_id, { 'status': 'not_started', 'message': 'No AI processing initiated for this customer' }) response_data = { 'customer_id': customer_id, 'ai_processing': status, 'timestamp': datetime.now().isoformat() } # Clean for JSON serialization cleaned_response = self.clean_for_json(response_data) return jsonify(cleaned_response) @self.app.route('/api/batch-ai-analysis', methods=['POST']) def batch_ai_analysis(): """Get AI analysis for multiple customers""" try: data = request.get_json() customer_ids = data.get('customer_ids', []) if not customer_ids: return jsonify({ 'success': False, 'error': 'No customer IDs provided' }), 400 results = {} for customer_id in customer_ids: try: # Get regular analysis with improved fallback logger.info(f"๐ŸŒ Fetching data for batch analysis - customer {customer_id}") backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') if not backend_response: results[customer_id] = { 'success': False, 'error': 'No data found for this customer' } continue processed_data = self.process_lead_data(backend_response, customer_id) analytics = self.generate_analytics(backend_response, customer_id) analysis_data = { 'success': True, 'customer_id': customer_id, 'data': { 'lead_qualification': analytics.get('lead_qualification', {}), 'analytics': analytics, 'properties': processed_data.get('properties', []), 'summary': processed_data.get('summary', {}) } } # Add AI insights if AI_ENGINE_AVAILABLE and ai_engine: ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) ai_recommendations = ai_engine.find_similar_properties_ai(analysis_data, ai_insights) else: ai_insights = { 'personality_type': 'Analytical', 'decision_making_style': 'Data-driven', 'preferred_communication': 'Email', 'urgency_level': 'Medium', 'budget_confidence': 'High', 'location_preference': 'Premium areas', 'property_type_preference': 'Luxury properties' } ai_recommendations = [] analysis_data['data']['ai_insights'] = ai_insights analysis_data['data']['ai_recommendations'] = ai_recommendations analysis_data['ai_enhanced'] = True cleaned_response = self.clean_for_json(analysis_data) results[customer_id] = cleaned_response except Exception as e: results[customer_id] = { 'success': False, 'error': str(e) } return jsonify({ 'success': True, 'results': results, 'total_processed': len(customer_ids), 'ai_enhanced': True }) except Exception as e: logger.error(f"โŒ Error in batch AI analysis: {e}") return jsonify({ 'success': False, 'error': f'Batch AI analysis failed: {str(e)}' }), 500 @self.app.route('/api/clear-cache', methods=['POST']) def clear_cache(): """Clear all cached data including AI processing status""" try: self.cache.clear() self.cache_timestamps.clear() self.ai_processing_status.clear() logger.info("๐Ÿ—‘๏ธ Cache and AI status cleared") return jsonify({ 'success': True, 'message': 'Cache and AI processing status cleared successfully' }) except Exception as e: logger.error(f"โŒ Error clearing cache: {e}") return jsonify({ 'success': False, 'error': f'Failed to clear cache: {str(e)}' }), 500 @self.app.route('/api/test-email', methods=['POST']) def test_email(): """Test SendGrid email functionality""" logger.info("๐Ÿงช Testing SendGrid email functionality") try: # Debug request information logger.info(f"Request Content-Type: {request.content_type}") logger.info(f"Request Headers: {dict(request.headers)}") logger.info(f"Request Method: {request.method}") logger.info(f"Request Data: {request.get_data()}") # Handle both JSON and form data with better error handling data = {} if request.is_json: try: data = request.get_json() or {} logger.info(f"Parsed JSON data: {data}") except Exception as json_error: logger.error(f"JSON parsing error: {json_error}") return jsonify({ 'success': False, 'error': f'Invalid JSON: {str(json_error)}' }), 400 else: try: data = request.form.to_dict() or {} logger.info(f"Parsed form data: {data}") except Exception as form_error: logger.error(f"Form parsing error: {form_error}") return jsonify({ 'success': False, 'error': f'Invalid form data: {str(form_error)}' }), 400 recipient_email = data.get('email', 'shaiksameermujahid@gmail.com') logger.info(f"๐Ÿ“ง Testing email to: {recipient_email}") # Validate email format if not recipient_email or '@' not in recipient_email: return jsonify({ 'success': False, 'error': 'Invalid email address' }), 400 # Get AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Test SendGrid email test_result = ai_engine.test_sendgrid_email(recipient_email) return jsonify(test_result) except Exception as e: logger.error(f"Error testing email: {e}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/test-email-simple', methods=['GET']) def test_email_simple(): """Simple test SendGrid email functionality (GET method)""" logger.info("๐Ÿงช Testing SendGrid email functionality (simple GET)") try: # Use default email for testing recipient_email = 'shaiksameermujahid@gmail.com' logger.info(f"๐Ÿ“ง Testing email to: {recipient_email}") # Get AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Test SendGrid email test_result = ai_engine.test_sendgrid_email(recipient_email) return jsonify(test_result) except Exception as e: logger.error(f"Error testing email (simple): {e}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/automated-email', methods=['POST']) def send_automated_email(): """Send automated email""" try: data = request.json customer_id = data.get('customer_id') email_type = data.get('email_type', 'behavioral_trigger') recipient_email = data.get('recipient_email', 'shaiksameermujahid@gmail.com') if not customer_id: return jsonify({ 'success': False, 'error': 'Customer ID is required' }), 400 # Generate mock email response (email service integrated) if MOCK_DATA_AVAILABLE: customer_data = mock_data_service.get_customer_data(customer_id) customer_profile = mock_data_service.get_customer_profile(customer_id) return jsonify({ 'success': True, 'customer_id': customer_id, 'recipient_email': recipient_email, 'subject': f"๐Ÿค– AI-Powered Recommendations for {customer_profile.get('customerName', f'Customer {customer_id}')}", 'email_type': email_type, 'timestamp': datetime.now().isoformat(), 'mock_data': True, 'message': 'Mock email generated successfully (integrated email service)' }) else: return jsonify({ 'success': False, 'error': 'Email service not available and no mock data' }), 500 except Exception as e: logger.error(f"Error sending automated email: {e}") return jsonify({ 'success': False, 'error': f'Email service not available: {str(e)}' }), 500 @self.app.route('/api/email-status', methods=['GET']) def get_email_status(): """Get email automation status""" return jsonify({ 'success': True, 'service_status': 'integrated', 'email_config': { 'sender': 'sameermujahid7777@gmail.com', 'recipient': 'sameermujahid7777@gmail.com', 'smtp_server': 'Integrated Service' }, 'ai_engine_loaded': True, 'timestamp': datetime.now().isoformat(), 'message': 'Email service integrated into main API service' }) @self.app.route('/api/email-preview/', methods=['GET']) def email_preview(customer_id): """Generate email preview for a customer""" logger.info(f"๐Ÿ“ง Generating email preview for customer {customer_id}") try: # Get customer data customer_data = self.make_api_request(f'/api/lead-analysis/{customer_id}') if not customer_data or 'data' not in customer_data: return jsonify({ 'success': False, 'error': 'Customer data not found' }), 404 # Process with AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Generate email content recipient_email = request.args.get('email', 'customer@example.com') email_result = ai_engine.process_customer_with_ai( customer_id, recipient_email, customer_data ) if not email_result.get('success'): return jsonify({ 'success': False, 'error': 'Failed to generate email content' }), 500 return jsonify({ 'success': True, 'customer_id': customer_id, 'email_preview': email_result, 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"Error generating email preview: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/email-preview/') def email_preview_page(customer_id): """Email preview page""" try: # Get email content email_response = self.get_email_preview(customer_id) email_data = email_response.json if not email_data.get('success'): return f"Error: {email_data.get('error', 'Unknown error')}" email_content = email_data['email_content'] # Create HTML page with email content html_content = f""" Email Preview - Customer {customer_id}

๐Ÿ“ง Email Preview

Customer ID: {customer_id}

โ† Back to Customer Analysis
""" return html_content except Exception as e: logger.error(f"Error creating email preview page: {e}") return f"Error: {str(e)}" @self.app.route('/api/download-email/') def download_email(filename): """Download saved email file""" try: import os emails_dir = "/tmp/emails" file_path = os.path.join(emails_dir, filename) if not os.path.exists(file_path): return jsonify({ 'success': False, 'error': 'Email file not found' }), 404 from flask import send_file return send_file(file_path, as_attachment=True) except Exception as e: logger.error(f"Error downloading email file: {e}") return jsonify({ 'success': False, 'error': f'Failed to download email: {str(e)}' }), 500 @self.app.route('/api/list-emails') def list_emails(): """List all saved email files""" try: import os emails_dir = "/tmp/emails" if not os.path.exists(emails_dir): return jsonify({ 'success': True, 'emails': [] }) emails = [] for filename in os.listdir(emails_dir): if filename.endswith('.html'): file_path = os.path.join(emails_dir, filename) file_stat = os.stat(file_path) emails.append({ 'filename': filename, 'size': file_stat.st_size, 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), 'download_url': f'/api/download-email/{filename}' }) # Sort by creation time (newest first) emails.sort(key=lambda x: x['created'], reverse=True) return jsonify({ 'success': True, 'emails': emails }) except Exception as e: logger.error(f"Error listing email files: {e}") return jsonify({ 'success': False, 'error': f'Failed to list emails: {str(e)}' }), 500 @self.app.route('/saved-emails') def saved_emails_page(): """Page to view and download saved emails""" try: import os emails_dir = "/tmp/emails" if not os.path.exists(emails_dir): emails = [] else: emails = [] for filename in os.listdir(emails_dir): if filename.endswith('.html'): file_path = os.path.join(emails_dir, filename) file_stat = os.stat(file_path) emails.append({ 'filename': filename, 'size': file_stat.st_size, 'created': datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'), 'download_url': f'/api/download-email/{filename}' }) # Sort by creation time (newest first) emails.sort(key=lambda x: x['created'], reverse=True) # Generate email cards HTML email_cards_html = "" if emails: for email in emails: email_cards_html += f""" """ # Generate content section if emails: content_section = f"""

๐Ÿ“ง Generated Emails ({len(emails)} total)

{email_cards_html}
""" else: content_section = """

No emails generated yet

Generate some AI recommendations to see saved emails here.

""" html_content = f""" Saved Emails

Saved Emails

Download and view generated email content

{content_section}
""" return html_content except Exception as e: logger.error(f"Error creating saved emails page: {e}") return f"Error: {str(e)}" @self.app.route('/api/properties/parallel', methods=['GET']) def get_properties_parallel(): """Get properties using parallel processing for better performance""" logger.info("๐Ÿš€ Fetching properties using parallel processing...") try: from concurrent.futures import ThreadPoolExecutor, as_completed import requests # Configuration max_workers = int(request.args.get('workers', 8)) page_size = int(request.args.get('page_size', 100)) max_pages = int(request.args.get('max_pages', 50)) logger.info(f"๐Ÿ”ง Using {max_workers} workers, {page_size} properties per page, max {max_pages} pages") def fetch_properties_page(page_num): """Fetch a single page of properties""" try: url = f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails" params = { 'pageNumber': page_num, 'pageSize': page_size } response = requests.get(url, params=params, timeout=None, verify=False) response.raise_for_status() data = response.json() # Handle different response structures if isinstance(data, list): properties = data elif isinstance(data, dict): properties = data.get('data', data.get('properties', data.get('results', []))) else: properties = [] logger.info(f"โœ… Page {page_num}: {len(properties)} properties") return properties except Exception as e: logger.error(f"โŒ Error fetching page {page_num}: {e}") return [] # Use ThreadPoolExecutor for parallel fetching all_properties = [] completed_pages = 0 empty_pages = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit tasks for all pages future_to_page = { executor.submit(fetch_properties_page, page_num): page_num for page_num in range(1, max_pages + 1) } for future in as_completed(future_to_page): page_num = future_to_page[future] try: properties = future.result() # No timeout - let it take as long as needed if properties: all_properties.extend(properties) completed_pages += 1 logger.info(f"โœ… Parallel fetch - Page {page_num}: {len(properties)} properties (Total: {len(all_properties)})") else: empty_pages += 1 logger.info(f"๐Ÿ“„ Page {page_num} is empty (Empty pages: {empty_pages})") # Stop if we've hit too many empty pages in a row if empty_pages >= 3: logger.info("๐Ÿ›‘ Stopping fetch - too many consecutive empty pages") break except Exception as e: logger.error(f"โŒ Failed to fetch page {page_num}: {e}") # Continue with other pages logger.info(f"๐ŸŽ‰ Parallel fetch completed: {len(all_properties)} total properties from {completed_pages} pages!") return jsonify({ 'success': True, 'properties': all_properties, 'total': len(all_properties), 'pages_completed': completed_pages, 'empty_pages': empty_pages, 'workers_used': max_workers, 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"Error in parallel property fetch: {e}") return jsonify({ 'success': False, 'error': str(e), 'properties': [] }), 500 @self.app.route('/api/automated-email-analysis/', methods=['GET']) def analyze_automated_emails(customer_id): """Analyze what automated emails would be sent for a customer""" logger.info(f"๐Ÿค– Analyzing automated email triggers for customer {customer_id}") try: # Get customer analysis using our AI model (not from backend) logger.info(f"๐Ÿ” Generating lead analysis for customer {customer_id} using AI model") # Try to fetch data from backend first for property data backend_response = None try: logger.info(f"๐ŸŒ Making backend API request to: {self.config['backend_url']}/api/PropertyLeadQualification/customer/{customer_id}") backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') logger.info(f"๐Ÿ“ฅ Backend response received for customer {customer_id}: {len(backend_response) if isinstance(backend_response, list) else 'No data'}") except Exception as e: logger.warning(f"โš ๏ธ Backend API failed: {e}") # If backend fails, use mock data if not backend_response and MOCK_DATA_AVAILABLE: logger.info(f"๐Ÿ”„ Using mock data for customer {customer_id}") backend_response = mock_data_service.get_customer_data(customer_id) logger.info(f"๐Ÿ“ฅ Mock data generated for customer {customer_id}: {len(backend_response)} properties") if not backend_response: return jsonify({ 'success': False, 'error': 'No property data found for this customer' }), 404 # Process lead data using our AI model logger.info(f"โš™๏ธ Processing lead data for customer {customer_id}") processed_data = self.process_lead_data(backend_response, customer_id) # Generate analytics using our AI model logger.info(f"๐Ÿง  Generating analytics for customer {customer_id}") analytics = self.generate_analytics(backend_response, customer_id) # Create customer analysis data structure customer_data = { 'success': True, 'customer_id': customer_id, 'timestamp': datetime.now().isoformat(), 'data': { 'lead_qualification': analytics.get('lead_qualification', {}), 'analytics': { 'engagement_level': analytics.get('engagement_level'), 'preferred_property_types': analytics.get('preferred_property_types', []), 'price_preferences': analytics.get('price_preferences', {}), 'viewing_patterns': analytics.get('viewing_patterns', {}), 'property_analysis': analytics.get('property_analysis', {}), 'recommendations': analytics.get('recommendations', []), 'risk_assessment': analytics.get('risk_assessment', {}), 'opportunity_score': analytics.get('opportunity_score'), 'lead_timeline': analytics.get('lead_timeline', []), 'conversion_probability': analytics.get('conversion_probability', {}) }, 'properties': processed_data.get('properties', []), 'summary': processed_data.get('summary', {}) } } # Get AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Analyze automated email triggers using our AI model trigger_analysis = ai_engine.analyze_automated_email_triggers(customer_data) if not trigger_analysis.get('success'): return jsonify({ 'success': False, 'error': 'Failed to analyze email triggers' }), 500 return jsonify(trigger_analysis) except Exception as e: logger.error(f"Error analyzing automated emails: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/test-automated-emails/', methods=['POST']) def test_automated_emails(customer_id): """Test sending all automated emails for a customer""" logger.info(f"๐Ÿงช Testing automated emails for customer {customer_id}") try: data = request.get_json() or {} recipient_email = data.get('email', 'shaiksameermujahid@gmail.com') # Get customer analysis using our AI model (not from backend) logger.info(f"๐Ÿ” Generating lead analysis for customer {customer_id} using AI model") # Try to fetch data from backend first for property data backend_response = None try: logger.info(f"๐ŸŒ Making backend API request to: {self.config['backend_url']}/api/PropertyLeadQualification/customer/{customer_id}") backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') logger.info(f"๐Ÿ“ฅ Backend response received for customer {customer_id}: {len(backend_response) if isinstance(backend_response, list) else 'No data'}") except Exception as e: logger.warning(f"โš ๏ธ Backend API failed: {e}") # If backend fails, use mock data if not backend_response and MOCK_DATA_AVAILABLE: logger.info(f"๐Ÿ”„ Using mock data for customer {customer_id}") backend_response = mock_data_service.get_customer_data(customer_id) logger.info(f"๐Ÿ“ฅ Mock data generated for customer {customer_id}: {len(backend_response)} properties") if not backend_response: return jsonify({ 'success': False, 'error': 'No property data found for this customer' }), 404 # Process lead data using our AI model logger.info(f"โš™๏ธ Processing lead data for customer {customer_id}") processed_data = self.process_lead_data(backend_response, customer_id) # Generate analytics using our AI model logger.info(f"๐Ÿง  Generating analytics for customer {customer_id}") analytics = self.generate_analytics(backend_response, customer_id) # Create customer analysis data structure customer_data = { 'success': True, 'customer_id': customer_id, 'timestamp': datetime.now().isoformat(), 'data': { 'lead_qualification': analytics.get('lead_qualification', {}), 'analytics': { 'engagement_level': analytics.get('engagement_level'), 'preferred_property_types': analytics.get('preferred_property_types', []), 'price_preferences': analytics.get('price_preferences', {}), 'viewing_patterns': analytics.get('viewing_patterns', {}), 'property_analysis': analytics.get('property_analysis', {}), 'recommendations': analytics.get('recommendations', []), 'risk_assessment': analytics.get('risk_assessment', {}), 'opportunity_score': analytics.get('opportunity_score'), 'lead_timeline': analytics.get('lead_timeline', []), 'conversion_probability': analytics.get('conversion_probability', {}) }, 'properties': processed_data.get('properties', []), 'summary': processed_data.get('summary', {}) } } # Get AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Analyze triggers using our AI model logger.info(f"๐Ÿค– Analyzing automated email triggers for customer {customer_id}") trigger_analysis = ai_engine.analyze_automated_email_triggers(customer_data) if not trigger_analysis.get('success'): return jsonify({ 'success': False, 'error': 'Failed to analyze email triggers' }), 500 # Send emails for each trigger sent_emails = [] failed_emails = [] for trigger in trigger_analysis.get('triggers', []): try: # Generate email content using our AI model email_content = ai_engine.generate_automated_email_content( trigger, customer_data, trigger_analysis.get('ai_insights', {}) ) if email_content.get('success'): # Send email success = ai_engine.send_email(recipient_email, email_content) if success: sent_emails.append({ 'trigger_type': trigger['trigger_type'], 'subject': email_content['subject'], 'priority': trigger['priority'], 'reason': trigger['reason'] }) else: failed_emails.append({ 'trigger_type': trigger['trigger_type'], 'error': 'Failed to send email' }) else: failed_emails.append({ 'trigger_type': trigger['trigger_type'], 'error': email_content.get('error', 'Failed to generate content') }) except Exception as e: failed_emails.append({ 'trigger_type': trigger['trigger_type'], 'error': str(e) }) return jsonify({ 'success': True, 'customer_id': customer_id, 'recipient_email': recipient_email, 'total_triggers': len(trigger_analysis.get('triggers', [])), 'sent_emails': sent_emails, 'failed_emails': failed_emails, 'analysis_summary': trigger_analysis.get('analysis_summary', {}), 'ai_insights': trigger_analysis.get('ai_insights', {}), 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"Error testing automated emails: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/fetch-all-properties', methods=['GET']) def fetch_all_properties(): """Fetch all 600+ properties and store in ChromaDB""" try: logger.info("๐Ÿš€ Starting fetch of ALL 600+ properties...") # Get parameters with defaults optimized for 600+ properties workers = request.args.get('workers', 10, type=int) page_size = request.args.get('page_size', 100, type=int) # Increased page size max_pages = request.args.get('max_pages', 50, type=int) # Increased max pages logger.info(f"๐Ÿ“Š Fetch parameters: {workers} workers, {page_size} per page, max {max_pages} pages") # Get AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Fetch all properties using parallel processing success = ai_engine.fetch_all_properties_parallel( max_workers=workers, page_size=page_size, max_pages=max_pages ) if success: # Get final count from ChromaDB total_stored = 0 if hasattr(ai_engine, 'properties_collection') and ai_engine.properties_collection: try: total_stored = ai_engine.properties_collection.count() except: total_stored = "Unknown" return jsonify({ 'success': True, 'message': f'Successfully fetched and stored properties', 'total_properties': total_stored, 'parameters_used': { 'workers': workers, 'page_size': page_size, 'max_pages': max_pages }, 'timestamp': datetime.now().isoformat() }) else: return jsonify({ 'success': False, 'error': 'Failed to fetch properties' }), 500 except Exception as e: logger.error(f"โŒ Error fetching all properties: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/multi-ai-recommendations/', methods=['POST']) def send_multi_ai_recommendations(customer_id): """Send multiple AI-powered recommendation emails (property-based, price-based, location-based)""" logger.info(f"๐Ÿค– Multi-AI recommendation request for customer {customer_id}") try: data = request.get_json() or {} recipient_email = data.get('email', 'shaiksameermujahid@gmail.com') email_count = data.get('email_count', 10) # Send 10 emails by default # Get customer analysis data analysis_data = self._get_analysis_data_parallel(customer_id) if not analysis_data: return jsonify({ 'success': False, 'error': 'No customer data found' }), 404 # Get AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Generate AI insights ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) # Create different types of recommendations recommendation_types = [ 'property_based', 'price_based', 'location_based', 'similarity_based', 'behavioral_based', 'premium_properties', 'budget_friendly', 'trending_properties', 'family_oriented', 'investment_opportunities' ] sent_emails = [] failed_emails = [] # Send multiple recommendation emails for i in range(min(email_count, len(recommendation_types))): rec_type = recommendation_types[i] try: logger.info(f"๐Ÿ“ง Generating {rec_type} recommendations for customer {customer_id}") # Get specialized recommendations based on type recommendations = ai_engine.get_enhanced_ai_recommendations( analysis_data, ai_insights, count=5 ) # Generate personalized email email_result = ai_engine.generate_personalized_email( analysis_data, ai_insights, recommendations, recipient_email ) if email_result.get('success'): # Customize subject based on recommendation type custom_subject = self._get_custom_subject(rec_type, customer_id, ai_insights) email_result['subject'] = custom_subject # Send email success = ai_engine.send_email(recipient_email, email_result) if success: sent_emails.append({ 'type': rec_type, 'subject': custom_subject, 'recommendations_count': len(recommendations), 'timestamp': datetime.now().isoformat() }) logger.info(f"โœ… {rec_type} email sent successfully") else: failed_emails.append({ 'type': rec_type, 'error': 'Failed to send email' }) else: failed_emails.append({ 'type': rec_type, 'error': email_result.get('error', 'Failed to generate email') }) except Exception as e: failed_emails.append({ 'type': rec_type, 'error': str(e) }) logger.error(f"โŒ Error with {rec_type}: {e}") return jsonify({ 'success': True, 'customer_id': customer_id, 'recipient_email': recipient_email, 'requested_emails': email_count, 'sent_emails': sent_emails, 'failed_emails': failed_emails, 'total_sent': len(sent_emails), 'total_failed': len(failed_emails), 'ai_insights': ai_insights, 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"โŒ Error in multi-AI recommendations: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/chromadb-recommendations/', methods=['GET']) def get_chromadb_recommendations(customer_id): """Get property recommendations using ChromaDB similarity search""" logger.info(f"๐Ÿ” ChromaDB recommendations for customer {customer_id}") try: # Get customer analysis data analysis_data = self._get_analysis_data_parallel(customer_id) if not analysis_data: return jsonify({ 'success': False, 'error': 'No customer data found' }), 404 # Get AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Generate AI insights ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) # Get different types of recommendations from ChromaDB recommendations = { 'similarity_based': ai_engine.get_similarity_based_recommendations( analysis_data['data']['properties'], ai_engine.extract_customer_preferences(analysis_data, ai_insights), count=5 ), 'preference_based': ai_engine.get_preference_based_recommendations( ai_engine.extract_customer_preferences(analysis_data, ai_insights), analysis_data['data']['properties'], count=5 ), 'behavioral_based': ai_engine.get_behavioral_recommendations( analysis_data, ai_insights, count=5 ) } # Add AI explanations for rec_type, recs in recommendations.items(): recommendations[rec_type] = ai_engine.add_ai_explanations( recs, ai_engine.extract_customer_preferences(analysis_data, ai_insights), ai_insights ) return jsonify({ 'success': True, 'customer_id': customer_id, 'recommendations': recommendations, 'ai_insights': ai_insights, 'total_recommendations': sum(len(recs) for recs in recommendations.values()), 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"โŒ Error getting ChromaDB recommendations: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/test-sendgrid-connection', methods=['GET']) def test_sendgrid_connection(): """Test if SendGrid connection is working""" logger.info("๐Ÿงช Testing SendGrid connection") try: # Get AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Test SendGrid test_result = ai_engine.test_sendgrid_email('shaiksameermujahid@gmail.com') return jsonify({ 'success': test_result.get('success', False), 'message': 'SendGrid connection test completed', 'details': test_result, 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"โŒ Error testing SendGrid: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/email-analysis-basis/', methods=['GET']) def get_email_analysis_basis(customer_id): """Get analysis basis for email recommendations""" logger.info(f"๐Ÿ“Š Getting email analysis basis for customer {customer_id}") try: # Get customer analysis data analysis_data = self._get_analysis_data_parallel(customer_id) if not analysis_data: return jsonify({ 'success': False, 'error': 'No customer data found' }), 404 # Get AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Generate AI insights ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) # Extract customer preferences preferences = ai_engine.extract_customer_preferences(analysis_data, ai_insights) # Analyze behavioral patterns behavioral_patterns = ai_engine.analyze_behavioral_patterns(analysis_data, ai_insights) return jsonify({ 'success': True, 'customer_id': customer_id, 'analysis_basis': { 'customer_preferences': preferences, 'ai_insights': ai_insights, 'behavioral_patterns': behavioral_patterns, 'viewed_properties': analysis_data['data']['properties'][:5], # Sample properties 'lead_qualification': analysis_data['data']['lead_qualification'], 'engagement_level': analysis_data['data']['analytics']['engagement_level'], 'conversion_probability': analysis_data['data']['analytics']['conversion_probability']['final_probability'] }, 'email_triggers': { 'property_based': 'Based on property types you viewed most', 'price_based': f"Based on your price range โ‚น{preferences.get('price_range', {}).get('min_price', 0)} - โ‚น{preferences.get('price_range', {}).get('max_price', 0)}", 'location_based': 'Based on locations you explored', 'similarity_based': 'Based on properties similar to your favorites', 'behavioral_based': f"Based on your {behavioral_patterns.get('engagement_level', 'medium')} engagement pattern", 'premium_properties': 'Based on your interest in high-value properties', 'budget_friendly': 'Based on your value-conscious browsing', 'trending_properties': 'Based on market trends and your preferences', 'family_oriented': 'Based on your interest in family-suitable properties', 'investment_opportunities': 'Based on your investment potential analysis' }, 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"โŒ Error getting email analysis basis: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/email-content-preview/', methods=['GET']) def preview_all_email_content(customer_id): """Preview content of all 10 email types that would be sent""" logger.info(f"๐Ÿ‘๏ธ Previewing all email content for customer {customer_id}") try: # Get customer analysis data analysis_data = self._get_analysis_data_parallel(customer_id) if not analysis_data: return jsonify({ 'success': False, 'error': 'No customer data found' }), 404 # Get AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Generate AI insights ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) # Ensure ChromaDB is properly initialized and has properties if not hasattr(ai_engine, 'properties_collection') or ai_engine.properties_collection is None: logger.info("๐Ÿš€ ChromaDB not initialized, initializing...") ai_engine.initialize_chromadb() # Check if ChromaDB has properties try: if ai_engine.properties_collection: collection_count = ai_engine.properties_collection.count() logger.info(f"๐Ÿ“Š ChromaDB has {collection_count} properties") if collection_count == 0: logger.info("๐Ÿš€ ChromaDB is empty, auto-fetching properties...") ai_engine.auto_fetch_and_store_properties() else: logger.warning("โš ๏ธ ChromaDB collection is None, auto-fetching properties...") ai_engine.auto_fetch_and_store_properties() except Exception as count_error: logger.warning(f"โš ๏ธ Could not check ChromaDB count: {count_error}") logger.info("๐Ÿš€ Auto-fetching properties as fallback...") ai_engine.auto_fetch_and_store_properties() # Email types to generate email_types = [ 'property_based', 'price_based', 'location_based', 'similarity_based', 'behavioral_based', 'premium_properties', 'budget_friendly', 'trending_properties', 'family_oriented', 'investment_opportunities' ] email_previews = [] for email_type in email_types: try: # Get recommendations using specialized AI model for this type recommendations = ai_engine.get_multi_ai_property_recommendations( analysis_data, ai_insights, email_type, count=12 ) logger.info(f"๐Ÿค– AI Model for {email_type} preview: {len(recommendations)} properties") # Add fallback if needed if len(recommendations) < 10: fallback_recommendations = ai_engine.search_similar_properties_chromadb( f"property {email_type.replace('_', ' ')} recommendation", [], 12 ) # Merge without duplicates seen_ids = {str(r.get('id', r.get('propertyId', ''))) for r in recommendations} for prop in fallback_recommendations: prop_id = str(prop.get('id', prop.get('propertyId', ''))) if prop_id not in seen_ids and len(recommendations) < 12: recommendations.append(prop) seen_ids.add(prop_id) # Generate email content email_content = ai_engine.generate_personalized_email( analysis_data, ai_insights, recommendations, 'preview@example.com', email_type ) if email_content.get('success'): email_previews.append({ 'email_type': email_type, 'subject': email_content.get('subject', self._get_custom_subject(email_type, customer_id, ai_insights)), 'html_content': email_content.get('html_content', ''), 'text_content': email_content.get('text_content', ''), 'recommendations_count': len(recommendations), 'properties_included': [ { 'name': rec.get('property_name', rec.get('name', rec.get('title', 'Property'))), 'price': rec.get('price', rec.get('marketValue', rec.get('amount', 0))), 'type': rec.get('property_type', rec.get('type', rec.get('propertyTypeName', 'N/A'))), 'score': rec.get('ai_score', rec.get('similarity_score', 0)) } for rec in recommendations[:3] ], 'ai_insights': { 'personality_type': email_content.get('personalization', {}).get('personality_type', 'N/A'), 'decision_style': ai_insights.get('recommendation_strategy', 'N/A'), 'urgency_level': email_content.get('personalization', {}).get('urgency_level', 'N/A'), 'peak_activity': ai_insights.get('peak_time', 'N/A') } }) else: email_previews.append({ 'email_type': email_type, 'subject': self._get_custom_subject(email_type, customer_id, ai_insights), 'error': email_content.get('error', 'Failed to generate content'), 'recommendations_count': 0, 'properties_included': [] }) except Exception as e: logger.error(f"โŒ Error generating {email_type}: {e}") email_previews.append({ 'email_type': email_type, 'subject': self._get_custom_subject(email_type, customer_id, ai_insights), 'error': str(e), 'recommendations_count': 0, 'properties_included': [] }) return jsonify({ 'success': True, 'customer_id': customer_id, 'email_previews': email_previews, 'total_emails': len(email_previews), 'ai_insights': ai_insights, 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"โŒ Error previewing email content: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/test-all-10-emails/', methods=['POST']) def test_send_all_10_emails(customer_id): """Test sending all 10 different email types with unique ChromaDB properties""" logger.info(f"๐Ÿงช Testing all 10 email types for customer {customer_id}") try: data = request.get_json() or {} recipient_email = data.get('email', 'shaiksameermujahid@gmail.com') # Get customer analysis data analysis_data = self._get_analysis_data_parallel(customer_id) if not analysis_data: return jsonify({ 'success': False, 'error': 'No customer data found' }), 404 # Get AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Generate AI insights ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) # Ensure ChromaDB has properties before generating recommendations if hasattr(ai_engine, 'properties_collection') and ai_engine.properties_collection: try: collection_count = ai_engine.properties_collection.count() if collection_count == 0: logger.info("๐Ÿš€ ChromaDB is empty, auto-fetching properties...") ai_engine.auto_fetch_and_store_properties() except Exception as count_error: logger.warning(f"โš ๏ธ Could not check ChromaDB count: {count_error}") # Try to fetch anyway ai_engine.auto_fetch_and_store_properties() else: logger.warning("โš ๏ธ ChromaDB not initialized, auto-initializing...") ai_engine.initialize_chromadb() ai_engine.auto_fetch_and_store_properties() # Email types with their specific ChromaDB query strategies email_strategies = { 'property_based': 'Get properties matching customer\'s preferred property types', 'price_based': 'Get properties within customer\'s budget range', 'location_based': 'Get properties in customer\'s preferred locations', 'similarity_based': 'Get properties similar to customer\'s viewed properties', 'behavioral_based': 'Get properties based on customer behavior patterns', 'premium_properties': 'Get high-end luxury properties', 'budget_friendly': 'Get value-for-money properties', 'trending_properties': 'Get currently trending market properties', 'family_oriented': 'Get family-suitable properties with amenities', 'investment_opportunities': 'Get properties with high ROI potential' } sent_emails = [] failed_emails = [] for email_type, strategy in email_strategies.items(): try: logger.info(f"๐Ÿ“ง Processing {email_type} email with strategy: {strategy}") # Use specialized AI models for each email type - request 15 to ensure we get at least 10 recommendations = ai_engine.get_multi_ai_property_recommendations( analysis_data, ai_insights, email_type, count=15 ) logger.info(f"๐Ÿค– AI Model for {email_type} retrieved {len(recommendations)} properties") # If we don't have enough properties, try a fallback query if len(recommendations) < 10: logger.warning(f"โš ๏ธ Only {len(recommendations)} properties for {email_type}, trying fallback...") fallback_recommendations = ai_engine.search_similar_properties_chromadb( f"property {email_type.replace('_', ' ')} recommendation", [], 15 ) # Merge without duplicates seen_ids = {str(r.get('id', r.get('propertyId', ''))) for r in recommendations} for prop in fallback_recommendations: prop_id = str(prop.get('id', prop.get('propertyId', ''))) if prop_id not in seen_ids and len(recommendations) < 15: recommendations.append(prop) seen_ids.add(prop_id) logger.info(f"๐Ÿ”„ After fallback: {len(recommendations)} properties for {email_type}") # Generate personalized email with email type email_result = ai_engine.generate_personalized_email( analysis_data, ai_insights, recommendations, recipient_email, email_type ) # Add email type to the content for better file naming if isinstance(email_result, dict) and 'success' in email_result: email_content = email_result else: email_content = {'success': True} if email_content.get('success'): # Customize subject and content based on email type custom_subject = self._get_custom_subject(email_type, customer_id, ai_insights) email_content['subject'] = custom_subject # Send email success = ai_engine.send_email(recipient_email, email_content) if success: sent_emails.append({ 'type': email_type, 'subject': custom_subject, 'strategy': strategy, 'recommendations_count': len(recommendations), 'properties_from_chromadb': True, 'timestamp': datetime.now().isoformat(), 'sample_properties': [ { 'name': rec.get('property_name', 'Property'), 'price': rec.get('price', 0), 'type': rec.get('property_type', 'N/A'), 'ai_score': rec.get('ai_score', 0) } for rec in recommendations[:3] ] }) logger.info(f"โœ… {email_type} email sent successfully") else: failed_emails.append({ 'type': email_type, 'strategy': strategy, 'error': 'Failed to send email via SendGrid' }) else: failed_emails.append({ 'type': email_type, 'strategy': strategy, 'error': email_content.get('error', 'Failed to generate email content') }) except Exception as e: failed_emails.append({ 'type': email_type, 'strategy': strategy, 'error': str(e) }) logger.error(f"โŒ Error with {email_type}: {e}") return jsonify({ 'success': True, 'customer_id': customer_id, 'recipient_email': recipient_email, 'test_results': { 'total_attempted': len(email_strategies), 'total_sent': len(sent_emails), 'total_failed': len(failed_emails), 'sent_emails': sent_emails, 'failed_emails': failed_emails }, 'ai_insights': ai_insights, 'chromadb_status': 'Active - Properties retrieved from vector database', 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"โŒ Error testing all 10 emails: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/chromadb-status', methods=['GET']) def check_chromadb_status(): """Check ChromaDB status and property count""" logger.info("๐Ÿ” Checking ChromaDB status") try: # Get AI engine ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Check ChromaDB initialization if not hasattr(ai_engine, 'properties_collection') or not ai_engine.properties_collection: return jsonify({ 'success': True, 'chromadb_initialized': False, 'property_count': 0, 'message': 'ChromaDB not initialized' }) # Get property count try: property_count = ai_engine.properties_collection.count() collection_name = ai_engine.properties_collection.name return jsonify({ 'success': True, 'chromadb_initialized': True, 'collection_name': collection_name, 'property_count': property_count, 'status': 'ready' if property_count > 0 else 'empty', 'message': f'ChromaDB collection "{collection_name}" has {property_count} properties', 'timestamp': datetime.now().isoformat() }) except Exception as count_error: return jsonify({ 'success': False, 'chromadb_initialized': True, 'error': f'Collection access error: {count_error}', 'message': 'ChromaDB collection may be corrupted' }) except Exception as e: logger.error(f"โŒ Error checking ChromaDB status: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/saved-emails', methods=['GET']) def list_saved_emails(): """List all saved email files""" try: import os import glob from datetime import datetime emails_dir = "./saved_emails" if not os.path.exists(emails_dir): return jsonify({'saved_emails': [], 'message': 'No saved emails directory found'}) # Get all HTML files in the saved emails directory email_files = glob.glob(os.path.join(emails_dir, "*.html")) saved_emails = [] for file_path in email_files: file_name = os.path.basename(file_path) file_stats = os.stat(file_path) created_time = datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S') # Extract email type from filename email_type = file_name.split('_')[0] if '_' in file_name else 'unknown' saved_emails.append({ 'filename': file_name, 'filepath': file_path, 'email_type': email_type.replace('_', ' ').title(), 'created_time': created_time, 'size_kb': round(file_stats.st_size / 1024, 2) }) # Sort by creation time (newest first) saved_emails.sort(key=lambda x: x['created_time'], reverse=True) return jsonify({ 'saved_emails': saved_emails, 'total_count': len(saved_emails), 'message': f'Found {len(saved_emails)} saved email files' }) except Exception as e: logger.error(f"โŒ Error listing saved emails: {e}") return jsonify({'error': str(e)}), 500 @self.app.route('/view-email/') def view_saved_email(filename): """Serve saved email file for viewing""" try: import os from flask import send_file emails_dir = "./saved_emails" file_path = os.path.join(emails_dir, filename) if not os.path.exists(file_path): return "Email file not found", 404 return send_file(file_path, mimetype='text/html') except Exception as e: logger.error(f"โŒ Error serving email file: {e}") return f"Error loading email: {str(e)}", 500 @self.app.route('/api/DebtFinancing', methods=['PUT']) def update_debt_financing(): """Update debt financing information""" logger.info("๐Ÿ’ฐ Debt financing update request") try: data = request.get_json() if not data: return jsonify({ 'success': False, 'error': 'No data provided' }), 400 # Validate required fields required_fields = ['customer_id', 'debt_amount', 'financing_type'] for field in required_fields: if field not in data: return jsonify({ 'success': False, 'error': f'Missing required field: {field}' }), 400 # Process debt financing update customer_id = data.get('customer_id') debt_amount = data.get('debt_amount') financing_type = data.get('financing_type') # Create debt financing record debt_record = { 'customer_id': customer_id, 'debt_amount': debt_amount, 'financing_type': financing_type, 'interest_rate': data.get('interest_rate', 0), 'term_months': data.get('term_months', 0), 'monthly_payment': data.get('monthly_payment', 0), 'status': data.get('status', 'pending'), 'updated_at': datetime.now().isoformat(), 'created_at': datetime.now().isoformat() } # Try to save to backend first backend_response = None try: backend_response = self.make_api_request( '/api/DebtFinancing', method='PUT', data=debt_record, use_fallback=False ) except Exception as e: logger.warning(f"โš ๏ธ Backend API failed for debt financing: {e}") # If backend fails, use fallback if not backend_response: logger.info("๐Ÿ”„ Using fallback for debt financing update") backend_response = { 'success': True, 'message': 'Debt financing updated successfully (fallback mode)', 'data': debt_record, 'fallback': True } return jsonify(backend_response) except Exception as e: logger.error(f"โŒ Error updating debt financing: {e}") return jsonify({ 'success': False, 'error': f'Failed to update debt financing: {str(e)}' }), 500 @self.app.route('/api/Email/sendPlainText', methods=['POST']) def send_plain_text_email(): """Send plain text email using the external email API""" logger.info("๐Ÿ“ง Plain text email send request") try: # Get parameters from query string or JSON body if request.is_json: data = request.get_json() to_email = data.get('toEmail') subject = data.get('subject') body = data.get('body') else: to_email = request.args.get('toEmail') subject = request.args.get('subject') body = request.args.get('body') # Validate required parameters if not to_email: return jsonify({ 'success': False, 'error': 'toEmail parameter is required' }), 400 if not subject: return jsonify({ 'success': False, 'error': 'subject parameter is required' }), 400 if not body: return jsonify({ 'success': False, 'error': 'body parameter is required' }), 400 # Validate email format if '@' not in to_email: return jsonify({ 'success': False, 'error': 'Invalid email address format' }), 400 # Prepare email data for external API email_data = { 'toEmail': to_email, 'subject': subject, 'body': body } # Send email using external API external_api_url = f"{self.config['backend_url']}/api/Email/sendPlainText" try: logger.info(f"๐ŸŒ Sending email via external API: {external_api_url}") response = requests.post( external_api_url, params=email_data, headers=self.config['ngrok_headers'], timeout=30 ) if response.status_code == 200: logger.info(f"โœ… Email sent successfully to {to_email}") return jsonify({ 'success': True, 'message': 'Email sent successfully', 'recipient': to_email, 'subject': subject, 'timestamp': datetime.now().isoformat() }) else: logger.error(f"โŒ External email API failed: {response.status_code} - {response.text}") return jsonify({ 'success': False, 'error': f'Email service failed: {response.status_code}', 'details': response.text }), 500 except requests.exceptions.Timeout: logger.error("โฐ Email API request timeout") return jsonify({ 'success': False, 'error': 'Email service timeout' }), 504 except requests.exceptions.ConnectionError: logger.error("๐Ÿ”Œ Email API connection error") return jsonify({ 'success': False, 'error': 'Email service unavailable' }), 503 except Exception as e: logger.error(f"โŒ Email API error: {e}") return jsonify({ 'success': False, 'error': f'Email service error: {str(e)}' }), 500 except Exception as e: logger.error(f"โŒ Error sending plain text email: {e}") return jsonify({ 'success': False, 'error': f'Failed to send email: {str(e)}' }), 500 @self.app.route('/api/email-automation/', methods=['POST']) def trigger_email_automation(customer_id): """Trigger comprehensive email automation based on user tracking analysis""" logger.info(f"๐Ÿ“ง Triggering email automation for customer {customer_id}") try: data = request.get_json() or {} email_type = data.get('email_type', 'all') # 'all', 'new_properties', 'recommendations', 'peak_time' recipient_email = data.get('email') if not recipient_email: return jsonify({ 'success': False, 'error': 'Email address is required' }), 400 # Get customer analysis data (use the same method as the main analysis) cache_key = f'lead_analysis_{customer_id}' analysis_data = self.get_cached_data(cache_key) if not analysis_data: # Generate fresh analysis data logger.info(f"๐ŸŒ Fetching fresh data for email automation - customer {customer_id}") backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') if not backend_response: return jsonify({ 'success': False, 'error': 'No customer data found' }), 404 processed_data = self.process_lead_data(backend_response, customer_id) analytics = self.generate_analytics(backend_response, customer_id) analysis_data = { 'success': True, 'customer_id': customer_id, 'data': { 'lead_qualification': analytics.get('lead_qualification', {}), 'analytics': analytics, 'properties': processed_data.get('properties', []), 'summary': processed_data.get('summary', {}) } } # Get AI engine for analysis ai_engine = self.get_ai_engine() if not ai_engine: return jsonify({ 'success': False, 'error': 'AI engine not available' }), 500 # Generate AI insights from tracking data ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) results = [] # Send different types of emails based on request if email_type in ['all', 'new_properties']: new_prop_result = self._send_new_properties_email(customer_id, recipient_email, analysis_data, ai_insights) results.append(new_prop_result) if email_type in ['all', 'recommendations']: rec_result = self._send_ai_recommendations_email(customer_id, recipient_email, analysis_data, ai_insights) results.append(rec_result) if email_type in ['all', 'peak_time']: peak_result = self._send_peak_time_engagement_email(customer_id, recipient_email, analysis_data, ai_insights) results.append(peak_result) return jsonify({ 'success': True, 'message': f'Email automation completed for customer {customer_id}', 'customer_id': customer_id, 'recipient': recipient_email, 'email_type': email_type, 'ai_insights': ai_insights, 'results': results, 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"โŒ Error in email automation: {e}") return jsonify({ 'success': False, 'error': f'Email automation failed: {str(e)}' }), 500 def _send_new_properties_email(self, customer_id: int, recipient_email: str, analysis_data: Dict, ai_insights: Dict) -> Dict: """Send email about newly added properties based on user tracking""" logger.info(f"๐Ÿก Sending new properties email for customer {customer_id}") try: # Analyze user preferences from tracking data user_preferences = self._analyze_user_preferences(analysis_data) # Get newly added properties that match user preferences new_properties = self._get_new_properties_for_user(user_preferences) if not new_properties: return { 'type': 'new_properties', 'success': False, 'message': 'No new properties matching user preferences' } # Create email content subject = f"๐Ÿก New Properties Just Added - Perfect Matches for You!" body_lines = [ f"Dear Valued Customer,", "", "๐ŸŽ‰ Great news! We've just added new properties that match your preferences perfectly!", "", "๐Ÿ“Š YOUR PREFERENCES (Based on Your Activity):", f"โ€ข Preferred Property Type: {user_preferences.get('property_type', 'Villa')}", f"โ€ข Budget Range: ${user_preferences.get('min_budget', 0):,} - ${user_preferences.get('max_budget', 0):,}", f"โ€ข Preferred Locations: {', '.join(user_preferences.get('locations', ['Dubai']))}", f"โ€ข Bedrooms: {user_preferences.get('bedrooms', '3+')}", "", "๐Ÿ†• NEWLY ADDED PROPERTIES:" ] # Add new properties for i, prop in enumerate(new_properties[:5], 1): body_lines.extend([ f"{i}. {prop.get('title', 'New Property')}", f" ๐Ÿ“ Location: {prop.get('location', 'N/A')}", f" ๐Ÿ’ฐ Price: ${prop.get('price', 0):,}", f" ๐Ÿ  Type: {prop.get('propertyTypeName', 'Villa')}", f" ๐Ÿ›๏ธ Bedrooms: {prop.get('bedrooms', 'N/A')}", f" โญ Match Score: {prop.get('match_score', 95):.0f}%", "" ]) body_lines.extend([ "โšก ACT FAST!", "These properties are fresh on the market and likely to be in high demand.", "", "๐Ÿ’ก NEXT STEPS:", "โ€ข Schedule a viewing immediately", "โ€ข Contact our team for more details", "โ€ข Get pre-approval for faster processing", "", "Best regards,", "Your Property Match Team" ]) email_body = "\n".join(body_lines) # Send email result = self._send_email_via_api(recipient_email, subject, email_body) result['type'] = 'new_properties' result['properties_count'] = len(new_properties) result['ai_insights'] = { 'personality_type': 'Analytical', 'urgency_level': 'Medium', 'peak_time': 'Evening' } return result except Exception as e: logger.error(f"โŒ Error sending new properties email: {e}") return { 'type': 'new_properties', 'success': False, 'error': str(e) } def _send_ai_recommendations_email(self, customer_id: int, recipient_email: str, analysis_data: Dict, ai_insights: Dict) -> Dict: """Send AI-powered recommendations based on user tracking analysis""" logger.info(f"๐Ÿค– Sending AI recommendations email for customer {customer_id}") try: # Get AI engine for recommendations ai_engine = self.get_ai_engine() if not ai_engine: return { 'type': 'ai_recommendations', 'success': False, 'error': 'AI engine not available' } # Generate recommendations based on tracking data recommendations = ai_engine.get_enhanced_ai_recommendations(analysis_data, ai_insights, count=5) # Create personalized email content personality_type = ai_insights.get('ai_personality_type', 'Analytical') urgency_level = ai_insights.get('urgency_level', 'Medium') decision_style = ai_insights.get('recommendation_strategy', 'Data-driven') peak_time = ai_insights.get('peak_time', 'Evening') subject = f"๐Ÿค– AI-Powered Recommendations - Tailored Just for You!" body_lines = [ f"Dear Valued Customer,", "", "๐Ÿง  Our AI has analyzed your browsing behavior and preferences to bring you these personalized recommendations:", "", "๐Ÿ“Š YOUR BEHAVIOR ANALYSIS:", f"โ€ข Personality Type: {personality_type}", f"โ€ข Decision Making Style: {decision_style}", f"โ€ข Engagement Level: {ai_insights.get('engagement_level', 'High')}", f"โ€ข Urgency Level: {urgency_level}", f"โ€ข Peak Activity Time: {peak_time}", "", "๐ŸŽฏ AI RECOMMENDATIONS:" ] # Add AI recommendations if recommendations and isinstance(recommendations, list): for i, prop in enumerate(recommendations[:5], 1): body_lines.extend([ f"{i}. {prop.get('property_name', prop.get('title', 'Recommended Property'))}", f" ๐Ÿ“ Location: {prop.get('location', 'N/A')}", f" ๐Ÿ’ฐ Price: ${prop.get('price', 0):,}", f" ๐ŸŽฏ AI Match Score: {prop.get('ai_score', prop.get('match_score', 0)):.1f}%", f" ๐Ÿ’ก Why Recommended: {prop.get('recommendation_reason', 'Matches your preferences')}", "" ]) # Add urgency-based call to action if urgency_level == 'High': body_lines.extend([ "๐Ÿ”ฅ URGENT RECOMMENDATION:", "Based on your high engagement, we recommend acting quickly on these properties!", "" ]) body_lines.extend([ "๐Ÿ’ก PERSONALIZED NEXT STEPS:", "โ€ข Review properties that match your personality type", "โ€ข Schedule viewings during your peak activity time", "โ€ข Contact our AI-powered assistant for more details", "", "Best regards,", "AI-Powered Property Recommendation System" ]) email_body = "\n".join(body_lines) # Send email result = self._send_email_via_api(recipient_email, subject, email_body) result['type'] = 'ai_recommendations' result['ai_insights'] = { 'personality_type': personality_type, 'decision_making_style': decision_style, 'urgency_level': urgency_level, 'peak_time': peak_time } result['recommendations_count'] = len(recommendations) if recommendations else 0 return result except Exception as e: logger.error(f"โŒ Error sending AI recommendations email: {e}") return { 'type': 'ai_recommendations', 'success': False, 'error': str(e) } def _send_peak_time_engagement_email(self, customer_id: int, recipient_email: str, analysis_data: Dict, ai_insights: Dict) -> Dict: """Send email during user's peak engagement time""" logger.info(f"โฐ Sending peak time engagement email for customer {customer_id}") try: # Analyze peak time from tracking data peak_time = ai_insights.get('peak_time', 'Evening') engagement_level = ai_insights.get('engagement_level', 'High') # Extract AI insights for display personality_type = ai_insights.get('ai_personality_type', 'Analytical') urgency_level = ai_insights.get('urgency_level', 'Medium') # Create time-sensitive email content subject = f"โฐ Perfect Time to Explore Properties - Based on Your Activity Pattern!" body_lines = [ f"Dear Valued Customer,", "", f"โฐ We've noticed you're most active during {peak_time} hours!", f"Your engagement level: {engagement_level}", "", "๐Ÿ“ˆ OPTIMAL VIEWING TIME:", f"โ€ข Your Peak Activity: {peak_time}", f"โ€ข Best Time for Property Tours: {self._get_optimal_viewing_time(peak_time)}", f"โ€ข Recommended Action Time: {self._get_recommended_action_time(peak_time)}", "", "๐ŸŽฏ TIME-SENSITIVE OPPORTUNITIES:" ] # Get time-sensitive properties time_sensitive_props = self._get_time_sensitive_properties(analysis_data) for i, prop in enumerate(time_sensitive_props[:3], 1): body_lines.extend([ f"{i}. {prop.get('title', 'Time-Sensitive Property')}", f" ๐Ÿ“ Location: {prop.get('location', 'N/A')}", f" ๐Ÿ’ฐ Price: ${prop.get('price', 0):,}", f" โฐ Best Viewing Time: {prop.get('optimal_time', peak_time)}", f" ๐Ÿšจ Urgency: {prop.get('urgency', 'Medium')}", "" ]) body_lines.extend([ "๐Ÿ’ก PEAK TIME STRATEGY:", f"โ€ข Schedule viewings during {peak_time} for maximum engagement", "โ€ข Take advantage of your high focus period", "โ€ข Make decisions when you're most alert", "", "๐Ÿ“ž IMMEDIATE ACTION:", "โ€ข Call now to schedule a viewing", "โ€ข Get priority booking during your peak time", "โ€ข Receive personalized assistance", "", "Best regards,", "Your Peak Time Property Team" ]) email_body = "\n".join(body_lines) # Send email result = self._send_email_via_api(recipient_email, subject, email_body) result['type'] = 'peak_time_engagement' result['peak_time'] = peak_time result['engagement_level'] = engagement_level result['ai_insights'] = { 'personality_type': personality_type, 'urgency_level': urgency_level, 'peak_time': peak_time } return result except Exception as e: logger.error(f"โŒ Error sending peak time engagement email: {e}") return { 'type': 'peak_time_engagement', 'success': False, 'error': str(e) } def _send_email_via_api(self, recipient_email: str, subject: str, body: str) -> Dict: """Send email using the external API""" try: email_data = { 'toEmail': recipient_email, 'subject': subject, 'body': body } external_api_url = f"{self.config['backend_url']}/api/Email/sendPlainText" logger.info(f"๐ŸŒ Sending email via external API to {recipient_email}") response = requests.post( external_api_url, params=email_data, headers=self.config['ngrok_headers'], timeout=30 ) if response.status_code == 200: logger.info(f"โœ… Email sent successfully to {recipient_email}") return { 'success': True, 'message': 'Email sent successfully', 'recipient': recipient_email, 'subject': subject } else: logger.error(f"โŒ External email API failed: {response.status_code} - {response.text}") return { 'success': False, 'error': f'Email service failed: {response.status_code}', 'details': response.text } except Exception as e: logger.error(f"โŒ Email API error: {e}") return { 'success': False, 'error': f'Email service error: {str(e)}' } def _analyze_user_preferences(self, analysis_data: Dict) -> Dict: """Analyze user preferences from tracking data""" try: # Extract preferences from analysis data preferences = { 'property_type': 'Villa', # Default 'min_budget': 500000, 'max_budget': 2000000, 'locations': ['Dubai', 'Abu Dhabi'], 'bedrooms': '3+', 'price_range': '500K-2M' } # Analyze from tracking data if available if 'data' in analysis_data and 'analytics' in analysis_data['data']: analytics = analysis_data['data']['analytics'] # Extract property type preferences if 'property_types' in analytics: most_viewed = max(analytics['property_types'].items(), key=lambda x: x[1]) preferences['property_type'] = most_viewed[0] # Extract budget preferences if 'price_ranges' in analytics: most_viewed = max(analytics['price_ranges'].items(), key=lambda x: x[1]) preferences['price_range'] = most_viewed[0] # Extract location preferences if 'locations' in analytics: top_locations = sorted(analytics['locations'].items(), key=lambda x: x[1], reverse=True)[:3] preferences['locations'] = [loc[0] for loc in top_locations] return preferences except Exception as e: logger.error(f"โŒ Error analyzing user preferences: {e}") return { 'property_type': 'Villa', 'min_budget': 500000, 'max_budget': 2000000, 'locations': ['Dubai'], 'bedrooms': '3+' } def _get_new_properties_for_user(self, user_preferences: Dict) -> List[Dict]: """Get newly added properties that match user preferences""" try: # This would typically fetch from a database of new properties # For now, we'll generate mock new properties based on preferences property_type = user_preferences.get('property_type', 'Villa') locations = user_preferences.get('locations', ['Dubai']) min_budget = user_preferences.get('min_budget', 500000) max_budget = user_preferences.get('max_budget', 2000000) new_properties = [] # Generate mock new properties for i, location in enumerate(locations[:2]): price = min_budget + (i * 300000) if price <= max_budget: new_properties.append({ 'title': f'New {property_type} in {location}', 'location': location, 'price': price, 'propertyTypeName': property_type, 'bedrooms': '3+', 'match_score': 95 + i, 'is_new': True, 'added_date': datetime.now().strftime('%Y-%m-%d') }) return new_properties except Exception as e: logger.error(f"โŒ Error getting new properties: {e}") return [] def _get_optimal_viewing_time(self, peak_time: str) -> str: """Get optimal viewing time based on peak activity""" time_mapping = { 'Morning': '9:00 AM - 11:00 AM', 'Afternoon': '2:00 PM - 4:00 PM', 'Evening': '6:00 PM - 8:00 PM', 'Night': '7:00 PM - 9:00 PM' } return time_mapping.get(peak_time, '6:00 PM - 8:00 PM') def _get_recommended_action_time(self, peak_time: str) -> str: """Get recommended action time based on peak activity""" time_mapping = { 'Morning': '10:00 AM - 12:00 PM', 'Afternoon': '3:00 PM - 5:00 PM', 'Evening': '7:00 PM - 9:00 PM', 'Night': '8:00 PM - 10:00 PM' } return time_mapping.get(peak_time, '7:00 PM - 9:00 PM') def _get_time_sensitive_properties(self, analysis_data: Dict) -> List[Dict]: """Get time-sensitive properties based on analysis""" try: # Generate time-sensitive properties return [ { 'title': 'Limited Time Offer - Premium Villa', 'location': 'Dubai Marina', 'price': 1500000, 'optimal_time': 'Evening', 'urgency': 'High' }, { 'title': 'Flash Sale - Luxury Apartment', 'location': 'Downtown Dubai', 'price': 800000, 'optimal_time': 'Afternoon', 'urgency': 'Very High' }, { 'title': 'Exclusive Preview - New Development', 'location': 'Palm Jumeirah', 'price': 2500000, 'optimal_time': 'Morning', 'urgency': 'Medium' } ] except Exception as e: logger.error(f"โŒ Error getting time-sensitive properties: {e}") return [] # Helper methods for multi-AI system def _get_analysis_data_parallel(self, customer_id: int) -> Dict: """Get analysis data for a customer using parallel processing""" try: # Try to get from cache first cache_key = f'lead_analysis_{customer_id}' cached_data = self.get_cached_data(cache_key) if cached_data: logger.info(f"๐Ÿ“‹ Returning cached analysis data for customer {customer_id}") return cached_data # Fetch from backend backend_response = None try: backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') except Exception as e: logger.warning(f"โš ๏ธ Backend API failed: {e}") # If backend fails, use mock data if not backend_response and MOCK_DATA_AVAILABLE: logger.info(f"๐Ÿ”„ Using mock data for customer {customer_id}") backend_response = mock_data_service.get_customer_data(customer_id) if not backend_response: return None # Process the data processed_data = self.process_lead_data(backend_response, customer_id) analytics = self.generate_analytics(backend_response, customer_id) analysis_data = { 'success': True, 'customer_id': customer_id, 'timestamp': datetime.now().isoformat(), 'data': { 'lead_qualification': analytics.get('lead_qualification', {}), 'analytics': analytics, 'properties': processed_data.get('properties', []), 'summary': processed_data.get('summary', {}) } } # Cache the result self.cache_data(cache_key, analysis_data) return analysis_data except Exception as e: logger.error(f"โŒ Error getting analysis data for customer {customer_id}: {e}") return None def _get_custom_subject(self, recommendation_type: str, customer_id: int, ai_insights: Dict) -> str: """Generate custom email subject based on recommendation type""" personality_type = ai_insights.get('personality_type', 'Analytical') urgency_level = ai_insights.get('urgency_level', 'Medium') subjects = { 'property_based': f"๐Ÿก Perfect Properties Matched to Your Preferences - Customer {customer_id}", 'price_based': f"๐Ÿ’ฐ Properties in Your Budget Range - Exclusive Deals for Customer {customer_id}", 'location_based': f"๐Ÿ“ Prime Locations Just for You - Customer {customer_id}", 'similarity_based': f"๐Ÿ” Properties Similar to Your Favorites - Customer {customer_id}", 'behavioral_based': f"๐Ÿง  AI-Analyzed Recommendations Based on Your Behavior - Customer {customer_id}", 'premium_properties': f"โญ Premium Properties Collection - Customer {customer_id}", 'budget_friendly': f"๐Ÿ’ต Best Value Properties for Smart Investors - Customer {customer_id}", 'trending_properties': f"๐Ÿ“ˆ Trending Properties in Hot Markets - Customer {customer_id}", 'family_oriented': f"๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Family-Perfect Properties - Customer {customer_id}", 'investment_opportunities': f"๐Ÿ’ผ Investment Opportunities with High ROI - Customer {customer_id}" } base_subject = subjects.get(recommendation_type, f"๐Ÿค– AI Recommendations for Customer {customer_id}") # Add urgency indicators for high urgency if urgency_level == 'High': base_subject = f"โšก URGENT: {base_subject}" elif urgency_level == 'Very High': base_subject = f"๐Ÿ”ฅ HOT DEALS: {base_subject}" return base_subject # Include all the original analysis methods from api_service.py def clean_for_json(self, obj): """Clean object for JSON serialization""" def clean_value(value): import numpy as np # Handle numpy types if isinstance(value, (np.float32, np.float64, np.int32, np.int64)): value = float(value) elif isinstance(value, np.ndarray): return value.tolist() # Handle regular floats if isinstance(value, float): if math.isinf(value) and value > 0: return 999999999 elif math.isinf(value) and value < 0: return -999999999 elif math.isnan(value): return 0 return value def clean_dict(d): if isinstance(d, dict): return {k: clean_value(v) if not isinstance(v, (dict, list)) else clean_dict(v) for k, v in d.items()} elif isinstance(d, list): return [clean_value(v) if not isinstance(v, (dict, list)) else clean_dict(v) for v in d] else: return clean_value(d) return clean_dict(obj) def make_api_request(self, endpoint: str, method: str = 'GET', data: Dict = None, use_fallback: bool = True) -> Any: """Make API request to backend with robust fallback mechanism""" max_retries = self.config.get('max_retries', 3) retry_delay = self.config.get('retry_delay', 1) for attempt in range(max_retries + 1): try: url = f"{self.config['backend_url']}{endpoint}" logger.info(f"๐ŸŒ Making {method} request to: {url} (attempt {attempt + 1}/{max_retries + 1})") headers = self.config['ngrok_headers'] # Add timeout for better reliability timeout = 30 if attempt < max_retries else 60 if method.upper() == 'GET': response = requests.get(url, headers=headers, timeout=timeout) elif method.upper() == 'POST': response = requests.post(url, headers=headers, json=data, timeout=timeout) elif method.upper() == 'PUT': response = requests.put(url, headers=headers, json=data, timeout=timeout) else: raise ValueError(f"Unsupported HTTP method: {method}") if response.status_code == 200: logger.info(f"โœ… API request successful on attempt {attempt + 1}") return response.json() elif response.status_code in [429, 502, 503, 504] and attempt < max_retries: # Retry on rate limiting or server errors logger.warning(f"โš ๏ธ API request failed with {response.status_code}, retrying in {retry_delay}s...") time.sleep(retry_delay * (attempt + 1)) # Exponential backoff continue else: logger.error(f"โŒ API request failed: {response.status_code} - {response.text}") if not use_fallback: return None break except requests.exceptions.Timeout as e: logger.warning(f"โฐ Request timeout on attempt {attempt + 1}: {e}") if attempt < max_retries: time.sleep(retry_delay * (attempt + 1)) continue else: logger.error(f"โŒ All retry attempts failed due to timeout") break except requests.exceptions.ConnectionError as e: logger.warning(f"๐Ÿ”Œ Connection error on attempt {attempt + 1}: {e}") if attempt < max_retries: time.sleep(retry_delay * (attempt + 1)) continue else: logger.error(f"โŒ All retry attempts failed due to connection error") break except Exception as e: logger.error(f"โŒ API request error on attempt {attempt + 1}: {e}") if attempt < max_retries: time.sleep(retry_delay * (attempt + 1)) continue else: break # If all attempts failed and fallback is enabled, return fallback data if use_fallback: logger.info(f"๐Ÿ”„ All API attempts failed, using fallback data for endpoint: {endpoint}") return self.get_fallback_data(endpoint, method, data) return None def get_fallback_data(self, endpoint: str, method: str, data: Dict = None) -> Any: """Get fallback data when API requests fail""" try: # Extract customer ID from endpoint if it's a customer-specific endpoint customer_id = None if '/customer/' in endpoint: try: customer_id = int(endpoint.split('/customer/')[-1]) except (ValueError, IndexError): pass # Use mock data service as fallback if MOCK_DATA_AVAILABLE and customer_id: logger.info(f"๐Ÿ”„ Using mock data fallback for customer {customer_id}") return mock_data_service.get_customer_data(customer_id) # Return empty structure for other endpoints if 'PropertyLeadQualification' in endpoint: return [] elif 'DebtFinancing' in endpoint: return {'success': False, 'message': 'Service temporarily unavailable', 'fallback': True} else: return {'success': False, 'message': 'Service temporarily unavailable', 'fallback': True} except Exception as e: logger.error(f"โŒ Fallback data generation failed: {e}") return None # All the original analysis methods from api_service.py are included below def process_lead_data(self, data: List[Dict], customer_id: int) -> Dict: """Process and enhance lead qualification data""" # Implementation from original api_service.py if not data: return { 'customer_id': customer_id, 'properties': [], 'summary': { 'total_properties': 0, 'total_views': 0, 'total_duration': 0, 'average_price': 0, 'property_types': {}, 'engagement_score': 0 } } # Calculate summary statistics total_views = sum(prop.get('viewCount', 0) for prop in data) total_duration = sum(prop.get('totalDuration', 0) for prop in data) total_price = sum(prop.get('price', 0) for prop in data) # Property type distribution property_types = {} for prop in data: prop_type = prop.get('propertyTypeName', 'Unknown') property_types[prop_type] = property_types.get(prop_type, 0) + 1 # Calculate engagement score engagement_score = self.calculate_engagement_score(data) # Enhance each property with additional metrics enhanced_properties = [] for prop in data: enhanced_prop = prop.copy() enhanced_prop['engagement_score'] = self.calculate_property_engagement(prop) enhanced_prop['price_per_sqm'] = self.estimate_price_per_sqm(prop.get('price', 0)) enhanced_prop['view_frequency'] = self.calculate_view_frequency(prop) enhanced_prop['last_viewed_days_ago'] = self.days_since_last_view(prop.get('lastViewedAt')) enhanced_properties.append(enhanced_prop) return { 'customer_id': customer_id, 'properties': enhanced_properties, 'summary': { 'total_properties': len(data), 'total_views': total_views, 'total_duration': total_duration, 'average_price': total_price / len(data) if data else 0, 'property_types': property_types, 'engagement_score': engagement_score, 'last_activity': max((prop.get('lastViewedAt') for prop in data), default=None) } } def generate_analytics(self, lead_data: List[Dict], customer_id: int) -> Dict: """Generate comprehensive analytics from lead data""" if not lead_data: return {'customer_id': customer_id, 'analytics': {}} # Process the data processed_data = self.process_lead_data(lead_data, customer_id) # Generate detailed lead qualification lead_qualification = self.generate_detailed_lead_qualification(processed_data) # Generate insights insights = { 'customer_id': customer_id, 'lead_qualification': lead_qualification, 'engagement_level': self.categorize_engagement(processed_data['summary']['engagement_score']), 'preferred_property_types': self.get_preferred_property_types(processed_data['summary']['property_types']), 'price_preferences': self.analyze_price_preferences(lead_data), 'viewing_patterns': self.analyze_viewing_patterns(lead_data), 'property_analysis': self.analyze_properties_detailed(lead_data), 'recommendations': self.generate_recommendations(processed_data), 'risk_assessment': self.assess_risk(processed_data), 'opportunity_score': self.calculate_opportunity_score(processed_data), 'lead_timeline': self.generate_lead_timeline(lead_data), 'conversion_probability': self.calculate_conversion_probability(processed_data) } return insights # Add placeholder methods for the analysis functions def generate_detailed_lead_qualification(self, processed_data: Dict) -> Dict: """Generate detailed lead qualification with warm/cold/hot status""" summary = processed_data['summary'] properties = processed_data['properties'] # Calculate lead score based on multiple factors lead_score = 0 factors = {} # 1. Engagement Score (0-30 points) engagement_points = min(summary['engagement_score'] * 0.3, 30) lead_score += engagement_points factors['engagement'] = { 'score': summary['engagement_score'], 'points': engagement_points, 'max_points': 30, 'description': f"Based on {summary['total_views']} views and {formatDuration(summary['total_duration'])} total viewing time" } # 2. Property Diversity (0-15 points) property_types = len(summary['property_types']) diversity_points = min(property_types * 5, 15) lead_score += diversity_points factors['property_diversity'] = { 'score': property_types, 'points': diversity_points, 'max_points': 15, 'description': f"Viewed {property_types} different property types" } # 3. Recent Activity (0-20 points) recent_activity_points = 0 if summary.get('last_activity'): try: last_activity = datetime.fromisoformat(summary['last_activity'].replace('Z', '+00:00')) days_since = (datetime.now() - last_activity).days if days_since <= 1: recent_activity_points = 20 elif days_since <= 3: recent_activity_points = 15 elif days_since <= 7: recent_activity_points = 10 elif days_since <= 30: recent_activity_points = 5 except: pass lead_score += recent_activity_points factors['recent_activity'] = { 'score': days_since if 'days_since' in locals() else 999, 'points': recent_activity_points, 'max_points': 20, 'description': f"Last activity was {days_since if 'days_since' in locals() else 'unknown'} days ago" } # 4. Price Range Consistency (0-15 points) prices = [p['price'] for p in properties if p['price'] > 0] if prices: price_range = max(prices) - min(prices) price_consistency = 1 - (price_range / max(prices)) consistency_points = price_consistency * 15 lead_score += consistency_points factors['price_consistency'] = { 'score': round(price_consistency * 100, 1), 'points': consistency_points, 'max_points': 15, 'description': f"Price range consistency: {round(price_consistency * 100, 1)}%" } # 5. Viewing Depth (0-20 points) deep_views = sum(1 for p in properties if p['viewCount'] > 1) depth_points = min(deep_views * 5, 20) lead_score += depth_points factors['viewing_depth'] = { 'score': deep_views, 'points': depth_points, 'max_points': 20, 'description': f"{deep_views} properties viewed multiple times" } # Determine lead status if lead_score >= 70: lead_status = "HOT" status_color = "#e74c3c" status_description = "High probability of conversion. Immediate follow-up recommended." elif lead_score >= 50: lead_status = "WARM" status_color = "#f39c12" status_description = "Good potential. Regular follow-up needed." elif lead_score >= 30: lead_status = "LUKEWARM" status_color = "#95a5a6" status_description = "Some interest shown. Nurturing required." else: lead_status = "COLD" status_color = "#34495e" status_description = "Low engagement. Re-engagement campaign needed." return { 'lead_score': round(lead_score, 1), 'lead_status': lead_status, 'status_color': status_color, 'status_description': status_description, 'max_possible_score': 100, 'factors': factors, 'summary': { 'total_properties_viewed': summary['total_properties'], 'total_views': summary['total_views'], 'total_duration': summary['total_duration'], 'average_price': summary['average_price'], 'engagement_score': summary['engagement_score'] } } def calculate_engagement_score(self, properties: List[Dict]) -> float: """Calculate overall engagement score for customer""" if not properties: return 0.0 total_score = 0 for prop in properties: views = prop.get('viewCount', 0) duration = prop.get('totalDuration', 0) # Weight views and duration view_score = min(views * 10, 50) # Max 50 points for views duration_score = min(duration / 1000, 50) # Max 50 points for duration total_score += view_score + duration_score return total_score / len(properties) def calculate_property_engagement(self, property_data: Dict) -> float: """Calculate engagement score for a single property""" views = property_data.get('viewCount', 0) duration = property_data.get('totalDuration', 0) view_score = min(views * 10, 50) duration_score = min(duration / 1000, 50) return view_score + duration_score def estimate_price_per_sqm(self, price: float) -> float: """Estimate price per square meter (rough estimation)""" return price / 150 if price > 0 else 0 def calculate_view_frequency(self, property_data: Dict) -> str: """Calculate viewing frequency""" views = property_data.get('viewCount', 0) if views == 0: return "No views" elif views == 1: return "Single view" elif views <= 3: return "Low frequency" elif views <= 7: return "Medium frequency" else: return "High frequency" def days_since_last_view(self, last_viewed: str) -> int: """Calculate days since last view""" if not last_viewed: return 999 try: last_view_date = datetime.fromisoformat(last_viewed.replace('Z', '+00:00')) days_diff = (datetime.now() - last_view_date).days return max(0, days_diff) except: return 999 def categorize_engagement(self, score: float) -> str: """Categorize engagement level""" if score >= 80: return "Very High" elif score >= 60: return "High" elif score >= 40: return "Medium" elif score >= 20: return "Low" else: return "Very Low" def get_preferred_property_types(self, property_types: Dict) -> List[str]: """Get preferred property types""" if not property_types: return [] sorted_types = sorted(property_types.items(), key=lambda x: x[1], reverse=True) return [prop_type for prop_type, count in sorted_types[:3]] def analyze_price_preferences(self, properties: List[Dict]) -> Dict: """Analyze price preferences""" if not properties: return {} prices = [prop.get('price', 0) for prop in properties] prices = [p for p in prices if p > 0] if not prices: return {} return { 'min_price': min(prices), 'max_price': max(prices), 'avg_price': sum(prices) / len(prices), 'price_range': max(prices) - min(prices) } def analyze_viewing_patterns(self, properties: List[Dict]) -> Dict: """Analyze viewing patterns""" if not properties: return {} # Group by viewing time morning_views = 0 afternoon_views = 0 evening_views = 0 for prop in properties: if prop.get('lastViewedAt'): try: view_time = datetime.fromisoformat(prop['lastViewedAt'].replace('Z', '+00:00')) hour = view_time.hour if 6 <= hour < 12: morning_views += prop.get('viewCount', 0) elif 12 <= hour < 18: afternoon_views += prop.get('viewCount', 0) else: evening_views += prop.get('viewCount', 0) except: pass return { 'morning_views': morning_views, 'afternoon_views': afternoon_views, 'evening_views': evening_views, 'peak_viewing_time': 'morning' if morning_views > max(afternoon_views, evening_views) else 'afternoon' if afternoon_views > evening_views else 'evening' } def analyze_properties_detailed(self, properties: List[Dict]) -> Dict: """Detailed analysis of each property""" return {'analyzed_properties': len(properties)} def generate_recommendations(self, processed_data: Dict) -> List[str]: """Generate recommendations based on data""" recommendations = [] summary = processed_data['summary'] if summary['engagement_score'] < 30: recommendations.append("Low engagement detected. Consider follow-up communication.") if summary['total_views'] < 5: recommendations.append("Limited property exploration. Suggest more property options.") if summary['average_price'] > 20000000: # 20M recommendations.append("High-value customer. Consider premium properties.") if len(summary['property_types']) > 3: recommendations.append("Diverse property interests. Offer variety in recommendations.") return recommendations def assess_risk(self, processed_data: Dict) -> Dict: """Assess risk factors""" summary = processed_data['summary'] risk_factors = [] risk_score = 0 if summary['engagement_score'] < 20: risk_factors.append("Very low engagement") risk_score += 30 if summary['total_views'] < 3: risk_factors.append("Limited property exploration") risk_score += 20 if summary['average_price'] > 50000000: # 50M risk_factors.append("Very high price range") risk_score += 15 return { 'risk_score': min(risk_score, 100), 'risk_level': 'High' if risk_score > 50 else 'Medium' if risk_score > 25 else 'Low', 'risk_factors': risk_factors } def calculate_opportunity_score(self, processed_data: Dict) -> float: """Calculate opportunity score""" summary = processed_data['summary'] # Base score from engagement score = summary['engagement_score'] # Bonus for high-value properties if summary['average_price'] > 20000000: score += 20 # Bonus for multiple property types if len(summary['property_types']) > 2: score += 10 # Bonus for recent activity if summary.get('last_activity'): try: last_activity = datetime.fromisoformat(summary['last_activity'].replace('Z', '+00:00')) days_since = (datetime.now() - last_activity).days if days_since <= 7: score += 15 elif days_since <= 30: score += 10 except: pass return min(score, 100) def generate_lead_timeline(self, properties: List[Dict]) -> List[Dict]: """Generate timeline of lead activity""" timeline = [] for prop in properties: if prop.get('lastViewedAt'): try: view_date = datetime.fromisoformat(prop['lastViewedAt'].replace('Z', '+00:00')) timeline.append({ 'date': view_date.isoformat(), 'property_name': prop['propertyName'], 'property_type': prop['propertyTypeName'], 'price': prop['price'], 'views': prop['viewCount'], 'duration': prop['totalDuration'], 'engagement_score': prop.get('engagement_score', 0) }) except: pass # Sort by date timeline.sort(key=lambda x: x['date'], reverse=True) return timeline def calculate_conversion_probability(self, processed_data: Dict) -> Dict: """Calculate probability of conversion based on various factors""" summary = processed_data['summary'] properties = processed_data['properties'] # Base probability base_probability = min(summary['engagement_score'], 100) # Adjustments based on factors adjustments = {} # Price range consistency prices = [p['price'] for p in properties if p['price'] > 0] if prices: price_range = max(prices) - min(prices) price_consistency = 1 - (price_range / max(prices)) adjustments['price_consistency'] = price_consistency * 10 # Recent activity if summary.get('last_activity'): try: last_activity = datetime.fromisoformat(summary['last_activity'].replace('Z', '+00:00')) days_since = (datetime.now() - last_activity).days if days_since <= 1: adjustments['recent_activity'] = 15 elif days_since <= 3: adjustments['recent_activity'] = 10 elif days_since <= 7: adjustments['recent_activity'] = 5 except: pass # Property diversity property_types = len(summary['property_types']) adjustments['property_diversity'] = min(property_types * 3, 15) # Deep engagement deep_views = sum(1 for p in properties if p['viewCount'] > 1) adjustments['deep_engagement'] = min(deep_views * 5, 20) # Calculate final probability total_adjustment = sum(adjustments.values()) final_probability = min(base_probability + total_adjustment, 100) return { 'base_probability': base_probability, 'adjustments': adjustments, 'total_adjustment': total_adjustment, 'final_probability': final_probability, 'confidence_level': 'High' if len(properties) >= 5 else 'Medium' if len(properties) >= 3 else 'Low' } def get_cached_data(self, key: str) -> Optional[Any]: """Get data from cache if not expired""" if key in self.cache and key in self.cache_timestamps: if time.time() - self.cache_timestamps[key] < self.config['cache_duration']: return self.cache[key] else: # Remove expired cache del self.cache[key] del self.cache_timestamps[key] return None def cache_data(self, key: str, data: Any): """Cache data with timestamp""" self.cache[key] = data self.cache_timestamps[key] = time.time() def start_cache_cleanup(self): """Start background thread to clean up expired cache""" def cleanup_cache(): while True: try: current_time = time.time() expired_keys = [ key for key, timestamp in self.cache_timestamps.items() if current_time - timestamp > self.config['cache_duration'] ] for key in expired_keys: self.cache.pop(key, None) self.cache_timestamps.pop(key, None) time.sleep(60) # Clean up every minute except Exception as e: logger.error(f"Cache cleanup error: {e}") time.sleep(60) cleanup_thread = threading.Thread(target=cleanup_cache, daemon=True) cleanup_thread.start() def run(self, host: str = '0.0.0.0', port: int = 5000, debug: bool = False): """Run the Flask application""" logger.info(f"๐Ÿš€ Starting Enhanced Lead Qualification API Service with AI on {host}:{port}") logger.info(f"๐ŸŒ Backend URL: {self.config['backend_url']}") logger.info(f"๐Ÿค– AI Features: Enabled") logger.info(f"๐Ÿ“Š API Endpoints:") logger.info(f" - GET /api/health") logger.info(f" - GET /api/lead-analysis/") logger.info(f" - GET /api/ai-analysis/") logger.info(f" - POST /api/ai-recommendations/") logger.info(f" - POST /api/multi-ai-recommendations/") logger.info(f" - GET /api/chromadb-recommendations/") logger.info(f" - GET /api/ai-status/") logger.info(f" - POST /api/batch-ai-analysis") logger.info(f" - GET /api/fetch-all-properties") logger.info(f" - POST /api/clear-cache") logger.info(f" - POST /api/test-email") logger.info(f" - POST /api/automated-email") logger.info(f" - GET /api/email-status") logger.info(f" ๐Ÿ†• NEW EMAIL TESTING ENDPOINTS:") logger.info(f" - GET /api/test-sendgrid-connection") logger.info(f" - GET /api/email-analysis-basis/") logger.info(f" - GET /api/email-content-preview/") logger.info(f" - POST /api/test-all-10-emails/") logger.info(f" - GET /api/chromadb-status") self.app.run(host=host, port=port, debug=debug) if __name__ == '__main__': api_service = EnhancedLeadQualificationAPI() api_service.run(debug=True)