|
import os |
|
|
|
from crewai import Agent, Crew, Process, Task |
|
from crewai.tools import tool |
|
from crewai_tools import ( |
|
SerperDevTool, |
|
WebsiteSearchTool, |
|
YoutubeVideoSearchTool |
|
) |
|
from openinference.instrumentation.crewai import CrewAIInstrumentor |
|
from phoenix.otel import register |
|
from util import get_final_answer |
|
|
|
PLANNING_MODEL = "gpt-4.5-preview" |
|
|
|
PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"] |
|
|
|
os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}" |
|
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com" |
|
|
|
tracer_provider = register( |
|
auto_instrument=True, |
|
project_name="gaia" |
|
) |
|
|
|
CrewAIInstrumentor().instrument(tracer_provider=tracer_provider) |
|
|
|
def run_crew(question): |
|
web_search_tool = SerperDevTool() |
|
web_rag_tool = WebsiteSearchTool() |
|
youtube_search_tool = YoutubeVideoSearchTool() |
|
|
|
web_search_agent = Agent( |
|
role="Web Search Agent", |
|
goal="Search the web for question \"{topic}\" and scrape the most relevant web page.", |
|
backstory="As an expert web search assistant, you search the web for the question and scrape the most relevant web page.", |
|
allow_delegation=False, |
|
max_iter=1, |
|
tools=[web_search_tool, web_rag_tool], |
|
verbose=True |
|
) |
|
|
|
youtube_search_agent = Agent( |
|
role="YouTube Search Agent", |
|
goal="Search a YouTube video for question \"{topic}\".", |
|
backstory="As an expert YouTube search assistant, you search a YouTube video for the question.", |
|
allow_delegation=False, |
|
max_iter=1, |
|
tools=[youtube_search_tool], |
|
verbose=True |
|
) |
|
|
|
web_search = Task( |
|
agent=web_search_agent, |
|
description="Search the web for question \"{topic}\" and scrape the most relevant web page.", |
|
expected_output="Content to help answer the web related question." |
|
) |
|
|
|
youtube_search = Task( |
|
agent=youtube_search_agent, |
|
description="Search a YouTube video for question \"{topic}\"", |
|
expected_output="Content to help answer the YouTube related question." |
|
) |
|
|
|
crew = Crew( |
|
agents=[web_search_agent, youtube_search_agent], |
|
|
|
|
|
process=Process.sequential, |
|
tasks=[web_search, youtube_search], |
|
verbose=True |
|
) |
|
|
|
answer = crew.kickoff(inputs={"topic": question}) |
|
|
|
return get_final_answer(question, str(answer)) |