|
import sys |
|
print("Python version") |
|
print (sys. version) |
|
|
|
|
|
from typing import Annotated, Sequence, TypedDict |
|
import operator |
|
import functools |
|
|
|
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder |
|
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage |
|
from langchain_community.tools.tavily_search import TavilySearchResults |
|
from langchain_experimental.tools import PythonREPLTool |
|
from langchain.agents import create_openai_tools_agent |
|
from langchain_huggingface import HuggingFacePipeline |
|
from langgraph.graph import StateGraph, END |
|
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
|
|
|
|
|
|
|
|
|
|
|
name="openai-community/gpt2" |
|
|
|
|
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(name,truncation=True) |
|
tokenizer.pad_token = tokenizer.eos_token |
|
model = AutoModelForCausalLM.from_pretrained(name) |
|
|
|
pipe = pipeline( |
|
"text-generation", |
|
model=model, |
|
tokenizer=tokenizer, |
|
device_map="auto", |
|
max_new_tokens=500, # text to generate for outputs |
|
) |
|
print ("pipeline is created") |
|
|
|
|
|
llm = HuggingFacePipeline(pipeline=pipe) |
|
|
|
|
|
members = ["Researcher", "Coder"] |
|
options = ["FINISH"] + members |
|
|
|
|
|
system_prompt = ( |
|
"You are a supervisor tasked with managing a conversation between the following workers: {members}." |
|
" Given the following user request, respond with the workers to act next. Each worker will perform a task" |
|
" and respond with their results and status. When all workers are finished, respond with FINISH." |
|
) |
|
|
|
|
|
prompt = ChatPromptTemplate.from_messages( |
|
[ |
|
("system", system_prompt), |
|
MessagesPlaceholder(variable_name="messages"), |
|
("system", "Given the conversation above, who should act next? Or Should we FINISH? Select one of: {options}"), |
|
] |
|
).partial(options=str(options), members=", ".join(members)) |
|
|
|
print ("Prompt Template created") |
|
|
|
|
|
def route_tool_response(llm_response): |
|
""" |
|
Parse the LLM response to determine the next step based on routing logic. |
|
""" |
|
if "FINISH" in llm_response: |
|
return "FINISH" |
|
for member in members: |
|
if member in llm_response: |
|
return member |
|
return "Unknown" |
|
|
|
def supervisor_chain(state): |
|
""" |
|
Supervisor logic to interact with HuggingFacePipeline and decide the next worker. |
|
""" |
|
messages = state.get("messages", []) |
|
user_prompt = prompt.format(messages=messages) |
|
|
|
try: |
|
llm_response = pipe(user_prompt, max_new_tokens=500)[0]["generated_text"] |
|
except Exception as e: |
|
raise RuntimeError(f"LLM processing error: {e}") |
|
|
|
next_action = route_tool_response(llm_response) |
|
return {"next": next_action} |
|
|
|
|
|
class AgentState(TypedDict): |
|
messages: Annotated[Sequence[BaseMessage], operator.add] |
|
next: str |
|
|
|
|
|
tavily_tool = TavilySearchResults(max_results=5) |
|
python_repl_tool = PythonREPLTool() |
|
|
|
|
|
research_agent = create_openai_tools_agent( |
|
llm=llm, |
|
tools=[tavily_tool], |
|
prompt=ChatPromptTemplate.from_messages( |
|
[ |
|
SystemMessage(content="You are a web researcher."), |
|
MessagesPlaceholder(variable_name="messages"), |
|
MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder |
|
] |
|
), |
|
) |
|
|
|
print ("Created agents with their respective prompts") |
|
|
|
code_agent = create_openai_tools_agent( |
|
llm=llm, |
|
tools=[python_repl_tool], |
|
prompt=ChatPromptTemplate.from_messages( |
|
[ |
|
SystemMessage(content="You may generate safe Python code for analysis."), |
|
MessagesPlaceholder(variable_name="messages"), |
|
MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder |
|
] |
|
), |
|
) |
|
|
|
|
|
print ("create_openai_tools_agent") |
|
|
|
|
|
|
|
workflow = StateGraph(AgentState) |
|
|
|
|
|
workflow.add_node("Researcher", research_agent) # Pass the agent directly (no .run required) |
|
workflow.add_node("Coder", code_agent) # Pass the agent directly |
|
workflow.add_node("supervisor", supervisor_chain) |
|
|
|
|
|
for member in members: |
|
workflow.add_edge(member, "supervisor") |
|
|
|
workflow.add_conditional_edges( |
|
"supervisor", |
|
lambda x: x["next"], |
|
{k: k for k in members} | {"FINISH": END} # Dynamically map workers to their actions |
|
) |
|
|
|
|
|
workflow.set_entry_point("supervisor") |
|
|
|
print(workflow) |
|
|
|
|
|
graph = workflow.compile() |
|
|
|
from IPython.display import display, Image |
|
display(Image(graph.get_graph().draw_mermaid_png())) |
|
|
|
|
|
initial_state = { |
|
"messages": [ |
|
#HumanMessage(content="Code hello world and print it to the terminal.") # Correct format for user input |
|
HumanMessage(content="Write Code for printing \"hello world\" in Python. Keep it precise.") # Correct format for user input |
|
] |
|
} |
|
|
|
|
|
result = graph.invoke(initial_state) |
|
print("Workflow Result:", result) |