""" MCP Client for connecting to Modal backend """ import os import time import requests from typing import List, Dict, Any, Optional from urllib.parse import urljoin class MCPClient: """Client for communicating with Modal MCP backend""" def __init__(self, modal_url: str, session_token: Optional[str] = None): """ Initialize MCP client Args: modal_url: Base URL of the Modal backend or 'local://mock' for local mode session_token: Optional session token for authenticated requests """ self.local_mode = modal_url == 'local://mock' self.base_url = modal_url.rstrip('/') if not self.local_mode else 'local://mock' self.session_token = session_token or self._generate_session_token() self.headers = { 'Content-Type': 'application/json', 'X-Session-Token': self.session_token } self.mode = 'mock' # Default to mock mode self._mock_sessions = {} # Store session data in memory # Parse Modal URL components if it's a Modal URL self.is_modal_url = not self.local_mode and "--" in modal_url and "modal.run" in modal_url if self.is_modal_url: # Extract from pattern: https://hfmcp--email-rule-agent-backend-dev.modal.run # or https://hfmcp--email-rule-agent-backend-health-dev.modal.run url_without_protocol = modal_url.replace("https://", "").replace("http://", "") parts = url_without_protocol.split("--") if len(parts) >= 2: self.workspace = parts[0] # The second part contains app-name.modal.run # We need to extract just the app name remaining = parts[1].replace(".modal.run", "") # For URL like hfmcp--email-rule-agent-backend.modal.run # We want: app_name = "email-rule-agent-backend" self.app_name = remaining # No environment suffix in deployed Modal URLs self.env = "" else: # Fallback if URL doesn't match expected pattern self.is_modal_url = False def _generate_session_token(self) -> str: """Generate a random session token for demo users""" import uuid return str(uuid.uuid4()) def set_mode(self, mode: str): """Set the mode (mock or gmail)""" if mode not in ['mock', 'gmail']: raise ValueError("Mode must be 'mock' or 'gmail'") self.mode = mode self.headers['X-Mode'] = mode def set_gmail_token(self, oauth_token: str): """Set Gmail OAuth token for authenticated requests""" self.headers['Authorization'] = f'Bearer {oauth_token}' self.mode = 'gmail' def _construct_modal_endpoint_url(self, endpoint: str) -> str: """Construct the Modal-specific URL for an endpoint""" if not self.is_modal_url: # Fallback to regular path-based URL return urljoin(self.base_url, endpoint) # Map endpoints to Modal function names - now consolidated endpoint_to_function = { '/health': 'health', '/emails': 'emails', '/labels': 'labels', '/rules/preview': 'rules_preview', '/rules/apply': 'rules_apply', '/rules': 'rules', '/rules/save': 'rules', '/sessions': 'sessions', '/auth/gmail/url': 'auth', '/auth/gmail/callback': 'auth', } # First check the direct mapping function_name = endpoint_to_function.get(endpoint) path_suffix = "" if not function_name: # Handle special cases if endpoint.startswith('/rules/'): # All rules operations go to 'rules' function_name = 'rules' # Preserve the path after /rules for PATCH and DELETE if hasattr(self, '_current_method') and self._current_method in ['PATCH', 'DELETE']: path_suffix = endpoint[6:] # Everything after '/rules' elif endpoint.startswith('/sessions/'): # All sessions operations go to 'sessions' function_name = 'sessions' # Preserve the path after /sessions if hasattr(self, '_current_method') and self._current_method in ['GET', 'DELETE']: path_suffix = endpoint[9:] # Everything after '/sessions' elif endpoint.startswith('/auth/'): # All auth operations go to 'auth' function_name = 'auth' else: # Fallback: convert path to function name function_name = endpoint.strip("/").replace("/", "_") # Convert underscores to hyphens for Modal URLs function_name_with_hyphens = function_name.replace('_', '-') # Construct URL without environment suffix base_url = f"https://{self.workspace}--{self.app_name}-{function_name_with_hyphens}.modal.run" return base_url + path_suffix def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: """Make a request to the Modal backend with unified error handling""" # If in local mode, always use local responses if self.local_mode: return self._get_local_response(endpoint, method, kwargs) # Store current method for URL construction self._current_method = method # Construct the appropriate URL url = self._construct_modal_endpoint_url(endpoint) # Handle special cases for Modal endpoints if self.is_modal_url: # For GET /sessions/{session_id}, Modal expects session_id as query param if endpoint.startswith('/sessions/') and method == 'GET' and endpoint != '/sessions': session_id = endpoint.split('/')[-1] kwargs.setdefault('params', {})['session_id'] = session_id # Note: PATCH and DELETE for /rules/{rule_id} now preserve the path structure # so we don't need to move rule_id to query params try: response = requests.request( method=method, url=url, headers=self.headers, timeout=15, # Reduced timeout for better responsiveness **kwargs ) response.raise_for_status() return response.json() except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: # Backend is down or unreachable, use full local mock as fallback print(f"MCPClient: Connection failed ('{e}'). Engaging fallback mode.") return self._get_local_response(endpoint, method, kwargs) except requests.exceptions.HTTPError as e: # Backend is up but returned an error (4xx, 5xx) print(f"MCPClient: HTTP Error: {e.response.status_code}") try: # Try to parse a JSON error response from the backend error_details = e.response.json() except ValueError: error_details = {'error': e.response.text or 'An unknown HTTP error occurred.'} return { 'success': False, 'status_code': e.response.status_code, 'error': self._get_user_friendly_error(e.response.status_code), **error_details } except requests.exceptions.RequestException as e: # Other request-related errors print(f"MCPClient: General request exception: {e}") return { 'error': f'A network error occurred: {e}', 'success': False, 'status_code': None } def _get_user_friendly_error(self, status_code: int) -> str: """Get user-friendly error message based on HTTP status code""" error_messages = { 400: "Invalid request - please check your input", 401: "Authentication required - please log in", 403: "Access denied", 404: "Resource not found", 429: "Please slow down - too many requests", 500: "Server issue - please try again", 502: "Server temporarily unavailable", 503: "Server busy - please try again later" } return error_messages.get(status_code, f"Error {status_code} occurred") def _get_local_response(self, endpoint: str, method: str, kwargs: Dict[str, Any]) -> Dict[str, Any]: """Provide session-aware local responses for all endpoints""" # Initialize session data if needed session_id = self.session_token if session_id not in self._mock_sessions: self._mock_sessions[session_id] = { 'emails': self._load_initial_emails(), 'rules': [], 'rule_counter': 0 } session_data = self._mock_sessions[session_id] # Handle different endpoints for local mode if '/rules/preview' in endpoint and method == 'POST': # Simulate rule preview rule = kwargs.get('json', {}).get('rule', {}) return { 'success': True, 'affected_emails': self._get_mock_preview_emails(), 'statistics': { 'matched_count': 5, 'total_scanned': 20 } } elif '/rules/apply' in endpoint and method == 'POST': # Simulate rule application rule = kwargs.get('json', {}).get('rule', {}) return { 'success': True, 'statistics': { 'processed_count': 5, 'moved_count': 3, 'labeled_count': 2 }, 'updated_emails': self._get_mock_updated_emails() } elif endpoint == '/rules' and method == 'POST': # Simulate saving a rule rule = kwargs.get('json', {}).get('rule', {}) rule['rule_id'] = rule.get('rule_id', f'rule_{session_data["rule_counter"]}') rule['created_at'] = rule.get('created_at', time.strftime('%Y-%m-%dT%H:%M:%SZ')) rule['status'] = rule.get('status', 'pending') session_data['rules'].append(rule) session_data['rule_counter'] += 1 return { 'success': True, 'rule_id': rule['rule_id'], 'message': 'Rule saved successfully' } elif '/rules' in endpoint and method == 'GET' and 'rule_id' not in endpoint: # Return saved rules for this session return { 'rules': session_data['rules'], 'count': 0, 'success': True } elif '/rules' in endpoint and method == 'PATCH': # Handle rule status update rule_id = endpoint.split('/')[-1] update_data = kwargs.get('json', {}).get('update', {}) for rule in session_data['rules']: if rule.get('rule_id') == rule_id: rule.update(update_data) return { 'success': True, 'rule': rule, 'message': f'Rule {rule_id} updated' } return {'success': False, 'error': 'Rule not found'} elif '/rules' in endpoint and method == 'DELETE': # Simulate rule deletion rule_id = endpoint.split('/')[-1] session_data['rules'] = [r for r in session_data['rules'] if r.get('rule_id') != rule_id] return { 'success': True, 'message': 'Rule deleted successfully' } elif '/emails/search' in endpoint and method == 'GET': # Return search results try: import json with open('data/emails.json', 'r') as f: emails = json.load(f) query = kwargs.get('params', {}).get('query', '').lower() # Simple search simulation results = [e for e in emails if query in e.get('subject', '').lower() or query in e.get('from_name', '').lower()][:10] return { 'emails': results, 'success': True } except (FileNotFoundError, json.JSONDecodeError) as e: return {'emails': [], 'error': f'Could not load emails: {e}', 'success': False} elif '/emails' in endpoint and method == 'GET': # Return emails from session data return { 'emails': session_data['emails'], 'next_page_token': None, 'total_estimate': len(session_data['emails']), 'success': True } elif '/labels' in endpoint and method == 'GET': # Return mock labels return { 'labels': [ {'id': 'INBOX', 'name': 'Inbox'}, {'id': 'WORK', 'name': 'Work'}, {'id': 'PERSONAL', 'name': 'Personal'}, {'id': 'ARCHIVE', 'name': 'Archive'} ], 'success': True } elif '/auth/gmail/url' in endpoint: # Return mock OAuth URL return { 'auth_url': 'https://accounts.google.com/oauth/authorize?mock=true', 'success': True } elif '/auth/gmail/callback' in endpoint: # Mock OAuth callback return { 'access_token': 'mock_access_token', 'user_info': { 'email': 'demo@example.com', 'name': 'Demo User' }, 'success': True } # Default fallback for unknown endpoints return { 'error': f'Endpoint {endpoint} not available in offline mode', 'success': False, 'fallback': True } def _get_mock_preview_emails(self) -> List[Dict[str, Any]]: """Get mock preview emails for local testing""" try: import json with open('data/preview_emails.json', 'r') as f: return json.load(f)[:10] except (FileNotFoundError, json.JSONDecodeError) as e: print(f"MCPClient: Could not load preview emails: {e}") return [] def _get_mock_updated_emails(self) -> List[Dict[str, Any]]: """Get mock updated emails after rule application""" try: import json with open('data/emails.json', 'r') as f: emails = json.load(f) # Simulate some emails being moved to different folders for i in range(min(5, len(emails))): if i % 2 == 0: emails[i]['folder'] = 'Work' else: emails[i]['labels'] = emails[i].get('labels', []) + ['Processed'] return emails except (FileNotFoundError, json.JSONDecodeError) as e: print(f"MCPClient: Could not load emails for update simulation: {e}") return [] # Email operations def list_emails( self, folder: str = 'inbox', page_size: int = 20, page_token: Optional[str] = None ) -> Dict[str, Any]: """ List emails from the specified folder Args: folder: Email folder/label to list page_size: Number of emails per page page_token: Token for pagination Returns: Dict with emails, next_page_token, and total_estimate """ params = { 'folder': folder, 'page_size': page_size, 'mode': self.mode } if page_token: params['page_token'] = page_token return self._request('GET', '/emails', params=params) def search_emails( self, query: str, folder: Optional[str] = None, max_results: int = 50 ) -> List[Dict[str, Any]]: """ Search emails with the given query Args: query: Search query folder: Optional folder to search within max_results: Maximum number of results Returns: List of matching emails """ params = { 'search': query, # Changed from 'query' to 'search' 'max_results': max_results, 'mode': self.mode } if folder: params['folder'] = folder response = self._request('GET', '/emails', params=params) # Changed endpoint return response.get('emails', []) def get_labels(self) -> List[Dict[str, Any]]: """ Get available email labels/folders Returns: List of label dictionaries """ response = self._request('GET', '/labels', params={'mode': self.mode}) return response.get('labels', []) # Rule operations def preview_rule( self, rule: Dict[str, Any], sample_size: int = 10 ) -> Dict[str, Any]: """ Preview the effects of a rule without applying it Args: rule: Rule dictionary sample_size: Number of sample emails to show Returns: Dict with matched emails and statistics """ return self._request('POST', '/rules/preview', json={ 'rule': rule, 'sample_size': sample_size, 'mode': self.mode }) def apply_rule( self, rule: Dict[str, Any], preview: bool = False ) -> Dict[str, Any]: """ Apply a rule to emails Args: rule: Rule dictionary preview: If True, only preview without applying Returns: Dict with results """ return self._request('POST', '/rules/apply', json={ 'rule': rule, 'preview': preview, 'mode': self.mode }) def save_rule(self, rule: Dict[str, Any]) -> Dict[str, Any]: """ Save a rule for the current user/session Args: rule: Rule dictionary to save Returns: Dict with saved rule ID """ return self._request('POST', '/rules/save', json={ 'rule': rule, 'mode': self.mode }) def update_rule_status(self, rule_id: str, new_status: str) -> Dict[str, Any]: """Update the status of a rule""" return self._request('PATCH', f'/rules/{rule_id}', json={ 'update': {'status': new_status} }) def archive_rule(self, rule_id: str) -> Dict[str, Any]: """Archive (reject) a rule""" return self.update_rule_status(rule_id, 'rejected') def reactivate_rule(self, rule_id: str) -> Dict[str, Any]: """Reactivate an archived rule""" return self.update_rule_status(rule_id, 'pending') def get_rules(self) -> List[Dict[str, Any]]: """ Get saved rules for the current user/session Returns: List of saved rules """ response = self._request('GET', '/rules', params={'mode': self.mode}) return response.get('rules', []) def delete_rule(self, rule_id: str) -> Dict[str, Any]: """ Delete a saved rule Args: rule_id: ID of the rule to delete Returns: Dict with success status """ return self._request('DELETE', f'/rules/{rule_id}') # Authentication def get_gmail_auth_url(self) -> str: """ Get Gmail OAuth authorization URL Returns: OAuth URL for Gmail authorization """ response = self._request('GET', '/auth/gmail/url') return response.get('auth_url', '') def exchange_gmail_code(self, code: str) -> Dict[str, Any]: """ Exchange Gmail OAuth code for tokens Args: code: OAuth authorization code Returns: Dict with access token and user info """ return self._request('POST', '/auth/gmail/callback', json={'code': code}) def _load_initial_emails(self) -> List[Dict[str, Any]]: """Load initial emails from file for demo mode""" try: import json with open('data/emails.json', 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"Warning: Could not load initial emails: {e}") # Return some default demo emails return [ { "id": "msg_001", "from_email": "demo@example.com", "from_name": "Demo User", "subject": "Welcome to Email Rule Agent!", "snippet": "This is a demo email to get you started...", "body": "This is a demo email to get you started with the Email Rule Agent.", "date": "2024-01-15T10:00:00Z", "labels": ["INBOX"], "folder": "INBOX" } ]