LangGraphMultiAgent / MultiAgentSupervisorResearcherCoder
aparnavellala's picture
Create MultiAgentSupervisorResearcherCoder
34a64af verified
import sys
print("Python version")
print (sys. version)
from typing import Annotated, Sequence, TypedDict
import operator
import functools
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_experimental.tools import PythonREPLTool
from langchain.agents import create_openai_tools_agent
from langchain_huggingface import HuggingFacePipeline
from langgraph.graph import StateGraph, END
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
# SETUP: HuggingFace Model and Pipeline
#name = "meta-llama/Llama-3.2-1B"
#name="deepseek-ai/DeepSeek-R1-Distill-Qwen-32B"
#name="deepseek-ai/deepseek-llm-7b-chat"
name="openai-community/gpt2"
#name="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"
#name="microsoft/Phi-3.5-mini-instruct"
#name="Qwen/Qwen2.5-7B-Instruct-1M"
tokenizer = AutoTokenizer.from_pretrained(name,truncation=True)
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(name)
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device_map="auto",
max_new_tokens=500, # text to generate for outputs
)
print ("pipeline is created")
# Wrap in LangChain's HuggingFacePipeline
llm = HuggingFacePipeline(pipeline=pipe)
# Members and Final Options
members = ["Researcher", "Coder"]
options = ["FINISH"] + members
# Supervisor prompt
system_prompt = (
"You are a supervisor tasked with managing a conversation between the following workers: {members}."
" Given the following user request, respond with the workers to act next. Each worker will perform a task"
" and respond with their results and status. When all workers are finished, respond with FINISH."
)
# Prompt template required for the workflow
prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
("system", "Given the conversation above, who should act next? Or Should we FINISH? Select one of: {options}"),
]
).partial(options=str(options), members=", ".join(members))
print ("Prompt Template created")
# Supervisor routing logic
def route_tool_response(llm_response):
"""
Parse the LLM response to determine the next step based on routing logic.
"""
if "FINISH" in llm_response:
return "FINISH"
for member in members:
if member in llm_response:
return member
return "Unknown"
def supervisor_chain(state):
"""
Supervisor logic to interact with HuggingFacePipeline and decide the next worker.
"""
messages = state.get("messages", [])
user_prompt = prompt.format(messages=messages)
try:
llm_response = pipe(user_prompt, max_new_tokens=500)[0]["generated_text"]
except Exception as e:
raise RuntimeError(f"LLM processing error: {e}")
next_action = route_tool_response(llm_response)
return {"next": next_action}
# AgentState definition
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
next: str
# Create tools
tavily_tool = TavilySearchResults(max_results=5)
python_repl_tool = PythonREPLTool()
# Create agents with their respective prompts
research_agent = create_openai_tools_agent(
llm=llm,
tools=[tavily_tool],
prompt=ChatPromptTemplate.from_messages(
[
SystemMessage(content="You are a web researcher."),
MessagesPlaceholder(variable_name="messages"),
MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder
]
),
)
print ("Created agents with their respective prompts")
code_agent = create_openai_tools_agent(
llm=llm,
tools=[python_repl_tool],
prompt=ChatPromptTemplate.from_messages(
[
SystemMessage(content="You may generate safe Python code for analysis."),
MessagesPlaceholder(variable_name="messages"),
MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder
]
),
)
print ("create_openai_tools_agent")
# Create the workflow
workflow = StateGraph(AgentState)
# Nodes
workflow.add_node("Researcher", research_agent) # Pass the agent directly (no .run required)
workflow.add_node("Coder", code_agent) # Pass the agent directly
workflow.add_node("supervisor", supervisor_chain)
# Add edges for workflow transitions
for member in members:
workflow.add_edge(member, "supervisor")
workflow.add_conditional_edges(
"supervisor",
lambda x: x["next"],
{k: k for k in members} | {"FINISH": END} # Dynamically map workers to their actions
)
# Define entry point
workflow.set_entry_point("supervisor")
print(workflow)
# Compile the workflow
graph = workflow.compile()
from IPython.display import display, Image
display(Image(graph.get_graph().draw_mermaid_png()))
# Properly formatted initial state
initial_state = {
"messages": [
#HumanMessage(content="Code hello world and print it to the terminal.") # Correct format for user input
HumanMessage(content="Write Code for printing \"hello world\" in Python. Keep it precise.") # Correct format for user input
]
}
# Execute the workflow
result = graph.invoke(initial_state)
print("Workflow Result:", result)