import os from crewai import Agent, Crew, Process, Task from crewai.tools import tool from crewai_tools import ( CodeInterpreterTool, SerperDevTool, VisionTool, WebsiteSearchTool, YoutubeVideoSearchTool ) from openinference.instrumentation.crewai import CrewAIInstrumentor from phoenix.otel import register from util import get_final_answer MANAGER_MODEL = "gpt-4.5-preview" AGENT_MODEL = "gpt-4.5-preview" PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"] os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}" os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com" tracer_provider = register( auto_instrument=True, project_name="gaia" ) CrewAIInstrumentor().instrument(tracer_provider=tracer_provider) def run_crew(question, file_name): # Tools python_coding_tool = CodeInterpreterTool() image_search_tool = VisionTool() web_search_tool = SerperDevTool() web_rag_tool = WebsiteSearchTool() youtube_search_tool = YoutubeVideoSearchTool() # Agents image_analysis_agent = Agent( role="Image Analysis Agent", goal="Analyze image to help answer question \"{topic}\". ", backstory="As an expert image analysis assistant, you analyze the image to help answer the question. ", allow_delegation=False, llm=AGENT_MODEL, #max_iter=1, tools=[image_search_tool], verbose=True ) python_coding_agent = Agent( role="Python Coding Agent", goal="Write and/or execute Python code to help answer question \"{topic}\". ", backstory="As an expert Python coding assistant, you write and/or execute Python code to help answer the question. ", allow_delegation=False, llm=AGENT_MODEL, max_iter=1, tools=[python_coding_tool], verbose=False, ) video_analysis_agent = Agent( role="Video Analysis Agent", goal="Analyze video to help answer question \"{topic}\". ", backstory="As an expert video analysis assistant, you analyze the video to help answer the question. ", allow_delegation=False, llm=AGENT_MODEL, max_iter=1, tools=[youtube_search_tool], verbose=False ) web_search_agent = Agent( role="Web Search Agent", goal="Search the web to help answer question \"{topic}\", then scrape the most relevant web page. ", backstory="As an expert web search assistant, you search the web to help answer the question. ", allow_delegation=False, llm=AGENT_MODEL, max_iter=1, tools=[web_search_tool, web_rag_tool], verbose=False ) manager_agent = Agent( role="Manager", goal="Try to answer the following question. If needed, delegate to **one** of your coworkers, image_analysis_agent, python_coding_agent, web_search_agent, or video_analysis_agent for help. Question: \"{topic}\" ", backstory="As an expert manager assistant, you answer the question. ", allow_delegation=True, llm=MANAGER_MODEL, max_iter=5, tools=[], verbose=False ) # Tasks image_analysis_task = Task( agent=image_analysis_agent, description="Analyze image to help answer question \"{topic}\". ", expected_output="Content to help answer the question. " ) python_coding_task = Task( agent=python_coding_agent, description="Write and/or execute Python code to help answer question \"{topic}\". ", expected_output="Content to help answer the question. " ) video_analysis_task = Task( agent=video_analysis_agent, description="Analyze video to help answer question \"{topic}\". ", expected_output="Content to help answer the question. " ) web_search_task = Task( agent=web_search_agent, description="Search the web to help answer question \"{topic}\", then scrape the most relevant web page. ", expected_output="Content to help answer the question. " ) #manager_task = Task( # agent=manager_agent, # description="Try to answer the following question. If needed, delegate to **one** of your coworkers, image_analysis_agent, python_coding_agent, web_search_agent, or video_analysis_agent for help. Question: \"{topic}\" ", # expected_output="The answer to the question. " #) # Crew crew = Crew( agents=[image_analysis_agent, python_coding_agent, web_search_agent, video_analysis_agent], #planning=True, manager_agent=manager_agent, #manager_llm=MANAGER_MODEL, process=Process.hierarchical, tasks=[image_analysis_task, python_coding_task, web_search_task, video_analysis_task], #tasks=[manager_task], verbose=True ) if file_name: question = f"{question} File name: data/{file_name}." print("###") print(f"Question: {question}") print("###") initial_answer = crew.kickoff(inputs={"topic": question}) print("###") print(f"Initial answer: {initial_answer}") final_answer = get_final_answer(question, str(initial_answer)) print(f"Final answer: {final_answer}") print("###") return final_answer