import os from crewai import Agent, Crew, Process, Task from crewai.tools import tool from crewai_tools import ( CodeInterpreterTool, SerperDevTool, VisionTool, WebsiteSearchTool, YoutubeVideoSearchTool ) from openinference.instrumentation.crewai import CrewAIInstrumentor from phoenix.otel import register from util import get_final_answer PLANNING_MODEL = "gpt-4.5-preview" PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"] os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}" os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com" tracer_provider = register( auto_instrument=True, project_name="gaia" ) CrewAIInstrumentor().instrument(tracer_provider=tracer_provider) def run_crew(question): # Tools python_coding_tool = CodeInterpreterTool() image_search_tool = VisionTool() web_search_tool = SerperDevTool() web_rag_tool = WebsiteSearchTool() youtube_search_tool = YoutubeVideoSearchTool() # Agents image_search_agent = Agent( role="Image Search Agent", goal="Search an image for question \"{topic}\".", backstory="As an expert image search assistant, you search an image for the question.", allow_delegation=False, max_iter=1, tools=[image_search_tool], verbose=True ) python_coding_agent = Agent( role="Python Coding Agent", goal="Write and execute Python code to solve question \"{topic}\".", backstory="As an expert Python coding assistant, you write and execute Python for the question.", allow_delegation=False, max_iter=1, tools=[python_coding], verbose=True, ) web_search_agent = Agent( role="Web Search Agent", goal="Search the web for question \"{topic}\" and scrape the most relevant web page.", backstory="As an expert web search assistant, you search the web for the question and scrape the most relevant web page.", allow_delegation=False, max_iter=1, tools=[web_search_tool, web_rag_tool], verbose=True ) youtube_search_agent = Agent( role="YouTube Search Agent", goal="Search a YouTube video for question \"{topic}\".", backstory="As an expert YouTube search assistant, you search a YouTube video for the question.", allow_delegation=False, max_iter=1, tools=[youtube_search_tool], verbose=True ) # Tasks image_search_task = Task( agent=image_search_agent, description="Search an image for question \"{topic}\"", expected_output="Content to help answer the image related question." ) python_coding_task = Task( agent=python_coding_agent, description="Write and execute Python code to solve question \"{topic}\".", expected_output="Content to help answer the Phyton coding related question." ) web_search_task = Task( agent=web_search_agent, description="Search the web for question \"{topic}\" and scrape the most relevant web page.", expected_output="Content to help answer the web related question." ) youtube_search_task = Task( agent=youtube_search_agent, description="Search a YouTube video for question \"{topic}\"", expected_output="Content to help answer the YouTube related question." ) # Crew crew = Crew( agents=[image_search_agent, python_coding_agent, web_search_agent, youtube_search_agent], #planning=True, #planning_llm=PLANNING_MODEL, process=Process.sequential, tasks=[image_search_task, python_coding_task, web_search_task, youtube_search_task], verbose=True ) answer = crew.kickoff(inputs={"topic": question}) return get_final_answer(question, str(answer))