AI-Email-Organizer / components /simple_agent.py
olety's picture
Initial deployment
028cd37 verified
"""
Simple agent implementation without CrewAI overhead
Now with natural language rule extraction!
"""
import os
import json
import re
import logging
import uuid
from typing import List, Dict, Any, Optional, Callable, Tuple
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage, AIMessage
from .models import EmailRule, EmailRuleList, RuleCondition, RuleAction, RuleParameters
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Global email context
email_context = []
def get_llm(use_thinking=False):
"""Get configured OpenRouter LLM"""
api_key = os.getenv('OPENROUTER_API_KEY')
if not api_key:
raise ValueError(
"OpenRouter API key not configured. "
"Please set OPENROUTER_API_KEY in HuggingFace Spaces secrets."
)
# Use thinking model for complex reasoning tasks
model = 'google/gemini-2.5-flash-preview-05-20:thinking' if use_thinking else 'google/gemini-2.5-flash-preview-05-20'
return ChatOpenAI(
openai_api_key=api_key,
openai_api_base="https://openrouter.ai/api/v1",
model_name=model,
temperature=0.7,
streaming=True,
default_headers={
"HTTP-Referer": os.getenv('HF_SPACE_URL', 'http://localhost:7860'),
"X-Title": "Email Rule Agent"
}
)
def analyze_emails_and_propose_rules(emails: List[Dict[str, Any]], focus_area: str = "all types") -> Dict[str, Any]:
"""Analyze emails and propose organization rules"""
if not emails:
return {
'success': False,
'message': "No emails available to analyze.",
'rules': []
}
# Prepare email summaries
email_summaries = []
for e in emails[:30]: # Limit to 30 emails
email_summaries.append({
'from': e.get('from_name', ''),
'subject': e.get('subject', ''),
'preview': (e.get('body', '')[:150] + '...') if len(e.get('body', '')) > 150 else e.get('body', '')
})
prompt = """Analyze these """ + str(len(email_summaries)) + """ email samples and propose 3-5 organization rules.
Focus area: """ + focus_area + """
Available folders: inbox, reading, work, archive
You can suggest new folders if needed.
Email samples:
""" + json.dumps(email_summaries, indent=2) + """
Look for patterns like:
- Newsletters (subject contains "newsletter", from contains "news")
- Work emails (from specific domains)
- Social media notifications
- Marketing/promotional emails
- Personal emails from friends/family
Create practical rules with clear patterns.
Each condition MUST have a non-empty value extracted from the email samples.
Example good rule:
{{
"name": "Tech Newsletters",
"conditions": [
{{"field": "from", "operator": "contains", "value": "techcrunch"}}
],
"actions": [{{"type": "move", "parameters": {{"folder": "reading"}}}}]
}}
DO NOT create rules with empty values like "" in conditions.
Return a JSON object with a 'rules' array containing the email rules."""
try:
llm = get_llm(use_thinking=False)
# Try to use structured output if available
try:
structured_llm = llm.with_structured_output(EmailRuleList)
response = structured_llm.invoke([
SystemMessage(content="You are an email pattern analyzer. Create 3-5 practical organization rules based on the email patterns you observe."),
HumanMessage(content=prompt)
])
# Response is an EmailRuleList object
formatted_rules = []
for rule in response.rules:
formatted_rule = rule.model_dump()
# Ensure rule_id is set with UUID
if not formatted_rule.get('rule_id'):
formatted_rule['rule_id'] = f"rule_{uuid.uuid4().hex[:8]}"
formatted_rules.append(formatted_rule)
return {
'success': True,
'rules': formatted_rules,
'message': f"Found {len(formatted_rules)} patterns in your emails!"
}
except Exception as struct_error:
# Fallback to regex extraction if structured output not supported
logger.warning(f"Structured output not available, falling back to regex: {struct_error}")
response = llm.invoke([
SystemMessage(content="You are an email pattern analyzer. Always respond with valid JSON. Extract actual values from the email content - never use empty strings."),
HumanMessage(content=prompt)
])
# Extract JSON from response
response_text = response.content
json_match = re.search(r'\{[\s\S]*\}', response_text)
if json_match:
parsed = json.loads(json_match.group())
rules = parsed.get('rules', [])
# Format rules for the UI
formatted_rules = []
for rule in rules:
formatted_rule = {
"name": rule.get('name', 'Unnamed Rule'),
"description": rule.get('description', 'Automatically organize emails'),
"conditions": [{
"field": c.get('field', 'from'),
"operator": c.get('operator', 'contains'),
"value": c.get('value', ''),
"case_sensitive": False
} for c in rule.get('conditions', []) if c.get('value') and (isinstance(c.get('value'), str) and c.get('value').strip() or isinstance(c.get('value'), list) and c.get('value'))],
"actions": [{
"type": a.get('type', 'move'),
"parameters": a.get('parameters', {
"folder": a.get('folder', 'inbox'),
"label": a.get('label', ''),
"template": a.get('template', '')
})
} for a in rule.get('actions', [])],
"confidence": rule.get('confidence', 0.8),
"rule_id": f"rule_{uuid.uuid4().hex[:8]}"
}
formatted_rules.append(formatted_rule)
return {
'success': True,
'rules': formatted_rules,
'message': f"Found {len(formatted_rules)} patterns in your emails!"
}
else:
return {
'success': False,
'message': "Could not parse rule suggestions.",
'rules': []
}
except Exception as e:
print(f"Error analyzing emails: {e}")
return {
'success': False,
'message': f"Error analyzing emails: {str(e)}",
'rules': []
}
def detect_rule_intent(message: str) -> bool:
"""Check if the message contains intent to create a rule"""
rule_indicators = [
# Direct commands
r'\b(move|archive|label|organize|put|send|flag|mark)\b.*\b(all|these|those|emails?|messages?)\b',
r'\b(i want|i need|please|can you|could you)\b.*\b(move|archive|organize)',
r'\ball\s+(my\s+)?(newsletter|marketing|promotional|work|personal)',
r'\b(delete|remove|trash)\b.*\b(all|these|emails)', # Will convert to archive
# Draft patterns
r'\b(draft|reply|respond)\b.*\b(polite|professional|saying|acknowledging)\b',
r'\b(send|write|create)\b.*\b(reply|response|acknowledgment)\b',
r'\b(draft|create|write)\b.*\b(replies?|responses?)\b.*\b(for|to)\b',
r'\bfor\s+(emails?\s+)?from\s+.*\bdraft\b',
r'\backnowledge\b.*\b(emails?|messages?)\b',
# Specific patterns
r'emails?\s+from\s+\w+',
r'put\s+.*\s+in(to)?\s+',
r'archive\s+(all\s+)?.*emails?',
r'unsubscribe|spam|junk',
]
message_lower = message.lower()
return any(re.search(pattern, message_lower) for pattern in rule_indicators)
def resolve_context_references(
text: str,
conversation_history: List[Dict],
emails: List[Dict[str, Any]]
) -> Tuple[str, Dict[str, Any]]:
"""Resolve contextual references like 'them', 'those' using conversation history"""
context_info = {
'mentioned_senders': [],
'mentioned_categories': [],
'mentioned_subjects': []
}
# Look for recent context in conversation
for msg in conversation_history[-5:]: # Last 5 messages
content = msg.get('content', '').lower()
# Check for email references
if 'newsletter' in content:
context_info['mentioned_categories'].append('newsletter')
if 'marketing' in content or 'promotional' in content:
context_info['mentioned_categories'].append('marketing')
if 'work' in content or 'colleague' in content:
context_info['mentioned_categories'].append('work')
# Extract email addresses mentioned
email_pattern = r'[\w\.-]+@[\w\.-]+\.\w+'
found_emails = re.findall(email_pattern, content)
context_info['mentioned_senders'].extend(found_emails)
# Replace contextual references
resolved_text = text
if re.search(r'\b(them|those|these)\b', text.lower()) and not re.search(r'(emails?|messages?)', text.lower()):
# Add clarification based on context
if context_info['mentioned_categories']:
category = context_info['mentioned_categories'][-1]
resolved_text = text.replace('them', f'{category} emails')
resolved_text = resolved_text.replace('those', f'those {category} emails')
resolved_text = resolved_text.replace('these', f'these {category} emails')
return resolved_text, context_info
def extract_rule_from_natural_language(
user_message: str,
conversation_history: List[Dict],
emails: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""Extract a rule from natural language request"""
# Resolve context
resolved_message, context_info = resolve_context_references(
user_message, conversation_history, emails
)
# Convert delete to archive
if re.search(r'\b(delete|remove|trash)\b', resolved_message.lower()):
resolved_message = re.sub(
r'\b(delete|remove|trash)\b',
'archive',
resolved_message,
flags=re.IGNORECASE
)
# Use LLM to extract rule components
llm = get_llm(use_thinking=False)
# Include recent conversation context
recent_context = ""
if conversation_history and len(conversation_history) > 0:
# Get last 2 exchanges
recent = conversation_history[-4:] if len(conversation_history) >= 4 else conversation_history
for msg in recent:
role = "User" if msg.get('role') == 'user' else "Assistant"
recent_context += f"{role}: {msg.get('content', '')}\n"
prompt = f"""Extract email rule components from this request: "{resolved_message}"
Recent conversation:
{recent_context}
Context from conversation:
- Mentioned categories: {context_info['mentioned_categories']}
- Mentioned senders: {context_info['mentioned_senders']}
IMPORTANT: If this is NOT a request to create, modify, or apply an email rule, return the word "NOTARULE" instead of JSON.
Common rule requests include:
- "Move [emails] to [folder]"
- "Archive [type of emails]"
- "Please move all Uber receipts to travel folder"
- Confirmations like "yes", "ok", "sure" after discussing a rule
Email rule requests typically:
- Ask to move, archive, label, organize, or draft replies for emails
- Mention email senders, subjects, or content patterns
- Describe actions to take on emails
Available actions: move (to folder), archive, label, draft (a reply)
Available folders: inbox, reading, work, personal, receipts, travel, archive
If the user is confirming a rule (saying "yes", "ok", "sure" after you offered to create one), extract the rule from the conversation.
Return JSON in this EXACT format:
{{
"name": "Short descriptive name for the rule",
"description": "What this rule does",
"conditions": [
{{"field": "from", "operator": "contains", "value": "uber"}}
],
"actions": [
{{"type": "move", "parameters": {{"folder": "travel"}}}}
],
"confidence": 0.9
}}
Examples:
- "Move emails from Uber to travel folder" β†’ {{"name": "Uber to Travel", "conditions": [{{"field": "from", "operator": "contains", "value": "uber"}}], "actions": [{{"type": "move", "parameters": {{"folder": "travel"}}}}]}}
- User says "Yes" after "Shall I move Uber receipts to travel?" β†’ Extract the Uber rule from context"""
try:
# Try to use structured output
try:
structured_llm = llm.with_structured_output(EmailRule)
response = structured_llm.invoke([
SystemMessage(content="You are a rule extraction expert. Extract email rule components from the user's request."),
HumanMessage(content=prompt)
])
# Response is an EmailRule object
formatted_rule = response.model_dump()
# Ensure rule_id is set with UUID
if not formatted_rule.get('rule_id'):
formatted_rule['rule_id'] = f"rule_{uuid.uuid4().hex[:8]}"
return formatted_rule
except Exception as struct_error:
# Fallback to regex extraction
logger.warning(f"Structured output not available for single rule, falling back to regex: {struct_error}")
response = llm.invoke([
SystemMessage(content="You are a rule extraction expert. Return either valid JSON for a rule or 'NOTARULE' if this is not a rule request."),
HumanMessage(content=prompt)
])
# Check if it's not a rule
if "NOTARULE" in response.content:
return None
# Extract JSON
json_match = re.search(r'\{[\s\S]*\}', response.content)
if json_match:
rule_data = json.loads(json_match.group())
logger.info(f"LLM returned JSON: {rule_data}")
# Format for UI - with defaults for missing fields
formatted_rule = {
"name": rule_data.get('name', 'Move Uber receipts to Travel'),
"description": rule_data.get('description', 'Automatically organize Uber receipts'),
"conditions": [{
"field": c.get('field', 'from'),
"operator": c.get('operator', 'contains'),
"value": c.get('value', ''),
"case_sensitive": False
} for c in rule_data.get('conditions', [])],
"actions": [{
"type": a.get('type', 'move'),
"parameters": a.get('parameters', {
"folder": a.get('folder', 'inbox'),
"label": a.get('label', ''),
"template": a.get('template', '')
})
} for a in rule_data.get('actions', [])],
"confidence": rule_data.get('confidence', 0.9),
"rule_id": f"rule_{uuid.uuid4().hex[:8]}"
}
return formatted_rule
except Exception as e:
logger.error(f"Error extracting rule: {e}")
logger.error(f"Response content was: {response.content if 'response' in locals() else 'No response'}")
return None
def process_chat_message(
user_message: str,
emails: List[Dict[str, Any]],
conversation_history: Optional[List[Dict]] = None,
callback: Optional[Callable] = None,
rule_state: Optional[Dict] = None
) -> Dict[str, Any]:
"""Process a user message with improved priority flow"""
logger.info(f"=== AGENT CALLED ===\nMessage: {user_message[:50]}...\nEmails: {len(emails) if emails else 0}")
global email_context
email_context = emails
if not conversation_history:
conversation_history = []
try:
# PRIORITY 1: Always try to extract rule from natural language first
logger.info("Checking if this is a rule request...")
# Show thinking indicator
if callback:
callback({'type': 'analyzer_feedback', 'content': 'πŸ€” Understanding your request...'})
# Extract rule from natural language - let Gemini decide if it's a rule
extracted_rule = extract_rule_from_natural_language(
user_message, conversation_history, emails
)
if extracted_rule:
logger.info(f"Rule extracted successfully: {extracted_rule}")
# Check if it's a delete request that was converted
if 'delete' in user_message.lower() or 'remove' in user_message.lower():
response = "βœ… I'll archive those emails for you (for safety, I archive instead of delete). Check the rules panel to preview the rule!"
else:
response = "βœ… I've created a rule based on your request. Check the rules panel to preview how it will organize your emails!"
# Add hidden JSON marker
response += f"\n<!-- RULES_JSON_START\n{json.dumps([extracted_rule])}\nRULES_JSON_END -->"
return {
'response': response,
'rules': [extracted_rule],
'thinking_process': []
}
else:
logger.info("No rule extracted - falling through to general conversation")
# PRIORITY 2: Check for analysis request
analyze_keywords = ['analyze', 'suggest', 'patterns', 'help me organize',
'what rules', 'find patterns', 'inbox analysis']
should_analyze = any(keyword in user_message.lower() for keyword in analyze_keywords)
if should_analyze:
logger.info("Analysis requested - analyzing email patterns")
if callback:
callback({'type': 'analyzer_feedback', 'content': f'πŸ“Š Analyzing {len(emails)} emails for patterns...'})
result = analyze_emails_and_propose_rules(emails)
if result['success'] and result['rules']:
# Simple response - rules shown in panel
response = f"πŸ“Š I analyzed your {len(emails)} emails and found {len(result['rules'])} patterns! Check the rules panel to see my suggestions."
# Add hidden JSON marker
response += f"\n<!-- RULES_JSON_START\n{json.dumps(result['rules'])}\nRULES_JSON_END -->"
return {
'response': response,
'rules': result['rules'],
'thinking_process': []
}
else:
return {
'response': result['message'],
'rules': [],
'thinking_process': []
}
# PRIORITY 3: General conversation
# Simple greetings
greetings = ['hi', 'hello', 'hey', 'good morning', 'good afternoon']
is_greeting = any(greeting in user_message.lower().split() for greeting in greetings)
if is_greeting:
response = """πŸ‘‹ Hi there! I'm your email organization assistant. I can help you:
β€’ **Create rules from natural language** - Just tell me what you want!
Example: "Archive all marketing emails"
β€’ **Analyze your inbox** - I'll find patterns and suggest smart rules
Example: "Analyze my emails and suggest organization rules"
What would you like to do?"""
return {
'response': response,
'rules': [],
'thinking_process': []
}
# Other general conversation
llm = get_llm(use_thinking=False)
messages = [
SystemMessage(content="""You are a helpful email organization assistant. Be concise and friendly.
If the user seems to want email organization help, mention you can:
1. Create rules from their natural language requests (e.g., "archive all newsletters")
2. Analyze their inbox for patterns"""),
HumanMessage(content=user_message)
]
# Add recent conversation history for context
if conversation_history:
for msg in conversation_history[-4:]: # Last 4 messages
if msg['role'] == 'user':
messages.insert(-1, HumanMessage(content=msg['content']))
else:
messages.insert(-1, AIMessage(content=msg['content']))
response = llm.invoke(messages)
return {
'response': response.content,
'rules': [],
'thinking_process': []
}
except Exception as e:
logger.error(f"Error in process_chat_message: {e}")
return {
'response': f"I encountered an error: {str(e)}. Please try again.",
'rules': [],
'thinking_process': [f"Error: {str(e)}"]
}
def extract_rules_from_output(output: str) -> List[Dict[str, Any]]:
"""Extract JSON rules from output string"""
try:
# Look for hidden JSON markers
hidden_json = re.search(r'<!-- RULES_JSON_START\s*(.*?)\s*RULES_JSON_END -->', output, re.DOTALL)
if hidden_json:
json_content = hidden_json.group(1).strip()
return json.loads(json_content)
except (json.JSONDecodeError, AttributeError) as e:
pass
return []