import sys print("Python version") print (sys. version) from typing import Annotated, Sequence, TypedDict import operator import functools from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage from langchain_community.tools.tavily_search import TavilySearchResults from langchain_experimental.tools import PythonREPLTool from langchain.agents import create_openai_tools_agent from langchain_huggingface import HuggingFacePipeline from langgraph.graph import StateGraph, END from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline # SETUP: HuggingFace Model and Pipeline #name = "meta-llama/Llama-3.2-1B" #name="deepseek-ai/DeepSeek-R1-Distill-Qwen-32B" #name="deepseek-ai/deepseek-llm-7b-chat" name="openai-community/gpt2" #name="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B" #name="microsoft/Phi-3.5-mini-instruct" #name="Qwen/Qwen2.5-7B-Instruct-1M" tokenizer = AutoTokenizer.from_pretrained(name,truncation=True) tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained(name) pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, device_map="auto", max_new_tokens=500, # text to generate for outputs ) print ("pipeline is created") # Wrap in LangChain's HuggingFacePipeline llm = HuggingFacePipeline(pipeline=pipe) # Members and Final Options members = ["Researcher", "Coder"] options = ["FINISH"] + members # Supervisor prompt system_prompt = ( "You are a supervisor tasked with managing a conversation between the following workers: {members}." " Given the following user request, respond with the workers to act next. Each worker will perform a task" " and respond with their results and status. When all workers are finished, respond with FINISH." ) # Prompt template required for the workflow prompt = ChatPromptTemplate.from_messages( [ ("system", system_prompt), MessagesPlaceholder(variable_name="messages"), ("system", "Given the conversation above, who should act next? Or Should we FINISH? Select one of: {options}"), ] ).partial(options=str(options), members=", ".join(members)) print ("Prompt Template created") # Supervisor routing logic def route_tool_response(llm_response): """ Parse the LLM response to determine the next step based on routing logic. """ if "FINISH" in llm_response: return "FINISH" for member in members: if member in llm_response: return member return "Unknown" def supervisor_chain(state): """ Supervisor logic to interact with HuggingFacePipeline and decide the next worker. """ messages = state.get("messages", []) user_prompt = prompt.format(messages=messages) try: llm_response = pipe(user_prompt, max_new_tokens=500)[0]["generated_text"] except Exception as e: raise RuntimeError(f"LLM processing error: {e}") next_action = route_tool_response(llm_response) return {"next": next_action} # AgentState definition class AgentState(TypedDict): messages: Annotated[Sequence[BaseMessage], operator.add] next: str # Create tools tavily_tool = TavilySearchResults(max_results=5) python_repl_tool = PythonREPLTool() # Create agents with their respective prompts research_agent = create_openai_tools_agent( llm=llm, tools=[tavily_tool], prompt=ChatPromptTemplate.from_messages( [ SystemMessage(content="You are a web researcher."), MessagesPlaceholder(variable_name="messages"), MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder ] ), ) print ("Created agents with their respective prompts") code_agent = create_openai_tools_agent( llm=llm, tools=[python_repl_tool], prompt=ChatPromptTemplate.from_messages( [ SystemMessage(content="You may generate safe Python code for analysis."), MessagesPlaceholder(variable_name="messages"), MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder ] ), ) print ("create_openai_tools_agent") # Create the workflow workflow = StateGraph(AgentState) # Nodes workflow.add_node("Researcher", research_agent) # Pass the agent directly (no .run required) workflow.add_node("Coder", code_agent) # Pass the agent directly workflow.add_node("supervisor", supervisor_chain) # Add edges for workflow transitions for member in members: workflow.add_edge(member, "supervisor") workflow.add_conditional_edges( "supervisor", lambda x: x["next"], {k: k for k in members} | {"FINISH": END} # Dynamically map workers to their actions ) # Define entry point workflow.set_entry_point("supervisor") print(workflow) # Compile the workflow graph = workflow.compile() from IPython.display import display, Image display(Image(graph.get_graph().draw_mermaid_png())) # Properly formatted initial state initial_state = { "messages": [ #HumanMessage(content="Code hello world and print it to the terminal.") # Correct format for user input HumanMessage(content="Write Code for printing \"hello world\" in Python. Keep it precise.") # Correct format for user input ] } # Execute the workflow result = graph.invoke(initial_state) print("Workflow Result:", result)