gaia / crew.py
bstraehle's picture
Update crew.py
a0c3d4c verified
raw
history blame
10.9 kB
import cv2, os
from crewai import Agent, Crew, Process, Task
from crewai.tools import tool
from crewai_tools import (
CodeInterpreterTool,
SerperDevTool,
WebsiteSearchTool
)
from openai import OpenAI
from openinference.instrumentation.crewai import CrewAIInstrumentor
from phoenix.otel import register
from util import get_final_answer, get_img_b64
## LLMs
MANAGER_MODEL = "gpt-4.1"
AGENT_MODEL = "gpt-4.1-mini"
FINAL_ANSWER_MODEL = "gpt-4.5-preview"
AUDIO_MODEL = "gpt-4o-transcribe"
IMAGE_MODEL = "gpt-4.1"
VIDEO_MODEL = "gpt-4.1-mini"
# LLM evaluation
PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"]
os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}"
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com"
tracer_provider = register(
auto_instrument=True,
project_name="gaia"
)
CrewAIInstrumentor().instrument(tracer_provider=tracer_provider)
def run_crew(question, file_path):
# Custom tools
@tool("Audio Analysis Tool")
def audio_analysis_tool(question: str, file_path: str) -> str:
"""Answer a question about an audio file.
Args:
question (str): Question about the audio file
file_path (str): Path of the audio file
Returns:
str: Answer to the question about the audio file
Raises:
FileNotFoundError: If the audio file does not exist
RuntimeError: If processing fails"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"Audio file not found: {file_path}")
try:
client = OpenAI()
transcript = client.audio.transcriptions.create(
file=open(file_path, "rb"),
model=AUDIO_MODEL,
prompt=question
)
return transcript.text
except Exception as e:
raise RuntimeError(f"Failed to process audio: {str(e)}")
@tool("Image Analysis Tool")
def image_analysis_tool(question: str, file_path: str) -> str:
"""Answer a question about an image file.
Args:
question (str): Question about the image file
file_path (str): Path of the image file
Returns:
str: Answer to the question about the image file
Raises:
FileNotFoundError: If the image file does not exist
RuntimeError: If processing fails"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"Image file not found: {file_path}")
try:
# Get image
img_b64 = get_img_b64(file_path)
# OpenAI
client = OpenAI()
completion = client.chat.completions.create(
messages=[{"role": "user",
"content": [{"type": "text", "text": question},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}}]}],
model=IMAGE_MODEL
)
return completion.choices[0].message.content
except Exception as e:
raise RuntimeError(f"Failed to process image: {str(e)}")
@tool("YouTube Audio Analysis Tool")
def youtube_audio_analysis_tool(question: str, url: str) -> str:
"""Answer an audio question about a YouTube video.
Args:
question (str): Audio question about YouTube video
url (str): YouTube URL
Returns:
str: Answer to the audio question about YouTube video
Raises:
RuntimeError: If processing fails"""
try:
# YouTube (hack to deal with access issues)
if url.endswith("1htKBjuUWec"):
file_path = "data/1htKBjuUWec.mp4"
else:
raise RuntimeError()
# OpenAI
client = OpenAI()
transcription = client.audio.transcriptions.create(
file=open(file_path, "rb"),
model=AUDIO_MODEL,
prompt=question
)
return transcription.text
except Exception as e:
raise RuntimeError(f"Failed to process audio: {str(e)}")
@tool("YouTube Image Analysis Tool")
def youtube_image_analysis_tool(question: str, url: str) -> str:
"""Answer an image question about a YouTube video.
Args:
question (str): Image question about YouTube video
url (str): YouTube URL
Returns:
str: Answer to the image question about YouTube video
Raises:
RuntimeError: If processing fails"""
try:
# YouTube (hack to deal with access issues)
if url.endswith("L1vXCYZAYYM"):
file_path = "data/L1vXCYZAYYM.mp4"
else:
raise RuntimeError()
# Get video TODOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOo
video = cv2.VideoCapture(file_path)
base64Frames = []
while video.isOpened():
success, frame = video.read()
if not success:
break
_, buffer = cv2.imencode(".jpg", frame)
base64Frames.append(base64.b64encode(buffer).decode("utf-8"))
video.release()
# OpenAI
client = OpenAI()
response = client.responses.create(
input=[{"role": "user",
"content": [{"type": "input_text", "text": (question)},
*[{"type": "input_image", "image_url": f"data:image/jpeg;base64,{frame}"} for frame in base64Frames]]}],
model=VIDEO_MODEL
)
return response.output_text
except Exception as e:
raise RuntimeError(f"Failed to process video: {str(e)}")
# Built-in tools
web_search_tool = SerperDevTool()
web_rag_tool = WebsiteSearchTool()
python_coding_tool = CodeInterpreterTool()
# Agents
web_search_agent = Agent(
role="Web Search Agent",
goal="Search the web to help answer question \"{question}\", then scrape the most relevant web page.",
backstory="As an expert web search assistant, you search the web to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=3,
tools=[web_search_tool, web_rag_tool],
verbose=False
)
audio_analysis_agent = Agent(
role="Audio Analysis Agent",
goal="Analyze audio to help answer question \"{question}\"",
backstory="As an expert audio analysis assistant, you analyze the audio to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=3,
tools=[audio_analysis_tool],
verbose=False
)
image_analysis_agent = Agent(
role="Image Analysis Agent",
goal="Analyze image to help answer question \"{question}\"",
backstory="As an expert image analysis assistant, you analyze the image to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=3,
tools=[image_analysis_tool],
verbose=False
)
youtube_audio_analysis_agent = Agent(
role="YouTube Audio Analysis Agent",
goal="Analyze YouTube video to help answer audio question \"{question}\"",
backstory="As an expert YouTube audio analysis assistant, you analyze the video to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=3,
tools=[youtube_audio_analysis_tool],
verbose=False
)
youtube_image_analysis_agent = Agent(
role="YouTube Image Analysis Agent",
goal="Analyze YouTube video to help answer image question \"{question}\"",
backstory="As an expert YouTube image analysis assistant, you analyze the video to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=3,
tools=[youtube_image_analysis_tool],
verbose=False
)
python_coding_agent = Agent(
role="Python Coding Agent",
goal="Write and/or execute Python code to help answer question \"{question}\"",
backstory="As an expert Python coding assistant, you write and/or execute Python code to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=10,
tools=[python_coding_tool],
verbose=False
)
manager_agent = Agent(
role="Manager Agent",
goal="Try to answer the following question. If needed, delegate to one or more of your coworkers for help. "
"If there is no good coworker, delegate to the Python Coding Agent to implement a tool for the task. "
"Question: \"{question}\"",
backstory="As an expert manager assistant, you answer the question.",
allow_delegation=True,
llm=MANAGER_MODEL,
max_iter=5,
verbose=True
)
# Task
manager_task = Task(
agent=manager_agent,
description="Try to answer the following question. If needed, delegate to one or more of your coworkers for help. Question: \"{question}\"",
expected_output="The answer to the question."
)
# Crew
crew = Crew(
agents=[web_search_agent,
audio_analysis_agent,
image_analysis_agent,
youtube_audio_analysis_agent,
youtube_image_analysis_agent,
python_coding_agent],
manager_agent=manager_agent,
tasks=[manager_task],
verbose=True
)
# Process
if file_path:
question = f"{question} File path: {file_path}."
if file_path.endswith(".py"):
with open(f"{file_path}", "r") as file:
question = f"{question} File data:\n{file.read()}"
initial_answer = crew.kickoff(inputs={"question": question})
final_answer = get_final_answer(FINAL_ANSWER_MODEL, question, str(initial_answer))
print(f"Question: {question}")
print(f"Initial answer: {initial_answer}")
print(f"Final answer: {final_answer}")
return final_answer