import os from crewai import Agent, Crew, Process, Task from crewai.tools import tool from crewai_tools import ( SerperDevTool, WebsiteSearchTool ) from openinference.instrumentation.crewai import CrewAIInstrumentor from phoenix.otel import register from util import get_final_answer PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"] os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}" os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com" tracer_provider = register( auto_instrument=True, project_name="gaia" ) CrewAIInstrumentor().instrument(tracer_provider=tracer_provider) def run_crew(question): web_search_tool = SerperDevTool() web_rag_tool = WebsiteSearchTool() youtube_search_tool = YoutubeVideoSearchTool() web_search_agent = Agent( role="Web Search Agent", goal="Search the web for question \"{topic}\" and scrape the most relevant web page.", backstory="As an expert web search assistant, you search the web for the question and scrape the most relevant web page.", allow_delegation=False, tools=[web_search_tool, web_rag_tool], verbose=True ) youtube_search_agent = Agent( role="YouTube Search Agent", goal="Search a YouTube video for question \"{topic}\".", backstory="As an expert YouTube search assistant, you search a YouTube video.", allow_delegation=False, tools=[web_search_tool, web_rag_tool], verbose=True ) web_search = Task( agent=web_search_agent, description="Search the web for question \"{topic}\" and scrape the most relevant web page.", expected_output="Content to help answer the question." ) youtube_search = Task( agent=youtube_search_agent, description="Search a YouTube video for question \"{topic}\"", expected_output="Content to help answer the question." ) crew = Crew( agents=[web_search_agent, youtube_search_agent], planning=True, process=Process.sequential, tasks=[web_search, youtube_search], verbose=True ) answer = crew.kickoff(inputs={"topic": question}) return get_final_answer(question, str(answer))