Spaces:
Running
Running
import os | |
import time | |
import datetime | |
from langgraph.graph import StateGraph, END | |
# from langgraph.checkpoint.memory import MemorySaver | |
from .utils.views import print_agent_output | |
from ..memory.research import ResearchState | |
from .utils.utils import sanitize_filename | |
# Import agent classes | |
from . import \ | |
WriterAgent, \ | |
EditorAgent, \ | |
PublisherAgent, \ | |
ResearchAgent, \ | |
HumanAgent | |
class ChiefEditorAgent: | |
"""Agent responsible for managing and coordinating editing tasks.""" | |
def __init__(self, task: dict, websocket=None, stream_output=None, tone=None, headers=None): | |
self.task = task | |
self.websocket = websocket | |
self.stream_output = stream_output | |
self.headers = headers or {} | |
self.tone = tone | |
self.task_id = self._generate_task_id() | |
self.output_dir = self._create_output_directory() | |
def _generate_task_id(self): | |
# Currently time based, but can be any unique identifier | |
return int(time.time()) | |
def _create_output_directory(self): | |
output_dir = "./outputs/" + \ | |
sanitize_filename( | |
f"run_{self.task_id}_{self.task.get('query')[0:40]}") | |
os.makedirs(output_dir, exist_ok=True) | |
return output_dir | |
def _initialize_agents(self): | |
return { | |
"writer": WriterAgent(self.websocket, self.stream_output, self.headers), | |
"editor": EditorAgent(self.websocket, self.stream_output, self.headers), | |
"research": ResearchAgent(self.websocket, self.stream_output, self.tone, self.headers), | |
"publisher": PublisherAgent(self.output_dir, self.websocket, self.stream_output, self.headers), | |
"human": HumanAgent(self.websocket, self.stream_output, self.headers) | |
} | |
def _create_workflow(self, agents): | |
workflow = StateGraph(ResearchState) | |
# Add nodes for each agent | |
workflow.add_node("browser", agents["research"].run_initial_research) | |
workflow.add_node("planner", agents["editor"].plan_research) | |
workflow.add_node("researcher", agents["editor"].run_parallel_research) | |
workflow.add_node("writer", agents["writer"].run) | |
workflow.add_node("publisher", agents["publisher"].run) | |
workflow.add_node("human", agents["human"].review_plan) | |
# Add edges | |
self._add_workflow_edges(workflow) | |
return workflow | |
def _add_workflow_edges(self, workflow): | |
workflow.add_edge('browser', 'planner') | |
workflow.add_edge('planner', 'human') | |
workflow.add_edge('researcher', 'writer') | |
workflow.add_edge('writer', 'publisher') | |
workflow.set_entry_point("browser") | |
workflow.add_edge('publisher', END) | |
# Add human in the loop | |
workflow.add_conditional_edges( | |
'human', | |
lambda review: "accept" if review['human_feedback'] is None else "revise", | |
{"accept": "researcher", "revise": "planner"} | |
) | |
def init_research_team(self): | |
"""Initialize and create a workflow for the research team.""" | |
agents = self._initialize_agents() | |
return self._create_workflow(agents) | |
async def _log_research_start(self): | |
message = f"Starting the research process for query '{self.task.get('query')}'..." | |
if self.websocket and self.stream_output: | |
await self.stream_output("logs", "starting_research", message, self.websocket) | |
else: | |
print_agent_output(message, "MASTER") | |
async def run_research_task(self, task_id=None): | |
""" | |
Run a research task with the initialized research team. | |
Args: | |
task_id (optional): The ID of the task to run. | |
Returns: | |
The result of the research task. | |
""" | |
research_team = self.init_research_team() | |
chain = research_team.compile() | |
await self._log_research_start() | |
config = { | |
"configurable": { | |
"thread_id": task_id, | |
"thread_ts": datetime.datetime.utcnow() | |
} | |
} | |
result = await chain.ainvoke({"task": self.task}, config=config) | |
return result | |