""" Streaming version of the agent that yields tokens as they come """ import os import json import re from typing import Generator, Dict, Any, List from langchain_openai import ChatOpenAI from langchain.schema import HumanMessage, SystemMessage from langchain.callbacks.base import BaseCallbackHandler class StreamingCallbackHandler(BaseCallbackHandler): """Callback handler for streaming LLM responses""" def __init__(self): self.tokens = [] self.current_token = "" def on_llm_new_token(self, token: str, **kwargs) -> None: """Called when LLM generates a new token""" self.current_token = token self.tokens.append(token) def analyze_emails_streaming( emails: List[Dict[str, Any]], focus_area: str = "all types" ) -> Generator[Dict[str, Any], None, None]: """Analyze emails and stream the response""" if not emails: yield { 'type': 'error', 'content': "No emails available to analyze." } return # Prepare email summaries email_summaries = [] for e in emails[:30]: email_summaries.append({ 'from': e.get('from_name', ''), 'subject': e.get('subject', ''), 'preview': (e.get('body', '')[:150] + '...') if len(e.get('body', '')) > 150 else e.get('body', '') }) prompt = f"""Analyze these {len(email_summaries)} email samples and propose 3-5 organization rules. Focus area: {focus_area} Email samples: {json.dumps(email_summaries, indent=2)} Create practical rules and explain your reasoning. Then provide a JSON block with this structure: {{ "rules": [ {{ "name": "Rule name", "description": "What this rule does", "confidence": 0.8, "conditions": [{{"field": "from", "operator": "contains", "value": "example.com"}}], "actions": [{{"type": "move", "folder": "work"}}] }} ] }}""" try: api_key = os.getenv('OPENROUTER_API_KEY') if not api_key: yield {'type': 'error', 'content': 'API key not configured'} return # Create streaming LLM llm = ChatOpenAI( openai_api_key=api_key, openai_api_base="https://openrouter.ai/api/v1", model_name='google/gemini-2.5-flash-preview-05-20', temperature=0.7, streaming=True ) # Create callback handler callback = StreamingCallbackHandler() # Start streaming messages = [ SystemMessage(content="You are an email pattern analyzer. Explain your analysis, then provide JSON."), HumanMessage(content=prompt) ] # Stream tokens full_response = "" for chunk in llm.stream(messages, config={"callbacks": [callback]}): token = chunk.content full_response += token yield { 'type': 'token', 'content': token, 'full_response': full_response } # Extract rules from response json_match = re.search(r'\{[\s\S]*\}', full_response) if json_match: try: parsed = json.loads(json_match.group()) rules = parsed.get('rules', []) # Format rules formatted_rules = [] for rule in rules: formatted_rule = { "name": rule['name'], "description": rule['description'], "conditions": rule.get('conditions', []), "actions": rule.get('actions', []), "confidence": rule.get('confidence', 0.8), "rule_id": f"rule_{hash(rule['name'])}_{len(formatted_rules)}" } formatted_rules.append(formatted_rule) yield { 'type': 'rules', 'rules': formatted_rules, 'full_response': full_response } except: yield { 'type': 'parse_error', 'full_response': full_response } else: yield { 'type': 'no_rules', 'full_response': full_response } except Exception as e: yield { 'type': 'error', 'content': str(e) }