Spaces:
Sleeping
Sleeping
import os | |
import json | |
import requests | |
import time | |
from datetime import datetime, timedelta | |
from flask import Flask, request, jsonify, render_template | |
from flask_cors import CORS | |
import logging | |
from typing import Dict, List, Optional, Any | |
import threading | |
import math | |
# Import centralized configuration | |
from config import get_backend_url, get_ngrok_headers, get_api_endpoint, get_app_config | |
# Import the AI recommendation engine | |
try: | |
from ai_recommendation_engine import AIRecommendationEngine | |
ai_engine = AIRecommendationEngine() | |
AI_ENGINE_AVAILABLE = True | |
except Exception as e: | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
logger.warning(f"AI engine not available: {e}") | |
ai_engine = None | |
AI_ENGINE_AVAILABLE = False | |
# Import mock data service | |
try: | |
from mock_data_service import mock_data_service | |
MOCK_DATA_AVAILABLE = True | |
except ImportError: | |
MOCK_DATA_AVAILABLE = False | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class CustomJSONEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, float) and math.isinf(obj): | |
return 999999999 | |
if isinstance(obj, float) and math.isnan(obj): | |
return 0 | |
return super().default(obj) | |
def formatPrice(price): | |
"""Format price in Indian currency""" | |
return f"βΉ{price:,.0f}" | |
def formatDuration(seconds): | |
"""Format duration in hours and minutes""" | |
hours = seconds // 3600 | |
minutes = (seconds % 3600) // 60 | |
return f"{hours}h {minutes}m" | |
class EnhancedLeadQualificationAPI: | |
def __init__(self): | |
self.app = Flask(__name__) | |
self.app.json_encoder = CustomJSONEncoder | |
CORS(self.app) | |
# Configuration - Centralized from config.py | |
self.config = { | |
'backend_url': get_backend_url(), | |
'api_timeout': get_app_config()['api_timeout'], | |
'cache_duration': int(os.getenv('CACHE_DURATION', get_app_config()['cache_duration'])), | |
'max_retries': int(os.getenv('MAX_RETRIES', get_app_config()['max_retries'])), | |
'retry_delay': int(os.getenv('RETRY_DELAY', get_app_config()['retry_delay'])), | |
'ngrok_headers': get_ngrok_headers() | |
} | |
# Cache for API responses | |
self.cache = {} | |
self.cache_timestamps = {} | |
# AI processing status tracking | |
self.ai_processing_status = {} | |
# Setup routes | |
self.setup_routes() | |
# Add no-cache headers to all responses | |
def add_no_cache_headers(response): | |
"""Add headers to prevent any caching""" | |
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' | |
response.headers['Pragma'] = 'no-cache' | |
response.headers['Expires'] = '0' | |
return response | |
# Start cache cleanup thread | |
self.start_cache_cleanup() | |
def get_ai_engine(self): | |
"""Get the AI recommendation engine instance""" | |
try: | |
if AI_ENGINE_AVAILABLE and ai_engine: | |
return ai_engine | |
else: | |
logger.warning("β οΈ AI engine not available") | |
return None | |
except Exception as e: | |
logger.error(f"β Error getting AI engine: {e}") | |
return None | |
def setup_routes(self): | |
"""Setup all API routes including AI-enhanced endpoints and frontend""" | |
# Frontend routes | |
def index(): | |
"""Perfect AI Customer Analysis & Email Automation Dashboard""" | |
return render_template('perfect_ai_dashboard.html', customer_id=None) | |
def old_dashboard(): | |
"""Old dashboard for reference""" | |
return render_template('ai_lead_analysis.html', customer_id=None) | |
def customer_analysis(customer_id): | |
"""Customer analysis page with pre-filled customer ID""" | |
return render_template('ai_lead_analysis.html', customer_id=customer_id) | |
def email_automation_dashboard(): | |
"""Email automation dashboard""" | |
return render_template('email_automation_dashboard.html') | |
def health(): | |
"""Health check endpoint""" | |
return jsonify({ | |
'success': True, | |
'service': 'AI Lead Analysis System', | |
'status': 'running', | |
'timestamp': datetime.now().isoformat() | |
}) | |
def frontend_status(): | |
"""Get frontend service status""" | |
return jsonify({ | |
'success': True, | |
'service': 'AI Lead Analysis System', | |
'status': 'running', | |
'templates_available': [ | |
'ai_lead_analysis.html' | |
], | |
'features': [ | |
'AI Lead Analysis Dashboard', | |
'Email Automation Testing', | |
'Behavioral Analysis', | |
'Real-time Analytics' | |
], | |
'timestamp': datetime.now().isoformat() | |
}) | |
# API routes | |
def health_check(): | |
"""Health check endpoint""" | |
return jsonify({ | |
'status': 'healthy', | |
'timestamp': datetime.now().isoformat(), | |
'backend_url': self.config['backend_url'], | |
'cache_size': len(self.cache), | |
'ai_status': 'enabled', | |
'ai_models': 'loaded' | |
}) | |
def get_lead_analysis(customer_id): | |
"""Get complete lead analysis for a customer (original functionality)""" | |
logger.info(f"π Lead analysis request for customer {customer_id}") | |
try: | |
cache_key = f'lead_analysis_{customer_id}' | |
cached_data = self.get_cached_data(cache_key) | |
if cached_data: | |
logger.info(f"π Returning cached data for customer {customer_id}") | |
return jsonify(cached_data) | |
# Try to fetch data from backend with improved fallback mechanism | |
logger.info(f"π Fetching data for customer {customer_id} with fallback support") | |
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
logger.info(f"π₯ Data received for customer {customer_id}: {len(backend_response) if isinstance(backend_response, list) else 'No data'}") | |
if not backend_response: | |
return jsonify({ | |
'success': False, | |
'customer_id': customer_id, | |
'error': 'No data found for this customer', | |
'data': None | |
}), 404 | |
# Process lead data | |
logger.info(f"βοΈ Processing lead data for customer {customer_id}") | |
processed_data = self.process_lead_data(backend_response, customer_id) | |
# Generate analytics | |
logger.info(f"π§ Generating analytics for customer {customer_id}") | |
analytics = self.generate_analytics(backend_response, customer_id) | |
# Create combined response | |
combined_response = { | |
'success': True, | |
'customer_id': customer_id, | |
'timestamp': datetime.now().isoformat(), | |
'data': { | |
'lead_qualification': analytics.get('lead_qualification', {}), | |
'analytics': { | |
'engagement_level': analytics.get('engagement_level'), | |
'preferred_property_types': analytics.get('preferred_property_types', []), | |
'price_preferences': analytics.get('price_preferences', {}), | |
'viewing_patterns': analytics.get('viewing_patterns', {}), | |
'property_analysis': analytics.get('property_analysis', {}), | |
'recommendations': analytics.get('recommendations', []), | |
'risk_assessment': analytics.get('risk_assessment', {}), | |
'opportunity_score': analytics.get('opportunity_score'), | |
'lead_timeline': analytics.get('lead_timeline', []), | |
'conversion_probability': analytics.get('conversion_probability', {}) | |
}, | |
'properties': processed_data.get('properties', []), | |
'summary': processed_data.get('summary', {}) | |
}, | |
'api_info': { | |
'backend_url': self.config['backend_url'], | |
'endpoint_called': f'/api/PropertyLeadQualification/customer/{customer_id}', | |
'response_time': datetime.now().isoformat(), | |
'data_points': len(backend_response) if isinstance(backend_response, list) else 0 | |
} | |
} | |
# Clean the response for JSON serialization | |
cleaned_response = self.clean_for_json(combined_response) | |
logger.info(f"β Combined response created successfully for customer {customer_id}") | |
# Cache the result | |
self.cache_data(cache_key, cleaned_response) | |
logger.info(f"πΎ Combined data cached for customer {customer_id}") | |
return jsonify(cleaned_response) | |
except Exception as e: | |
logger.error(f"β Error in lead analysis for customer {customer_id}: {e}") | |
return jsonify({ | |
'success': False, | |
'customer_id': customer_id, | |
'error': f'Failed to generate lead analysis: {str(e)}', | |
'data': None | |
}), 500 | |
def get_ai_analysis(customer_id): | |
"""Get AI-enhanced lead analysis for a customer""" | |
logger.info(f"π€ AI analysis request for customer {customer_id}") | |
try: | |
# First get the regular analysis | |
cache_key = f'lead_analysis_{customer_id}' | |
analysis_data = self.get_cached_data(cache_key) | |
if not analysis_data: | |
# Generate regular analysis first with improved fallback | |
logger.info(f"π Fetching data for AI analysis - customer {customer_id}") | |
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
if not backend_response: | |
return jsonify({ | |
'success': False, | |
'customer_id': customer_id, | |
'error': 'No data found for this customer' | |
}), 404 | |
processed_data = self.process_lead_data(backend_response, customer_id) | |
analytics = self.generate_analytics(backend_response, customer_id) | |
analysis_data = { | |
'success': True, | |
'customer_id': customer_id, | |
'data': { | |
'lead_qualification': analytics.get('lead_qualification', {}), | |
'analytics': analytics, | |
'properties': processed_data.get('properties', []), | |
'summary': processed_data.get('summary', {}) | |
} | |
} | |
# Now enhance with AI analysis | |
if AI_ENGINE_AVAILABLE and ai_engine: | |
logger.info(f"π§ Generating AI insights for customer {customer_id}") | |
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
# Find AI-recommended properties | |
logger.info(f"π Finding AI recommendations for customer {customer_id}") | |
ai_recommendations = ai_engine.find_similar_properties_ai(analysis_data, ai_insights) | |
else: | |
logger.warning("β οΈ AI engine not available, using mock AI insights") | |
ai_insights = { | |
'personality_type': 'Analytical', | |
'decision_making_style': 'Data-driven', | |
'preferred_communication': 'Email', | |
'urgency_level': 'Medium', | |
'budget_confidence': 'High', | |
'location_preference': 'Premium areas', | |
'property_type_preference': 'Luxury properties' | |
} | |
ai_recommendations = [] | |
# Combine everything | |
enhanced_response = analysis_data.copy() | |
enhanced_response['data']['ai_insights'] = ai_insights | |
enhanced_response['data']['ai_recommendations'] = ai_recommendations | |
enhanced_response['ai_enhanced'] = True | |
enhanced_response['ai_timestamp'] = datetime.now().isoformat() | |
# Cache AI-enhanced result | |
ai_cache_key = f'ai_analysis_{customer_id}' | |
self.cache_data(ai_cache_key, enhanced_response) | |
logger.info(f"β AI analysis completed for customer {customer_id}") | |
return jsonify(enhanced_response) | |
except Exception as e: | |
logger.error(f"β Error in AI analysis for customer {customer_id}: {e}") | |
return jsonify({ | |
'success': False, | |
'customer_id': customer_id, | |
'error': f'Failed to generate AI analysis: {str(e)}', | |
'ai_error': True | |
}), 500 | |
def send_ai_recommendations(customer_id): | |
"""Send AI-powered recommendations via email""" | |
logger.info(f"π§ AI recommendation email request for customer {customer_id}") | |
try: | |
# Get email from request | |
data = request.get_json() | |
recipient_email = data.get('email') | |
if not recipient_email: | |
return jsonify({ | |
'success': False, | |
'error': 'Email address is required' | |
}), 400 | |
# Mark as processing | |
self.ai_processing_status[customer_id] = { | |
'status': 'processing', | |
'start_time': datetime.now().isoformat(), | |
'email': recipient_email | |
} | |
# Get or generate analysis data | |
cache_key = f'lead_analysis_{customer_id}' | |
analysis_data = self.get_cached_data(cache_key) | |
if not analysis_data: | |
# Generate analysis if not cached with improved fallback | |
logger.info(f"π Fetching data for recommendations - customer {customer_id}") | |
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
if not backend_response: | |
self.ai_processing_status[customer_id]['status'] = 'failed' | |
self.ai_processing_status[customer_id]['error'] = 'No data found' | |
return jsonify({ | |
'success': False, | |
'customer_id': customer_id, | |
'error': 'No data found for this customer' | |
}), 404 | |
processed_data = self.process_lead_data(backend_response, customer_id) | |
analytics = self.generate_analytics(backend_response, customer_id) | |
analysis_data = { | |
'success': True, | |
'customer_id': customer_id, | |
'data': { | |
'lead_qualification': analytics.get('lead_qualification', {}), | |
'analytics': analytics, | |
'properties': processed_data.get('properties', []), | |
'summary': processed_data.get('summary', {}) | |
} | |
} | |
# Process with AI in background | |
def process_ai_recommendations(): | |
try: | |
if AI_ENGINE_AVAILABLE and ai_engine: | |
result = ai_engine.process_customer_with_ai(customer_id, recipient_email, analysis_data) | |
else: | |
# Mock AI processing result | |
result = { | |
'success': True, | |
'customer_id': customer_id, | |
'email_sent': True, | |
'recipient': recipient_email, | |
'subject': f"π€ AI-Powered Property Recommendations for Customer {customer_id}", | |
'ai_insights': { | |
'personality_type': 'Analytical', | |
'decision_making_style': 'Data-driven', | |
'preferred_communication': 'Email', | |
'urgency_level': 'Medium', | |
'budget_confidence': 'High' | |
}, | |
'recommendations_count': 5, | |
'mock_data': True, | |
'message': 'Mock AI processing completed (AI engine not available)' | |
} | |
self.ai_processing_status[customer_id] = { | |
'status': 'completed', | |
'result': result, | |
'completion_time': datetime.now().isoformat() | |
} | |
logger.info(f"β AI processing completed for customer {customer_id}") | |
except Exception as e: | |
self.ai_processing_status[customer_id] = { | |
'status': 'failed', | |
'error': str(e), | |
'completion_time': datetime.now().isoformat() | |
} | |
logger.error(f"β AI processing failed for customer {customer_id}: {e}") | |
# Start background processing | |
thread = threading.Thread(target=process_ai_recommendations) | |
thread.daemon = True | |
thread.start() | |
return jsonify({ | |
'success': True, | |
'customer_id': customer_id, | |
'message': 'AI recommendation processing started', | |
'status': 'processing', | |
'email': recipient_email, | |
'estimated_completion': '2-3 minutes' | |
}) | |
except Exception as e: | |
logger.error(f"β Error starting AI recommendations for customer {customer_id}: {e}") | |
return jsonify({ | |
'success': False, | |
'customer_id': customer_id, | |
'error': f'Failed to start AI processing: {str(e)}' | |
}), 500 | |
def get_ai_status(customer_id): | |
"""Get AI processing status for a customer""" | |
status = self.ai_processing_status.get(customer_id, { | |
'status': 'not_started', | |
'message': 'No AI processing initiated for this customer' | |
}) | |
response_data = { | |
'customer_id': customer_id, | |
'ai_processing': status, | |
'timestamp': datetime.now().isoformat() | |
} | |
# Clean for JSON serialization | |
cleaned_response = self.clean_for_json(response_data) | |
return jsonify(cleaned_response) | |
def batch_ai_analysis(): | |
"""Get AI analysis for multiple customers""" | |
try: | |
data = request.get_json() | |
customer_ids = data.get('customer_ids', []) | |
if not customer_ids: | |
return jsonify({ | |
'success': False, | |
'error': 'No customer IDs provided' | |
}), 400 | |
results = {} | |
for customer_id in customer_ids: | |
try: | |
# Get regular analysis with improved fallback | |
logger.info(f"π Fetching data for batch analysis - customer {customer_id}") | |
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
if not backend_response: | |
results[customer_id] = { | |
'success': False, | |
'error': 'No data found for this customer' | |
} | |
continue | |
processed_data = self.process_lead_data(backend_response, customer_id) | |
analytics = self.generate_analytics(backend_response, customer_id) | |
analysis_data = { | |
'success': True, | |
'customer_id': customer_id, | |
'data': { | |
'lead_qualification': analytics.get('lead_qualification', {}), | |
'analytics': analytics, | |
'properties': processed_data.get('properties', []), | |
'summary': processed_data.get('summary', {}) | |
} | |
} | |
# Add AI insights | |
if AI_ENGINE_AVAILABLE and ai_engine: | |
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
ai_recommendations = ai_engine.find_similar_properties_ai(analysis_data, ai_insights) | |
else: | |
ai_insights = { | |
'personality_type': 'Analytical', | |
'decision_making_style': 'Data-driven', | |
'preferred_communication': 'Email', | |
'urgency_level': 'Medium', | |
'budget_confidence': 'High', | |
'location_preference': 'Premium areas', | |
'property_type_preference': 'Luxury properties' | |
} | |
ai_recommendations = [] | |
analysis_data['data']['ai_insights'] = ai_insights | |
analysis_data['data']['ai_recommendations'] = ai_recommendations | |
analysis_data['ai_enhanced'] = True | |
cleaned_response = self.clean_for_json(analysis_data) | |
results[customer_id] = cleaned_response | |
except Exception as e: | |
results[customer_id] = { | |
'success': False, | |
'error': str(e) | |
} | |
return jsonify({ | |
'success': True, | |
'results': results, | |
'total_processed': len(customer_ids), | |
'ai_enhanced': True | |
}) | |
except Exception as e: | |
logger.error(f"β Error in batch AI analysis: {e}") | |
return jsonify({ | |
'success': False, | |
'error': f'Batch AI analysis failed: {str(e)}' | |
}), 500 | |
def clear_cache(): | |
"""Clear all cached data including AI processing status""" | |
try: | |
self.cache.clear() | |
self.cache_timestamps.clear() | |
self.ai_processing_status.clear() | |
logger.info("ποΈ Cache and AI status cleared") | |
return jsonify({ | |
'success': True, | |
'message': 'Cache and AI processing status cleared successfully' | |
}) | |
except Exception as e: | |
logger.error(f"β Error clearing cache: {e}") | |
return jsonify({ | |
'success': False, | |
'error': f'Failed to clear cache: {str(e)}' | |
}), 500 | |
def test_email(): | |
"""Test SendGrid email functionality""" | |
logger.info("π§ͺ Testing SendGrid email functionality") | |
try: | |
# Debug request information | |
logger.info(f"Request Content-Type: {request.content_type}") | |
logger.info(f"Request Headers: {dict(request.headers)}") | |
logger.info(f"Request Method: {request.method}") | |
logger.info(f"Request Data: {request.get_data()}") | |
# Handle both JSON and form data with better error handling | |
data = {} | |
if request.is_json: | |
try: | |
data = request.get_json() or {} | |
logger.info(f"Parsed JSON data: {data}") | |
except Exception as json_error: | |
logger.error(f"JSON parsing error: {json_error}") | |
return jsonify({ | |
'success': False, | |
'error': f'Invalid JSON: {str(json_error)}' | |
}), 400 | |
else: | |
try: | |
data = request.form.to_dict() or {} | |
logger.info(f"Parsed form data: {data}") | |
except Exception as form_error: | |
logger.error(f"Form parsing error: {form_error}") | |
return jsonify({ | |
'success': False, | |
'error': f'Invalid form data: {str(form_error)}' | |
}), 400 | |
recipient_email = data.get('email', '[email protected]') | |
logger.info(f"π§ Testing email to: {recipient_email}") | |
# Validate email format | |
if not recipient_email or '@' not in recipient_email: | |
return jsonify({ | |
'success': False, | |
'error': 'Invalid email address' | |
}), 400 | |
# Get AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Test SendGrid email | |
test_result = ai_engine.test_sendgrid_email(recipient_email) | |
return jsonify(test_result) | |
except Exception as e: | |
logger.error(f"Error testing email: {e}") | |
import traceback | |
logger.error(f"Full traceback: {traceback.format_exc()}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def test_email_simple(): | |
"""Simple test SendGrid email functionality (GET method)""" | |
logger.info("π§ͺ Testing SendGrid email functionality (simple GET)") | |
try: | |
# Use default email for testing | |
recipient_email = '[email protected]' | |
logger.info(f"π§ Testing email to: {recipient_email}") | |
# Get AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Test SendGrid email | |
test_result = ai_engine.test_sendgrid_email(recipient_email) | |
return jsonify(test_result) | |
except Exception as e: | |
logger.error(f"Error testing email (simple): {e}") | |
import traceback | |
logger.error(f"Full traceback: {traceback.format_exc()}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def send_automated_email(): | |
"""Send automated email""" | |
try: | |
data = request.json | |
customer_id = data.get('customer_id') | |
email_type = data.get('email_type', 'behavioral_trigger') | |
recipient_email = data.get('recipient_email', '[email protected]') | |
if not customer_id: | |
return jsonify({ | |
'success': False, | |
'error': 'Customer ID is required' | |
}), 400 | |
# Generate mock email response (email service integrated) | |
if MOCK_DATA_AVAILABLE: | |
customer_data = mock_data_service.get_customer_data(customer_id) | |
customer_profile = mock_data_service.get_customer_profile(customer_id) | |
return jsonify({ | |
'success': True, | |
'customer_id': customer_id, | |
'recipient_email': recipient_email, | |
'subject': f"π€ AI-Powered Recommendations for {customer_profile.get('customerName', f'Customer {customer_id}')}", | |
'email_type': email_type, | |
'timestamp': datetime.now().isoformat(), | |
'mock_data': True, | |
'message': 'Mock email generated successfully (integrated email service)' | |
}) | |
else: | |
return jsonify({ | |
'success': False, | |
'error': 'Email service not available and no mock data' | |
}), 500 | |
except Exception as e: | |
logger.error(f"Error sending automated email: {e}") | |
return jsonify({ | |
'success': False, | |
'error': f'Email service not available: {str(e)}' | |
}), 500 | |
def get_email_status(): | |
"""Get email automation status""" | |
return jsonify({ | |
'success': True, | |
'service_status': 'integrated', | |
'email_config': { | |
'sender': '[email protected]', | |
'recipient': '[email protected]', | |
'smtp_server': 'Integrated Service' | |
}, | |
'ai_engine_loaded': True, | |
'timestamp': datetime.now().isoformat(), | |
'message': 'Email service integrated into main API service' | |
}) | |
def email_preview(customer_id): | |
"""Generate email preview for a customer""" | |
logger.info(f"π§ Generating email preview for customer {customer_id}") | |
try: | |
# Get customer data | |
customer_data = self.make_api_request(f'/api/lead-analysis/{customer_id}') | |
if not customer_data or 'data' not in customer_data: | |
return jsonify({ | |
'success': False, | |
'error': 'Customer data not found' | |
}), 404 | |
# Process with AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Generate email content | |
recipient_email = request.args.get('email', '[email protected]') | |
email_result = ai_engine.process_customer_with_ai( | |
customer_id, recipient_email, customer_data | |
) | |
if not email_result.get('success'): | |
return jsonify({ | |
'success': False, | |
'error': 'Failed to generate email content' | |
}), 500 | |
return jsonify({ | |
'success': True, | |
'customer_id': customer_id, | |
'email_preview': email_result, | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Error generating email preview: {e}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def email_preview_page(customer_id): | |
"""Email preview page""" | |
try: | |
# Get email content | |
email_response = self.get_email_preview(customer_id) | |
email_data = email_response.json | |
if not email_data.get('success'): | |
return f"Error: {email_data.get('error', 'Unknown error')}" | |
email_content = email_data['email_content'] | |
# Create HTML page with email content | |
html_content = f""" | |
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<title>Email Preview - Customer {customer_id}</title> | |
<style> | |
body {{ | |
font-family: Arial, sans-serif; | |
margin: 20px; | |
background-color: #f5f5f5; | |
}} | |
.container {{ | |
max-width: 800px; | |
margin: 0 auto; | |
background: white; | |
padding: 20px; | |
border-radius: 8px; | |
box-shadow: 0 2px 10px rgba(0,0,0,0.1); | |
}} | |
.header {{ | |
background: #2c3e50; | |
color: white; | |
padding: 20px; | |
border-radius: 8px; | |
margin-bottom: 20px; | |
}} | |
.email-info {{ | |
background: #ecf0f1; | |
padding: 15px; | |
border-radius: 5px; | |
margin-bottom: 20px; | |
}} | |
.email-content {{ | |
border: 1px solid #ddd; | |
border-radius: 5px; | |
overflow: hidden; | |
}} | |
.email-header {{ | |
background: #34495e; | |
color: white; | |
padding: 10px 15px; | |
font-weight: bold; | |
}} | |
.email-body {{ | |
padding: 0; | |
}} | |
.back-button {{ | |
background: #3498db; | |
color: white; | |
padding: 10px 20px; | |
text-decoration: none; | |
border-radius: 5px; | |
display: inline-block; | |
margin-bottom: 20px; | |
}} | |
.back-button:hover {{ | |
background: #2980b9; | |
}} | |
</style> | |
</head> | |
<body> | |
<div class="container"> | |
<div class="header"> | |
<h1>π§ Email Preview</h1> | |
<p>Customer ID: {customer_id}</p> | |
</div> | |
<a href="/customer/{customer_id}" class="back-button">β Back to Customer Analysis</a> | |
<div class="email-info"> | |
<h3>Email Details:</h3> | |
<p><strong>To:</strong> {email_content['recipient_email']}</p> | |
<p><strong>Subject:</strong> {email_content['subject']}</p> | |
<p><strong>Generated:</strong> {email_content['timestamp']}</p> | |
<p><strong>Recommendations:</strong> {email_data['recommendations_count']} properties</p> | |
</div> | |
<div class="email-content"> | |
<div class="email-header"> | |
Email Content Preview | |
</div> | |
<div class="email-body"> | |
{email_content['html_content']} | |
</div> | |
</div> | |
</div> | |
</body> | |
</html> | |
""" | |
return html_content | |
except Exception as e: | |
logger.error(f"Error creating email preview page: {e}") | |
return f"Error: {str(e)}" | |
def download_email(filename): | |
"""Download saved email file""" | |
try: | |
import os | |
emails_dir = "/tmp/emails" | |
file_path = os.path.join(emails_dir, filename) | |
if not os.path.exists(file_path): | |
return jsonify({ | |
'success': False, | |
'error': 'Email file not found' | |
}), 404 | |
from flask import send_file | |
return send_file(file_path, as_attachment=True) | |
except Exception as e: | |
logger.error(f"Error downloading email file: {e}") | |
return jsonify({ | |
'success': False, | |
'error': f'Failed to download email: {str(e)}' | |
}), 500 | |
def list_emails(): | |
"""List all saved email files""" | |
try: | |
import os | |
emails_dir = "/tmp/emails" | |
if not os.path.exists(emails_dir): | |
return jsonify({ | |
'success': True, | |
'emails': [] | |
}) | |
emails = [] | |
for filename in os.listdir(emails_dir): | |
if filename.endswith('.html'): | |
file_path = os.path.join(emails_dir, filename) | |
file_stat = os.stat(file_path) | |
emails.append({ | |
'filename': filename, | |
'size': file_stat.st_size, | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'download_url': f'/api/download-email/{filename}' | |
}) | |
# Sort by creation time (newest first) | |
emails.sort(key=lambda x: x['created'], reverse=True) | |
return jsonify({ | |
'success': True, | |
'emails': emails | |
}) | |
except Exception as e: | |
logger.error(f"Error listing email files: {e}") | |
return jsonify({ | |
'success': False, | |
'error': f'Failed to list emails: {str(e)}' | |
}), 500 | |
def saved_emails_page(): | |
"""Page to view and download saved emails""" | |
try: | |
import os | |
emails_dir = "/tmp/emails" | |
if not os.path.exists(emails_dir): | |
emails = [] | |
else: | |
emails = [] | |
for filename in os.listdir(emails_dir): | |
if filename.endswith('.html'): | |
file_path = os.path.join(emails_dir, filename) | |
file_stat = os.stat(file_path) | |
emails.append({ | |
'filename': filename, | |
'size': file_stat.st_size, | |
'created': datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'), | |
'download_url': f'/api/download-email/{filename}' | |
}) | |
# Sort by creation time (newest first) | |
emails.sort(key=lambda x: x['created'], reverse=True) | |
# Generate email cards HTML | |
email_cards_html = "" | |
if emails: | |
for email in emails: | |
email_cards_html += f""" | |
<div class="email-card"> | |
<div class="row align-items-center"> | |
<div class="col-md-8"> | |
<h5><i class="fas fa-file-alt"></i> {email.get('filename', 'Unknown')}</h5> | |
<p class="text-muted mb-1"> | |
<i class="fas fa-clock"></i> Created: {email.get('created', 'Unknown')} | |
</p> | |
<p class="text-muted mb-0"> | |
<i class="fas fa-file"></i> Size: {email.get('size', 0):,} bytes | |
</p> | |
</div> | |
<div class="col-md-4 text-end"> | |
<a href="{email.get('download_url', '#')}" class="download-btn"> | |
<i class="fas fa-download"></i> Download HTML | |
</a> | |
</div> | |
</div> | |
</div> | |
""" | |
# Generate content section | |
if emails: | |
content_section = f""" | |
<div class="row"> | |
<div class="col-12"> | |
<h3>π§ Generated Emails ({len(emails)} total)</h3> | |
{email_cards_html} | |
</div> | |
</div> | |
""" | |
else: | |
content_section = """ | |
<div class="row"> | |
<div class="col-12"> | |
<div class="text-center py-5"> | |
<i class="fas fa-inbox fa-3x text-muted mb-3"></i> | |
<h3>No emails generated yet</h3> | |
<p class="text-muted">Generate some AI recommendations to see saved emails here.</p> | |
</div> | |
</div> | |
</div> | |
""" | |
html_content = f""" | |
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<title>Saved Emails</title> | |
<link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" rel="stylesheet"> | |
<link href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css" rel="stylesheet"> | |
<style> | |
body {{ | |
background-color: #f8f9fa; | |
padding-top: 2rem; | |
}} | |
.header {{ | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
color: white; | |
padding: 2rem 0; | |
margin-bottom: 2rem; | |
}} | |
.email-card {{ | |
background: white; | |
border-radius: 10px; | |
box-shadow: 0 2px 10px rgba(0,0,0,0.1); | |
margin-bottom: 1rem; | |
padding: 1.5rem; | |
}} | |
.download-btn {{ | |
background: #28a745; | |
color: white; | |
border: none; | |
padding: 0.5rem 1rem; | |
border-radius: 5px; | |
text-decoration: none; | |
display: inline-block; | |
}} | |
.download-btn:hover {{ | |
background: #218838; | |
color: white; | |
text-decoration: none; | |
}} | |
</style> | |
</head> | |
<body> | |
<div class="header"> | |
<div class="container"> | |
<h1><i class="fas fa-envelope"></i> Saved Emails</h1> | |
<p class="mb-0">Download and view generated email content</p> | |
</div> | |
</div> | |
<div class="container"> | |
<div class="row mb-4"> | |
<div class="col-md-6"> | |
<a href="/" class="btn btn-outline-primary"> | |
<i class="fas fa-arrow-left"></i> Back to Home | |
</a> | |
</div> | |
<div class="col-md-6 text-end"> | |
<button class="btn btn-outline-secondary" onclick="location.reload()"> | |
<i class="fas fa-sync"></i> Refresh | |
</button> | |
</div> | |
</div> | |
{content_section} | |
</div> | |
</body> | |
</html> | |
""" | |
return html_content | |
except Exception as e: | |
logger.error(f"Error creating saved emails page: {e}") | |
return f"Error: {str(e)}" | |
def get_properties_parallel(): | |
"""Get properties using parallel processing for better performance""" | |
logger.info("π Fetching properties using parallel processing...") | |
try: | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import requests | |
# Configuration | |
max_workers = int(request.args.get('workers', 8)) | |
page_size = int(request.args.get('page_size', 100)) | |
max_pages = int(request.args.get('max_pages', 50)) | |
logger.info(f"π§ Using {max_workers} workers, {page_size} properties per page, max {max_pages} pages") | |
def fetch_properties_page(page_num): | |
"""Fetch a single page of properties""" | |
try: | |
url = f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails" | |
params = { | |
'pageNumber': page_num, | |
'pageSize': page_size | |
} | |
response = requests.get(url, params=params, timeout=None, verify=False) | |
response.raise_for_status() | |
data = response.json() | |
# Handle different response structures | |
if isinstance(data, list): | |
properties = data | |
elif isinstance(data, dict): | |
properties = data.get('data', data.get('properties', data.get('results', []))) | |
else: | |
properties = [] | |
logger.info(f"β Page {page_num}: {len(properties)} properties") | |
return properties | |
except Exception as e: | |
logger.error(f"β Error fetching page {page_num}: {e}") | |
return [] | |
# Use ThreadPoolExecutor for parallel fetching | |
all_properties = [] | |
completed_pages = 0 | |
empty_pages = 0 | |
with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
# Submit tasks for all pages | |
future_to_page = { | |
executor.submit(fetch_properties_page, page_num): page_num | |
for page_num in range(1, max_pages + 1) | |
} | |
for future in as_completed(future_to_page): | |
page_num = future_to_page[future] | |
try: | |
properties = future.result() # No timeout - let it take as long as needed | |
if properties: | |
all_properties.extend(properties) | |
completed_pages += 1 | |
logger.info(f"β Parallel fetch - Page {page_num}: {len(properties)} properties (Total: {len(all_properties)})") | |
else: | |
empty_pages += 1 | |
logger.info(f"π Page {page_num} is empty (Empty pages: {empty_pages})") | |
# Stop if we've hit too many empty pages in a row | |
if empty_pages >= 3: | |
logger.info("π Stopping fetch - too many consecutive empty pages") | |
break | |
except Exception as e: | |
logger.error(f"β Failed to fetch page {page_num}: {e}") | |
# Continue with other pages | |
logger.info(f"π Parallel fetch completed: {len(all_properties)} total properties from {completed_pages} pages!") | |
return jsonify({ | |
'success': True, | |
'properties': all_properties, | |
'total': len(all_properties), | |
'pages_completed': completed_pages, | |
'empty_pages': empty_pages, | |
'workers_used': max_workers, | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Error in parallel property fetch: {e}") | |
return jsonify({ | |
'success': False, | |
'error': str(e), | |
'properties': [] | |
}), 500 | |
def analyze_automated_emails(customer_id): | |
"""Analyze what automated emails would be sent for a customer""" | |
logger.info(f"π€ Analyzing automated email triggers for customer {customer_id}") | |
try: | |
# Get customer analysis using our AI model (not from backend) | |
logger.info(f"π Generating lead analysis for customer {customer_id} using AI model") | |
# Try to fetch data from backend first for property data | |
backend_response = None | |
try: | |
logger.info(f"π Making backend API request to: {self.config['backend_url']}/api/PropertyLeadQualification/customer/{customer_id}") | |
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
logger.info(f"π₯ Backend response received for customer {customer_id}: {len(backend_response) if isinstance(backend_response, list) else 'No data'}") | |
except Exception as e: | |
logger.warning(f"β οΈ Backend API failed: {e}") | |
# If backend fails, use mock data | |
if not backend_response and MOCK_DATA_AVAILABLE: | |
logger.info(f"π Using mock data for customer {customer_id}") | |
backend_response = mock_data_service.get_customer_data(customer_id) | |
logger.info(f"π₯ Mock data generated for customer {customer_id}: {len(backend_response)} properties") | |
if not backend_response: | |
return jsonify({ | |
'success': False, | |
'error': 'No property data found for this customer' | |
}), 404 | |
# Process lead data using our AI model | |
logger.info(f"βοΈ Processing lead data for customer {customer_id}") | |
processed_data = self.process_lead_data(backend_response, customer_id) | |
# Generate analytics using our AI model | |
logger.info(f"π§ Generating analytics for customer {customer_id}") | |
analytics = self.generate_analytics(backend_response, customer_id) | |
# Create customer analysis data structure | |
customer_data = { | |
'success': True, | |
'customer_id': customer_id, | |
'timestamp': datetime.now().isoformat(), | |
'data': { | |
'lead_qualification': analytics.get('lead_qualification', {}), | |
'analytics': { | |
'engagement_level': analytics.get('engagement_level'), | |
'preferred_property_types': analytics.get('preferred_property_types', []), | |
'price_preferences': analytics.get('price_preferences', {}), | |
'viewing_patterns': analytics.get('viewing_patterns', {}), | |
'property_analysis': analytics.get('property_analysis', {}), | |
'recommendations': analytics.get('recommendations', []), | |
'risk_assessment': analytics.get('risk_assessment', {}), | |
'opportunity_score': analytics.get('opportunity_score'), | |
'lead_timeline': analytics.get('lead_timeline', []), | |
'conversion_probability': analytics.get('conversion_probability', {}) | |
}, | |
'properties': processed_data.get('properties', []), | |
'summary': processed_data.get('summary', {}) | |
} | |
} | |
# Get AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Analyze automated email triggers using our AI model | |
trigger_analysis = ai_engine.analyze_automated_email_triggers(customer_data) | |
if not trigger_analysis.get('success'): | |
return jsonify({ | |
'success': False, | |
'error': 'Failed to analyze email triggers' | |
}), 500 | |
return jsonify(trigger_analysis) | |
except Exception as e: | |
logger.error(f"Error analyzing automated emails: {e}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def test_automated_emails(customer_id): | |
"""Test sending all automated emails for a customer""" | |
logger.info(f"π§ͺ Testing automated emails for customer {customer_id}") | |
try: | |
data = request.get_json() or {} | |
recipient_email = data.get('email', '[email protected]') | |
# Get customer analysis using our AI model (not from backend) | |
logger.info(f"π Generating lead analysis for customer {customer_id} using AI model") | |
# Try to fetch data from backend first for property data | |
backend_response = None | |
try: | |
logger.info(f"π Making backend API request to: {self.config['backend_url']}/api/PropertyLeadQualification/customer/{customer_id}") | |
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
logger.info(f"π₯ Backend response received for customer {customer_id}: {len(backend_response) if isinstance(backend_response, list) else 'No data'}") | |
except Exception as e: | |
logger.warning(f"β οΈ Backend API failed: {e}") | |
# If backend fails, use mock data | |
if not backend_response and MOCK_DATA_AVAILABLE: | |
logger.info(f"π Using mock data for customer {customer_id}") | |
backend_response = mock_data_service.get_customer_data(customer_id) | |
logger.info(f"π₯ Mock data generated for customer {customer_id}: {len(backend_response)} properties") | |
if not backend_response: | |
return jsonify({ | |
'success': False, | |
'error': 'No property data found for this customer' | |
}), 404 | |
# Process lead data using our AI model | |
logger.info(f"βοΈ Processing lead data for customer {customer_id}") | |
processed_data = self.process_lead_data(backend_response, customer_id) | |
# Generate analytics using our AI model | |
logger.info(f"π§ Generating analytics for customer {customer_id}") | |
analytics = self.generate_analytics(backend_response, customer_id) | |
# Create customer analysis data structure | |
customer_data = { | |
'success': True, | |
'customer_id': customer_id, | |
'timestamp': datetime.now().isoformat(), | |
'data': { | |
'lead_qualification': analytics.get('lead_qualification', {}), | |
'analytics': { | |
'engagement_level': analytics.get('engagement_level'), | |
'preferred_property_types': analytics.get('preferred_property_types', []), | |
'price_preferences': analytics.get('price_preferences', {}), | |
'viewing_patterns': analytics.get('viewing_patterns', {}), | |
'property_analysis': analytics.get('property_analysis', {}), | |
'recommendations': analytics.get('recommendations', []), | |
'risk_assessment': analytics.get('risk_assessment', {}), | |
'opportunity_score': analytics.get('opportunity_score'), | |
'lead_timeline': analytics.get('lead_timeline', []), | |
'conversion_probability': analytics.get('conversion_probability', {}) | |
}, | |
'properties': processed_data.get('properties', []), | |
'summary': processed_data.get('summary', {}) | |
} | |
} | |
# Get AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Analyze triggers using our AI model | |
logger.info(f"π€ Analyzing automated email triggers for customer {customer_id}") | |
trigger_analysis = ai_engine.analyze_automated_email_triggers(customer_data) | |
if not trigger_analysis.get('success'): | |
return jsonify({ | |
'success': False, | |
'error': 'Failed to analyze email triggers' | |
}), 500 | |
# Send emails for each trigger | |
sent_emails = [] | |
failed_emails = [] | |
for trigger in trigger_analysis.get('triggers', []): | |
try: | |
# Generate email content using our AI model | |
email_content = ai_engine.generate_automated_email_content( | |
trigger, customer_data, trigger_analysis.get('ai_insights', {}) | |
) | |
if email_content.get('success'): | |
# Send email | |
success = ai_engine.send_email(recipient_email, email_content) | |
if success: | |
sent_emails.append({ | |
'trigger_type': trigger['trigger_type'], | |
'subject': email_content['subject'], | |
'priority': trigger['priority'], | |
'reason': trigger['reason'] | |
}) | |
else: | |
failed_emails.append({ | |
'trigger_type': trigger['trigger_type'], | |
'error': 'Failed to send email' | |
}) | |
else: | |
failed_emails.append({ | |
'trigger_type': trigger['trigger_type'], | |
'error': email_content.get('error', 'Failed to generate content') | |
}) | |
except Exception as e: | |
failed_emails.append({ | |
'trigger_type': trigger['trigger_type'], | |
'error': str(e) | |
}) | |
return jsonify({ | |
'success': True, | |
'customer_id': customer_id, | |
'recipient_email': recipient_email, | |
'total_triggers': len(trigger_analysis.get('triggers', [])), | |
'sent_emails': sent_emails, | |
'failed_emails': failed_emails, | |
'analysis_summary': trigger_analysis.get('analysis_summary', {}), | |
'ai_insights': trigger_analysis.get('ai_insights', {}), | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Error testing automated emails: {e}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def fetch_all_properties(): | |
"""Fetch all 600+ properties and store in ChromaDB""" | |
try: | |
logger.info("π Starting fetch of ALL 600+ properties...") | |
# Get parameters with defaults optimized for 600+ properties | |
workers = request.args.get('workers', 10, type=int) | |
page_size = request.args.get('page_size', 100, type=int) # Increased page size | |
max_pages = request.args.get('max_pages', 50, type=int) # Increased max pages | |
logger.info(f"π Fetch parameters: {workers} workers, {page_size} per page, max {max_pages} pages") | |
# Get AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Fetch all properties using parallel processing | |
success = ai_engine.fetch_all_properties_parallel( | |
max_workers=workers, | |
page_size=page_size, | |
max_pages=max_pages | |
) | |
if success: | |
# Get final count from ChromaDB | |
total_stored = 0 | |
if hasattr(ai_engine, 'properties_collection') and ai_engine.properties_collection: | |
try: | |
total_stored = ai_engine.properties_collection.count() | |
except: | |
total_stored = "Unknown" | |
return jsonify({ | |
'success': True, | |
'message': f'Successfully fetched and stored properties', | |
'total_properties': total_stored, | |
'parameters_used': { | |
'workers': workers, | |
'page_size': page_size, | |
'max_pages': max_pages | |
}, | |
'timestamp': datetime.now().isoformat() | |
}) | |
else: | |
return jsonify({ | |
'success': False, | |
'error': 'Failed to fetch properties' | |
}), 500 | |
except Exception as e: | |
logger.error(f"β Error fetching all properties: {e}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def send_multi_ai_recommendations(customer_id): | |
"""Send multiple AI-powered recommendation emails (property-based, price-based, location-based)""" | |
logger.info(f"π€ Multi-AI recommendation request for customer {customer_id}") | |
try: | |
data = request.get_json() or {} | |
recipient_email = data.get('email', '[email protected]') | |
email_count = data.get('email_count', 10) # Send 10 emails by default | |
# Get customer analysis data | |
analysis_data = self._get_analysis_data_parallel(customer_id) | |
if not analysis_data: | |
return jsonify({ | |
'success': False, | |
'error': 'No customer data found' | |
}), 404 | |
# Get AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Generate AI insights | |
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
# Create different types of recommendations | |
recommendation_types = [ | |
'property_based', | |
'price_based', | |
'location_based', | |
'similarity_based', | |
'behavioral_based', | |
'premium_properties', | |
'budget_friendly', | |
'trending_properties', | |
'family_oriented', | |
'investment_opportunities' | |
] | |
sent_emails = [] | |
failed_emails = [] | |
# Send multiple recommendation emails | |
for i in range(min(email_count, len(recommendation_types))): | |
rec_type = recommendation_types[i] | |
try: | |
logger.info(f"π§ Generating {rec_type} recommendations for customer {customer_id}") | |
# Get specialized recommendations based on type | |
recommendations = ai_engine.get_enhanced_ai_recommendations( | |
analysis_data, ai_insights, count=5 | |
) | |
# Generate personalized email | |
email_result = ai_engine.generate_personalized_email( | |
analysis_data, ai_insights, recommendations, recipient_email | |
) | |
if email_result.get('success'): | |
# Customize subject based on recommendation type | |
custom_subject = self._get_custom_subject(rec_type, customer_id, ai_insights) | |
email_result['subject'] = custom_subject | |
# Send email | |
success = ai_engine.send_email(recipient_email, email_result) | |
if success: | |
sent_emails.append({ | |
'type': rec_type, | |
'subject': custom_subject, | |
'recommendations_count': len(recommendations), | |
'timestamp': datetime.now().isoformat() | |
}) | |
logger.info(f"β {rec_type} email sent successfully") | |
else: | |
failed_emails.append({ | |
'type': rec_type, | |
'error': 'Failed to send email' | |
}) | |
else: | |
failed_emails.append({ | |
'type': rec_type, | |
'error': email_result.get('error', 'Failed to generate email') | |
}) | |
except Exception as e: | |
failed_emails.append({ | |
'type': rec_type, | |
'error': str(e) | |
}) | |
logger.error(f"β Error with {rec_type}: {e}") | |
return jsonify({ | |
'success': True, | |
'customer_id': customer_id, | |
'recipient_email': recipient_email, | |
'requested_emails': email_count, | |
'sent_emails': sent_emails, | |
'failed_emails': failed_emails, | |
'total_sent': len(sent_emails), | |
'total_failed': len(failed_emails), | |
'ai_insights': ai_insights, | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"β Error in multi-AI recommendations: {e}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def get_chromadb_recommendations(customer_id): | |
"""Get property recommendations using ChromaDB similarity search""" | |
logger.info(f"π ChromaDB recommendations for customer {customer_id}") | |
try: | |
# Get customer analysis data | |
analysis_data = self._get_analysis_data_parallel(customer_id) | |
if not analysis_data: | |
return jsonify({ | |
'success': False, | |
'error': 'No customer data found' | |
}), 404 | |
# Get AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Generate AI insights | |
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
# Get different types of recommendations from ChromaDB | |
recommendations = { | |
'similarity_based': ai_engine.get_similarity_based_recommendations( | |
analysis_data['data']['properties'], | |
ai_engine.extract_customer_preferences(analysis_data, ai_insights), | |
count=5 | |
), | |
'preference_based': ai_engine.get_preference_based_recommendations( | |
ai_engine.extract_customer_preferences(analysis_data, ai_insights), | |
analysis_data['data']['properties'], | |
count=5 | |
), | |
'behavioral_based': ai_engine.get_behavioral_recommendations( | |
analysis_data, ai_insights, count=5 | |
) | |
} | |
# Add AI explanations | |
for rec_type, recs in recommendations.items(): | |
recommendations[rec_type] = ai_engine.add_ai_explanations( | |
recs, | |
ai_engine.extract_customer_preferences(analysis_data, ai_insights), | |
ai_insights | |
) | |
return jsonify({ | |
'success': True, | |
'customer_id': customer_id, | |
'recommendations': recommendations, | |
'ai_insights': ai_insights, | |
'total_recommendations': sum(len(recs) for recs in recommendations.values()), | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"β Error getting ChromaDB recommendations: {e}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def test_sendgrid_connection(): | |
"""Test if SendGrid connection is working""" | |
logger.info("π§ͺ Testing SendGrid connection") | |
try: | |
# Get AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Test SendGrid | |
test_result = ai_engine.test_sendgrid_email('[email protected]') | |
return jsonify({ | |
'success': test_result.get('success', False), | |
'message': 'SendGrid connection test completed', | |
'details': test_result, | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"β Error testing SendGrid: {e}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def get_email_analysis_basis(customer_id): | |
"""Get analysis basis for email recommendations""" | |
logger.info(f"π Getting email analysis basis for customer {customer_id}") | |
try: | |
# Get customer analysis data | |
analysis_data = self._get_analysis_data_parallel(customer_id) | |
if not analysis_data: | |
return jsonify({ | |
'success': False, | |
'error': 'No customer data found' | |
}), 404 | |
# Get AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Generate AI insights | |
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
# Extract customer preferences | |
preferences = ai_engine.extract_customer_preferences(analysis_data, ai_insights) | |
# Analyze behavioral patterns | |
behavioral_patterns = ai_engine.analyze_behavioral_patterns(analysis_data, ai_insights) | |
return jsonify({ | |
'success': True, | |
'customer_id': customer_id, | |
'analysis_basis': { | |
'customer_preferences': preferences, | |
'ai_insights': ai_insights, | |
'behavioral_patterns': behavioral_patterns, | |
'viewed_properties': analysis_data['data']['properties'][:5], # Sample properties | |
'lead_qualification': analysis_data['data']['lead_qualification'], | |
'engagement_level': analysis_data['data']['analytics']['engagement_level'], | |
'conversion_probability': analysis_data['data']['analytics']['conversion_probability']['final_probability'] | |
}, | |
'email_triggers': { | |
'property_based': 'Based on property types you viewed most', | |
'price_based': f"Based on your price range βΉ{preferences.get('price_range', {}).get('min_price', 0)} - βΉ{preferences.get('price_range', {}).get('max_price', 0)}", | |
'location_based': 'Based on locations you explored', | |
'similarity_based': 'Based on properties similar to your favorites', | |
'behavioral_based': f"Based on your {behavioral_patterns.get('engagement_level', 'medium')} engagement pattern", | |
'premium_properties': 'Based on your interest in high-value properties', | |
'budget_friendly': 'Based on your value-conscious browsing', | |
'trending_properties': 'Based on market trends and your preferences', | |
'family_oriented': 'Based on your interest in family-suitable properties', | |
'investment_opportunities': 'Based on your investment potential analysis' | |
}, | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"β Error getting email analysis basis: {e}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def preview_all_email_content(customer_id): | |
"""Preview content of all 10 email types that would be sent""" | |
logger.info(f"ποΈ Previewing all email content for customer {customer_id}") | |
try: | |
# Get customer analysis data | |
analysis_data = self._get_analysis_data_parallel(customer_id) | |
if not analysis_data: | |
return jsonify({ | |
'success': False, | |
'error': 'No customer data found' | |
}), 404 | |
# Get AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Generate AI insights | |
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
# Ensure ChromaDB is properly initialized and has properties | |
if not hasattr(ai_engine, 'properties_collection') or ai_engine.properties_collection is None: | |
logger.info("π ChromaDB not initialized, initializing...") | |
ai_engine.initialize_chromadb() | |
# Check if ChromaDB has properties | |
try: | |
if ai_engine.properties_collection: | |
collection_count = ai_engine.properties_collection.count() | |
logger.info(f"π ChromaDB has {collection_count} properties") | |
if collection_count == 0: | |
logger.info("π ChromaDB is empty, auto-fetching properties...") | |
ai_engine.auto_fetch_and_store_properties() | |
else: | |
logger.warning("β οΈ ChromaDB collection is None, auto-fetching properties...") | |
ai_engine.auto_fetch_and_store_properties() | |
except Exception as count_error: | |
logger.warning(f"β οΈ Could not check ChromaDB count: {count_error}") | |
logger.info("π Auto-fetching properties as fallback...") | |
ai_engine.auto_fetch_and_store_properties() | |
# Email types to generate | |
email_types = [ | |
'property_based', | |
'price_based', | |
'location_based', | |
'similarity_based', | |
'behavioral_based', | |
'premium_properties', | |
'budget_friendly', | |
'trending_properties', | |
'family_oriented', | |
'investment_opportunities' | |
] | |
email_previews = [] | |
for email_type in email_types: | |
try: | |
# Get recommendations using specialized AI model for this type | |
recommendations = ai_engine.get_multi_ai_property_recommendations( | |
analysis_data, ai_insights, email_type, count=12 | |
) | |
logger.info(f"π€ AI Model for {email_type} preview: {len(recommendations)} properties") | |
# Add fallback if needed | |
if len(recommendations) < 10: | |
fallback_recommendations = ai_engine.search_similar_properties_chromadb( | |
f"property {email_type.replace('_', ' ')} recommendation", [], 12 | |
) | |
# Merge without duplicates | |
seen_ids = {str(r.get('id', r.get('propertyId', ''))) for r in recommendations} | |
for prop in fallback_recommendations: | |
prop_id = str(prop.get('id', prop.get('propertyId', ''))) | |
if prop_id not in seen_ids and len(recommendations) < 12: | |
recommendations.append(prop) | |
seen_ids.add(prop_id) | |
# Generate email content | |
email_content = ai_engine.generate_personalized_email( | |
analysis_data, ai_insights, recommendations, '[email protected]', email_type | |
) | |
if email_content.get('success'): | |
email_previews.append({ | |
'email_type': email_type, | |
'subject': email_content.get('subject', self._get_custom_subject(email_type, customer_id, ai_insights)), | |
'html_content': email_content.get('html_content', ''), | |
'text_content': email_content.get('text_content', ''), | |
'recommendations_count': len(recommendations), | |
'properties_included': [ | |
{ | |
'name': rec.get('property_name', rec.get('name', rec.get('title', 'Property'))), | |
'price': rec.get('price', rec.get('marketValue', rec.get('amount', 0))), | |
'type': rec.get('property_type', rec.get('type', rec.get('propertyTypeName', 'N/A'))), | |
'score': rec.get('ai_score', rec.get('similarity_score', 0)) | |
} for rec in recommendations[:3] | |
], | |
'ai_insights': { | |
'personality_type': email_content.get('personalization', {}).get('personality_type', 'N/A'), | |
'decision_style': ai_insights.get('recommendation_strategy', 'N/A'), | |
'urgency_level': email_content.get('personalization', {}).get('urgency_level', 'N/A'), | |
'peak_activity': ai_insights.get('peak_time', 'N/A') | |
} | |
}) | |
else: | |
email_previews.append({ | |
'email_type': email_type, | |
'subject': self._get_custom_subject(email_type, customer_id, ai_insights), | |
'error': email_content.get('error', 'Failed to generate content'), | |
'recommendations_count': 0, | |
'properties_included': [] | |
}) | |
except Exception as e: | |
logger.error(f"β Error generating {email_type}: {e}") | |
email_previews.append({ | |
'email_type': email_type, | |
'subject': self._get_custom_subject(email_type, customer_id, ai_insights), | |
'error': str(e), | |
'recommendations_count': 0, | |
'properties_included': [] | |
}) | |
return jsonify({ | |
'success': True, | |
'customer_id': customer_id, | |
'email_previews': email_previews, | |
'total_emails': len(email_previews), | |
'ai_insights': ai_insights, | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"β Error previewing email content: {e}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def test_send_all_10_emails(customer_id): | |
"""Test sending all 10 different email types with unique ChromaDB properties""" | |
logger.info(f"π§ͺ Testing all 10 email types for customer {customer_id}") | |
try: | |
data = request.get_json() or {} | |
recipient_email = data.get('email', '[email protected]') | |
# Get customer analysis data | |
analysis_data = self._get_analysis_data_parallel(customer_id) | |
if not analysis_data: | |
return jsonify({ | |
'success': False, | |
'error': 'No customer data found' | |
}), 404 | |
# Get AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Generate AI insights | |
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
# Ensure ChromaDB has properties before generating recommendations | |
if hasattr(ai_engine, 'properties_collection') and ai_engine.properties_collection: | |
try: | |
collection_count = ai_engine.properties_collection.count() | |
if collection_count == 0: | |
logger.info("π ChromaDB is empty, auto-fetching properties...") | |
ai_engine.auto_fetch_and_store_properties() | |
except Exception as count_error: | |
logger.warning(f"β οΈ Could not check ChromaDB count: {count_error}") | |
# Try to fetch anyway | |
ai_engine.auto_fetch_and_store_properties() | |
else: | |
logger.warning("β οΈ ChromaDB not initialized, auto-initializing...") | |
ai_engine.initialize_chromadb() | |
ai_engine.auto_fetch_and_store_properties() | |
# Email types with their specific ChromaDB query strategies | |
email_strategies = { | |
'property_based': 'Get properties matching customer\'s preferred property types', | |
'price_based': 'Get properties within customer\'s budget range', | |
'location_based': 'Get properties in customer\'s preferred locations', | |
'similarity_based': 'Get properties similar to customer\'s viewed properties', | |
'behavioral_based': 'Get properties based on customer behavior patterns', | |
'premium_properties': 'Get high-end luxury properties', | |
'budget_friendly': 'Get value-for-money properties', | |
'trending_properties': 'Get currently trending market properties', | |
'family_oriented': 'Get family-suitable properties with amenities', | |
'investment_opportunities': 'Get properties with high ROI potential' | |
} | |
sent_emails = [] | |
failed_emails = [] | |
for email_type, strategy in email_strategies.items(): | |
try: | |
logger.info(f"π§ Processing {email_type} email with strategy: {strategy}") | |
# Use specialized AI models for each email type - request 15 to ensure we get at least 10 | |
recommendations = ai_engine.get_multi_ai_property_recommendations( | |
analysis_data, ai_insights, email_type, count=15 | |
) | |
logger.info(f"π€ AI Model for {email_type} retrieved {len(recommendations)} properties") | |
# If we don't have enough properties, try a fallback query | |
if len(recommendations) < 10: | |
logger.warning(f"β οΈ Only {len(recommendations)} properties for {email_type}, trying fallback...") | |
fallback_recommendations = ai_engine.search_similar_properties_chromadb( | |
f"property {email_type.replace('_', ' ')} recommendation", [], 15 | |
) | |
# Merge without duplicates | |
seen_ids = {str(r.get('id', r.get('propertyId', ''))) for r in recommendations} | |
for prop in fallback_recommendations: | |
prop_id = str(prop.get('id', prop.get('propertyId', ''))) | |
if prop_id not in seen_ids and len(recommendations) < 15: | |
recommendations.append(prop) | |
seen_ids.add(prop_id) | |
logger.info(f"π After fallback: {len(recommendations)} properties for {email_type}") | |
# Generate personalized email with email type | |
email_result = ai_engine.generate_personalized_email( | |
analysis_data, ai_insights, recommendations, recipient_email, email_type | |
) | |
# Add email type to the content for better file naming | |
if isinstance(email_result, dict) and 'success' in email_result: | |
email_content = email_result | |
else: | |
email_content = {'success': True} | |
if email_content.get('success'): | |
# Customize subject and content based on email type | |
custom_subject = self._get_custom_subject(email_type, customer_id, ai_insights) | |
email_content['subject'] = custom_subject | |
# Send email | |
success = ai_engine.send_email(recipient_email, email_content) | |
if success: | |
sent_emails.append({ | |
'type': email_type, | |
'subject': custom_subject, | |
'strategy': strategy, | |
'recommendations_count': len(recommendations), | |
'properties_from_chromadb': True, | |
'timestamp': datetime.now().isoformat(), | |
'sample_properties': [ | |
{ | |
'name': rec.get('property_name', 'Property'), | |
'price': rec.get('price', 0), | |
'type': rec.get('property_type', 'N/A'), | |
'ai_score': rec.get('ai_score', 0) | |
} for rec in recommendations[:3] | |
] | |
}) | |
logger.info(f"β {email_type} email sent successfully") | |
else: | |
failed_emails.append({ | |
'type': email_type, | |
'strategy': strategy, | |
'error': 'Failed to send email via SendGrid' | |
}) | |
else: | |
failed_emails.append({ | |
'type': email_type, | |
'strategy': strategy, | |
'error': email_content.get('error', 'Failed to generate email content') | |
}) | |
except Exception as e: | |
failed_emails.append({ | |
'type': email_type, | |
'strategy': strategy, | |
'error': str(e) | |
}) | |
logger.error(f"β Error with {email_type}: {e}") | |
return jsonify({ | |
'success': True, | |
'customer_id': customer_id, | |
'recipient_email': recipient_email, | |
'test_results': { | |
'total_attempted': len(email_strategies), | |
'total_sent': len(sent_emails), | |
'total_failed': len(failed_emails), | |
'sent_emails': sent_emails, | |
'failed_emails': failed_emails | |
}, | |
'ai_insights': ai_insights, | |
'chromadb_status': 'Active - Properties retrieved from vector database', | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"β Error testing all 10 emails: {e}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def check_chromadb_status(): | |
"""Check ChromaDB status and property count""" | |
logger.info("π Checking ChromaDB status") | |
try: | |
# Get AI engine | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Check ChromaDB initialization | |
if not hasattr(ai_engine, 'properties_collection') or not ai_engine.properties_collection: | |
return jsonify({ | |
'success': True, | |
'chromadb_initialized': False, | |
'property_count': 0, | |
'message': 'ChromaDB not initialized' | |
}) | |
# Get property count | |
try: | |
property_count = ai_engine.properties_collection.count() | |
collection_name = ai_engine.properties_collection.name | |
return jsonify({ | |
'success': True, | |
'chromadb_initialized': True, | |
'collection_name': collection_name, | |
'property_count': property_count, | |
'status': 'ready' if property_count > 0 else 'empty', | |
'message': f'ChromaDB collection "{collection_name}" has {property_count} properties', | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as count_error: | |
return jsonify({ | |
'success': False, | |
'chromadb_initialized': True, | |
'error': f'Collection access error: {count_error}', | |
'message': 'ChromaDB collection may be corrupted' | |
}) | |
except Exception as e: | |
logger.error(f"β Error checking ChromaDB status: {e}") | |
return jsonify({ | |
'success': False, | |
'error': str(e) | |
}), 500 | |
def list_saved_emails(): | |
"""List all saved email files""" | |
try: | |
import os | |
import glob | |
from datetime import datetime | |
emails_dir = "./saved_emails" | |
if not os.path.exists(emails_dir): | |
return jsonify({'saved_emails': [], 'message': 'No saved emails directory found'}) | |
# Get all HTML files in the saved emails directory | |
email_files = glob.glob(os.path.join(emails_dir, "*.html")) | |
saved_emails = [] | |
for file_path in email_files: | |
file_name = os.path.basename(file_path) | |
file_stats = os.stat(file_path) | |
created_time = datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S') | |
# Extract email type from filename | |
email_type = file_name.split('_')[0] if '_' in file_name else 'unknown' | |
saved_emails.append({ | |
'filename': file_name, | |
'filepath': file_path, | |
'email_type': email_type.replace('_', ' ').title(), | |
'created_time': created_time, | |
'size_kb': round(file_stats.st_size / 1024, 2) | |
}) | |
# Sort by creation time (newest first) | |
saved_emails.sort(key=lambda x: x['created_time'], reverse=True) | |
return jsonify({ | |
'saved_emails': saved_emails, | |
'total_count': len(saved_emails), | |
'message': f'Found {len(saved_emails)} saved email files' | |
}) | |
except Exception as e: | |
logger.error(f"β Error listing saved emails: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def view_saved_email(filename): | |
"""Serve saved email file for viewing""" | |
try: | |
import os | |
from flask import send_file | |
emails_dir = "./saved_emails" | |
file_path = os.path.join(emails_dir, filename) | |
if not os.path.exists(file_path): | |
return "Email file not found", 404 | |
return send_file(file_path, mimetype='text/html') | |
except Exception as e: | |
logger.error(f"β Error serving email file: {e}") | |
return f"Error loading email: {str(e)}", 500 | |
def update_debt_financing(): | |
"""Update debt financing information""" | |
logger.info("π° Debt financing update request") | |
try: | |
data = request.get_json() | |
if not data: | |
return jsonify({ | |
'success': False, | |
'error': 'No data provided' | |
}), 400 | |
# Validate required fields | |
required_fields = ['customer_id', 'debt_amount', 'financing_type'] | |
for field in required_fields: | |
if field not in data: | |
return jsonify({ | |
'success': False, | |
'error': f'Missing required field: {field}' | |
}), 400 | |
# Process debt financing update | |
customer_id = data.get('customer_id') | |
debt_amount = data.get('debt_amount') | |
financing_type = data.get('financing_type') | |
# Create debt financing record | |
debt_record = { | |
'customer_id': customer_id, | |
'debt_amount': debt_amount, | |
'financing_type': financing_type, | |
'interest_rate': data.get('interest_rate', 0), | |
'term_months': data.get('term_months', 0), | |
'monthly_payment': data.get('monthly_payment', 0), | |
'status': data.get('status', 'pending'), | |
'updated_at': datetime.now().isoformat(), | |
'created_at': datetime.now().isoformat() | |
} | |
# Try to save to backend first | |
backend_response = None | |
try: | |
backend_response = self.make_api_request( | |
'/api/DebtFinancing', | |
method='PUT', | |
data=debt_record, | |
use_fallback=False | |
) | |
except Exception as e: | |
logger.warning(f"β οΈ Backend API failed for debt financing: {e}") | |
# If backend fails, use fallback | |
if not backend_response: | |
logger.info("π Using fallback for debt financing update") | |
backend_response = { | |
'success': True, | |
'message': 'Debt financing updated successfully (fallback mode)', | |
'data': debt_record, | |
'fallback': True | |
} | |
return jsonify(backend_response) | |
except Exception as e: | |
logger.error(f"β Error updating debt financing: {e}") | |
return jsonify({ | |
'success': False, | |
'error': f'Failed to update debt financing: {str(e)}' | |
}), 500 | |
def send_plain_text_email(): | |
"""Send plain text email using the external email API""" | |
logger.info("π§ Plain text email send request") | |
try: | |
# Get parameters from query string or JSON body | |
if request.is_json: | |
data = request.get_json() | |
to_email = data.get('toEmail') | |
subject = data.get('subject') | |
body = data.get('body') | |
else: | |
to_email = request.args.get('toEmail') | |
subject = request.args.get('subject') | |
body = request.args.get('body') | |
# Validate required parameters | |
if not to_email: | |
return jsonify({ | |
'success': False, | |
'error': 'toEmail parameter is required' | |
}), 400 | |
if not subject: | |
return jsonify({ | |
'success': False, | |
'error': 'subject parameter is required' | |
}), 400 | |
if not body: | |
return jsonify({ | |
'success': False, | |
'error': 'body parameter is required' | |
}), 400 | |
# Validate email format | |
if '@' not in to_email: | |
return jsonify({ | |
'success': False, | |
'error': 'Invalid email address format' | |
}), 400 | |
# Prepare email data for external API | |
email_data = { | |
'toEmail': to_email, | |
'subject': subject, | |
'body': body | |
} | |
# Send email using external API | |
external_api_url = f"{self.config['backend_url']}/api/Email/sendPlainText" | |
try: | |
logger.info(f"π Sending email via external API: {external_api_url}") | |
response = requests.post( | |
external_api_url, | |
params=email_data, | |
headers=self.config['ngrok_headers'], | |
timeout=30 | |
) | |
if response.status_code == 200: | |
logger.info(f"β Email sent successfully to {to_email}") | |
return jsonify({ | |
'success': True, | |
'message': 'Email sent successfully', | |
'recipient': to_email, | |
'subject': subject, | |
'timestamp': datetime.now().isoformat() | |
}) | |
else: | |
logger.error(f"β External email API failed: {response.status_code} - {response.text}") | |
return jsonify({ | |
'success': False, | |
'error': f'Email service failed: {response.status_code}', | |
'details': response.text | |
}), 500 | |
except requests.exceptions.Timeout: | |
logger.error("β° Email API request timeout") | |
return jsonify({ | |
'success': False, | |
'error': 'Email service timeout' | |
}), 504 | |
except requests.exceptions.ConnectionError: | |
logger.error("π Email API connection error") | |
return jsonify({ | |
'success': False, | |
'error': 'Email service unavailable' | |
}), 503 | |
except Exception as e: | |
logger.error(f"β Email API error: {e}") | |
return jsonify({ | |
'success': False, | |
'error': f'Email service error: {str(e)}' | |
}), 500 | |
except Exception as e: | |
logger.error(f"β Error sending plain text email: {e}") | |
return jsonify({ | |
'success': False, | |
'error': f'Failed to send email: {str(e)}' | |
}), 500 | |
def trigger_email_automation(customer_id): | |
"""Trigger comprehensive email automation based on user tracking analysis""" | |
logger.info(f"π§ Triggering email automation for customer {customer_id}") | |
try: | |
data = request.get_json() or {} | |
email_type = data.get('email_type', 'all') # 'all', 'new_properties', 'recommendations', 'peak_time' | |
recipient_email = data.get('email') | |
if not recipient_email: | |
return jsonify({ | |
'success': False, | |
'error': 'Email address is required' | |
}), 400 | |
# Get customer analysis data (use the same method as the main analysis) | |
cache_key = f'lead_analysis_{customer_id}' | |
analysis_data = self.get_cached_data(cache_key) | |
if not analysis_data: | |
# Generate fresh analysis data | |
logger.info(f"π Fetching fresh data for email automation - customer {customer_id}") | |
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
if not backend_response: | |
return jsonify({ | |
'success': False, | |
'error': 'No customer data found' | |
}), 404 | |
processed_data = self.process_lead_data(backend_response, customer_id) | |
analytics = self.generate_analytics(backend_response, customer_id) | |
analysis_data = { | |
'success': True, | |
'customer_id': customer_id, | |
'data': { | |
'lead_qualification': analytics.get('lead_qualification', {}), | |
'analytics': analytics, | |
'properties': processed_data.get('properties', []), | |
'summary': processed_data.get('summary', {}) | |
} | |
} | |
# Get AI engine for analysis | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return jsonify({ | |
'success': False, | |
'error': 'AI engine not available' | |
}), 500 | |
# Generate AI insights from tracking data | |
ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
results = [] | |
# Send different types of emails based on request | |
if email_type in ['all', 'new_properties']: | |
new_prop_result = self._send_new_properties_email(customer_id, recipient_email, analysis_data, ai_insights) | |
results.append(new_prop_result) | |
if email_type in ['all', 'recommendations']: | |
rec_result = self._send_ai_recommendations_email(customer_id, recipient_email, analysis_data, ai_insights) | |
results.append(rec_result) | |
if email_type in ['all', 'peak_time']: | |
peak_result = self._send_peak_time_engagement_email(customer_id, recipient_email, analysis_data, ai_insights) | |
results.append(peak_result) | |
return jsonify({ | |
'success': True, | |
'message': f'Email automation completed for customer {customer_id}', | |
'customer_id': customer_id, | |
'recipient': recipient_email, | |
'email_type': email_type, | |
'ai_insights': ai_insights, | |
'results': results, | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"β Error in email automation: {e}") | |
return jsonify({ | |
'success': False, | |
'error': f'Email automation failed: {str(e)}' | |
}), 500 | |
def _send_new_properties_email(self, customer_id: int, recipient_email: str, analysis_data: Dict, ai_insights: Dict) -> Dict: | |
"""Send email about newly added properties based on user tracking""" | |
logger.info(f"π‘ Sending new properties email for customer {customer_id}") | |
try: | |
# Analyze user preferences from tracking data | |
user_preferences = self._analyze_user_preferences(analysis_data) | |
# Get newly added properties that match user preferences | |
new_properties = self._get_new_properties_for_user(user_preferences) | |
if not new_properties: | |
return { | |
'type': 'new_properties', | |
'success': False, | |
'message': 'No new properties matching user preferences' | |
} | |
# Create email content | |
subject = f"π‘ New Properties Just Added - Perfect Matches for You!" | |
body_lines = [ | |
f"Dear Valued Customer,", | |
"", | |
"π Great news! We've just added new properties that match your preferences perfectly!", | |
"", | |
"π YOUR PREFERENCES (Based on Your Activity):", | |
f"β’ Preferred Property Type: {user_preferences.get('property_type', 'Villa')}", | |
f"β’ Budget Range: ${user_preferences.get('min_budget', 0):,} - ${user_preferences.get('max_budget', 0):,}", | |
f"β’ Preferred Locations: {', '.join(user_preferences.get('locations', ['Dubai']))}", | |
f"β’ Bedrooms: {user_preferences.get('bedrooms', '3+')}", | |
"", | |
"π NEWLY ADDED PROPERTIES:" | |
] | |
# Add new properties | |
for i, prop in enumerate(new_properties[:5], 1): | |
body_lines.extend([ | |
f"{i}. {prop.get('title', 'New Property')}", | |
f" π Location: {prop.get('location', 'N/A')}", | |
f" π° Price: ${prop.get('price', 0):,}", | |
f" π Type: {prop.get('propertyTypeName', 'Villa')}", | |
f" ποΈ Bedrooms: {prop.get('bedrooms', 'N/A')}", | |
f" β Match Score: {prop.get('match_score', 95):.0f}%", | |
"" | |
]) | |
body_lines.extend([ | |
"β‘ ACT FAST!", | |
"These properties are fresh on the market and likely to be in high demand.", | |
"", | |
"π‘ NEXT STEPS:", | |
"β’ Schedule a viewing immediately", | |
"β’ Contact our team for more details", | |
"β’ Get pre-approval for faster processing", | |
"", | |
"Best regards,", | |
"Your Property Match Team" | |
]) | |
email_body = "\n".join(body_lines) | |
# Send email | |
result = self._send_email_via_api(recipient_email, subject, email_body) | |
result['type'] = 'new_properties' | |
result['properties_count'] = len(new_properties) | |
result['ai_insights'] = { | |
'personality_type': 'Analytical', | |
'urgency_level': 'Medium', | |
'peak_time': 'Evening' | |
} | |
return result | |
except Exception as e: | |
logger.error(f"β Error sending new properties email: {e}") | |
return { | |
'type': 'new_properties', | |
'success': False, | |
'error': str(e) | |
} | |
def _send_ai_recommendations_email(self, customer_id: int, recipient_email: str, analysis_data: Dict, ai_insights: Dict) -> Dict: | |
"""Send AI-powered recommendations based on user tracking analysis""" | |
logger.info(f"π€ Sending AI recommendations email for customer {customer_id}") | |
try: | |
# Get AI engine for recommendations | |
ai_engine = self.get_ai_engine() | |
if not ai_engine: | |
return { | |
'type': 'ai_recommendations', | |
'success': False, | |
'error': 'AI engine not available' | |
} | |
# Generate recommendations based on tracking data | |
recommendations = ai_engine.get_enhanced_ai_recommendations(analysis_data, ai_insights, count=5) | |
# Create personalized email content | |
personality_type = ai_insights.get('ai_personality_type', 'Analytical') | |
urgency_level = ai_insights.get('urgency_level', 'Medium') | |
decision_style = ai_insights.get('recommendation_strategy', 'Data-driven') | |
peak_time = ai_insights.get('peak_time', 'Evening') | |
subject = f"π€ AI-Powered Recommendations - Tailored Just for You!" | |
body_lines = [ | |
f"Dear Valued Customer,", | |
"", | |
"π§ Our AI has analyzed your browsing behavior and preferences to bring you these personalized recommendations:", | |
"", | |
"π YOUR BEHAVIOR ANALYSIS:", | |
f"β’ Personality Type: {personality_type}", | |
f"β’ Decision Making Style: {decision_style}", | |
f"β’ Engagement Level: {ai_insights.get('engagement_level', 'High')}", | |
f"β’ Urgency Level: {urgency_level}", | |
f"β’ Peak Activity Time: {peak_time}", | |
"", | |
"π― AI RECOMMENDATIONS:" | |
] | |
# Add AI recommendations | |
if recommendations and isinstance(recommendations, list): | |
for i, prop in enumerate(recommendations[:5], 1): | |
body_lines.extend([ | |
f"{i}. {prop.get('property_name', prop.get('title', 'Recommended Property'))}", | |
f" π Location: {prop.get('location', 'N/A')}", | |
f" π° Price: ${prop.get('price', 0):,}", | |
f" π― AI Match Score: {prop.get('ai_score', prop.get('match_score', 0)):.1f}%", | |
f" π‘ Why Recommended: {prop.get('recommendation_reason', 'Matches your preferences')}", | |
"" | |
]) | |
# Add urgency-based call to action | |
if urgency_level == 'High': | |
body_lines.extend([ | |
"π₯ URGENT RECOMMENDATION:", | |
"Based on your high engagement, we recommend acting quickly on these properties!", | |
"" | |
]) | |
body_lines.extend([ | |
"π‘ PERSONALIZED NEXT STEPS:", | |
"β’ Review properties that match your personality type", | |
"β’ Schedule viewings during your peak activity time", | |
"β’ Contact our AI-powered assistant for more details", | |
"", | |
"Best regards,", | |
"AI-Powered Property Recommendation System" | |
]) | |
email_body = "\n".join(body_lines) | |
# Send email | |
result = self._send_email_via_api(recipient_email, subject, email_body) | |
result['type'] = 'ai_recommendations' | |
result['ai_insights'] = { | |
'personality_type': personality_type, | |
'decision_making_style': decision_style, | |
'urgency_level': urgency_level, | |
'peak_time': peak_time | |
} | |
result['recommendations_count'] = len(recommendations) if recommendations else 0 | |
return result | |
except Exception as e: | |
logger.error(f"β Error sending AI recommendations email: {e}") | |
return { | |
'type': 'ai_recommendations', | |
'success': False, | |
'error': str(e) | |
} | |
def _send_peak_time_engagement_email(self, customer_id: int, recipient_email: str, analysis_data: Dict, ai_insights: Dict) -> Dict: | |
"""Send email during user's peak engagement time""" | |
logger.info(f"β° Sending peak time engagement email for customer {customer_id}") | |
try: | |
# Analyze peak time from tracking data | |
peak_time = ai_insights.get('peak_time', 'Evening') | |
engagement_level = ai_insights.get('engagement_level', 'High') | |
# Extract AI insights for display | |
personality_type = ai_insights.get('ai_personality_type', 'Analytical') | |
urgency_level = ai_insights.get('urgency_level', 'Medium') | |
# Create time-sensitive email content | |
subject = f"β° Perfect Time to Explore Properties - Based on Your Activity Pattern!" | |
body_lines = [ | |
f"Dear Valued Customer,", | |
"", | |
f"β° We've noticed you're most active during {peak_time} hours!", | |
f"Your engagement level: {engagement_level}", | |
"", | |
"π OPTIMAL VIEWING TIME:", | |
f"β’ Your Peak Activity: {peak_time}", | |
f"β’ Best Time for Property Tours: {self._get_optimal_viewing_time(peak_time)}", | |
f"β’ Recommended Action Time: {self._get_recommended_action_time(peak_time)}", | |
"", | |
"π― TIME-SENSITIVE OPPORTUNITIES:" | |
] | |
# Get time-sensitive properties | |
time_sensitive_props = self._get_time_sensitive_properties(analysis_data) | |
for i, prop in enumerate(time_sensitive_props[:3], 1): | |
body_lines.extend([ | |
f"{i}. {prop.get('title', 'Time-Sensitive Property')}", | |
f" π Location: {prop.get('location', 'N/A')}", | |
f" π° Price: ${prop.get('price', 0):,}", | |
f" β° Best Viewing Time: {prop.get('optimal_time', peak_time)}", | |
f" π¨ Urgency: {prop.get('urgency', 'Medium')}", | |
"" | |
]) | |
body_lines.extend([ | |
"π‘ PEAK TIME STRATEGY:", | |
f"β’ Schedule viewings during {peak_time} for maximum engagement", | |
"β’ Take advantage of your high focus period", | |
"β’ Make decisions when you're most alert", | |
"", | |
"π IMMEDIATE ACTION:", | |
"β’ Call now to schedule a viewing", | |
"β’ Get priority booking during your peak time", | |
"β’ Receive personalized assistance", | |
"", | |
"Best regards,", | |
"Your Peak Time Property Team" | |
]) | |
email_body = "\n".join(body_lines) | |
# Send email | |
result = self._send_email_via_api(recipient_email, subject, email_body) | |
result['type'] = 'peak_time_engagement' | |
result['peak_time'] = peak_time | |
result['engagement_level'] = engagement_level | |
result['ai_insights'] = { | |
'personality_type': personality_type, | |
'urgency_level': urgency_level, | |
'peak_time': peak_time | |
} | |
return result | |
except Exception as e: | |
logger.error(f"β Error sending peak time engagement email: {e}") | |
return { | |
'type': 'peak_time_engagement', | |
'success': False, | |
'error': str(e) | |
} | |
def _send_email_via_api(self, recipient_email: str, subject: str, body: str) -> Dict: | |
"""Send email using the external API""" | |
try: | |
email_data = { | |
'toEmail': recipient_email, | |
'subject': subject, | |
'body': body | |
} | |
external_api_url = f"{self.config['backend_url']}/api/Email/sendPlainText" | |
logger.info(f"π Sending email via external API to {recipient_email}") | |
response = requests.post( | |
external_api_url, | |
params=email_data, | |
headers=self.config['ngrok_headers'], | |
timeout=30 | |
) | |
if response.status_code == 200: | |
logger.info(f"β Email sent successfully to {recipient_email}") | |
return { | |
'success': True, | |
'message': 'Email sent successfully', | |
'recipient': recipient_email, | |
'subject': subject | |
} | |
else: | |
logger.error(f"β External email API failed: {response.status_code} - {response.text}") | |
return { | |
'success': False, | |
'error': f'Email service failed: {response.status_code}', | |
'details': response.text | |
} | |
except Exception as e: | |
logger.error(f"β Email API error: {e}") | |
return { | |
'success': False, | |
'error': f'Email service error: {str(e)}' | |
} | |
def _analyze_user_preferences(self, analysis_data: Dict) -> Dict: | |
"""Analyze user preferences from tracking data""" | |
try: | |
# Extract preferences from analysis data | |
preferences = { | |
'property_type': 'Villa', # Default | |
'min_budget': 500000, | |
'max_budget': 2000000, | |
'locations': ['Dubai', 'Abu Dhabi'], | |
'bedrooms': '3+', | |
'price_range': '500K-2M' | |
} | |
# Analyze from tracking data if available | |
if 'data' in analysis_data and 'analytics' in analysis_data['data']: | |
analytics = analysis_data['data']['analytics'] | |
# Extract property type preferences | |
if 'property_types' in analytics: | |
most_viewed = max(analytics['property_types'].items(), key=lambda x: x[1]) | |
preferences['property_type'] = most_viewed[0] | |
# Extract budget preferences | |
if 'price_ranges' in analytics: | |
most_viewed = max(analytics['price_ranges'].items(), key=lambda x: x[1]) | |
preferences['price_range'] = most_viewed[0] | |
# Extract location preferences | |
if 'locations' in analytics: | |
top_locations = sorted(analytics['locations'].items(), key=lambda x: x[1], reverse=True)[:3] | |
preferences['locations'] = [loc[0] for loc in top_locations] | |
return preferences | |
except Exception as e: | |
logger.error(f"β Error analyzing user preferences: {e}") | |
return { | |
'property_type': 'Villa', | |
'min_budget': 500000, | |
'max_budget': 2000000, | |
'locations': ['Dubai'], | |
'bedrooms': '3+' | |
} | |
def _get_new_properties_for_user(self, user_preferences: Dict) -> List[Dict]: | |
"""Get newly added properties that match user preferences""" | |
try: | |
# This would typically fetch from a database of new properties | |
# For now, we'll generate mock new properties based on preferences | |
property_type = user_preferences.get('property_type', 'Villa') | |
locations = user_preferences.get('locations', ['Dubai']) | |
min_budget = user_preferences.get('min_budget', 500000) | |
max_budget = user_preferences.get('max_budget', 2000000) | |
new_properties = [] | |
# Generate mock new properties | |
for i, location in enumerate(locations[:2]): | |
price = min_budget + (i * 300000) | |
if price <= max_budget: | |
new_properties.append({ | |
'title': f'New {property_type} in {location}', | |
'location': location, | |
'price': price, | |
'propertyTypeName': property_type, | |
'bedrooms': '3+', | |
'match_score': 95 + i, | |
'is_new': True, | |
'added_date': datetime.now().strftime('%Y-%m-%d') | |
}) | |
return new_properties | |
except Exception as e: | |
logger.error(f"β Error getting new properties: {e}") | |
return [] | |
def _get_optimal_viewing_time(self, peak_time: str) -> str: | |
"""Get optimal viewing time based on peak activity""" | |
time_mapping = { | |
'Morning': '9:00 AM - 11:00 AM', | |
'Afternoon': '2:00 PM - 4:00 PM', | |
'Evening': '6:00 PM - 8:00 PM', | |
'Night': '7:00 PM - 9:00 PM' | |
} | |
return time_mapping.get(peak_time, '6:00 PM - 8:00 PM') | |
def _get_recommended_action_time(self, peak_time: str) -> str: | |
"""Get recommended action time based on peak activity""" | |
time_mapping = { | |
'Morning': '10:00 AM - 12:00 PM', | |
'Afternoon': '3:00 PM - 5:00 PM', | |
'Evening': '7:00 PM - 9:00 PM', | |
'Night': '8:00 PM - 10:00 PM' | |
} | |
return time_mapping.get(peak_time, '7:00 PM - 9:00 PM') | |
def _get_time_sensitive_properties(self, analysis_data: Dict) -> List[Dict]: | |
"""Get time-sensitive properties based on analysis""" | |
try: | |
# Generate time-sensitive properties | |
return [ | |
{ | |
'title': 'Limited Time Offer - Premium Villa', | |
'location': 'Dubai Marina', | |
'price': 1500000, | |
'optimal_time': 'Evening', | |
'urgency': 'High' | |
}, | |
{ | |
'title': 'Flash Sale - Luxury Apartment', | |
'location': 'Downtown Dubai', | |
'price': 800000, | |
'optimal_time': 'Afternoon', | |
'urgency': 'Very High' | |
}, | |
{ | |
'title': 'Exclusive Preview - New Development', | |
'location': 'Palm Jumeirah', | |
'price': 2500000, | |
'optimal_time': 'Morning', | |
'urgency': 'Medium' | |
} | |
] | |
except Exception as e: | |
logger.error(f"β Error getting time-sensitive properties: {e}") | |
return [] | |
# Helper methods for multi-AI system | |
def _get_analysis_data_parallel(self, customer_id: int) -> Dict: | |
"""Get analysis data for a customer using parallel processing""" | |
try: | |
# Try to get from cache first | |
cache_key = f'lead_analysis_{customer_id}' | |
cached_data = self.get_cached_data(cache_key) | |
if cached_data: | |
logger.info(f"π Returning cached analysis data for customer {customer_id}") | |
return cached_data | |
# Fetch from backend | |
backend_response = None | |
try: | |
backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
except Exception as e: | |
logger.warning(f"β οΈ Backend API failed: {e}") | |
# If backend fails, use mock data | |
if not backend_response and MOCK_DATA_AVAILABLE: | |
logger.info(f"π Using mock data for customer {customer_id}") | |
backend_response = mock_data_service.get_customer_data(customer_id) | |
if not backend_response: | |
return None | |
# Process the data | |
processed_data = self.process_lead_data(backend_response, customer_id) | |
analytics = self.generate_analytics(backend_response, customer_id) | |
analysis_data = { | |
'success': True, | |
'customer_id': customer_id, | |
'timestamp': datetime.now().isoformat(), | |
'data': { | |
'lead_qualification': analytics.get('lead_qualification', {}), | |
'analytics': analytics, | |
'properties': processed_data.get('properties', []), | |
'summary': processed_data.get('summary', {}) | |
} | |
} | |
# Cache the result | |
self.cache_data(cache_key, analysis_data) | |
return analysis_data | |
except Exception as e: | |
logger.error(f"β Error getting analysis data for customer {customer_id}: {e}") | |
return None | |
def _get_custom_subject(self, recommendation_type: str, customer_id: int, ai_insights: Dict) -> str: | |
"""Generate custom email subject based on recommendation type""" | |
personality_type = ai_insights.get('personality_type', 'Analytical') | |
urgency_level = ai_insights.get('urgency_level', 'Medium') | |
subjects = { | |
'property_based': f"π‘ Perfect Properties Matched to Your Preferences - Customer {customer_id}", | |
'price_based': f"π° Properties in Your Budget Range - Exclusive Deals for Customer {customer_id}", | |
'location_based': f"π Prime Locations Just for You - Customer {customer_id}", | |
'similarity_based': f"π Properties Similar to Your Favorites - Customer {customer_id}", | |
'behavioral_based': f"π§ AI-Analyzed Recommendations Based on Your Behavior - Customer {customer_id}", | |
'premium_properties': f"β Premium Properties Collection - Customer {customer_id}", | |
'budget_friendly': f"π΅ Best Value Properties for Smart Investors - Customer {customer_id}", | |
'trending_properties': f"π Trending Properties in Hot Markets - Customer {customer_id}", | |
'family_oriented': f"π¨βπ©βπ§βπ¦ Family-Perfect Properties - Customer {customer_id}", | |
'investment_opportunities': f"πΌ Investment Opportunities with High ROI - Customer {customer_id}" | |
} | |
base_subject = subjects.get(recommendation_type, f"π€ AI Recommendations for Customer {customer_id}") | |
# Add urgency indicators for high urgency | |
if urgency_level == 'High': | |
base_subject = f"β‘ URGENT: {base_subject}" | |
elif urgency_level == 'Very High': | |
base_subject = f"π₯ HOT DEALS: {base_subject}" | |
return base_subject | |
# Include all the original analysis methods from api_service.py | |
def clean_for_json(self, obj): | |
"""Clean object for JSON serialization""" | |
def clean_value(value): | |
import numpy as np | |
# Handle numpy types | |
if isinstance(value, (np.float32, np.float64, np.int32, np.int64)): | |
value = float(value) | |
elif isinstance(value, np.ndarray): | |
return value.tolist() | |
# Handle regular floats | |
if isinstance(value, float): | |
if math.isinf(value) and value > 0: | |
return 999999999 | |
elif math.isinf(value) and value < 0: | |
return -999999999 | |
elif math.isnan(value): | |
return 0 | |
return value | |
def clean_dict(d): | |
if isinstance(d, dict): | |
return {k: clean_value(v) if not isinstance(v, (dict, list)) else clean_dict(v) for k, v in d.items()} | |
elif isinstance(d, list): | |
return [clean_value(v) if not isinstance(v, (dict, list)) else clean_dict(v) for v in d] | |
else: | |
return clean_value(d) | |
return clean_dict(obj) | |
def make_api_request(self, endpoint: str, method: str = 'GET', data: Dict = None, use_fallback: bool = True) -> Any: | |
"""Make API request to backend with robust fallback mechanism""" | |
max_retries = self.config.get('max_retries', 3) | |
retry_delay = self.config.get('retry_delay', 1) | |
for attempt in range(max_retries + 1): | |
try: | |
url = f"{self.config['backend_url']}{endpoint}" | |
logger.info(f"π Making {method} request to: {url} (attempt {attempt + 1}/{max_retries + 1})") | |
headers = self.config['ngrok_headers'] | |
# Add timeout for better reliability | |
timeout = 30 if attempt < max_retries else 60 | |
if method.upper() == 'GET': | |
response = requests.get(url, headers=headers, timeout=timeout) | |
elif method.upper() == 'POST': | |
response = requests.post(url, headers=headers, json=data, timeout=timeout) | |
elif method.upper() == 'PUT': | |
response = requests.put(url, headers=headers, json=data, timeout=timeout) | |
else: | |
raise ValueError(f"Unsupported HTTP method: {method}") | |
if response.status_code == 200: | |
logger.info(f"β API request successful on attempt {attempt + 1}") | |
return response.json() | |
elif response.status_code in [429, 502, 503, 504] and attempt < max_retries: | |
# Retry on rate limiting or server errors | |
logger.warning(f"β οΈ API request failed with {response.status_code}, retrying in {retry_delay}s...") | |
time.sleep(retry_delay * (attempt + 1)) # Exponential backoff | |
continue | |
else: | |
logger.error(f"β API request failed: {response.status_code} - {response.text}") | |
if not use_fallback: | |
return None | |
break | |
except requests.exceptions.Timeout as e: | |
logger.warning(f"β° Request timeout on attempt {attempt + 1}: {e}") | |
if attempt < max_retries: | |
time.sleep(retry_delay * (attempt + 1)) | |
continue | |
else: | |
logger.error(f"β All retry attempts failed due to timeout") | |
break | |
except requests.exceptions.ConnectionError as e: | |
logger.warning(f"π Connection error on attempt {attempt + 1}: {e}") | |
if attempt < max_retries: | |
time.sleep(retry_delay * (attempt + 1)) | |
continue | |
else: | |
logger.error(f"β All retry attempts failed due to connection error") | |
break | |
except Exception as e: | |
logger.error(f"β API request error on attempt {attempt + 1}: {e}") | |
if attempt < max_retries: | |
time.sleep(retry_delay * (attempt + 1)) | |
continue | |
else: | |
break | |
# If all attempts failed and fallback is enabled, return fallback data | |
if use_fallback: | |
logger.info(f"π All API attempts failed, using fallback data for endpoint: {endpoint}") | |
return self.get_fallback_data(endpoint, method, data) | |
return None | |
def get_fallback_data(self, endpoint: str, method: str, data: Dict = None) -> Any: | |
"""Get fallback data when API requests fail""" | |
try: | |
# Extract customer ID from endpoint if it's a customer-specific endpoint | |
customer_id = None | |
if '/customer/' in endpoint: | |
try: | |
customer_id = int(endpoint.split('/customer/')[-1]) | |
except (ValueError, IndexError): | |
pass | |
# Use mock data service as fallback | |
if MOCK_DATA_AVAILABLE and customer_id: | |
logger.info(f"π Using mock data fallback for customer {customer_id}") | |
return mock_data_service.get_customer_data(customer_id) | |
# Return empty structure for other endpoints | |
if 'PropertyLeadQualification' in endpoint: | |
return [] | |
elif 'DebtFinancing' in endpoint: | |
return {'success': False, 'message': 'Service temporarily unavailable', 'fallback': True} | |
else: | |
return {'success': False, 'message': 'Service temporarily unavailable', 'fallback': True} | |
except Exception as e: | |
logger.error(f"β Fallback data generation failed: {e}") | |
return None | |
# All the original analysis methods from api_service.py are included below | |
def process_lead_data(self, data: List[Dict], customer_id: int) -> Dict: | |
"""Process and enhance lead qualification data""" | |
# Implementation from original api_service.py | |
if not data: | |
return { | |
'customer_id': customer_id, | |
'properties': [], | |
'summary': { | |
'total_properties': 0, | |
'total_views': 0, | |
'total_duration': 0, | |
'average_price': 0, | |
'property_types': {}, | |
'engagement_score': 0 | |
} | |
} | |
# Calculate summary statistics | |
total_views = sum(prop.get('viewCount', 0) for prop in data) | |
total_duration = sum(prop.get('totalDuration', 0) for prop in data) | |
total_price = sum(prop.get('price', 0) for prop in data) | |
# Property type distribution | |
property_types = {} | |
for prop in data: | |
prop_type = prop.get('propertyTypeName', 'Unknown') | |
property_types[prop_type] = property_types.get(prop_type, 0) + 1 | |
# Calculate engagement score | |
engagement_score = self.calculate_engagement_score(data) | |
# Enhance each property with additional metrics | |
enhanced_properties = [] | |
for prop in data: | |
enhanced_prop = prop.copy() | |
enhanced_prop['engagement_score'] = self.calculate_property_engagement(prop) | |
enhanced_prop['price_per_sqm'] = self.estimate_price_per_sqm(prop.get('price', 0)) | |
enhanced_prop['view_frequency'] = self.calculate_view_frequency(prop) | |
enhanced_prop['last_viewed_days_ago'] = self.days_since_last_view(prop.get('lastViewedAt')) | |
enhanced_properties.append(enhanced_prop) | |
return { | |
'customer_id': customer_id, | |
'properties': enhanced_properties, | |
'summary': { | |
'total_properties': len(data), | |
'total_views': total_views, | |
'total_duration': total_duration, | |
'average_price': total_price / len(data) if data else 0, | |
'property_types': property_types, | |
'engagement_score': engagement_score, | |
'last_activity': max((prop.get('lastViewedAt') for prop in data), default=None) | |
} | |
} | |
def generate_analytics(self, lead_data: List[Dict], customer_id: int) -> Dict: | |
"""Generate comprehensive analytics from lead data""" | |
if not lead_data: | |
return {'customer_id': customer_id, 'analytics': {}} | |
# Process the data | |
processed_data = self.process_lead_data(lead_data, customer_id) | |
# Generate detailed lead qualification | |
lead_qualification = self.generate_detailed_lead_qualification(processed_data) | |
# Generate insights | |
insights = { | |
'customer_id': customer_id, | |
'lead_qualification': lead_qualification, | |
'engagement_level': self.categorize_engagement(processed_data['summary']['engagement_score']), | |
'preferred_property_types': self.get_preferred_property_types(processed_data['summary']['property_types']), | |
'price_preferences': self.analyze_price_preferences(lead_data), | |
'viewing_patterns': self.analyze_viewing_patterns(lead_data), | |
'property_analysis': self.analyze_properties_detailed(lead_data), | |
'recommendations': self.generate_recommendations(processed_data), | |
'risk_assessment': self.assess_risk(processed_data), | |
'opportunity_score': self.calculate_opportunity_score(processed_data), | |
'lead_timeline': self.generate_lead_timeline(lead_data), | |
'conversion_probability': self.calculate_conversion_probability(processed_data) | |
} | |
return insights | |
# Add placeholder methods for the analysis functions | |
def generate_detailed_lead_qualification(self, processed_data: Dict) -> Dict: | |
"""Generate detailed lead qualification with warm/cold/hot status""" | |
summary = processed_data['summary'] | |
properties = processed_data['properties'] | |
# Calculate lead score based on multiple factors | |
lead_score = 0 | |
factors = {} | |
# 1. Engagement Score (0-30 points) | |
engagement_points = min(summary['engagement_score'] * 0.3, 30) | |
lead_score += engagement_points | |
factors['engagement'] = { | |
'score': summary['engagement_score'], | |
'points': engagement_points, | |
'max_points': 30, | |
'description': f"Based on {summary['total_views']} views and {formatDuration(summary['total_duration'])} total viewing time" | |
} | |
# 2. Property Diversity (0-15 points) | |
property_types = len(summary['property_types']) | |
diversity_points = min(property_types * 5, 15) | |
lead_score += diversity_points | |
factors['property_diversity'] = { | |
'score': property_types, | |
'points': diversity_points, | |
'max_points': 15, | |
'description': f"Viewed {property_types} different property types" | |
} | |
# 3. Recent Activity (0-20 points) | |
recent_activity_points = 0 | |
if summary.get('last_activity'): | |
try: | |
last_activity = datetime.fromisoformat(summary['last_activity'].replace('Z', '+00:00')) | |
days_since = (datetime.now() - last_activity).days | |
if days_since <= 1: | |
recent_activity_points = 20 | |
elif days_since <= 3: | |
recent_activity_points = 15 | |
elif days_since <= 7: | |
recent_activity_points = 10 | |
elif days_since <= 30: | |
recent_activity_points = 5 | |
except: | |
pass | |
lead_score += recent_activity_points | |
factors['recent_activity'] = { | |
'score': days_since if 'days_since' in locals() else 999, | |
'points': recent_activity_points, | |
'max_points': 20, | |
'description': f"Last activity was {days_since if 'days_since' in locals() else 'unknown'} days ago" | |
} | |
# 4. Price Range Consistency (0-15 points) | |
prices = [p['price'] for p in properties if p['price'] > 0] | |
if prices: | |
price_range = max(prices) - min(prices) | |
price_consistency = 1 - (price_range / max(prices)) | |
consistency_points = price_consistency * 15 | |
lead_score += consistency_points | |
factors['price_consistency'] = { | |
'score': round(price_consistency * 100, 1), | |
'points': consistency_points, | |
'max_points': 15, | |
'description': f"Price range consistency: {round(price_consistency * 100, 1)}%" | |
} | |
# 5. Viewing Depth (0-20 points) | |
deep_views = sum(1 for p in properties if p['viewCount'] > 1) | |
depth_points = min(deep_views * 5, 20) | |
lead_score += depth_points | |
factors['viewing_depth'] = { | |
'score': deep_views, | |
'points': depth_points, | |
'max_points': 20, | |
'description': f"{deep_views} properties viewed multiple times" | |
} | |
# Determine lead status | |
if lead_score >= 70: | |
lead_status = "HOT" | |
status_color = "#e74c3c" | |
status_description = "High probability of conversion. Immediate follow-up recommended." | |
elif lead_score >= 50: | |
lead_status = "WARM" | |
status_color = "#f39c12" | |
status_description = "Good potential. Regular follow-up needed." | |
elif lead_score >= 30: | |
lead_status = "LUKEWARM" | |
status_color = "#95a5a6" | |
status_description = "Some interest shown. Nurturing required." | |
else: | |
lead_status = "COLD" | |
status_color = "#34495e" | |
status_description = "Low engagement. Re-engagement campaign needed." | |
return { | |
'lead_score': round(lead_score, 1), | |
'lead_status': lead_status, | |
'status_color': status_color, | |
'status_description': status_description, | |
'max_possible_score': 100, | |
'factors': factors, | |
'summary': { | |
'total_properties_viewed': summary['total_properties'], | |
'total_views': summary['total_views'], | |
'total_duration': summary['total_duration'], | |
'average_price': summary['average_price'], | |
'engagement_score': summary['engagement_score'] | |
} | |
} | |
def calculate_engagement_score(self, properties: List[Dict]) -> float: | |
"""Calculate overall engagement score for customer""" | |
if not properties: | |
return 0.0 | |
total_score = 0 | |
for prop in properties: | |
views = prop.get('viewCount', 0) | |
duration = prop.get('totalDuration', 0) | |
# Weight views and duration | |
view_score = min(views * 10, 50) # Max 50 points for views | |
duration_score = min(duration / 1000, 50) # Max 50 points for duration | |
total_score += view_score + duration_score | |
return total_score / len(properties) | |
def calculate_property_engagement(self, property_data: Dict) -> float: | |
"""Calculate engagement score for a single property""" | |
views = property_data.get('viewCount', 0) | |
duration = property_data.get('totalDuration', 0) | |
view_score = min(views * 10, 50) | |
duration_score = min(duration / 1000, 50) | |
return view_score + duration_score | |
def estimate_price_per_sqm(self, price: float) -> float: | |
"""Estimate price per square meter (rough estimation)""" | |
return price / 150 if price > 0 else 0 | |
def calculate_view_frequency(self, property_data: Dict) -> str: | |
"""Calculate viewing frequency""" | |
views = property_data.get('viewCount', 0) | |
if views == 0: | |
return "No views" | |
elif views == 1: | |
return "Single view" | |
elif views <= 3: | |
return "Low frequency" | |
elif views <= 7: | |
return "Medium frequency" | |
else: | |
return "High frequency" | |
def days_since_last_view(self, last_viewed: str) -> int: | |
"""Calculate days since last view""" | |
if not last_viewed: | |
return 999 | |
try: | |
last_view_date = datetime.fromisoformat(last_viewed.replace('Z', '+00:00')) | |
days_diff = (datetime.now() - last_view_date).days | |
return max(0, days_diff) | |
except: | |
return 999 | |
def categorize_engagement(self, score: float) -> str: | |
"""Categorize engagement level""" | |
if score >= 80: | |
return "Very High" | |
elif score >= 60: | |
return "High" | |
elif score >= 40: | |
return "Medium" | |
elif score >= 20: | |
return "Low" | |
else: | |
return "Very Low" | |
def get_preferred_property_types(self, property_types: Dict) -> List[str]: | |
"""Get preferred property types""" | |
if not property_types: | |
return [] | |
sorted_types = sorted(property_types.items(), key=lambda x: x[1], reverse=True) | |
return [prop_type for prop_type, count in sorted_types[:3]] | |
def analyze_price_preferences(self, properties: List[Dict]) -> Dict: | |
"""Analyze price preferences""" | |
if not properties: | |
return {} | |
prices = [prop.get('price', 0) for prop in properties] | |
prices = [p for p in prices if p > 0] | |
if not prices: | |
return {} | |
return { | |
'min_price': min(prices), | |
'max_price': max(prices), | |
'avg_price': sum(prices) / len(prices), | |
'price_range': max(prices) - min(prices) | |
} | |
def analyze_viewing_patterns(self, properties: List[Dict]) -> Dict: | |
"""Analyze viewing patterns""" | |
if not properties: | |
return {} | |
# Group by viewing time | |
morning_views = 0 | |
afternoon_views = 0 | |
evening_views = 0 | |
for prop in properties: | |
if prop.get('lastViewedAt'): | |
try: | |
view_time = datetime.fromisoformat(prop['lastViewedAt'].replace('Z', '+00:00')) | |
hour = view_time.hour | |
if 6 <= hour < 12: | |
morning_views += prop.get('viewCount', 0) | |
elif 12 <= hour < 18: | |
afternoon_views += prop.get('viewCount', 0) | |
else: | |
evening_views += prop.get('viewCount', 0) | |
except: | |
pass | |
return { | |
'morning_views': morning_views, | |
'afternoon_views': afternoon_views, | |
'evening_views': evening_views, | |
'peak_viewing_time': 'morning' if morning_views > max(afternoon_views, evening_views) else | |
'afternoon' if afternoon_views > evening_views else 'evening' | |
} | |
def analyze_properties_detailed(self, properties: List[Dict]) -> Dict: | |
"""Detailed analysis of each property""" | |
return {'analyzed_properties': len(properties)} | |
def generate_recommendations(self, processed_data: Dict) -> List[str]: | |
"""Generate recommendations based on data""" | |
recommendations = [] | |
summary = processed_data['summary'] | |
if summary['engagement_score'] < 30: | |
recommendations.append("Low engagement detected. Consider follow-up communication.") | |
if summary['total_views'] < 5: | |
recommendations.append("Limited property exploration. Suggest more property options.") | |
if summary['average_price'] > 20000000: # 20M | |
recommendations.append("High-value customer. Consider premium properties.") | |
if len(summary['property_types']) > 3: | |
recommendations.append("Diverse property interests. Offer variety in recommendations.") | |
return recommendations | |
def assess_risk(self, processed_data: Dict) -> Dict: | |
"""Assess risk factors""" | |
summary = processed_data['summary'] | |
risk_factors = [] | |
risk_score = 0 | |
if summary['engagement_score'] < 20: | |
risk_factors.append("Very low engagement") | |
risk_score += 30 | |
if summary['total_views'] < 3: | |
risk_factors.append("Limited property exploration") | |
risk_score += 20 | |
if summary['average_price'] > 50000000: # 50M | |
risk_factors.append("Very high price range") | |
risk_score += 15 | |
return { | |
'risk_score': min(risk_score, 100), | |
'risk_level': 'High' if risk_score > 50 else 'Medium' if risk_score > 25 else 'Low', | |
'risk_factors': risk_factors | |
} | |
def calculate_opportunity_score(self, processed_data: Dict) -> float: | |
"""Calculate opportunity score""" | |
summary = processed_data['summary'] | |
# Base score from engagement | |
score = summary['engagement_score'] | |
# Bonus for high-value properties | |
if summary['average_price'] > 20000000: | |
score += 20 | |
# Bonus for multiple property types | |
if len(summary['property_types']) > 2: | |
score += 10 | |
# Bonus for recent activity | |
if summary.get('last_activity'): | |
try: | |
last_activity = datetime.fromisoformat(summary['last_activity'].replace('Z', '+00:00')) | |
days_since = (datetime.now() - last_activity).days | |
if days_since <= 7: | |
score += 15 | |
elif days_since <= 30: | |
score += 10 | |
except: | |
pass | |
return min(score, 100) | |
def generate_lead_timeline(self, properties: List[Dict]) -> List[Dict]: | |
"""Generate timeline of lead activity""" | |
timeline = [] | |
for prop in properties: | |
if prop.get('lastViewedAt'): | |
try: | |
view_date = datetime.fromisoformat(prop['lastViewedAt'].replace('Z', '+00:00')) | |
timeline.append({ | |
'date': view_date.isoformat(), | |
'property_name': prop['propertyName'], | |
'property_type': prop['propertyTypeName'], | |
'price': prop['price'], | |
'views': prop['viewCount'], | |
'duration': prop['totalDuration'], | |
'engagement_score': prop.get('engagement_score', 0) | |
}) | |
except: | |
pass | |
# Sort by date | |
timeline.sort(key=lambda x: x['date'], reverse=True) | |
return timeline | |
def calculate_conversion_probability(self, processed_data: Dict) -> Dict: | |
"""Calculate probability of conversion based on various factors""" | |
summary = processed_data['summary'] | |
properties = processed_data['properties'] | |
# Base probability | |
base_probability = min(summary['engagement_score'], 100) | |
# Adjustments based on factors | |
adjustments = {} | |
# Price range consistency | |
prices = [p['price'] for p in properties if p['price'] > 0] | |
if prices: | |
price_range = max(prices) - min(prices) | |
price_consistency = 1 - (price_range / max(prices)) | |
adjustments['price_consistency'] = price_consistency * 10 | |
# Recent activity | |
if summary.get('last_activity'): | |
try: | |
last_activity = datetime.fromisoformat(summary['last_activity'].replace('Z', '+00:00')) | |
days_since = (datetime.now() - last_activity).days | |
if days_since <= 1: | |
adjustments['recent_activity'] = 15 | |
elif days_since <= 3: | |
adjustments['recent_activity'] = 10 | |
elif days_since <= 7: | |
adjustments['recent_activity'] = 5 | |
except: | |
pass | |
# Property diversity | |
property_types = len(summary['property_types']) | |
adjustments['property_diversity'] = min(property_types * 3, 15) | |
# Deep engagement | |
deep_views = sum(1 for p in properties if p['viewCount'] > 1) | |
adjustments['deep_engagement'] = min(deep_views * 5, 20) | |
# Calculate final probability | |
total_adjustment = sum(adjustments.values()) | |
final_probability = min(base_probability + total_adjustment, 100) | |
return { | |
'base_probability': base_probability, | |
'adjustments': adjustments, | |
'total_adjustment': total_adjustment, | |
'final_probability': final_probability, | |
'confidence_level': 'High' if len(properties) >= 5 else 'Medium' if len(properties) >= 3 else 'Low' | |
} | |
def get_cached_data(self, key: str) -> Optional[Any]: | |
"""Get data from cache if not expired""" | |
if key in self.cache and key in self.cache_timestamps: | |
if time.time() - self.cache_timestamps[key] < self.config['cache_duration']: | |
return self.cache[key] | |
else: | |
# Remove expired cache | |
del self.cache[key] | |
del self.cache_timestamps[key] | |
return None | |
def cache_data(self, key: str, data: Any): | |
"""Cache data with timestamp""" | |
self.cache[key] = data | |
self.cache_timestamps[key] = time.time() | |
def start_cache_cleanup(self): | |
"""Start background thread to clean up expired cache""" | |
def cleanup_cache(): | |
while True: | |
try: | |
current_time = time.time() | |
expired_keys = [ | |
key for key, timestamp in self.cache_timestamps.items() | |
if current_time - timestamp > self.config['cache_duration'] | |
] | |
for key in expired_keys: | |
self.cache.pop(key, None) | |
self.cache_timestamps.pop(key, None) | |
time.sleep(60) # Clean up every minute | |
except Exception as e: | |
logger.error(f"Cache cleanup error: {e}") | |
time.sleep(60) | |
cleanup_thread = threading.Thread(target=cleanup_cache, daemon=True) | |
cleanup_thread.start() | |
def run(self, host: str = '0.0.0.0', port: int = 5000, debug: bool = False): | |
"""Run the Flask application""" | |
logger.info(f"π Starting Enhanced Lead Qualification API Service with AI on {host}:{port}") | |
logger.info(f"π Backend URL: {self.config['backend_url']}") | |
logger.info(f"π€ AI Features: Enabled") | |
logger.info(f"π API Endpoints:") | |
logger.info(f" - GET /api/health") | |
logger.info(f" - GET /api/lead-analysis/<customer_id>") | |
logger.info(f" - GET /api/ai-analysis/<customer_id>") | |
logger.info(f" - POST /api/ai-recommendations/<customer_id>") | |
logger.info(f" - POST /api/multi-ai-recommendations/<customer_id>") | |
logger.info(f" - GET /api/chromadb-recommendations/<customer_id>") | |
logger.info(f" - GET /api/ai-status/<customer_id>") | |
logger.info(f" - POST /api/batch-ai-analysis") | |
logger.info(f" - GET /api/fetch-all-properties") | |
logger.info(f" - POST /api/clear-cache") | |
logger.info(f" - POST /api/test-email") | |
logger.info(f" - POST /api/automated-email") | |
logger.info(f" - GET /api/email-status") | |
logger.info(f" π NEW EMAIL TESTING ENDPOINTS:") | |
logger.info(f" - GET /api/test-sendgrid-connection") | |
logger.info(f" - GET /api/email-analysis-basis/<customer_id>") | |
logger.info(f" - GET /api/email-content-preview/<customer_id>") | |
logger.info(f" - POST /api/test-all-10-emails/<customer_id>") | |
logger.info(f" - GET /api/chromadb-status") | |
self.app.run(host=host, port=port, debug=debug) | |
if __name__ == '__main__': | |
api_service = EnhancedLeadQualificationAPI() | |
api_service.run(debug=True) |