import os import time import datetime from langgraph.graph import StateGraph, END # from langgraph.checkpoint.memory import MemorySaver from .utils.views import print_agent_output from ..memory.research import ResearchState from .utils.utils import sanitize_filename # Import agent classes from . import \ WriterAgent, \ EditorAgent, \ PublisherAgent, \ ResearchAgent, \ HumanAgent class ChiefEditorAgent: """Agent responsible for managing and coordinating editing tasks.""" def __init__(self, task: dict, websocket=None, stream_output=None, tone=None, headers=None): self.task = task self.websocket = websocket self.stream_output = stream_output self.headers = headers or {} self.tone = tone self.task_id = self._generate_task_id() self.output_dir = self._create_output_directory() def _generate_task_id(self): # Currently time based, but can be any unique identifier return int(time.time()) def _create_output_directory(self): output_dir = "./outputs/" + \ sanitize_filename( f"run_{self.task_id}_{self.task.get('query')[0:40]}") os.makedirs(output_dir, exist_ok=True) return output_dir def _initialize_agents(self): return { "writer": WriterAgent(self.websocket, self.stream_output, self.headers), "editor": EditorAgent(self.websocket, self.stream_output, self.headers), "research": ResearchAgent(self.websocket, self.stream_output, self.tone, self.headers), "publisher": PublisherAgent(self.output_dir, self.websocket, self.stream_output, self.headers), "human": HumanAgent(self.websocket, self.stream_output, self.headers) } def _create_workflow(self, agents): workflow = StateGraph(ResearchState) # Add nodes for each agent workflow.add_node("browser", agents["research"].run_initial_research) workflow.add_node("planner", agents["editor"].plan_research) workflow.add_node("researcher", agents["editor"].run_parallel_research) workflow.add_node("writer", agents["writer"].run) workflow.add_node("publisher", agents["publisher"].run) workflow.add_node("human", agents["human"].review_plan) # Add edges self._add_workflow_edges(workflow) return workflow def _add_workflow_edges(self, workflow): workflow.add_edge('browser', 'planner') workflow.add_edge('planner', 'human') workflow.add_edge('researcher', 'writer') workflow.add_edge('writer', 'publisher') workflow.set_entry_point("browser") workflow.add_edge('publisher', END) # Add human in the loop workflow.add_conditional_edges( 'human', lambda review: "accept" if review['human_feedback'] is None else "revise", {"accept": "researcher", "revise": "planner"} ) def init_research_team(self): """Initialize and create a workflow for the research team.""" agents = self._initialize_agents() return self._create_workflow(agents) async def _log_research_start(self): message = f"Starting the research process for query '{self.task.get('query')}'..." if self.websocket and self.stream_output: await self.stream_output("logs", "starting_research", message, self.websocket) else: print_agent_output(message, "MASTER") async def run_research_task(self, task_id=None): """ Run a research task with the initialized research team. Args: task_id (optional): The ID of the task to run. Returns: The result of the research task. """ research_team = self.init_research_team() chain = research_team.compile() await self._log_research_start() config = { "configurable": { "thread_id": task_id, "thread_ts": datetime.datetime.utcnow() } } result = await chain.ainvoke({"task": self.task}, config=config) return result