File size: 4,454 Bytes
fb293f8 59f5166 00aced7 ee397c8 1c7a0a7 15b8627 1c7a0a7 eea8c7f fe0c112 1c7a0a7 f06be5d 230d96d d1de168 ce7387d c70f203 2bbc3a8 86f2a58 230d96d df58f18 230d96d 3bd69bd 5028b6b 7ddb52b f407c48 7bc23ab e026826 7da0809 7349562 909d3c0 35828ac 1c7a0a7 fd399dc 7da0809 eea8c7f b6591d5 b40cc33 b6591d5 eea8c7f 69b460d b40cc33 7349562 eea8c7f 1c7a0a7 eea8c7f b40cc33 b6591d5 eea8c7f 3981c3e fd399dc 6245998 70df3c4 eea8c7f fd399dc 6bf14de 3981c3e b40cc33 fd399dc f855987 1c7a0a7 7f78cfc fd399dc 35828ac 3981c3e b40cc33 fd399dc f855987 35828ac 7da0809 a412583 b40cc33 fd399dc c70f203 bfd09a6 4453360 bec1a98 c70f203 4453360 7da0809 b40cc33 b05f917 a412583 fd399dc b05f917 4453360 7da0809 35828ac 0b498b7 fd399dc a412583 f155629 c70f203 ab1c2b7 1e57e78 06d1985 1e57e78 fe96564 1e57e78 1312508 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
import os
from crewai import Agent, Crew, Process, Task
from crewai.tools import tool
from crewai_tools import (
CodeInterpreterTool,
SerperDevTool,
VisionTool,
WebsiteSearchTool,
YoutubeVideoSearchTool
)
from openinference.instrumentation.crewai import CrewAIInstrumentor
from phoenix.otel import register
from util import get_final_answer
MANAGER_MODEL = "gpt-4.1"
AGENT_MODEL = "gpt-4.1"
PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"]
os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}"
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com"
tracer_provider = register(
auto_instrument=True,
project_name="gaia"
)
CrewAIInstrumentor().instrument(tracer_provider=tracer_provider)
def run_crew(question, file_name):
# Tools
image_analysis_tool = VisionTool()
python_coding_tool = CodeInterpreterTool()
web_search_tool = SerperDevTool()
web_rag_tool = WebsiteSearchTool()
youtube_analysis_tool = YoutubeVideoSearchTool()
# Agents
image_analysis_agent = Agent(
role="Image Analysis Agent",
goal="Analyze image to help answer question \"{topic}\". ",
backstory="As an expert image analysis assistant, you analyze the image to help answer the question. ",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=3,
tools=[image_analysis_tool],
verbose=False
)
python_coding_agent = Agent(
role="Python Coding Agent",
goal="Write and/or execute Python code to help answer question \"{topic}\". ",
backstory="As an expert Python coding assistant, you write and/or execute Python code to help answer the question. ",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=5,
tools=[python_coding_tool],
verbose=False
)
web_search_agent = Agent(
role="Web Search Agent",
goal="Search the web to help answer question \"{topic}\", then scrape the most relevant web page. ",
backstory="As an expert web search assistant, you search the web to help answer the question. ",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=3,
tools=[web_search_tool, web_rag_tool],
verbose=False
)
youtube_analysis_agent = Agent(
role="YouTube Analysis Agent",
goal="Analyze YouTube video to help answer question \"{topic}\". ",
backstory="As an expert YouTube video analysis assistant, you analyze the video to help answer the question. ",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=3,
tools=[youtube_analysis_tool],
verbose=False
)
manager_agent = Agent(
role="Manager Agent",
goal="Try to answer the following question. If needed, delegate to **one** of your coworkers, Image Analysis Agent, Python Coding Agent, Web Search Agent, or YouTube Analysis Agent for help. "
"If there is no good coworker, delegate to the Python Coding Agent to implement a tool for the task. "
"Question: \"{topic}\" ",
backstory="As an expert manager assistant, you answer the question. ",
allow_delegation=True,
llm=MANAGER_MODEL,
max_iter=5,
verbose=True
)
# Tasks
manager_task = Task(
agent=manager_agent,
description="Try to answer the following question. If needed, delegate to **one** of your coworkers, Image Analysis Agent, Python Coding Agent, Web Search Agent, or YouTube Analysis Agent for help. Question: \"{topic}\" ",
expected_output="The answer to the question. "
)
# Crew
crew = Crew(
agents=[image_analysis_agent, python_coding_agent, web_search_agent, youtube_analysis_agent],
manager_agent=manager_agent,
tasks=[manager_task],
verbose=True
)
# Processing
if file_name:
question = f"{question} File name: data/{file_name}. "
if file_name.endswith(".py"):
with open(f"data/{file_name}", "r") as file:
question = f"{question} File data:\n{file.read()}"
print(f"Question: {question}")
initial_answer = crew.kickoff(inputs={"topic": question})
print(f"Initial answer: {initial_answer}")
final_answer = get_final_answer(question, str(initial_answer))
print(f"Final answer: {final_answer}")
return final_answer |