lead-qualification / api_service_enhanced.py
sksameermujahid's picture
Upload 15 files
adb96cf verified
import os
import json
import requests
import time
from datetime import datetime, timedelta
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS
import logging
from typing import Dict, List, Optional, Any
import threading
import math
# Import centralized configuration
from config import get_backend_url, get_ngrok_headers, get_api_endpoint, get_app_config
# Import the AI recommendation engine
try:
from ai_recommendation_engine import AIRecommendationEngine
ai_engine = AIRecommendationEngine()
AI_ENGINE_AVAILABLE = True
except Exception as e:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.warning(f"AI engine not available: {e}")
ai_engine = None
AI_ENGINE_AVAILABLE = False
# Import mock data service
try:
from mock_data_service import mock_data_service
MOCK_DATA_AVAILABLE = True
except ImportError:
MOCK_DATA_AVAILABLE = False
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, float) and math.isinf(obj):
return 999999999
if isinstance(obj, float) and math.isnan(obj):
return 0
return super().default(obj)
def formatPrice(price):
"""Format price in Indian currency"""
return f"β‚Ή{price:,.0f}"
def formatDuration(seconds):
"""Format duration in hours and minutes"""
hours = seconds // 3600
minutes = (seconds % 3600) // 60
return f"{hours}h {minutes}m"
class EnhancedLeadQualificationAPI:
def __init__(self):
self.app = Flask(__name__)
self.app.json_encoder = CustomJSONEncoder
CORS(self.app)
# Configuration - Centralized from config.py
self.config = {
'backend_url': get_backend_url(),
'api_timeout': get_app_config()['api_timeout'],
'cache_duration': int(os.getenv('CACHE_DURATION', get_app_config()['cache_duration'])),
'max_retries': int(os.getenv('MAX_RETRIES', get_app_config()['max_retries'])),
'retry_delay': int(os.getenv('RETRY_DELAY', get_app_config()['retry_delay'])),
'ngrok_headers': get_ngrok_headers()
}
# Cache for API responses
self.cache = {}
self.cache_timestamps = {}
# AI processing status tracking
self.ai_processing_status = {}
# Setup routes
self.setup_routes()
# Add no-cache headers to all responses
@self.app.after_request
def add_no_cache_headers(response):
"""Add headers to prevent any caching"""
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
# Start cache cleanup thread
self.start_cache_cleanup()
def get_ai_engine(self):
"""Get the AI recommendation engine instance"""
try:
if AI_ENGINE_AVAILABLE and ai_engine:
return ai_engine
else:
logger.warning("⚠️ AI engine not available")
return None
except Exception as e:
logger.error(f"❌ Error getting AI engine: {e}")
return None
def setup_routes(self):
"""Setup all API routes including AI-enhanced endpoints and frontend"""
# Frontend routes
@self.app.route('/')
def index():
"""Perfect AI Customer Analysis & Email Automation Dashboard"""
return render_template('perfect_ai_dashboard.html', customer_id=None)
@self.app.route('/old')
def old_dashboard():
"""Old dashboard for reference"""
return render_template('ai_lead_analysis.html', customer_id=None)
@self.app.route('/customer/<int:customer_id>')
def customer_analysis(customer_id):
"""Customer analysis page with pre-filled customer ID"""
return render_template('ai_lead_analysis.html', customer_id=customer_id)
@self.app.route('/email-automation')
def email_automation_dashboard():
"""Email automation dashboard"""
return render_template('email_automation_dashboard.html')
@self.app.route('/health')
def health():
"""Health check endpoint"""
return jsonify({
'success': True,
'service': 'AI Lead Analysis System',
'status': 'running',
'timestamp': datetime.now().isoformat()
})
@self.app.route('/api/frontend/status')
def frontend_status():
"""Get frontend service status"""
return jsonify({
'success': True,
'service': 'AI Lead Analysis System',
'status': 'running',
'templates_available': [
'ai_lead_analysis.html'
],
'features': [
'AI Lead Analysis Dashboard',
'Email Automation Testing',
'Behavioral Analysis',
'Real-time Analytics'
],
'timestamp': datetime.now().isoformat()
})
# API routes
@self.app.route('/api/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'backend_url': self.config['backend_url'],
'cache_size': len(self.cache),
'ai_status': 'enabled',
'ai_models': 'loaded'
})
@self.app.route('/api/lead-analysis/<int:customer_id>', methods=['GET'])
def get_lead_analysis(customer_id):
"""Get complete lead analysis for a customer (original functionality)"""
logger.info(f"πŸ” Lead analysis request for customer {customer_id}")
try:
cache_key = f'lead_analysis_{customer_id}'
cached_data = self.get_cached_data(cache_key)
if cached_data:
logger.info(f"πŸ“‹ Returning cached data for customer {customer_id}")
return jsonify(cached_data)
# Try to fetch data from backend with improved fallback mechanism
logger.info(f"🌐 Fetching data for customer {customer_id} with fallback support")
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}')
logger.info(f"πŸ“₯ Data received for customer {customer_id}: {len(backend_response) if isinstance(backend_response, list) else 'No data'}")
if not backend_response:
return jsonify({
'success': False,
'customer_id': customer_id,
'error': 'No data found for this customer',
'data': None
}), 404
# Process lead data
logger.info(f"βš™οΈ Processing lead data for customer {customer_id}")
processed_data = self.process_lead_data(backend_response, customer_id)
# Generate analytics
logger.info(f"🧠 Generating analytics for customer {customer_id}")
analytics = self.generate_analytics(backend_response, customer_id)
# Create combined response
combined_response = {
'success': True,
'customer_id': customer_id,
'timestamp': datetime.now().isoformat(),
'data': {
'lead_qualification': analytics.get('lead_qualification', {}),
'analytics': {
'engagement_level': analytics.get('engagement_level'),
'preferred_property_types': analytics.get('preferred_property_types', []),
'price_preferences': analytics.get('price_preferences', {}),
'viewing_patterns': analytics.get('viewing_patterns', {}),
'property_analysis': analytics.get('property_analysis', {}),
'recommendations': analytics.get('recommendations', []),
'risk_assessment': analytics.get('risk_assessment', {}),
'opportunity_score': analytics.get('opportunity_score'),
'lead_timeline': analytics.get('lead_timeline', []),
'conversion_probability': analytics.get('conversion_probability', {})
},
'properties': processed_data.get('properties', []),
'summary': processed_data.get('summary', {})
},
'api_info': {
'backend_url': self.config['backend_url'],
'endpoint_called': f'/api/PropertyLeadQualification/customer/{customer_id}',
'response_time': datetime.now().isoformat(),
'data_points': len(backend_response) if isinstance(backend_response, list) else 0
}
}
# Clean the response for JSON serialization
cleaned_response = self.clean_for_json(combined_response)
logger.info(f"βœ… Combined response created successfully for customer {customer_id}")
# Cache the result
self.cache_data(cache_key, cleaned_response)
logger.info(f"πŸ’Ύ Combined data cached for customer {customer_id}")
return jsonify(cleaned_response)
except Exception as e:
logger.error(f"❌ Error in lead analysis for customer {customer_id}: {e}")
return jsonify({
'success': False,
'customer_id': customer_id,
'error': f'Failed to generate lead analysis: {str(e)}',
'data': None
}), 500
@self.app.route('/api/ai-analysis/<int:customer_id>', methods=['GET'])
def get_ai_analysis(customer_id):
"""Get AI-enhanced lead analysis for a customer"""
logger.info(f"πŸ€– AI analysis request for customer {customer_id}")
try:
# First get the regular analysis
cache_key = f'lead_analysis_{customer_id}'
analysis_data = self.get_cached_data(cache_key)
if not analysis_data:
# Generate regular analysis first with improved fallback
logger.info(f"🌐 Fetching data for AI analysis - customer {customer_id}")
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}')
if not backend_response:
return jsonify({
'success': False,
'customer_id': customer_id,
'error': 'No data found for this customer'
}), 404
processed_data = self.process_lead_data(backend_response, customer_id)
analytics = self.generate_analytics(backend_response, customer_id)
analysis_data = {
'success': True,
'customer_id': customer_id,
'data': {
'lead_qualification': analytics.get('lead_qualification', {}),
'analytics': analytics,
'properties': processed_data.get('properties', []),
'summary': processed_data.get('summary', {})
}
}
# Now enhance with AI analysis
if AI_ENGINE_AVAILABLE and ai_engine:
logger.info(f"🧠 Generating AI insights for customer {customer_id}")
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data)
# Find AI-recommended properties
logger.info(f"πŸ” Finding AI recommendations for customer {customer_id}")
ai_recommendations = ai_engine.find_similar_properties_ai(analysis_data, ai_insights)
else:
logger.warning("⚠️ AI engine not available, using mock AI insights")
ai_insights = {
'personality_type': 'Analytical',
'decision_making_style': 'Data-driven',
'preferred_communication': 'Email',
'urgency_level': 'Medium',
'budget_confidence': 'High',
'location_preference': 'Premium areas',
'property_type_preference': 'Luxury properties'
}
ai_recommendations = []
# Combine everything
enhanced_response = analysis_data.copy()
enhanced_response['data']['ai_insights'] = ai_insights
enhanced_response['data']['ai_recommendations'] = ai_recommendations
enhanced_response['ai_enhanced'] = True
enhanced_response['ai_timestamp'] = datetime.now().isoformat()
# Cache AI-enhanced result
ai_cache_key = f'ai_analysis_{customer_id}'
self.cache_data(ai_cache_key, enhanced_response)
logger.info(f"βœ… AI analysis completed for customer {customer_id}")
return jsonify(enhanced_response)
except Exception as e:
logger.error(f"❌ Error in AI analysis for customer {customer_id}: {e}")
return jsonify({
'success': False,
'customer_id': customer_id,
'error': f'Failed to generate AI analysis: {str(e)}',
'ai_error': True
}), 500
@self.app.route('/api/ai-recommendations/<int:customer_id>', methods=['POST'])
def send_ai_recommendations(customer_id):
"""Send AI-powered recommendations via email"""
logger.info(f"πŸ“§ AI recommendation email request for customer {customer_id}")
try:
# Get email from request
data = request.get_json()
recipient_email = data.get('email')
if not recipient_email:
return jsonify({
'success': False,
'error': 'Email address is required'
}), 400
# Mark as processing
self.ai_processing_status[customer_id] = {
'status': 'processing',
'start_time': datetime.now().isoformat(),
'email': recipient_email
}
# Get or generate analysis data
cache_key = f'lead_analysis_{customer_id}'
analysis_data = self.get_cached_data(cache_key)
if not analysis_data:
# Generate analysis if not cached with improved fallback
logger.info(f"🌐 Fetching data for recommendations - customer {customer_id}")
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}')
if not backend_response:
self.ai_processing_status[customer_id]['status'] = 'failed'
self.ai_processing_status[customer_id]['error'] = 'No data found'
return jsonify({
'success': False,
'customer_id': customer_id,
'error': 'No data found for this customer'
}), 404
processed_data = self.process_lead_data(backend_response, customer_id)
analytics = self.generate_analytics(backend_response, customer_id)
analysis_data = {
'success': True,
'customer_id': customer_id,
'data': {
'lead_qualification': analytics.get('lead_qualification', {}),
'analytics': analytics,
'properties': processed_data.get('properties', []),
'summary': processed_data.get('summary', {})
}
}
# Process with AI in background
def process_ai_recommendations():
try:
if AI_ENGINE_AVAILABLE and ai_engine:
result = ai_engine.process_customer_with_ai(customer_id, recipient_email, analysis_data)
else:
# Mock AI processing result
result = {
'success': True,
'customer_id': customer_id,
'email_sent': True,
'recipient': recipient_email,
'subject': f"πŸ€– AI-Powered Property Recommendations for Customer {customer_id}",
'ai_insights': {
'personality_type': 'Analytical',
'decision_making_style': 'Data-driven',
'preferred_communication': 'Email',
'urgency_level': 'Medium',
'budget_confidence': 'High'
},
'recommendations_count': 5,
'mock_data': True,
'message': 'Mock AI processing completed (AI engine not available)'
}
self.ai_processing_status[customer_id] = {
'status': 'completed',
'result': result,
'completion_time': datetime.now().isoformat()
}
logger.info(f"βœ… AI processing completed for customer {customer_id}")
except Exception as e:
self.ai_processing_status[customer_id] = {
'status': 'failed',
'error': str(e),
'completion_time': datetime.now().isoformat()
}
logger.error(f"❌ AI processing failed for customer {customer_id}: {e}")
# Start background processing
thread = threading.Thread(target=process_ai_recommendations)
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'customer_id': customer_id,
'message': 'AI recommendation processing started',
'status': 'processing',
'email': recipient_email,
'estimated_completion': '2-3 minutes'
})
except Exception as e:
logger.error(f"❌ Error starting AI recommendations for customer {customer_id}: {e}")
return jsonify({
'success': False,
'customer_id': customer_id,
'error': f'Failed to start AI processing: {str(e)}'
}), 500
@self.app.route('/api/ai-status/<int:customer_id>', methods=['GET'])
def get_ai_status(customer_id):
"""Get AI processing status for a customer"""
status = self.ai_processing_status.get(customer_id, {
'status': 'not_started',
'message': 'No AI processing initiated for this customer'
})
response_data = {
'customer_id': customer_id,
'ai_processing': status,
'timestamp': datetime.now().isoformat()
}
# Clean for JSON serialization
cleaned_response = self.clean_for_json(response_data)
return jsonify(cleaned_response)
@self.app.route('/api/batch-ai-analysis', methods=['POST'])
def batch_ai_analysis():
"""Get AI analysis for multiple customers"""
try:
data = request.get_json()
customer_ids = data.get('customer_ids', [])
if not customer_ids:
return jsonify({
'success': False,
'error': 'No customer IDs provided'
}), 400
results = {}
for customer_id in customer_ids:
try:
# Get regular analysis with improved fallback
logger.info(f"🌐 Fetching data for batch analysis - customer {customer_id}")
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}')
if not backend_response:
results[customer_id] = {
'success': False,
'error': 'No data found for this customer'
}
continue
processed_data = self.process_lead_data(backend_response, customer_id)
analytics = self.generate_analytics(backend_response, customer_id)
analysis_data = {
'success': True,
'customer_id': customer_id,
'data': {
'lead_qualification': analytics.get('lead_qualification', {}),
'analytics': analytics,
'properties': processed_data.get('properties', []),
'summary': processed_data.get('summary', {})
}
}
# Add AI insights
if AI_ENGINE_AVAILABLE and ai_engine:
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data)
ai_recommendations = ai_engine.find_similar_properties_ai(analysis_data, ai_insights)
else:
ai_insights = {
'personality_type': 'Analytical',
'decision_making_style': 'Data-driven',
'preferred_communication': 'Email',
'urgency_level': 'Medium',
'budget_confidence': 'High',
'location_preference': 'Premium areas',
'property_type_preference': 'Luxury properties'
}
ai_recommendations = []
analysis_data['data']['ai_insights'] = ai_insights
analysis_data['data']['ai_recommendations'] = ai_recommendations
analysis_data['ai_enhanced'] = True
cleaned_response = self.clean_for_json(analysis_data)
results[customer_id] = cleaned_response
except Exception as e:
results[customer_id] = {
'success': False,
'error': str(e)
}
return jsonify({
'success': True,
'results': results,
'total_processed': len(customer_ids),
'ai_enhanced': True
})
except Exception as e:
logger.error(f"❌ Error in batch AI analysis: {e}")
return jsonify({
'success': False,
'error': f'Batch AI analysis failed: {str(e)}'
}), 500
@self.app.route('/api/clear-cache', methods=['POST'])
def clear_cache():
"""Clear all cached data including AI processing status"""
try:
self.cache.clear()
self.cache_timestamps.clear()
self.ai_processing_status.clear()
logger.info("πŸ—‘οΈ Cache and AI status cleared")
return jsonify({
'success': True,
'message': 'Cache and AI processing status cleared successfully'
})
except Exception as e:
logger.error(f"❌ Error clearing cache: {e}")
return jsonify({
'success': False,
'error': f'Failed to clear cache: {str(e)}'
}), 500
@self.app.route('/api/test-email', methods=['POST'])
def test_email():
"""Test SendGrid email functionality"""
logger.info("πŸ§ͺ Testing SendGrid email functionality")
try:
# Debug request information
logger.info(f"Request Content-Type: {request.content_type}")
logger.info(f"Request Headers: {dict(request.headers)}")
logger.info(f"Request Method: {request.method}")
logger.info(f"Request Data: {request.get_data()}")
# Handle both JSON and form data with better error handling
data = {}
if request.is_json:
try:
data = request.get_json() or {}
logger.info(f"Parsed JSON data: {data}")
except Exception as json_error:
logger.error(f"JSON parsing error: {json_error}")
return jsonify({
'success': False,
'error': f'Invalid JSON: {str(json_error)}'
}), 400
else:
try:
data = request.form.to_dict() or {}
logger.info(f"Parsed form data: {data}")
except Exception as form_error:
logger.error(f"Form parsing error: {form_error}")
return jsonify({
'success': False,
'error': f'Invalid form data: {str(form_error)}'
}), 400
recipient_email = data.get('email', '[email protected]')
logger.info(f"πŸ“§ Testing email to: {recipient_email}")
# Validate email format
if not recipient_email or '@' not in recipient_email:
return jsonify({
'success': False,
'error': 'Invalid email address'
}), 400
# Get AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Test SendGrid email
test_result = ai_engine.test_sendgrid_email(recipient_email)
return jsonify(test_result)
except Exception as e:
logger.error(f"Error testing email: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/test-email-simple', methods=['GET'])
def test_email_simple():
"""Simple test SendGrid email functionality (GET method)"""
logger.info("πŸ§ͺ Testing SendGrid email functionality (simple GET)")
try:
# Use default email for testing
recipient_email = '[email protected]'
logger.info(f"πŸ“§ Testing email to: {recipient_email}")
# Get AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Test SendGrid email
test_result = ai_engine.test_sendgrid_email(recipient_email)
return jsonify(test_result)
except Exception as e:
logger.error(f"Error testing email (simple): {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/automated-email', methods=['POST'])
def send_automated_email():
"""Send automated email"""
try:
data = request.json
customer_id = data.get('customer_id')
email_type = data.get('email_type', 'behavioral_trigger')
recipient_email = data.get('recipient_email', '[email protected]')
if not customer_id:
return jsonify({
'success': False,
'error': 'Customer ID is required'
}), 400
# Generate mock email response (email service integrated)
if MOCK_DATA_AVAILABLE:
customer_data = mock_data_service.get_customer_data(customer_id)
customer_profile = mock_data_service.get_customer_profile(customer_id)
return jsonify({
'success': True,
'customer_id': customer_id,
'recipient_email': recipient_email,
'subject': f"πŸ€– AI-Powered Recommendations for {customer_profile.get('customerName', f'Customer {customer_id}')}",
'email_type': email_type,
'timestamp': datetime.now().isoformat(),
'mock_data': True,
'message': 'Mock email generated successfully (integrated email service)'
})
else:
return jsonify({
'success': False,
'error': 'Email service not available and no mock data'
}), 500
except Exception as e:
logger.error(f"Error sending automated email: {e}")
return jsonify({
'success': False,
'error': f'Email service not available: {str(e)}'
}), 500
@self.app.route('/api/email-status', methods=['GET'])
def get_email_status():
"""Get email automation status"""
return jsonify({
'success': True,
'service_status': 'integrated',
'email_config': {
'sender': '[email protected]',
'recipient': '[email protected]',
'smtp_server': 'Integrated Service'
},
'ai_engine_loaded': True,
'timestamp': datetime.now().isoformat(),
'message': 'Email service integrated into main API service'
})
@self.app.route('/api/email-preview/<int:customer_id>', methods=['GET'])
def email_preview(customer_id):
"""Generate email preview for a customer"""
logger.info(f"πŸ“§ Generating email preview for customer {customer_id}")
try:
# Get customer data
customer_data = self.make_api_request(f'/api/lead-analysis/{customer_id}')
if not customer_data or 'data' not in customer_data:
return jsonify({
'success': False,
'error': 'Customer data not found'
}), 404
# Process with AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Generate email content
recipient_email = request.args.get('email', '[email protected]')
email_result = ai_engine.process_customer_with_ai(
customer_id, recipient_email, customer_data
)
if not email_result.get('success'):
return jsonify({
'success': False,
'error': 'Failed to generate email content'
}), 500
return jsonify({
'success': True,
'customer_id': customer_id,
'email_preview': email_result,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Error generating email preview: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/email-preview/<int:customer_id>')
def email_preview_page(customer_id):
"""Email preview page"""
try:
# Get email content
email_response = self.get_email_preview(customer_id)
email_data = email_response.json
if not email_data.get('success'):
return f"Error: {email_data.get('error', 'Unknown error')}"
email_content = email_data['email_content']
# Create HTML page with email content
html_content = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Email Preview - Customer {customer_id}</title>
<style>
body {{
font-family: Arial, sans-serif;
margin: 20px;
background-color: #f5f5f5;
}}
.container {{
max-width: 800px;
margin: 0 auto;
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}}
.header {{
background: #2c3e50;
color: white;
padding: 20px;
border-radius: 8px;
margin-bottom: 20px;
}}
.email-info {{
background: #ecf0f1;
padding: 15px;
border-radius: 5px;
margin-bottom: 20px;
}}
.email-content {{
border: 1px solid #ddd;
border-radius: 5px;
overflow: hidden;
}}
.email-header {{
background: #34495e;
color: white;
padding: 10px 15px;
font-weight: bold;
}}
.email-body {{
padding: 0;
}}
.back-button {{
background: #3498db;
color: white;
padding: 10px 20px;
text-decoration: none;
border-radius: 5px;
display: inline-block;
margin-bottom: 20px;
}}
.back-button:hover {{
background: #2980b9;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>πŸ“§ Email Preview</h1>
<p>Customer ID: {customer_id}</p>
</div>
<a href="/customer/{customer_id}" class="back-button">← Back to Customer Analysis</a>
<div class="email-info">
<h3>Email Details:</h3>
<p><strong>To:</strong> {email_content['recipient_email']}</p>
<p><strong>Subject:</strong> {email_content['subject']}</p>
<p><strong>Generated:</strong> {email_content['timestamp']}</p>
<p><strong>Recommendations:</strong> {email_data['recommendations_count']} properties</p>
</div>
<div class="email-content">
<div class="email-header">
Email Content Preview
</div>
<div class="email-body">
{email_content['html_content']}
</div>
</div>
</div>
</body>
</html>
"""
return html_content
except Exception as e:
logger.error(f"Error creating email preview page: {e}")
return f"Error: {str(e)}"
@self.app.route('/api/download-email/<filename>')
def download_email(filename):
"""Download saved email file"""
try:
import os
emails_dir = "/tmp/emails"
file_path = os.path.join(emails_dir, filename)
if not os.path.exists(file_path):
return jsonify({
'success': False,
'error': 'Email file not found'
}), 404
from flask import send_file
return send_file(file_path, as_attachment=True)
except Exception as e:
logger.error(f"Error downloading email file: {e}")
return jsonify({
'success': False,
'error': f'Failed to download email: {str(e)}'
}), 500
@self.app.route('/api/list-emails')
def list_emails():
"""List all saved email files"""
try:
import os
emails_dir = "/tmp/emails"
if not os.path.exists(emails_dir):
return jsonify({
'success': True,
'emails': []
})
emails = []
for filename in os.listdir(emails_dir):
if filename.endswith('.html'):
file_path = os.path.join(emails_dir, filename)
file_stat = os.stat(file_path)
emails.append({
'filename': filename,
'size': file_stat.st_size,
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
'download_url': f'/api/download-email/{filename}'
})
# Sort by creation time (newest first)
emails.sort(key=lambda x: x['created'], reverse=True)
return jsonify({
'success': True,
'emails': emails
})
except Exception as e:
logger.error(f"Error listing email files: {e}")
return jsonify({
'success': False,
'error': f'Failed to list emails: {str(e)}'
}), 500
@self.app.route('/saved-emails')
def saved_emails_page():
"""Page to view and download saved emails"""
try:
import os
emails_dir = "/tmp/emails"
if not os.path.exists(emails_dir):
emails = []
else:
emails = []
for filename in os.listdir(emails_dir):
if filename.endswith('.html'):
file_path = os.path.join(emails_dir, filename)
file_stat = os.stat(file_path)
emails.append({
'filename': filename,
'size': file_stat.st_size,
'created': datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'),
'download_url': f'/api/download-email/{filename}'
})
# Sort by creation time (newest first)
emails.sort(key=lambda x: x['created'], reverse=True)
# Generate email cards HTML
email_cards_html = ""
if emails:
for email in emails:
email_cards_html += f"""
<div class="email-card">
<div class="row align-items-center">
<div class="col-md-8">
<h5><i class="fas fa-file-alt"></i> {email.get('filename', 'Unknown')}</h5>
<p class="text-muted mb-1">
<i class="fas fa-clock"></i> Created: {email.get('created', 'Unknown')}
</p>
<p class="text-muted mb-0">
<i class="fas fa-file"></i> Size: {email.get('size', 0):,} bytes
</p>
</div>
<div class="col-md-4 text-end">
<a href="{email.get('download_url', '#')}" class="download-btn">
<i class="fas fa-download"></i> Download HTML
</a>
</div>
</div>
</div>
"""
# Generate content section
if emails:
content_section = f"""
<div class="row">
<div class="col-12">
<h3>πŸ“§ Generated Emails ({len(emails)} total)</h3>
{email_cards_html}
</div>
</div>
"""
else:
content_section = """
<div class="row">
<div class="col-12">
<div class="text-center py-5">
<i class="fas fa-inbox fa-3x text-muted mb-3"></i>
<h3>No emails generated yet</h3>
<p class="text-muted">Generate some AI recommendations to see saved emails here.</p>
</div>
</div>
</div>
"""
html_content = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Saved Emails</title>
<link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css" rel="stylesheet">
<style>
body {{
background-color: #f8f9fa;
padding-top: 2rem;
}}
.header {{
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 2rem 0;
margin-bottom: 2rem;
}}
.email-card {{
background: white;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
margin-bottom: 1rem;
padding: 1.5rem;
}}
.download-btn {{
background: #28a745;
color: white;
border: none;
padding: 0.5rem 1rem;
border-radius: 5px;
text-decoration: none;
display: inline-block;
}}
.download-btn:hover {{
background: #218838;
color: white;
text-decoration: none;
}}
</style>
</head>
<body>
<div class="header">
<div class="container">
<h1><i class="fas fa-envelope"></i> Saved Emails</h1>
<p class="mb-0">Download and view generated email content</p>
</div>
</div>
<div class="container">
<div class="row mb-4">
<div class="col-md-6">
<a href="/" class="btn btn-outline-primary">
<i class="fas fa-arrow-left"></i> Back to Home
</a>
</div>
<div class="col-md-6 text-end">
<button class="btn btn-outline-secondary" onclick="location.reload()">
<i class="fas fa-sync"></i> Refresh
</button>
</div>
</div>
{content_section}
</div>
</body>
</html>
"""
return html_content
except Exception as e:
logger.error(f"Error creating saved emails page: {e}")
return f"Error: {str(e)}"
@self.app.route('/api/properties/parallel', methods=['GET'])
def get_properties_parallel():
"""Get properties using parallel processing for better performance"""
logger.info("πŸš€ Fetching properties using parallel processing...")
try:
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
# Configuration
max_workers = int(request.args.get('workers', 8))
page_size = int(request.args.get('page_size', 100))
max_pages = int(request.args.get('max_pages', 50))
logger.info(f"πŸ”§ Using {max_workers} workers, {page_size} properties per page, max {max_pages} pages")
def fetch_properties_page(page_num):
"""Fetch a single page of properties"""
try:
url = f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails"
params = {
'pageNumber': page_num,
'pageSize': page_size
}
response = requests.get(url, params=params, timeout=None, verify=False)
response.raise_for_status()
data = response.json()
# Handle different response structures
if isinstance(data, list):
properties = data
elif isinstance(data, dict):
properties = data.get('data', data.get('properties', data.get('results', [])))
else:
properties = []
logger.info(f"βœ… Page {page_num}: {len(properties)} properties")
return properties
except Exception as e:
logger.error(f"❌ Error fetching page {page_num}: {e}")
return []
# Use ThreadPoolExecutor for parallel fetching
all_properties = []
completed_pages = 0
empty_pages = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit tasks for all pages
future_to_page = {
executor.submit(fetch_properties_page, page_num): page_num
for page_num in range(1, max_pages + 1)
}
for future in as_completed(future_to_page):
page_num = future_to_page[future]
try:
properties = future.result() # No timeout - let it take as long as needed
if properties:
all_properties.extend(properties)
completed_pages += 1
logger.info(f"βœ… Parallel fetch - Page {page_num}: {len(properties)} properties (Total: {len(all_properties)})")
else:
empty_pages += 1
logger.info(f"πŸ“„ Page {page_num} is empty (Empty pages: {empty_pages})")
# Stop if we've hit too many empty pages in a row
if empty_pages >= 3:
logger.info("πŸ›‘ Stopping fetch - too many consecutive empty pages")
break
except Exception as e:
logger.error(f"❌ Failed to fetch page {page_num}: {e}")
# Continue with other pages
logger.info(f"πŸŽ‰ Parallel fetch completed: {len(all_properties)} total properties from {completed_pages} pages!")
return jsonify({
'success': True,
'properties': all_properties,
'total': len(all_properties),
'pages_completed': completed_pages,
'empty_pages': empty_pages,
'workers_used': max_workers,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Error in parallel property fetch: {e}")
return jsonify({
'success': False,
'error': str(e),
'properties': []
}), 500
@self.app.route('/api/automated-email-analysis/<int:customer_id>', methods=['GET'])
def analyze_automated_emails(customer_id):
"""Analyze what automated emails would be sent for a customer"""
logger.info(f"πŸ€– Analyzing automated email triggers for customer {customer_id}")
try:
# Get customer analysis using our AI model (not from backend)
logger.info(f"πŸ” Generating lead analysis for customer {customer_id} using AI model")
# Try to fetch data from backend first for property data
backend_response = None
try:
logger.info(f"🌐 Making backend API request to: {self.config['backend_url']}/api/PropertyLeadQualification/customer/{customer_id}")
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}')
logger.info(f"πŸ“₯ Backend response received for customer {customer_id}: {len(backend_response) if isinstance(backend_response, list) else 'No data'}")
except Exception as e:
logger.warning(f"⚠️ Backend API failed: {e}")
# If backend fails, use mock data
if not backend_response and MOCK_DATA_AVAILABLE:
logger.info(f"πŸ”„ Using mock data for customer {customer_id}")
backend_response = mock_data_service.get_customer_data(customer_id)
logger.info(f"πŸ“₯ Mock data generated for customer {customer_id}: {len(backend_response)} properties")
if not backend_response:
return jsonify({
'success': False,
'error': 'No property data found for this customer'
}), 404
# Process lead data using our AI model
logger.info(f"βš™οΈ Processing lead data for customer {customer_id}")
processed_data = self.process_lead_data(backend_response, customer_id)
# Generate analytics using our AI model
logger.info(f"🧠 Generating analytics for customer {customer_id}")
analytics = self.generate_analytics(backend_response, customer_id)
# Create customer analysis data structure
customer_data = {
'success': True,
'customer_id': customer_id,
'timestamp': datetime.now().isoformat(),
'data': {
'lead_qualification': analytics.get('lead_qualification', {}),
'analytics': {
'engagement_level': analytics.get('engagement_level'),
'preferred_property_types': analytics.get('preferred_property_types', []),
'price_preferences': analytics.get('price_preferences', {}),
'viewing_patterns': analytics.get('viewing_patterns', {}),
'property_analysis': analytics.get('property_analysis', {}),
'recommendations': analytics.get('recommendations', []),
'risk_assessment': analytics.get('risk_assessment', {}),
'opportunity_score': analytics.get('opportunity_score'),
'lead_timeline': analytics.get('lead_timeline', []),
'conversion_probability': analytics.get('conversion_probability', {})
},
'properties': processed_data.get('properties', []),
'summary': processed_data.get('summary', {})
}
}
# Get AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Analyze automated email triggers using our AI model
trigger_analysis = ai_engine.analyze_automated_email_triggers(customer_data)
if not trigger_analysis.get('success'):
return jsonify({
'success': False,
'error': 'Failed to analyze email triggers'
}), 500
return jsonify(trigger_analysis)
except Exception as e:
logger.error(f"Error analyzing automated emails: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/test-automated-emails/<int:customer_id>', methods=['POST'])
def test_automated_emails(customer_id):
"""Test sending all automated emails for a customer"""
logger.info(f"πŸ§ͺ Testing automated emails for customer {customer_id}")
try:
data = request.get_json() or {}
recipient_email = data.get('email', '[email protected]')
# Get customer analysis using our AI model (not from backend)
logger.info(f"πŸ” Generating lead analysis for customer {customer_id} using AI model")
# Try to fetch data from backend first for property data
backend_response = None
try:
logger.info(f"🌐 Making backend API request to: {self.config['backend_url']}/api/PropertyLeadQualification/customer/{customer_id}")
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}')
logger.info(f"πŸ“₯ Backend response received for customer {customer_id}: {len(backend_response) if isinstance(backend_response, list) else 'No data'}")
except Exception as e:
logger.warning(f"⚠️ Backend API failed: {e}")
# If backend fails, use mock data
if not backend_response and MOCK_DATA_AVAILABLE:
logger.info(f"πŸ”„ Using mock data for customer {customer_id}")
backend_response = mock_data_service.get_customer_data(customer_id)
logger.info(f"πŸ“₯ Mock data generated for customer {customer_id}: {len(backend_response)} properties")
if not backend_response:
return jsonify({
'success': False,
'error': 'No property data found for this customer'
}), 404
# Process lead data using our AI model
logger.info(f"βš™οΈ Processing lead data for customer {customer_id}")
processed_data = self.process_lead_data(backend_response, customer_id)
# Generate analytics using our AI model
logger.info(f"🧠 Generating analytics for customer {customer_id}")
analytics = self.generate_analytics(backend_response, customer_id)
# Create customer analysis data structure
customer_data = {
'success': True,
'customer_id': customer_id,
'timestamp': datetime.now().isoformat(),
'data': {
'lead_qualification': analytics.get('lead_qualification', {}),
'analytics': {
'engagement_level': analytics.get('engagement_level'),
'preferred_property_types': analytics.get('preferred_property_types', []),
'price_preferences': analytics.get('price_preferences', {}),
'viewing_patterns': analytics.get('viewing_patterns', {}),
'property_analysis': analytics.get('property_analysis', {}),
'recommendations': analytics.get('recommendations', []),
'risk_assessment': analytics.get('risk_assessment', {}),
'opportunity_score': analytics.get('opportunity_score'),
'lead_timeline': analytics.get('lead_timeline', []),
'conversion_probability': analytics.get('conversion_probability', {})
},
'properties': processed_data.get('properties', []),
'summary': processed_data.get('summary', {})
}
}
# Get AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Analyze triggers using our AI model
logger.info(f"πŸ€– Analyzing automated email triggers for customer {customer_id}")
trigger_analysis = ai_engine.analyze_automated_email_triggers(customer_data)
if not trigger_analysis.get('success'):
return jsonify({
'success': False,
'error': 'Failed to analyze email triggers'
}), 500
# Send emails for each trigger
sent_emails = []
failed_emails = []
for trigger in trigger_analysis.get('triggers', []):
try:
# Generate email content using our AI model
email_content = ai_engine.generate_automated_email_content(
trigger, customer_data, trigger_analysis.get('ai_insights', {})
)
if email_content.get('success'):
# Send email
success = ai_engine.send_email(recipient_email, email_content)
if success:
sent_emails.append({
'trigger_type': trigger['trigger_type'],
'subject': email_content['subject'],
'priority': trigger['priority'],
'reason': trigger['reason']
})
else:
failed_emails.append({
'trigger_type': trigger['trigger_type'],
'error': 'Failed to send email'
})
else:
failed_emails.append({
'trigger_type': trigger['trigger_type'],
'error': email_content.get('error', 'Failed to generate content')
})
except Exception as e:
failed_emails.append({
'trigger_type': trigger['trigger_type'],
'error': str(e)
})
return jsonify({
'success': True,
'customer_id': customer_id,
'recipient_email': recipient_email,
'total_triggers': len(trigger_analysis.get('triggers', [])),
'sent_emails': sent_emails,
'failed_emails': failed_emails,
'analysis_summary': trigger_analysis.get('analysis_summary', {}),
'ai_insights': trigger_analysis.get('ai_insights', {}),
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Error testing automated emails: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/fetch-all-properties', methods=['GET'])
def fetch_all_properties():
"""Fetch all 600+ properties and store in ChromaDB"""
try:
logger.info("πŸš€ Starting fetch of ALL 600+ properties...")
# Get parameters with defaults optimized for 600+ properties
workers = request.args.get('workers', 10, type=int)
page_size = request.args.get('page_size', 100, type=int) # Increased page size
max_pages = request.args.get('max_pages', 50, type=int) # Increased max pages
logger.info(f"πŸ“Š Fetch parameters: {workers} workers, {page_size} per page, max {max_pages} pages")
# Get AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Fetch all properties using parallel processing
success = ai_engine.fetch_all_properties_parallel(
max_workers=workers,
page_size=page_size,
max_pages=max_pages
)
if success:
# Get final count from ChromaDB
total_stored = 0
if hasattr(ai_engine, 'properties_collection') and ai_engine.properties_collection:
try:
total_stored = ai_engine.properties_collection.count()
except:
total_stored = "Unknown"
return jsonify({
'success': True,
'message': f'Successfully fetched and stored properties',
'total_properties': total_stored,
'parameters_used': {
'workers': workers,
'page_size': page_size,
'max_pages': max_pages
},
'timestamp': datetime.now().isoformat()
})
else:
return jsonify({
'success': False,
'error': 'Failed to fetch properties'
}), 500
except Exception as e:
logger.error(f"❌ Error fetching all properties: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/multi-ai-recommendations/<int:customer_id>', methods=['POST'])
def send_multi_ai_recommendations(customer_id):
"""Send multiple AI-powered recommendation emails (property-based, price-based, location-based)"""
logger.info(f"πŸ€– Multi-AI recommendation request for customer {customer_id}")
try:
data = request.get_json() or {}
recipient_email = data.get('email', '[email protected]')
email_count = data.get('email_count', 10) # Send 10 emails by default
# Get customer analysis data
analysis_data = self._get_analysis_data_parallel(customer_id)
if not analysis_data:
return jsonify({
'success': False,
'error': 'No customer data found'
}), 404
# Get AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Generate AI insights
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data)
# Create different types of recommendations
recommendation_types = [
'property_based',
'price_based',
'location_based',
'similarity_based',
'behavioral_based',
'premium_properties',
'budget_friendly',
'trending_properties',
'family_oriented',
'investment_opportunities'
]
sent_emails = []
failed_emails = []
# Send multiple recommendation emails
for i in range(min(email_count, len(recommendation_types))):
rec_type = recommendation_types[i]
try:
logger.info(f"πŸ“§ Generating {rec_type} recommendations for customer {customer_id}")
# Get specialized recommendations based on type
recommendations = ai_engine.get_enhanced_ai_recommendations(
analysis_data, ai_insights, count=5
)
# Generate personalized email
email_result = ai_engine.generate_personalized_email(
analysis_data, ai_insights, recommendations, recipient_email
)
if email_result.get('success'):
# Customize subject based on recommendation type
custom_subject = self._get_custom_subject(rec_type, customer_id, ai_insights)
email_result['subject'] = custom_subject
# Send email
success = ai_engine.send_email(recipient_email, email_result)
if success:
sent_emails.append({
'type': rec_type,
'subject': custom_subject,
'recommendations_count': len(recommendations),
'timestamp': datetime.now().isoformat()
})
logger.info(f"βœ… {rec_type} email sent successfully")
else:
failed_emails.append({
'type': rec_type,
'error': 'Failed to send email'
})
else:
failed_emails.append({
'type': rec_type,
'error': email_result.get('error', 'Failed to generate email')
})
except Exception as e:
failed_emails.append({
'type': rec_type,
'error': str(e)
})
logger.error(f"❌ Error with {rec_type}: {e}")
return jsonify({
'success': True,
'customer_id': customer_id,
'recipient_email': recipient_email,
'requested_emails': email_count,
'sent_emails': sent_emails,
'failed_emails': failed_emails,
'total_sent': len(sent_emails),
'total_failed': len(failed_emails),
'ai_insights': ai_insights,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"❌ Error in multi-AI recommendations: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/chromadb-recommendations/<int:customer_id>', methods=['GET'])
def get_chromadb_recommendations(customer_id):
"""Get property recommendations using ChromaDB similarity search"""
logger.info(f"πŸ” ChromaDB recommendations for customer {customer_id}")
try:
# Get customer analysis data
analysis_data = self._get_analysis_data_parallel(customer_id)
if not analysis_data:
return jsonify({
'success': False,
'error': 'No customer data found'
}), 404
# Get AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Generate AI insights
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data)
# Get different types of recommendations from ChromaDB
recommendations = {
'similarity_based': ai_engine.get_similarity_based_recommendations(
analysis_data['data']['properties'],
ai_engine.extract_customer_preferences(analysis_data, ai_insights),
count=5
),
'preference_based': ai_engine.get_preference_based_recommendations(
ai_engine.extract_customer_preferences(analysis_data, ai_insights),
analysis_data['data']['properties'],
count=5
),
'behavioral_based': ai_engine.get_behavioral_recommendations(
analysis_data, ai_insights, count=5
)
}
# Add AI explanations
for rec_type, recs in recommendations.items():
recommendations[rec_type] = ai_engine.add_ai_explanations(
recs,
ai_engine.extract_customer_preferences(analysis_data, ai_insights),
ai_insights
)
return jsonify({
'success': True,
'customer_id': customer_id,
'recommendations': recommendations,
'ai_insights': ai_insights,
'total_recommendations': sum(len(recs) for recs in recommendations.values()),
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"❌ Error getting ChromaDB recommendations: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/test-sendgrid-connection', methods=['GET'])
def test_sendgrid_connection():
"""Test if SendGrid connection is working"""
logger.info("πŸ§ͺ Testing SendGrid connection")
try:
# Get AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Test SendGrid
test_result = ai_engine.test_sendgrid_email('[email protected]')
return jsonify({
'success': test_result.get('success', False),
'message': 'SendGrid connection test completed',
'details': test_result,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"❌ Error testing SendGrid: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/email-analysis-basis/<int:customer_id>', methods=['GET'])
def get_email_analysis_basis(customer_id):
"""Get analysis basis for email recommendations"""
logger.info(f"πŸ“Š Getting email analysis basis for customer {customer_id}")
try:
# Get customer analysis data
analysis_data = self._get_analysis_data_parallel(customer_id)
if not analysis_data:
return jsonify({
'success': False,
'error': 'No customer data found'
}), 404
# Get AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Generate AI insights
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data)
# Extract customer preferences
preferences = ai_engine.extract_customer_preferences(analysis_data, ai_insights)
# Analyze behavioral patterns
behavioral_patterns = ai_engine.analyze_behavioral_patterns(analysis_data, ai_insights)
return jsonify({
'success': True,
'customer_id': customer_id,
'analysis_basis': {
'customer_preferences': preferences,
'ai_insights': ai_insights,
'behavioral_patterns': behavioral_patterns,
'viewed_properties': analysis_data['data']['properties'][:5], # Sample properties
'lead_qualification': analysis_data['data']['lead_qualification'],
'engagement_level': analysis_data['data']['analytics']['engagement_level'],
'conversion_probability': analysis_data['data']['analytics']['conversion_probability']['final_probability']
},
'email_triggers': {
'property_based': 'Based on property types you viewed most',
'price_based': f"Based on your price range β‚Ή{preferences.get('price_range', {}).get('min_price', 0)} - β‚Ή{preferences.get('price_range', {}).get('max_price', 0)}",
'location_based': 'Based on locations you explored',
'similarity_based': 'Based on properties similar to your favorites',
'behavioral_based': f"Based on your {behavioral_patterns.get('engagement_level', 'medium')} engagement pattern",
'premium_properties': 'Based on your interest in high-value properties',
'budget_friendly': 'Based on your value-conscious browsing',
'trending_properties': 'Based on market trends and your preferences',
'family_oriented': 'Based on your interest in family-suitable properties',
'investment_opportunities': 'Based on your investment potential analysis'
},
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"❌ Error getting email analysis basis: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/email-content-preview/<int:customer_id>', methods=['GET'])
def preview_all_email_content(customer_id):
"""Preview content of all 10 email types that would be sent"""
logger.info(f"πŸ‘οΈ Previewing all email content for customer {customer_id}")
try:
# Get customer analysis data
analysis_data = self._get_analysis_data_parallel(customer_id)
if not analysis_data:
return jsonify({
'success': False,
'error': 'No customer data found'
}), 404
# Get AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Generate AI insights
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data)
# Ensure ChromaDB is properly initialized and has properties
if not hasattr(ai_engine, 'properties_collection') or ai_engine.properties_collection is None:
logger.info("πŸš€ ChromaDB not initialized, initializing...")
ai_engine.initialize_chromadb()
# Check if ChromaDB has properties
try:
if ai_engine.properties_collection:
collection_count = ai_engine.properties_collection.count()
logger.info(f"πŸ“Š ChromaDB has {collection_count} properties")
if collection_count == 0:
logger.info("πŸš€ ChromaDB is empty, auto-fetching properties...")
ai_engine.auto_fetch_and_store_properties()
else:
logger.warning("⚠️ ChromaDB collection is None, auto-fetching properties...")
ai_engine.auto_fetch_and_store_properties()
except Exception as count_error:
logger.warning(f"⚠️ Could not check ChromaDB count: {count_error}")
logger.info("πŸš€ Auto-fetching properties as fallback...")
ai_engine.auto_fetch_and_store_properties()
# Email types to generate
email_types = [
'property_based',
'price_based',
'location_based',
'similarity_based',
'behavioral_based',
'premium_properties',
'budget_friendly',
'trending_properties',
'family_oriented',
'investment_opportunities'
]
email_previews = []
for email_type in email_types:
try:
# Get recommendations using specialized AI model for this type
recommendations = ai_engine.get_multi_ai_property_recommendations(
analysis_data, ai_insights, email_type, count=12
)
logger.info(f"πŸ€– AI Model for {email_type} preview: {len(recommendations)} properties")
# Add fallback if needed
if len(recommendations) < 10:
fallback_recommendations = ai_engine.search_similar_properties_chromadb(
f"property {email_type.replace('_', ' ')} recommendation", [], 12
)
# Merge without duplicates
seen_ids = {str(r.get('id', r.get('propertyId', ''))) for r in recommendations}
for prop in fallback_recommendations:
prop_id = str(prop.get('id', prop.get('propertyId', '')))
if prop_id not in seen_ids and len(recommendations) < 12:
recommendations.append(prop)
seen_ids.add(prop_id)
# Generate email content
email_content = ai_engine.generate_personalized_email(
analysis_data, ai_insights, recommendations, '[email protected]', email_type
)
if email_content.get('success'):
email_previews.append({
'email_type': email_type,
'subject': email_content.get('subject', self._get_custom_subject(email_type, customer_id, ai_insights)),
'html_content': email_content.get('html_content', ''),
'text_content': email_content.get('text_content', ''),
'recommendations_count': len(recommendations),
'properties_included': [
{
'name': rec.get('property_name', rec.get('name', rec.get('title', 'Property'))),
'price': rec.get('price', rec.get('marketValue', rec.get('amount', 0))),
'type': rec.get('property_type', rec.get('type', rec.get('propertyTypeName', 'N/A'))),
'score': rec.get('ai_score', rec.get('similarity_score', 0))
} for rec in recommendations[:3]
],
'ai_insights': {
'personality_type': email_content.get('personalization', {}).get('personality_type', 'N/A'),
'decision_style': ai_insights.get('recommendation_strategy', 'N/A'),
'urgency_level': email_content.get('personalization', {}).get('urgency_level', 'N/A'),
'peak_activity': ai_insights.get('peak_time', 'N/A')
}
})
else:
email_previews.append({
'email_type': email_type,
'subject': self._get_custom_subject(email_type, customer_id, ai_insights),
'error': email_content.get('error', 'Failed to generate content'),
'recommendations_count': 0,
'properties_included': []
})
except Exception as e:
logger.error(f"❌ Error generating {email_type}: {e}")
email_previews.append({
'email_type': email_type,
'subject': self._get_custom_subject(email_type, customer_id, ai_insights),
'error': str(e),
'recommendations_count': 0,
'properties_included': []
})
return jsonify({
'success': True,
'customer_id': customer_id,
'email_previews': email_previews,
'total_emails': len(email_previews),
'ai_insights': ai_insights,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"❌ Error previewing email content: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/test-all-10-emails/<int:customer_id>', methods=['POST'])
def test_send_all_10_emails(customer_id):
"""Test sending all 10 different email types with unique ChromaDB properties"""
logger.info(f"πŸ§ͺ Testing all 10 email types for customer {customer_id}")
try:
data = request.get_json() or {}
recipient_email = data.get('email', '[email protected]')
# Get customer analysis data
analysis_data = self._get_analysis_data_parallel(customer_id)
if not analysis_data:
return jsonify({
'success': False,
'error': 'No customer data found'
}), 404
# Get AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Generate AI insights
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data)
# Ensure ChromaDB has properties before generating recommendations
if hasattr(ai_engine, 'properties_collection') and ai_engine.properties_collection:
try:
collection_count = ai_engine.properties_collection.count()
if collection_count == 0:
logger.info("πŸš€ ChromaDB is empty, auto-fetching properties...")
ai_engine.auto_fetch_and_store_properties()
except Exception as count_error:
logger.warning(f"⚠️ Could not check ChromaDB count: {count_error}")
# Try to fetch anyway
ai_engine.auto_fetch_and_store_properties()
else:
logger.warning("⚠️ ChromaDB not initialized, auto-initializing...")
ai_engine.initialize_chromadb()
ai_engine.auto_fetch_and_store_properties()
# Email types with their specific ChromaDB query strategies
email_strategies = {
'property_based': 'Get properties matching customer\'s preferred property types',
'price_based': 'Get properties within customer\'s budget range',
'location_based': 'Get properties in customer\'s preferred locations',
'similarity_based': 'Get properties similar to customer\'s viewed properties',
'behavioral_based': 'Get properties based on customer behavior patterns',
'premium_properties': 'Get high-end luxury properties',
'budget_friendly': 'Get value-for-money properties',
'trending_properties': 'Get currently trending market properties',
'family_oriented': 'Get family-suitable properties with amenities',
'investment_opportunities': 'Get properties with high ROI potential'
}
sent_emails = []
failed_emails = []
for email_type, strategy in email_strategies.items():
try:
logger.info(f"πŸ“§ Processing {email_type} email with strategy: {strategy}")
# Use specialized AI models for each email type - request 15 to ensure we get at least 10
recommendations = ai_engine.get_multi_ai_property_recommendations(
analysis_data, ai_insights, email_type, count=15
)
logger.info(f"πŸ€– AI Model for {email_type} retrieved {len(recommendations)} properties")
# If we don't have enough properties, try a fallback query
if len(recommendations) < 10:
logger.warning(f"⚠️ Only {len(recommendations)} properties for {email_type}, trying fallback...")
fallback_recommendations = ai_engine.search_similar_properties_chromadb(
f"property {email_type.replace('_', ' ')} recommendation", [], 15
)
# Merge without duplicates
seen_ids = {str(r.get('id', r.get('propertyId', ''))) for r in recommendations}
for prop in fallback_recommendations:
prop_id = str(prop.get('id', prop.get('propertyId', '')))
if prop_id not in seen_ids and len(recommendations) < 15:
recommendations.append(prop)
seen_ids.add(prop_id)
logger.info(f"πŸ”„ After fallback: {len(recommendations)} properties for {email_type}")
# Generate personalized email with email type
email_result = ai_engine.generate_personalized_email(
analysis_data, ai_insights, recommendations, recipient_email, email_type
)
# Add email type to the content for better file naming
if isinstance(email_result, dict) and 'success' in email_result:
email_content = email_result
else:
email_content = {'success': True}
if email_content.get('success'):
# Customize subject and content based on email type
custom_subject = self._get_custom_subject(email_type, customer_id, ai_insights)
email_content['subject'] = custom_subject
# Send email
success = ai_engine.send_email(recipient_email, email_content)
if success:
sent_emails.append({
'type': email_type,
'subject': custom_subject,
'strategy': strategy,
'recommendations_count': len(recommendations),
'properties_from_chromadb': True,
'timestamp': datetime.now().isoformat(),
'sample_properties': [
{
'name': rec.get('property_name', 'Property'),
'price': rec.get('price', 0),
'type': rec.get('property_type', 'N/A'),
'ai_score': rec.get('ai_score', 0)
} for rec in recommendations[:3]
]
})
logger.info(f"βœ… {email_type} email sent successfully")
else:
failed_emails.append({
'type': email_type,
'strategy': strategy,
'error': 'Failed to send email via SendGrid'
})
else:
failed_emails.append({
'type': email_type,
'strategy': strategy,
'error': email_content.get('error', 'Failed to generate email content')
})
except Exception as e:
failed_emails.append({
'type': email_type,
'strategy': strategy,
'error': str(e)
})
logger.error(f"❌ Error with {email_type}: {e}")
return jsonify({
'success': True,
'customer_id': customer_id,
'recipient_email': recipient_email,
'test_results': {
'total_attempted': len(email_strategies),
'total_sent': len(sent_emails),
'total_failed': len(failed_emails),
'sent_emails': sent_emails,
'failed_emails': failed_emails
},
'ai_insights': ai_insights,
'chromadb_status': 'Active - Properties retrieved from vector database',
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"❌ Error testing all 10 emails: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/chromadb-status', methods=['GET'])
def check_chromadb_status():
"""Check ChromaDB status and property count"""
logger.info("πŸ” Checking ChromaDB status")
try:
# Get AI engine
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Check ChromaDB initialization
if not hasattr(ai_engine, 'properties_collection') or not ai_engine.properties_collection:
return jsonify({
'success': True,
'chromadb_initialized': False,
'property_count': 0,
'message': 'ChromaDB not initialized'
})
# Get property count
try:
property_count = ai_engine.properties_collection.count()
collection_name = ai_engine.properties_collection.name
return jsonify({
'success': True,
'chromadb_initialized': True,
'collection_name': collection_name,
'property_count': property_count,
'status': 'ready' if property_count > 0 else 'empty',
'message': f'ChromaDB collection "{collection_name}" has {property_count} properties',
'timestamp': datetime.now().isoformat()
})
except Exception as count_error:
return jsonify({
'success': False,
'chromadb_initialized': True,
'error': f'Collection access error: {count_error}',
'message': 'ChromaDB collection may be corrupted'
})
except Exception as e:
logger.error(f"❌ Error checking ChromaDB status: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/saved-emails', methods=['GET'])
def list_saved_emails():
"""List all saved email files"""
try:
import os
import glob
from datetime import datetime
emails_dir = "./saved_emails"
if not os.path.exists(emails_dir):
return jsonify({'saved_emails': [], 'message': 'No saved emails directory found'})
# Get all HTML files in the saved emails directory
email_files = glob.glob(os.path.join(emails_dir, "*.html"))
saved_emails = []
for file_path in email_files:
file_name = os.path.basename(file_path)
file_stats = os.stat(file_path)
created_time = datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
# Extract email type from filename
email_type = file_name.split('_')[0] if '_' in file_name else 'unknown'
saved_emails.append({
'filename': file_name,
'filepath': file_path,
'email_type': email_type.replace('_', ' ').title(),
'created_time': created_time,
'size_kb': round(file_stats.st_size / 1024, 2)
})
# Sort by creation time (newest first)
saved_emails.sort(key=lambda x: x['created_time'], reverse=True)
return jsonify({
'saved_emails': saved_emails,
'total_count': len(saved_emails),
'message': f'Found {len(saved_emails)} saved email files'
})
except Exception as e:
logger.error(f"❌ Error listing saved emails: {e}")
return jsonify({'error': str(e)}), 500
@self.app.route('/view-email/<filename>')
def view_saved_email(filename):
"""Serve saved email file for viewing"""
try:
import os
from flask import send_file
emails_dir = "./saved_emails"
file_path = os.path.join(emails_dir, filename)
if not os.path.exists(file_path):
return "Email file not found", 404
return send_file(file_path, mimetype='text/html')
except Exception as e:
logger.error(f"❌ Error serving email file: {e}")
return f"Error loading email: {str(e)}", 500
@self.app.route('/api/DebtFinancing', methods=['PUT'])
def update_debt_financing():
"""Update debt financing information"""
logger.info("πŸ’° Debt financing update request")
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'No data provided'
}), 400
# Validate required fields
required_fields = ['customer_id', 'debt_amount', 'financing_type']
for field in required_fields:
if field not in data:
return jsonify({
'success': False,
'error': f'Missing required field: {field}'
}), 400
# Process debt financing update
customer_id = data.get('customer_id')
debt_amount = data.get('debt_amount')
financing_type = data.get('financing_type')
# Create debt financing record
debt_record = {
'customer_id': customer_id,
'debt_amount': debt_amount,
'financing_type': financing_type,
'interest_rate': data.get('interest_rate', 0),
'term_months': data.get('term_months', 0),
'monthly_payment': data.get('monthly_payment', 0),
'status': data.get('status', 'pending'),
'updated_at': datetime.now().isoformat(),
'created_at': datetime.now().isoformat()
}
# Try to save to backend first
backend_response = None
try:
backend_response = self.make_api_request(
'/api/DebtFinancing',
method='PUT',
data=debt_record,
use_fallback=False
)
except Exception as e:
logger.warning(f"⚠️ Backend API failed for debt financing: {e}")
# If backend fails, use fallback
if not backend_response:
logger.info("πŸ”„ Using fallback for debt financing update")
backend_response = {
'success': True,
'message': 'Debt financing updated successfully (fallback mode)',
'data': debt_record,
'fallback': True
}
return jsonify(backend_response)
except Exception as e:
logger.error(f"❌ Error updating debt financing: {e}")
return jsonify({
'success': False,
'error': f'Failed to update debt financing: {str(e)}'
}), 500
@self.app.route('/api/Email/sendPlainText', methods=['POST'])
def send_plain_text_email():
"""Send plain text email using the external email API"""
logger.info("πŸ“§ Plain text email send request")
try:
# Get parameters from query string or JSON body
if request.is_json:
data = request.get_json()
to_email = data.get('toEmail')
subject = data.get('subject')
body = data.get('body')
else:
to_email = request.args.get('toEmail')
subject = request.args.get('subject')
body = request.args.get('body')
# Validate required parameters
if not to_email:
return jsonify({
'success': False,
'error': 'toEmail parameter is required'
}), 400
if not subject:
return jsonify({
'success': False,
'error': 'subject parameter is required'
}), 400
if not body:
return jsonify({
'success': False,
'error': 'body parameter is required'
}), 400
# Validate email format
if '@' not in to_email:
return jsonify({
'success': False,
'error': 'Invalid email address format'
}), 400
# Prepare email data for external API
email_data = {
'toEmail': to_email,
'subject': subject,
'body': body
}
# Send email using external API
external_api_url = f"{self.config['backend_url']}/api/Email/sendPlainText"
try:
logger.info(f"🌐 Sending email via external API: {external_api_url}")
response = requests.post(
external_api_url,
params=email_data,
headers=self.config['ngrok_headers'],
timeout=30
)
if response.status_code == 200:
logger.info(f"βœ… Email sent successfully to {to_email}")
return jsonify({
'success': True,
'message': 'Email sent successfully',
'recipient': to_email,
'subject': subject,
'timestamp': datetime.now().isoformat()
})
else:
logger.error(f"❌ External email API failed: {response.status_code} - {response.text}")
return jsonify({
'success': False,
'error': f'Email service failed: {response.status_code}',
'details': response.text
}), 500
except requests.exceptions.Timeout:
logger.error("⏰ Email API request timeout")
return jsonify({
'success': False,
'error': 'Email service timeout'
}), 504
except requests.exceptions.ConnectionError:
logger.error("πŸ”Œ Email API connection error")
return jsonify({
'success': False,
'error': 'Email service unavailable'
}), 503
except Exception as e:
logger.error(f"❌ Email API error: {e}")
return jsonify({
'success': False,
'error': f'Email service error: {str(e)}'
}), 500
except Exception as e:
logger.error(f"❌ Error sending plain text email: {e}")
return jsonify({
'success': False,
'error': f'Failed to send email: {str(e)}'
}), 500
@self.app.route('/api/email-automation/<int:customer_id>', methods=['POST'])
def trigger_email_automation(customer_id):
"""Trigger comprehensive email automation based on user tracking analysis"""
logger.info(f"πŸ“§ Triggering email automation for customer {customer_id}")
try:
data = request.get_json() or {}
email_type = data.get('email_type', 'all') # 'all', 'new_properties', 'recommendations', 'peak_time'
recipient_email = data.get('email')
if not recipient_email:
return jsonify({
'success': False,
'error': 'Email address is required'
}), 400
# Get customer analysis data (use the same method as the main analysis)
cache_key = f'lead_analysis_{customer_id}'
analysis_data = self.get_cached_data(cache_key)
if not analysis_data:
# Generate fresh analysis data
logger.info(f"🌐 Fetching fresh data for email automation - customer {customer_id}")
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}')
if not backend_response:
return jsonify({
'success': False,
'error': 'No customer data found'
}), 404
processed_data = self.process_lead_data(backend_response, customer_id)
analytics = self.generate_analytics(backend_response, customer_id)
analysis_data = {
'success': True,
'customer_id': customer_id,
'data': {
'lead_qualification': analytics.get('lead_qualification', {}),
'analytics': analytics,
'properties': processed_data.get('properties', []),
'summary': processed_data.get('summary', {})
}
}
# Get AI engine for analysis
ai_engine = self.get_ai_engine()
if not ai_engine:
return jsonify({
'success': False,
'error': 'AI engine not available'
}), 500
# Generate AI insights from tracking data
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data)
results = []
# Send different types of emails based on request
if email_type in ['all', 'new_properties']:
new_prop_result = self._send_new_properties_email(customer_id, recipient_email, analysis_data, ai_insights)
results.append(new_prop_result)
if email_type in ['all', 'recommendations']:
rec_result = self._send_ai_recommendations_email(customer_id, recipient_email, analysis_data, ai_insights)
results.append(rec_result)
if email_type in ['all', 'peak_time']:
peak_result = self._send_peak_time_engagement_email(customer_id, recipient_email, analysis_data, ai_insights)
results.append(peak_result)
return jsonify({
'success': True,
'message': f'Email automation completed for customer {customer_id}',
'customer_id': customer_id,
'recipient': recipient_email,
'email_type': email_type,
'ai_insights': ai_insights,
'results': results,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"❌ Error in email automation: {e}")
return jsonify({
'success': False,
'error': f'Email automation failed: {str(e)}'
}), 500
def _send_new_properties_email(self, customer_id: int, recipient_email: str, analysis_data: Dict, ai_insights: Dict) -> Dict:
"""Send email about newly added properties based on user tracking"""
logger.info(f"🏑 Sending new properties email for customer {customer_id}")
try:
# Analyze user preferences from tracking data
user_preferences = self._analyze_user_preferences(analysis_data)
# Get newly added properties that match user preferences
new_properties = self._get_new_properties_for_user(user_preferences)
if not new_properties:
return {
'type': 'new_properties',
'success': False,
'message': 'No new properties matching user preferences'
}
# Create email content
subject = f"🏑 New Properties Just Added - Perfect Matches for You!"
body_lines = [
f"Dear Valued Customer,",
"",
"πŸŽ‰ Great news! We've just added new properties that match your preferences perfectly!",
"",
"πŸ“Š YOUR PREFERENCES (Based on Your Activity):",
f"β€’ Preferred Property Type: {user_preferences.get('property_type', 'Villa')}",
f"β€’ Budget Range: ${user_preferences.get('min_budget', 0):,} - ${user_preferences.get('max_budget', 0):,}",
f"β€’ Preferred Locations: {', '.join(user_preferences.get('locations', ['Dubai']))}",
f"β€’ Bedrooms: {user_preferences.get('bedrooms', '3+')}",
"",
"πŸ†• NEWLY ADDED PROPERTIES:"
]
# Add new properties
for i, prop in enumerate(new_properties[:5], 1):
body_lines.extend([
f"{i}. {prop.get('title', 'New Property')}",
f" πŸ“ Location: {prop.get('location', 'N/A')}",
f" πŸ’° Price: ${prop.get('price', 0):,}",
f" 🏠 Type: {prop.get('propertyTypeName', 'Villa')}",
f" πŸ›οΈ Bedrooms: {prop.get('bedrooms', 'N/A')}",
f" ⭐ Match Score: {prop.get('match_score', 95):.0f}%",
""
])
body_lines.extend([
"⚑ ACT FAST!",
"These properties are fresh on the market and likely to be in high demand.",
"",
"πŸ’‘ NEXT STEPS:",
"β€’ Schedule a viewing immediately",
"β€’ Contact our team for more details",
"β€’ Get pre-approval for faster processing",
"",
"Best regards,",
"Your Property Match Team"
])
email_body = "\n".join(body_lines)
# Send email
result = self._send_email_via_api(recipient_email, subject, email_body)
result['type'] = 'new_properties'
result['properties_count'] = len(new_properties)
result['ai_insights'] = {
'personality_type': 'Analytical',
'urgency_level': 'Medium',
'peak_time': 'Evening'
}
return result
except Exception as e:
logger.error(f"❌ Error sending new properties email: {e}")
return {
'type': 'new_properties',
'success': False,
'error': str(e)
}
def _send_ai_recommendations_email(self, customer_id: int, recipient_email: str, analysis_data: Dict, ai_insights: Dict) -> Dict:
"""Send AI-powered recommendations based on user tracking analysis"""
logger.info(f"πŸ€– Sending AI recommendations email for customer {customer_id}")
try:
# Get AI engine for recommendations
ai_engine = self.get_ai_engine()
if not ai_engine:
return {
'type': 'ai_recommendations',
'success': False,
'error': 'AI engine not available'
}
# Generate recommendations based on tracking data
recommendations = ai_engine.get_enhanced_ai_recommendations(analysis_data, ai_insights, count=5)
# Create personalized email content
personality_type = ai_insights.get('ai_personality_type', 'Analytical')
urgency_level = ai_insights.get('urgency_level', 'Medium')
decision_style = ai_insights.get('recommendation_strategy', 'Data-driven')
peak_time = ai_insights.get('peak_time', 'Evening')
subject = f"πŸ€– AI-Powered Recommendations - Tailored Just for You!"
body_lines = [
f"Dear Valued Customer,",
"",
"🧠 Our AI has analyzed your browsing behavior and preferences to bring you these personalized recommendations:",
"",
"πŸ“Š YOUR BEHAVIOR ANALYSIS:",
f"β€’ Personality Type: {personality_type}",
f"β€’ Decision Making Style: {decision_style}",
f"β€’ Engagement Level: {ai_insights.get('engagement_level', 'High')}",
f"β€’ Urgency Level: {urgency_level}",
f"β€’ Peak Activity Time: {peak_time}",
"",
"🎯 AI RECOMMENDATIONS:"
]
# Add AI recommendations
if recommendations and isinstance(recommendations, list):
for i, prop in enumerate(recommendations[:5], 1):
body_lines.extend([
f"{i}. {prop.get('property_name', prop.get('title', 'Recommended Property'))}",
f" πŸ“ Location: {prop.get('location', 'N/A')}",
f" πŸ’° Price: ${prop.get('price', 0):,}",
f" 🎯 AI Match Score: {prop.get('ai_score', prop.get('match_score', 0)):.1f}%",
f" πŸ’‘ Why Recommended: {prop.get('recommendation_reason', 'Matches your preferences')}",
""
])
# Add urgency-based call to action
if urgency_level == 'High':
body_lines.extend([
"πŸ”₯ URGENT RECOMMENDATION:",
"Based on your high engagement, we recommend acting quickly on these properties!",
""
])
body_lines.extend([
"πŸ’‘ PERSONALIZED NEXT STEPS:",
"β€’ Review properties that match your personality type",
"β€’ Schedule viewings during your peak activity time",
"β€’ Contact our AI-powered assistant for more details",
"",
"Best regards,",
"AI-Powered Property Recommendation System"
])
email_body = "\n".join(body_lines)
# Send email
result = self._send_email_via_api(recipient_email, subject, email_body)
result['type'] = 'ai_recommendations'
result['ai_insights'] = {
'personality_type': personality_type,
'decision_making_style': decision_style,
'urgency_level': urgency_level,
'peak_time': peak_time
}
result['recommendations_count'] = len(recommendations) if recommendations else 0
return result
except Exception as e:
logger.error(f"❌ Error sending AI recommendations email: {e}")
return {
'type': 'ai_recommendations',
'success': False,
'error': str(e)
}
def _send_peak_time_engagement_email(self, customer_id: int, recipient_email: str, analysis_data: Dict, ai_insights: Dict) -> Dict:
"""Send email during user's peak engagement time"""
logger.info(f"⏰ Sending peak time engagement email for customer {customer_id}")
try:
# Analyze peak time from tracking data
peak_time = ai_insights.get('peak_time', 'Evening')
engagement_level = ai_insights.get('engagement_level', 'High')
# Extract AI insights for display
personality_type = ai_insights.get('ai_personality_type', 'Analytical')
urgency_level = ai_insights.get('urgency_level', 'Medium')
# Create time-sensitive email content
subject = f"⏰ Perfect Time to Explore Properties - Based on Your Activity Pattern!"
body_lines = [
f"Dear Valued Customer,",
"",
f"⏰ We've noticed you're most active during {peak_time} hours!",
f"Your engagement level: {engagement_level}",
"",
"πŸ“ˆ OPTIMAL VIEWING TIME:",
f"β€’ Your Peak Activity: {peak_time}",
f"β€’ Best Time for Property Tours: {self._get_optimal_viewing_time(peak_time)}",
f"β€’ Recommended Action Time: {self._get_recommended_action_time(peak_time)}",
"",
"🎯 TIME-SENSITIVE OPPORTUNITIES:"
]
# Get time-sensitive properties
time_sensitive_props = self._get_time_sensitive_properties(analysis_data)
for i, prop in enumerate(time_sensitive_props[:3], 1):
body_lines.extend([
f"{i}. {prop.get('title', 'Time-Sensitive Property')}",
f" πŸ“ Location: {prop.get('location', 'N/A')}",
f" πŸ’° Price: ${prop.get('price', 0):,}",
f" ⏰ Best Viewing Time: {prop.get('optimal_time', peak_time)}",
f" 🚨 Urgency: {prop.get('urgency', 'Medium')}",
""
])
body_lines.extend([
"πŸ’‘ PEAK TIME STRATEGY:",
f"β€’ Schedule viewings during {peak_time} for maximum engagement",
"β€’ Take advantage of your high focus period",
"β€’ Make decisions when you're most alert",
"",
"πŸ“ž IMMEDIATE ACTION:",
"β€’ Call now to schedule a viewing",
"β€’ Get priority booking during your peak time",
"β€’ Receive personalized assistance",
"",
"Best regards,",
"Your Peak Time Property Team"
])
email_body = "\n".join(body_lines)
# Send email
result = self._send_email_via_api(recipient_email, subject, email_body)
result['type'] = 'peak_time_engagement'
result['peak_time'] = peak_time
result['engagement_level'] = engagement_level
result['ai_insights'] = {
'personality_type': personality_type,
'urgency_level': urgency_level,
'peak_time': peak_time
}
return result
except Exception as e:
logger.error(f"❌ Error sending peak time engagement email: {e}")
return {
'type': 'peak_time_engagement',
'success': False,
'error': str(e)
}
def _send_email_via_api(self, recipient_email: str, subject: str, body: str) -> Dict:
"""Send email using the external API"""
try:
email_data = {
'toEmail': recipient_email,
'subject': subject,
'body': body
}
external_api_url = f"{self.config['backend_url']}/api/Email/sendPlainText"
logger.info(f"🌐 Sending email via external API to {recipient_email}")
response = requests.post(
external_api_url,
params=email_data,
headers=self.config['ngrok_headers'],
timeout=30
)
if response.status_code == 200:
logger.info(f"βœ… Email sent successfully to {recipient_email}")
return {
'success': True,
'message': 'Email sent successfully',
'recipient': recipient_email,
'subject': subject
}
else:
logger.error(f"❌ External email API failed: {response.status_code} - {response.text}")
return {
'success': False,
'error': f'Email service failed: {response.status_code}',
'details': response.text
}
except Exception as e:
logger.error(f"❌ Email API error: {e}")
return {
'success': False,
'error': f'Email service error: {str(e)}'
}
def _analyze_user_preferences(self, analysis_data: Dict) -> Dict:
"""Analyze user preferences from tracking data"""
try:
# Extract preferences from analysis data
preferences = {
'property_type': 'Villa', # Default
'min_budget': 500000,
'max_budget': 2000000,
'locations': ['Dubai', 'Abu Dhabi'],
'bedrooms': '3+',
'price_range': '500K-2M'
}
# Analyze from tracking data if available
if 'data' in analysis_data and 'analytics' in analysis_data['data']:
analytics = analysis_data['data']['analytics']
# Extract property type preferences
if 'property_types' in analytics:
most_viewed = max(analytics['property_types'].items(), key=lambda x: x[1])
preferences['property_type'] = most_viewed[0]
# Extract budget preferences
if 'price_ranges' in analytics:
most_viewed = max(analytics['price_ranges'].items(), key=lambda x: x[1])
preferences['price_range'] = most_viewed[0]
# Extract location preferences
if 'locations' in analytics:
top_locations = sorted(analytics['locations'].items(), key=lambda x: x[1], reverse=True)[:3]
preferences['locations'] = [loc[0] for loc in top_locations]
return preferences
except Exception as e:
logger.error(f"❌ Error analyzing user preferences: {e}")
return {
'property_type': 'Villa',
'min_budget': 500000,
'max_budget': 2000000,
'locations': ['Dubai'],
'bedrooms': '3+'
}
def _get_new_properties_for_user(self, user_preferences: Dict) -> List[Dict]:
"""Get newly added properties that match user preferences"""
try:
# This would typically fetch from a database of new properties
# For now, we'll generate mock new properties based on preferences
property_type = user_preferences.get('property_type', 'Villa')
locations = user_preferences.get('locations', ['Dubai'])
min_budget = user_preferences.get('min_budget', 500000)
max_budget = user_preferences.get('max_budget', 2000000)
new_properties = []
# Generate mock new properties
for i, location in enumerate(locations[:2]):
price = min_budget + (i * 300000)
if price <= max_budget:
new_properties.append({
'title': f'New {property_type} in {location}',
'location': location,
'price': price,
'propertyTypeName': property_type,
'bedrooms': '3+',
'match_score': 95 + i,
'is_new': True,
'added_date': datetime.now().strftime('%Y-%m-%d')
})
return new_properties
except Exception as e:
logger.error(f"❌ Error getting new properties: {e}")
return []
def _get_optimal_viewing_time(self, peak_time: str) -> str:
"""Get optimal viewing time based on peak activity"""
time_mapping = {
'Morning': '9:00 AM - 11:00 AM',
'Afternoon': '2:00 PM - 4:00 PM',
'Evening': '6:00 PM - 8:00 PM',
'Night': '7:00 PM - 9:00 PM'
}
return time_mapping.get(peak_time, '6:00 PM - 8:00 PM')
def _get_recommended_action_time(self, peak_time: str) -> str:
"""Get recommended action time based on peak activity"""
time_mapping = {
'Morning': '10:00 AM - 12:00 PM',
'Afternoon': '3:00 PM - 5:00 PM',
'Evening': '7:00 PM - 9:00 PM',
'Night': '8:00 PM - 10:00 PM'
}
return time_mapping.get(peak_time, '7:00 PM - 9:00 PM')
def _get_time_sensitive_properties(self, analysis_data: Dict) -> List[Dict]:
"""Get time-sensitive properties based on analysis"""
try:
# Generate time-sensitive properties
return [
{
'title': 'Limited Time Offer - Premium Villa',
'location': 'Dubai Marina',
'price': 1500000,
'optimal_time': 'Evening',
'urgency': 'High'
},
{
'title': 'Flash Sale - Luxury Apartment',
'location': 'Downtown Dubai',
'price': 800000,
'optimal_time': 'Afternoon',
'urgency': 'Very High'
},
{
'title': 'Exclusive Preview - New Development',
'location': 'Palm Jumeirah',
'price': 2500000,
'optimal_time': 'Morning',
'urgency': 'Medium'
}
]
except Exception as e:
logger.error(f"❌ Error getting time-sensitive properties: {e}")
return []
# Helper methods for multi-AI system
def _get_analysis_data_parallel(self, customer_id: int) -> Dict:
"""Get analysis data for a customer using parallel processing"""
try:
# Try to get from cache first
cache_key = f'lead_analysis_{customer_id}'
cached_data = self.get_cached_data(cache_key)
if cached_data:
logger.info(f"πŸ“‹ Returning cached analysis data for customer {customer_id}")
return cached_data
# Fetch from backend
backend_response = None
try:
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}')
except Exception as e:
logger.warning(f"⚠️ Backend API failed: {e}")
# If backend fails, use mock data
if not backend_response and MOCK_DATA_AVAILABLE:
logger.info(f"πŸ”„ Using mock data for customer {customer_id}")
backend_response = mock_data_service.get_customer_data(customer_id)
if not backend_response:
return None
# Process the data
processed_data = self.process_lead_data(backend_response, customer_id)
analytics = self.generate_analytics(backend_response, customer_id)
analysis_data = {
'success': True,
'customer_id': customer_id,
'timestamp': datetime.now().isoformat(),
'data': {
'lead_qualification': analytics.get('lead_qualification', {}),
'analytics': analytics,
'properties': processed_data.get('properties', []),
'summary': processed_data.get('summary', {})
}
}
# Cache the result
self.cache_data(cache_key, analysis_data)
return analysis_data
except Exception as e:
logger.error(f"❌ Error getting analysis data for customer {customer_id}: {e}")
return None
def _get_custom_subject(self, recommendation_type: str, customer_id: int, ai_insights: Dict) -> str:
"""Generate custom email subject based on recommendation type"""
personality_type = ai_insights.get('personality_type', 'Analytical')
urgency_level = ai_insights.get('urgency_level', 'Medium')
subjects = {
'property_based': f"🏑 Perfect Properties Matched to Your Preferences - Customer {customer_id}",
'price_based': f"πŸ’° Properties in Your Budget Range - Exclusive Deals for Customer {customer_id}",
'location_based': f"πŸ“ Prime Locations Just for You - Customer {customer_id}",
'similarity_based': f"πŸ” Properties Similar to Your Favorites - Customer {customer_id}",
'behavioral_based': f"🧠 AI-Analyzed Recommendations Based on Your Behavior - Customer {customer_id}",
'premium_properties': f"⭐ Premium Properties Collection - Customer {customer_id}",
'budget_friendly': f"πŸ’΅ Best Value Properties for Smart Investors - Customer {customer_id}",
'trending_properties': f"πŸ“ˆ Trending Properties in Hot Markets - Customer {customer_id}",
'family_oriented': f"πŸ‘¨β€πŸ‘©β€πŸ‘§β€πŸ‘¦ Family-Perfect Properties - Customer {customer_id}",
'investment_opportunities': f"πŸ’Ό Investment Opportunities with High ROI - Customer {customer_id}"
}
base_subject = subjects.get(recommendation_type, f"πŸ€– AI Recommendations for Customer {customer_id}")
# Add urgency indicators for high urgency
if urgency_level == 'High':
base_subject = f"⚑ URGENT: {base_subject}"
elif urgency_level == 'Very High':
base_subject = f"πŸ”₯ HOT DEALS: {base_subject}"
return base_subject
# Include all the original analysis methods from api_service.py
def clean_for_json(self, obj):
"""Clean object for JSON serialization"""
def clean_value(value):
import numpy as np
# Handle numpy types
if isinstance(value, (np.float32, np.float64, np.int32, np.int64)):
value = float(value)
elif isinstance(value, np.ndarray):
return value.tolist()
# Handle regular floats
if isinstance(value, float):
if math.isinf(value) and value > 0:
return 999999999
elif math.isinf(value) and value < 0:
return -999999999
elif math.isnan(value):
return 0
return value
def clean_dict(d):
if isinstance(d, dict):
return {k: clean_value(v) if not isinstance(v, (dict, list)) else clean_dict(v) for k, v in d.items()}
elif isinstance(d, list):
return [clean_value(v) if not isinstance(v, (dict, list)) else clean_dict(v) for v in d]
else:
return clean_value(d)
return clean_dict(obj)
def make_api_request(self, endpoint: str, method: str = 'GET', data: Dict = None, use_fallback: bool = True) -> Any:
"""Make API request to backend with robust fallback mechanism"""
max_retries = self.config.get('max_retries', 3)
retry_delay = self.config.get('retry_delay', 1)
for attempt in range(max_retries + 1):
try:
url = f"{self.config['backend_url']}{endpoint}"
logger.info(f"🌐 Making {method} request to: {url} (attempt {attempt + 1}/{max_retries + 1})")
headers = self.config['ngrok_headers']
# Add timeout for better reliability
timeout = 30 if attempt < max_retries else 60
if method.upper() == 'GET':
response = requests.get(url, headers=headers, timeout=timeout)
elif method.upper() == 'POST':
response = requests.post(url, headers=headers, json=data, timeout=timeout)
elif method.upper() == 'PUT':
response = requests.put(url, headers=headers, json=data, timeout=timeout)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
if response.status_code == 200:
logger.info(f"βœ… API request successful on attempt {attempt + 1}")
return response.json()
elif response.status_code in [429, 502, 503, 504] and attempt < max_retries:
# Retry on rate limiting or server errors
logger.warning(f"⚠️ API request failed with {response.status_code}, retrying in {retry_delay}s...")
time.sleep(retry_delay * (attempt + 1)) # Exponential backoff
continue
else:
logger.error(f"❌ API request failed: {response.status_code} - {response.text}")
if not use_fallback:
return None
break
except requests.exceptions.Timeout as e:
logger.warning(f"⏰ Request timeout on attempt {attempt + 1}: {e}")
if attempt < max_retries:
time.sleep(retry_delay * (attempt + 1))
continue
else:
logger.error(f"❌ All retry attempts failed due to timeout")
break
except requests.exceptions.ConnectionError as e:
logger.warning(f"πŸ”Œ Connection error on attempt {attempt + 1}: {e}")
if attempt < max_retries:
time.sleep(retry_delay * (attempt + 1))
continue
else:
logger.error(f"❌ All retry attempts failed due to connection error")
break
except Exception as e:
logger.error(f"❌ API request error on attempt {attempt + 1}: {e}")
if attempt < max_retries:
time.sleep(retry_delay * (attempt + 1))
continue
else:
break
# If all attempts failed and fallback is enabled, return fallback data
if use_fallback:
logger.info(f"πŸ”„ All API attempts failed, using fallback data for endpoint: {endpoint}")
return self.get_fallback_data(endpoint, method, data)
return None
def get_fallback_data(self, endpoint: str, method: str, data: Dict = None) -> Any:
"""Get fallback data when API requests fail"""
try:
# Extract customer ID from endpoint if it's a customer-specific endpoint
customer_id = None
if '/customer/' in endpoint:
try:
customer_id = int(endpoint.split('/customer/')[-1])
except (ValueError, IndexError):
pass
# Use mock data service as fallback
if MOCK_DATA_AVAILABLE and customer_id:
logger.info(f"πŸ”„ Using mock data fallback for customer {customer_id}")
return mock_data_service.get_customer_data(customer_id)
# Return empty structure for other endpoints
if 'PropertyLeadQualification' in endpoint:
return []
elif 'DebtFinancing' in endpoint:
return {'success': False, 'message': 'Service temporarily unavailable', 'fallback': True}
else:
return {'success': False, 'message': 'Service temporarily unavailable', 'fallback': True}
except Exception as e:
logger.error(f"❌ Fallback data generation failed: {e}")
return None
# All the original analysis methods from api_service.py are included below
def process_lead_data(self, data: List[Dict], customer_id: int) -> Dict:
"""Process and enhance lead qualification data"""
# Implementation from original api_service.py
if not data:
return {
'customer_id': customer_id,
'properties': [],
'summary': {
'total_properties': 0,
'total_views': 0,
'total_duration': 0,
'average_price': 0,
'property_types': {},
'engagement_score': 0
}
}
# Calculate summary statistics
total_views = sum(prop.get('viewCount', 0) for prop in data)
total_duration = sum(prop.get('totalDuration', 0) for prop in data)
total_price = sum(prop.get('price', 0) for prop in data)
# Property type distribution
property_types = {}
for prop in data:
prop_type = prop.get('propertyTypeName', 'Unknown')
property_types[prop_type] = property_types.get(prop_type, 0) + 1
# Calculate engagement score
engagement_score = self.calculate_engagement_score(data)
# Enhance each property with additional metrics
enhanced_properties = []
for prop in data:
enhanced_prop = prop.copy()
enhanced_prop['engagement_score'] = self.calculate_property_engagement(prop)
enhanced_prop['price_per_sqm'] = self.estimate_price_per_sqm(prop.get('price', 0))
enhanced_prop['view_frequency'] = self.calculate_view_frequency(prop)
enhanced_prop['last_viewed_days_ago'] = self.days_since_last_view(prop.get('lastViewedAt'))
enhanced_properties.append(enhanced_prop)
return {
'customer_id': customer_id,
'properties': enhanced_properties,
'summary': {
'total_properties': len(data),
'total_views': total_views,
'total_duration': total_duration,
'average_price': total_price / len(data) if data else 0,
'property_types': property_types,
'engagement_score': engagement_score,
'last_activity': max((prop.get('lastViewedAt') for prop in data), default=None)
}
}
def generate_analytics(self, lead_data: List[Dict], customer_id: int) -> Dict:
"""Generate comprehensive analytics from lead data"""
if not lead_data:
return {'customer_id': customer_id, 'analytics': {}}
# Process the data
processed_data = self.process_lead_data(lead_data, customer_id)
# Generate detailed lead qualification
lead_qualification = self.generate_detailed_lead_qualification(processed_data)
# Generate insights
insights = {
'customer_id': customer_id,
'lead_qualification': lead_qualification,
'engagement_level': self.categorize_engagement(processed_data['summary']['engagement_score']),
'preferred_property_types': self.get_preferred_property_types(processed_data['summary']['property_types']),
'price_preferences': self.analyze_price_preferences(lead_data),
'viewing_patterns': self.analyze_viewing_patterns(lead_data),
'property_analysis': self.analyze_properties_detailed(lead_data),
'recommendations': self.generate_recommendations(processed_data),
'risk_assessment': self.assess_risk(processed_data),
'opportunity_score': self.calculate_opportunity_score(processed_data),
'lead_timeline': self.generate_lead_timeline(lead_data),
'conversion_probability': self.calculate_conversion_probability(processed_data)
}
return insights
# Add placeholder methods for the analysis functions
def generate_detailed_lead_qualification(self, processed_data: Dict) -> Dict:
"""Generate detailed lead qualification with warm/cold/hot status"""
summary = processed_data['summary']
properties = processed_data['properties']
# Calculate lead score based on multiple factors
lead_score = 0
factors = {}
# 1. Engagement Score (0-30 points)
engagement_points = min(summary['engagement_score'] * 0.3, 30)
lead_score += engagement_points
factors['engagement'] = {
'score': summary['engagement_score'],
'points': engagement_points,
'max_points': 30,
'description': f"Based on {summary['total_views']} views and {formatDuration(summary['total_duration'])} total viewing time"
}
# 2. Property Diversity (0-15 points)
property_types = len(summary['property_types'])
diversity_points = min(property_types * 5, 15)
lead_score += diversity_points
factors['property_diversity'] = {
'score': property_types,
'points': diversity_points,
'max_points': 15,
'description': f"Viewed {property_types} different property types"
}
# 3. Recent Activity (0-20 points)
recent_activity_points = 0
if summary.get('last_activity'):
try:
last_activity = datetime.fromisoformat(summary['last_activity'].replace('Z', '+00:00'))
days_since = (datetime.now() - last_activity).days
if days_since <= 1:
recent_activity_points = 20
elif days_since <= 3:
recent_activity_points = 15
elif days_since <= 7:
recent_activity_points = 10
elif days_since <= 30:
recent_activity_points = 5
except:
pass
lead_score += recent_activity_points
factors['recent_activity'] = {
'score': days_since if 'days_since' in locals() else 999,
'points': recent_activity_points,
'max_points': 20,
'description': f"Last activity was {days_since if 'days_since' in locals() else 'unknown'} days ago"
}
# 4. Price Range Consistency (0-15 points)
prices = [p['price'] for p in properties if p['price'] > 0]
if prices:
price_range = max(prices) - min(prices)
price_consistency = 1 - (price_range / max(prices))
consistency_points = price_consistency * 15
lead_score += consistency_points
factors['price_consistency'] = {
'score': round(price_consistency * 100, 1),
'points': consistency_points,
'max_points': 15,
'description': f"Price range consistency: {round(price_consistency * 100, 1)}%"
}
# 5. Viewing Depth (0-20 points)
deep_views = sum(1 for p in properties if p['viewCount'] > 1)
depth_points = min(deep_views * 5, 20)
lead_score += depth_points
factors['viewing_depth'] = {
'score': deep_views,
'points': depth_points,
'max_points': 20,
'description': f"{deep_views} properties viewed multiple times"
}
# Determine lead status
if lead_score >= 70:
lead_status = "HOT"
status_color = "#e74c3c"
status_description = "High probability of conversion. Immediate follow-up recommended."
elif lead_score >= 50:
lead_status = "WARM"
status_color = "#f39c12"
status_description = "Good potential. Regular follow-up needed."
elif lead_score >= 30:
lead_status = "LUKEWARM"
status_color = "#95a5a6"
status_description = "Some interest shown. Nurturing required."
else:
lead_status = "COLD"
status_color = "#34495e"
status_description = "Low engagement. Re-engagement campaign needed."
return {
'lead_score': round(lead_score, 1),
'lead_status': lead_status,
'status_color': status_color,
'status_description': status_description,
'max_possible_score': 100,
'factors': factors,
'summary': {
'total_properties_viewed': summary['total_properties'],
'total_views': summary['total_views'],
'total_duration': summary['total_duration'],
'average_price': summary['average_price'],
'engagement_score': summary['engagement_score']
}
}
def calculate_engagement_score(self, properties: List[Dict]) -> float:
"""Calculate overall engagement score for customer"""
if not properties:
return 0.0
total_score = 0
for prop in properties:
views = prop.get('viewCount', 0)
duration = prop.get('totalDuration', 0)
# Weight views and duration
view_score = min(views * 10, 50) # Max 50 points for views
duration_score = min(duration / 1000, 50) # Max 50 points for duration
total_score += view_score + duration_score
return total_score / len(properties)
def calculate_property_engagement(self, property_data: Dict) -> float:
"""Calculate engagement score for a single property"""
views = property_data.get('viewCount', 0)
duration = property_data.get('totalDuration', 0)
view_score = min(views * 10, 50)
duration_score = min(duration / 1000, 50)
return view_score + duration_score
def estimate_price_per_sqm(self, price: float) -> float:
"""Estimate price per square meter (rough estimation)"""
return price / 150 if price > 0 else 0
def calculate_view_frequency(self, property_data: Dict) -> str:
"""Calculate viewing frequency"""
views = property_data.get('viewCount', 0)
if views == 0:
return "No views"
elif views == 1:
return "Single view"
elif views <= 3:
return "Low frequency"
elif views <= 7:
return "Medium frequency"
else:
return "High frequency"
def days_since_last_view(self, last_viewed: str) -> int:
"""Calculate days since last view"""
if not last_viewed:
return 999
try:
last_view_date = datetime.fromisoformat(last_viewed.replace('Z', '+00:00'))
days_diff = (datetime.now() - last_view_date).days
return max(0, days_diff)
except:
return 999
def categorize_engagement(self, score: float) -> str:
"""Categorize engagement level"""
if score >= 80:
return "Very High"
elif score >= 60:
return "High"
elif score >= 40:
return "Medium"
elif score >= 20:
return "Low"
else:
return "Very Low"
def get_preferred_property_types(self, property_types: Dict) -> List[str]:
"""Get preferred property types"""
if not property_types:
return []
sorted_types = sorted(property_types.items(), key=lambda x: x[1], reverse=True)
return [prop_type for prop_type, count in sorted_types[:3]]
def analyze_price_preferences(self, properties: List[Dict]) -> Dict:
"""Analyze price preferences"""
if not properties:
return {}
prices = [prop.get('price', 0) for prop in properties]
prices = [p for p in prices if p > 0]
if not prices:
return {}
return {
'min_price': min(prices),
'max_price': max(prices),
'avg_price': sum(prices) / len(prices),
'price_range': max(prices) - min(prices)
}
def analyze_viewing_patterns(self, properties: List[Dict]) -> Dict:
"""Analyze viewing patterns"""
if not properties:
return {}
# Group by viewing time
morning_views = 0
afternoon_views = 0
evening_views = 0
for prop in properties:
if prop.get('lastViewedAt'):
try:
view_time = datetime.fromisoformat(prop['lastViewedAt'].replace('Z', '+00:00'))
hour = view_time.hour
if 6 <= hour < 12:
morning_views += prop.get('viewCount', 0)
elif 12 <= hour < 18:
afternoon_views += prop.get('viewCount', 0)
else:
evening_views += prop.get('viewCount', 0)
except:
pass
return {
'morning_views': morning_views,
'afternoon_views': afternoon_views,
'evening_views': evening_views,
'peak_viewing_time': 'morning' if morning_views > max(afternoon_views, evening_views) else
'afternoon' if afternoon_views > evening_views else 'evening'
}
def analyze_properties_detailed(self, properties: List[Dict]) -> Dict:
"""Detailed analysis of each property"""
return {'analyzed_properties': len(properties)}
def generate_recommendations(self, processed_data: Dict) -> List[str]:
"""Generate recommendations based on data"""
recommendations = []
summary = processed_data['summary']
if summary['engagement_score'] < 30:
recommendations.append("Low engagement detected. Consider follow-up communication.")
if summary['total_views'] < 5:
recommendations.append("Limited property exploration. Suggest more property options.")
if summary['average_price'] > 20000000: # 20M
recommendations.append("High-value customer. Consider premium properties.")
if len(summary['property_types']) > 3:
recommendations.append("Diverse property interests. Offer variety in recommendations.")
return recommendations
def assess_risk(self, processed_data: Dict) -> Dict:
"""Assess risk factors"""
summary = processed_data['summary']
risk_factors = []
risk_score = 0
if summary['engagement_score'] < 20:
risk_factors.append("Very low engagement")
risk_score += 30
if summary['total_views'] < 3:
risk_factors.append("Limited property exploration")
risk_score += 20
if summary['average_price'] > 50000000: # 50M
risk_factors.append("Very high price range")
risk_score += 15
return {
'risk_score': min(risk_score, 100),
'risk_level': 'High' if risk_score > 50 else 'Medium' if risk_score > 25 else 'Low',
'risk_factors': risk_factors
}
def calculate_opportunity_score(self, processed_data: Dict) -> float:
"""Calculate opportunity score"""
summary = processed_data['summary']
# Base score from engagement
score = summary['engagement_score']
# Bonus for high-value properties
if summary['average_price'] > 20000000:
score += 20
# Bonus for multiple property types
if len(summary['property_types']) > 2:
score += 10
# Bonus for recent activity
if summary.get('last_activity'):
try:
last_activity = datetime.fromisoformat(summary['last_activity'].replace('Z', '+00:00'))
days_since = (datetime.now() - last_activity).days
if days_since <= 7:
score += 15
elif days_since <= 30:
score += 10
except:
pass
return min(score, 100)
def generate_lead_timeline(self, properties: List[Dict]) -> List[Dict]:
"""Generate timeline of lead activity"""
timeline = []
for prop in properties:
if prop.get('lastViewedAt'):
try:
view_date = datetime.fromisoformat(prop['lastViewedAt'].replace('Z', '+00:00'))
timeline.append({
'date': view_date.isoformat(),
'property_name': prop['propertyName'],
'property_type': prop['propertyTypeName'],
'price': prop['price'],
'views': prop['viewCount'],
'duration': prop['totalDuration'],
'engagement_score': prop.get('engagement_score', 0)
})
except:
pass
# Sort by date
timeline.sort(key=lambda x: x['date'], reverse=True)
return timeline
def calculate_conversion_probability(self, processed_data: Dict) -> Dict:
"""Calculate probability of conversion based on various factors"""
summary = processed_data['summary']
properties = processed_data['properties']
# Base probability
base_probability = min(summary['engagement_score'], 100)
# Adjustments based on factors
adjustments = {}
# Price range consistency
prices = [p['price'] for p in properties if p['price'] > 0]
if prices:
price_range = max(prices) - min(prices)
price_consistency = 1 - (price_range / max(prices))
adjustments['price_consistency'] = price_consistency * 10
# Recent activity
if summary.get('last_activity'):
try:
last_activity = datetime.fromisoformat(summary['last_activity'].replace('Z', '+00:00'))
days_since = (datetime.now() - last_activity).days
if days_since <= 1:
adjustments['recent_activity'] = 15
elif days_since <= 3:
adjustments['recent_activity'] = 10
elif days_since <= 7:
adjustments['recent_activity'] = 5
except:
pass
# Property diversity
property_types = len(summary['property_types'])
adjustments['property_diversity'] = min(property_types * 3, 15)
# Deep engagement
deep_views = sum(1 for p in properties if p['viewCount'] > 1)
adjustments['deep_engagement'] = min(deep_views * 5, 20)
# Calculate final probability
total_adjustment = sum(adjustments.values())
final_probability = min(base_probability + total_adjustment, 100)
return {
'base_probability': base_probability,
'adjustments': adjustments,
'total_adjustment': total_adjustment,
'final_probability': final_probability,
'confidence_level': 'High' if len(properties) >= 5 else 'Medium' if len(properties) >= 3 else 'Low'
}
def get_cached_data(self, key: str) -> Optional[Any]:
"""Get data from cache if not expired"""
if key in self.cache and key in self.cache_timestamps:
if time.time() - self.cache_timestamps[key] < self.config['cache_duration']:
return self.cache[key]
else:
# Remove expired cache
del self.cache[key]
del self.cache_timestamps[key]
return None
def cache_data(self, key: str, data: Any):
"""Cache data with timestamp"""
self.cache[key] = data
self.cache_timestamps[key] = time.time()
def start_cache_cleanup(self):
"""Start background thread to clean up expired cache"""
def cleanup_cache():
while True:
try:
current_time = time.time()
expired_keys = [
key for key, timestamp in self.cache_timestamps.items()
if current_time - timestamp > self.config['cache_duration']
]
for key in expired_keys:
self.cache.pop(key, None)
self.cache_timestamps.pop(key, None)
time.sleep(60) # Clean up every minute
except Exception as e:
logger.error(f"Cache cleanup error: {e}")
time.sleep(60)
cleanup_thread = threading.Thread(target=cleanup_cache, daemon=True)
cleanup_thread.start()
def run(self, host: str = '0.0.0.0', port: int = 5000, debug: bool = False):
"""Run the Flask application"""
logger.info(f"πŸš€ Starting Enhanced Lead Qualification API Service with AI on {host}:{port}")
logger.info(f"🌐 Backend URL: {self.config['backend_url']}")
logger.info(f"πŸ€– AI Features: Enabled")
logger.info(f"πŸ“Š API Endpoints:")
logger.info(f" - GET /api/health")
logger.info(f" - GET /api/lead-analysis/<customer_id>")
logger.info(f" - GET /api/ai-analysis/<customer_id>")
logger.info(f" - POST /api/ai-recommendations/<customer_id>")
logger.info(f" - POST /api/multi-ai-recommendations/<customer_id>")
logger.info(f" - GET /api/chromadb-recommendations/<customer_id>")
logger.info(f" - GET /api/ai-status/<customer_id>")
logger.info(f" - POST /api/batch-ai-analysis")
logger.info(f" - GET /api/fetch-all-properties")
logger.info(f" - POST /api/clear-cache")
logger.info(f" - POST /api/test-email")
logger.info(f" - POST /api/automated-email")
logger.info(f" - GET /api/email-status")
logger.info(f" πŸ†• NEW EMAIL TESTING ENDPOINTS:")
logger.info(f" - GET /api/test-sendgrid-connection")
logger.info(f" - GET /api/email-analysis-basis/<customer_id>")
logger.info(f" - GET /api/email-content-preview/<customer_id>")
logger.info(f" - POST /api/test-all-10-emails/<customer_id>")
logger.info(f" - GET /api/chromadb-status")
self.app.run(host=host, port=port, debug=debug)
if __name__ == '__main__':
api_service = EnhancedLeadQualificationAPI()
api_service.run(debug=True)