Spaces:
Runtime error
Runtime error
""" | |
Simple agent implementation without CrewAI overhead | |
Now with natural language rule extraction! | |
""" | |
import os | |
import json | |
import re | |
import logging | |
import uuid | |
from typing import List, Dict, Any, Optional, Callable, Tuple | |
from langchain_openai import ChatOpenAI | |
from langchain.schema import HumanMessage, SystemMessage, AIMessage | |
from .models import EmailRule, EmailRuleList, RuleCondition, RuleAction, RuleParameters | |
# Set up logging | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
# Global email context | |
email_context = [] | |
def get_llm(use_thinking=False): | |
"""Get configured OpenRouter LLM""" | |
api_key = os.getenv('OPENROUTER_API_KEY') | |
if not api_key: | |
raise ValueError( | |
"OpenRouter API key not configured. " | |
"Please set OPENROUTER_API_KEY in HuggingFace Spaces secrets." | |
) | |
# Use thinking model for complex reasoning tasks | |
model = 'google/gemini-2.5-flash-preview-05-20:thinking' if use_thinking else 'google/gemini-2.5-flash-preview-05-20' | |
return ChatOpenAI( | |
openai_api_key=api_key, | |
openai_api_base="https://openrouter.ai/api/v1", | |
model_name=model, | |
temperature=0.7, | |
streaming=True, | |
default_headers={ | |
"HTTP-Referer": os.getenv('HF_SPACE_URL', 'http://localhost:7860'), | |
"X-Title": "Email Rule Agent" | |
} | |
) | |
def analyze_emails_and_propose_rules(emails: List[Dict[str, Any]], focus_area: str = "all types") -> Dict[str, Any]: | |
"""Analyze emails and propose organization rules""" | |
if not emails: | |
return { | |
'success': False, | |
'message': "No emails available to analyze.", | |
'rules': [] | |
} | |
# Prepare email summaries | |
email_summaries = [] | |
for e in emails[:30]: # Limit to 30 emails | |
email_summaries.append({ | |
'from': e.get('from_name', ''), | |
'subject': e.get('subject', ''), | |
'preview': (e.get('body', '')[:150] + '...') if len(e.get('body', '')) > 150 else e.get('body', '') | |
}) | |
prompt = """Analyze these """ + str(len(email_summaries)) + """ email samples and propose 3-5 organization rules. | |
Focus area: """ + focus_area + """ | |
Available folders: inbox, reading, work, archive | |
You can suggest new folders if needed. | |
Email samples: | |
""" + json.dumps(email_summaries, indent=2) + """ | |
Look for patterns like: | |
- Newsletters (subject contains "newsletter", from contains "news") | |
- Work emails (from specific domains) | |
- Social media notifications | |
- Marketing/promotional emails | |
- Personal emails from friends/family | |
Create practical rules with clear patterns. | |
Each condition MUST have a non-empty value extracted from the email samples. | |
Example good rule: | |
{{ | |
"name": "Tech Newsletters", | |
"conditions": [ | |
{{"field": "from", "operator": "contains", "value": "techcrunch"}} | |
], | |
"actions": [{{"type": "move", "parameters": {{"folder": "reading"}}}}] | |
}} | |
DO NOT create rules with empty values like "" in conditions. | |
Return a JSON object with a 'rules' array containing the email rules.""" | |
try: | |
llm = get_llm(use_thinking=False) | |
# Try to use structured output if available | |
try: | |
structured_llm = llm.with_structured_output(EmailRuleList) | |
response = structured_llm.invoke([ | |
SystemMessage(content="You are an email pattern analyzer. Create 3-5 practical organization rules based on the email patterns you observe."), | |
HumanMessage(content=prompt) | |
]) | |
# Response is an EmailRuleList object | |
formatted_rules = [] | |
for rule in response.rules: | |
formatted_rule = rule.model_dump() | |
# Ensure rule_id is set with UUID | |
if not formatted_rule.get('rule_id'): | |
formatted_rule['rule_id'] = f"rule_{uuid.uuid4().hex[:8]}" | |
formatted_rules.append(formatted_rule) | |
return { | |
'success': True, | |
'rules': formatted_rules, | |
'message': f"Found {len(formatted_rules)} patterns in your emails!" | |
} | |
except Exception as struct_error: | |
# Fallback to regex extraction if structured output not supported | |
logger.warning(f"Structured output not available, falling back to regex: {struct_error}") | |
response = llm.invoke([ | |
SystemMessage(content="You are an email pattern analyzer. Always respond with valid JSON. Extract actual values from the email content - never use empty strings."), | |
HumanMessage(content=prompt) | |
]) | |
# Extract JSON from response | |
response_text = response.content | |
json_match = re.search(r'\{[\s\S]*\}', response_text) | |
if json_match: | |
parsed = json.loads(json_match.group()) | |
rules = parsed.get('rules', []) | |
# Format rules for the UI | |
formatted_rules = [] | |
for rule in rules: | |
formatted_rule = { | |
"name": rule.get('name', 'Unnamed Rule'), | |
"description": rule.get('description', 'Automatically organize emails'), | |
"conditions": [{ | |
"field": c.get('field', 'from'), | |
"operator": c.get('operator', 'contains'), | |
"value": c.get('value', ''), | |
"case_sensitive": False | |
} for c in rule.get('conditions', []) if c.get('value') and (isinstance(c.get('value'), str) and c.get('value').strip() or isinstance(c.get('value'), list) and c.get('value'))], | |
"actions": [{ | |
"type": a.get('type', 'move'), | |
"parameters": a.get('parameters', { | |
"folder": a.get('folder', 'inbox'), | |
"label": a.get('label', ''), | |
"template": a.get('template', '') | |
}) | |
} for a in rule.get('actions', [])], | |
"confidence": rule.get('confidence', 0.8), | |
"rule_id": f"rule_{uuid.uuid4().hex[:8]}" | |
} | |
formatted_rules.append(formatted_rule) | |
return { | |
'success': True, | |
'rules': formatted_rules, | |
'message': f"Found {len(formatted_rules)} patterns in your emails!" | |
} | |
else: | |
return { | |
'success': False, | |
'message': "Could not parse rule suggestions.", | |
'rules': [] | |
} | |
except Exception as e: | |
print(f"Error analyzing emails: {e}") | |
return { | |
'success': False, | |
'message': f"Error analyzing emails: {str(e)}", | |
'rules': [] | |
} | |
def detect_rule_intent(message: str) -> bool: | |
"""Check if the message contains intent to create a rule""" | |
rule_indicators = [ | |
# Direct commands | |
r'\b(move|archive|label|organize|put|send|flag|mark)\b.*\b(all|these|those|emails?|messages?)\b', | |
r'\b(i want|i need|please|can you|could you)\b.*\b(move|archive|organize)', | |
r'\ball\s+(my\s+)?(newsletter|marketing|promotional|work|personal)', | |
r'\b(delete|remove|trash)\b.*\b(all|these|emails)', # Will convert to archive | |
# Draft patterns | |
r'\b(draft|reply|respond)\b.*\b(polite|professional|saying|acknowledging)\b', | |
r'\b(send|write|create)\b.*\b(reply|response|acknowledgment)\b', | |
r'\b(draft|create|write)\b.*\b(replies?|responses?)\b.*\b(for|to)\b', | |
r'\bfor\s+(emails?\s+)?from\s+.*\bdraft\b', | |
r'\backnowledge\b.*\b(emails?|messages?)\b', | |
# Specific patterns | |
r'emails?\s+from\s+\w+', | |
r'put\s+.*\s+in(to)?\s+', | |
r'archive\s+(all\s+)?.*emails?', | |
r'unsubscribe|spam|junk', | |
] | |
message_lower = message.lower() | |
return any(re.search(pattern, message_lower) for pattern in rule_indicators) | |
def resolve_context_references( | |
text: str, | |
conversation_history: List[Dict], | |
emails: List[Dict[str, Any]] | |
) -> Tuple[str, Dict[str, Any]]: | |
"""Resolve contextual references like 'them', 'those' using conversation history""" | |
context_info = { | |
'mentioned_senders': [], | |
'mentioned_categories': [], | |
'mentioned_subjects': [] | |
} | |
# Look for recent context in conversation | |
for msg in conversation_history[-5:]: # Last 5 messages | |
content = msg.get('content', '').lower() | |
# Check for email references | |
if 'newsletter' in content: | |
context_info['mentioned_categories'].append('newsletter') | |
if 'marketing' in content or 'promotional' in content: | |
context_info['mentioned_categories'].append('marketing') | |
if 'work' in content or 'colleague' in content: | |
context_info['mentioned_categories'].append('work') | |
# Extract email addresses mentioned | |
email_pattern = r'[\w\.-]+@[\w\.-]+\.\w+' | |
found_emails = re.findall(email_pattern, content) | |
context_info['mentioned_senders'].extend(found_emails) | |
# Replace contextual references | |
resolved_text = text | |
if re.search(r'\b(them|those|these)\b', text.lower()) and not re.search(r'(emails?|messages?)', text.lower()): | |
# Add clarification based on context | |
if context_info['mentioned_categories']: | |
category = context_info['mentioned_categories'][-1] | |
resolved_text = text.replace('them', f'{category} emails') | |
resolved_text = resolved_text.replace('those', f'those {category} emails') | |
resolved_text = resolved_text.replace('these', f'these {category} emails') | |
return resolved_text, context_info | |
def extract_rule_from_natural_language( | |
user_message: str, | |
conversation_history: List[Dict], | |
emails: List[Dict[str, Any]] | |
) -> Optional[Dict[str, Any]]: | |
"""Extract a rule from natural language request""" | |
# Resolve context | |
resolved_message, context_info = resolve_context_references( | |
user_message, conversation_history, emails | |
) | |
# Convert delete to archive | |
if re.search(r'\b(delete|remove|trash)\b', resolved_message.lower()): | |
resolved_message = re.sub( | |
r'\b(delete|remove|trash)\b', | |
'archive', | |
resolved_message, | |
flags=re.IGNORECASE | |
) | |
# Use LLM to extract rule components | |
llm = get_llm(use_thinking=False) | |
# Include recent conversation context | |
recent_context = "" | |
if conversation_history and len(conversation_history) > 0: | |
# Get last 2 exchanges | |
recent = conversation_history[-4:] if len(conversation_history) >= 4 else conversation_history | |
for msg in recent: | |
role = "User" if msg.get('role') == 'user' else "Assistant" | |
recent_context += f"{role}: {msg.get('content', '')}\n" | |
prompt = f"""Extract email rule components from this request: "{resolved_message}" | |
Recent conversation: | |
{recent_context} | |
Context from conversation: | |
- Mentioned categories: {context_info['mentioned_categories']} | |
- Mentioned senders: {context_info['mentioned_senders']} | |
IMPORTANT: If this is NOT a request to create, modify, or apply an email rule, return the word "NOTARULE" instead of JSON. | |
Common rule requests include: | |
- "Move [emails] to [folder]" | |
- "Archive [type of emails]" | |
- "Please move all Uber receipts to travel folder" | |
- Confirmations like "yes", "ok", "sure" after discussing a rule | |
Email rule requests typically: | |
- Ask to move, archive, label, organize, or draft replies for emails | |
- Mention email senders, subjects, or content patterns | |
- Describe actions to take on emails | |
Available actions: move (to folder), archive, label, draft (a reply) | |
Available folders: inbox, reading, work, personal, receipts, travel, archive | |
If the user is confirming a rule (saying "yes", "ok", "sure" after you offered to create one), extract the rule from the conversation. | |
Return JSON in this EXACT format: | |
{{ | |
"name": "Short descriptive name for the rule", | |
"description": "What this rule does", | |
"conditions": [ | |
{{"field": "from", "operator": "contains", "value": "uber"}} | |
], | |
"actions": [ | |
{{"type": "move", "parameters": {{"folder": "travel"}}}} | |
], | |
"confidence": 0.9 | |
}} | |
Examples: | |
- "Move emails from Uber to travel folder" β {{"name": "Uber to Travel", "conditions": [{{"field": "from", "operator": "contains", "value": "uber"}}], "actions": [{{"type": "move", "parameters": {{"folder": "travel"}}}}]}} | |
- User says "Yes" after "Shall I move Uber receipts to travel?" β Extract the Uber rule from context""" | |
try: | |
# Try to use structured output | |
try: | |
structured_llm = llm.with_structured_output(EmailRule) | |
response = structured_llm.invoke([ | |
SystemMessage(content="You are a rule extraction expert. Extract email rule components from the user's request."), | |
HumanMessage(content=prompt) | |
]) | |
# Response is an EmailRule object | |
formatted_rule = response.model_dump() | |
# Ensure rule_id is set with UUID | |
if not formatted_rule.get('rule_id'): | |
formatted_rule['rule_id'] = f"rule_{uuid.uuid4().hex[:8]}" | |
return formatted_rule | |
except Exception as struct_error: | |
# Fallback to regex extraction | |
logger.warning(f"Structured output not available for single rule, falling back to regex: {struct_error}") | |
response = llm.invoke([ | |
SystemMessage(content="You are a rule extraction expert. Return either valid JSON for a rule or 'NOTARULE' if this is not a rule request."), | |
HumanMessage(content=prompt) | |
]) | |
# Check if it's not a rule | |
if "NOTARULE" in response.content: | |
return None | |
# Extract JSON | |
json_match = re.search(r'\{[\s\S]*\}', response.content) | |
if json_match: | |
rule_data = json.loads(json_match.group()) | |
logger.info(f"LLM returned JSON: {rule_data}") | |
# Format for UI - with defaults for missing fields | |
formatted_rule = { | |
"name": rule_data.get('name', 'Move Uber receipts to Travel'), | |
"description": rule_data.get('description', 'Automatically organize Uber receipts'), | |
"conditions": [{ | |
"field": c.get('field', 'from'), | |
"operator": c.get('operator', 'contains'), | |
"value": c.get('value', ''), | |
"case_sensitive": False | |
} for c in rule_data.get('conditions', [])], | |
"actions": [{ | |
"type": a.get('type', 'move'), | |
"parameters": a.get('parameters', { | |
"folder": a.get('folder', 'inbox'), | |
"label": a.get('label', ''), | |
"template": a.get('template', '') | |
}) | |
} for a in rule_data.get('actions', [])], | |
"confidence": rule_data.get('confidence', 0.9), | |
"rule_id": f"rule_{uuid.uuid4().hex[:8]}" | |
} | |
return formatted_rule | |
except Exception as e: | |
logger.error(f"Error extracting rule: {e}") | |
logger.error(f"Response content was: {response.content if 'response' in locals() else 'No response'}") | |
return None | |
def process_chat_message( | |
user_message: str, | |
emails: List[Dict[str, Any]], | |
conversation_history: Optional[List[Dict]] = None, | |
callback: Optional[Callable] = None, | |
rule_state: Optional[Dict] = None | |
) -> Dict[str, Any]: | |
"""Process a user message with improved priority flow""" | |
logger.info(f"=== AGENT CALLED ===\nMessage: {user_message[:50]}...\nEmails: {len(emails) if emails else 0}") | |
global email_context | |
email_context = emails | |
if not conversation_history: | |
conversation_history = [] | |
try: | |
# PRIORITY 1: Always try to extract rule from natural language first | |
logger.info("Checking if this is a rule request...") | |
# Show thinking indicator | |
if callback: | |
callback({'type': 'analyzer_feedback', 'content': 'π€ Understanding your request...'}) | |
# Extract rule from natural language - let Gemini decide if it's a rule | |
extracted_rule = extract_rule_from_natural_language( | |
user_message, conversation_history, emails | |
) | |
if extracted_rule: | |
logger.info(f"Rule extracted successfully: {extracted_rule}") | |
# Check if it's a delete request that was converted | |
if 'delete' in user_message.lower() or 'remove' in user_message.lower(): | |
response = "β I'll archive those emails for you (for safety, I archive instead of delete). Check the rules panel to preview the rule!" | |
else: | |
response = "β I've created a rule based on your request. Check the rules panel to preview how it will organize your emails!" | |
# Add hidden JSON marker | |
response += f"\n<!-- RULES_JSON_START\n{json.dumps([extracted_rule])}\nRULES_JSON_END -->" | |
return { | |
'response': response, | |
'rules': [extracted_rule], | |
'thinking_process': [] | |
} | |
else: | |
logger.info("No rule extracted - falling through to general conversation") | |
# PRIORITY 2: Check for analysis request | |
analyze_keywords = ['analyze', 'suggest', 'patterns', 'help me organize', | |
'what rules', 'find patterns', 'inbox analysis'] | |
should_analyze = any(keyword in user_message.lower() for keyword in analyze_keywords) | |
if should_analyze: | |
logger.info("Analysis requested - analyzing email patterns") | |
if callback: | |
callback({'type': 'analyzer_feedback', 'content': f'π Analyzing {len(emails)} emails for patterns...'}) | |
result = analyze_emails_and_propose_rules(emails) | |
if result['success'] and result['rules']: | |
# Simple response - rules shown in panel | |
response = f"π I analyzed your {len(emails)} emails and found {len(result['rules'])} patterns! Check the rules panel to see my suggestions." | |
# Add hidden JSON marker | |
response += f"\n<!-- RULES_JSON_START\n{json.dumps(result['rules'])}\nRULES_JSON_END -->" | |
return { | |
'response': response, | |
'rules': result['rules'], | |
'thinking_process': [] | |
} | |
else: | |
return { | |
'response': result['message'], | |
'rules': [], | |
'thinking_process': [] | |
} | |
# PRIORITY 3: General conversation | |
# Simple greetings | |
greetings = ['hi', 'hello', 'hey', 'good morning', 'good afternoon'] | |
is_greeting = any(greeting in user_message.lower().split() for greeting in greetings) | |
if is_greeting: | |
response = """π Hi there! I'm your email organization assistant. I can help you: | |
β’ **Create rules from natural language** - Just tell me what you want! | |
Example: "Archive all marketing emails" | |
β’ **Analyze your inbox** - I'll find patterns and suggest smart rules | |
Example: "Analyze my emails and suggest organization rules" | |
What would you like to do?""" | |
return { | |
'response': response, | |
'rules': [], | |
'thinking_process': [] | |
} | |
# Other general conversation | |
llm = get_llm(use_thinking=False) | |
messages = [ | |
SystemMessage(content="""You are a helpful email organization assistant. Be concise and friendly. | |
If the user seems to want email organization help, mention you can: | |
1. Create rules from their natural language requests (e.g., "archive all newsletters") | |
2. Analyze their inbox for patterns"""), | |
HumanMessage(content=user_message) | |
] | |
# Add recent conversation history for context | |
if conversation_history: | |
for msg in conversation_history[-4:]: # Last 4 messages | |
if msg['role'] == 'user': | |
messages.insert(-1, HumanMessage(content=msg['content'])) | |
else: | |
messages.insert(-1, AIMessage(content=msg['content'])) | |
response = llm.invoke(messages) | |
return { | |
'response': response.content, | |
'rules': [], | |
'thinking_process': [] | |
} | |
except Exception as e: | |
logger.error(f"Error in process_chat_message: {e}") | |
return { | |
'response': f"I encountered an error: {str(e)}. Please try again.", | |
'rules': [], | |
'thinking_process': [f"Error: {str(e)}"] | |
} | |
def extract_rules_from_output(output: str) -> List[Dict[str, Any]]: | |
"""Extract JSON rules from output string""" | |
try: | |
# Look for hidden JSON markers | |
hidden_json = re.search(r'<!-- RULES_JSON_START\s*(.*?)\s*RULES_JSON_END -->', output, re.DOTALL) | |
if hidden_json: | |
json_content = hidden_json.group(1).strip() | |
return json.loads(json_content) | |
except (json.JSONDecodeError, AttributeError) as e: | |
pass | |
return [] |