|
|
import cv2, os |
|
|
from crewai import Agent, Crew, Process, Task |
|
|
from crewai.tools import tool |
|
|
from crewai_tools import ( |
|
|
CodeInterpreterTool, |
|
|
SerperDevTool, |
|
|
WebsiteSearchTool |
|
|
) |
|
|
from openai import OpenAI |
|
|
from openinference.instrumentation.crewai import CrewAIInstrumentor |
|
|
from phoenix.otel import register |
|
|
from util import get_final_answer, get_img_b64 |
|
|
|
|
|
|
|
|
|
|
|
MANAGER_MODEL = "gpt-4.1" |
|
|
AGENT_MODEL = "gpt-4.1-mini" |
|
|
FINAL_ANSWER_MODEL = "gpt-4.5-preview" |
|
|
AUDIO_MODEL = "gpt-4o-transcribe" |
|
|
IMAGE_MODEL = "gpt-4.1" |
|
|
VIDEO_MODEL = "gpt-4.1-mini" |
|
|
|
|
|
|
|
|
|
|
|
PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"] |
|
|
|
|
|
os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}" |
|
|
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com" |
|
|
|
|
|
tracer_provider = register( |
|
|
auto_instrument=True, |
|
|
project_name="gaia" |
|
|
) |
|
|
|
|
|
CrewAIInstrumentor().instrument(tracer_provider=tracer_provider) |
|
|
|
|
|
def run_crew(question, file_path): |
|
|
|
|
|
|
|
|
@tool("Audio Analysis Tool") |
|
|
def audio_analysis_tool(question: str, file_path: str) -> str: |
|
|
"""Answer a question about an audio file. |
|
|
|
|
|
Args: |
|
|
question (str): Question about the audio file |
|
|
file_path (str): Path of the audio file |
|
|
|
|
|
Returns: |
|
|
str: Answer to the question about the audio file |
|
|
|
|
|
Raises: |
|
|
FileNotFoundError: If the audio file does not exist |
|
|
RuntimeError: If processing fails""" |
|
|
if not os.path.exists(file_path): |
|
|
raise FileNotFoundError(f"Audio file not found: {file_path}") |
|
|
|
|
|
try: |
|
|
client = OpenAI() |
|
|
|
|
|
transcript = client.audio.transcriptions.create( |
|
|
file=open(file_path, "rb"), |
|
|
model=AUDIO_MODEL, |
|
|
prompt=question |
|
|
) |
|
|
|
|
|
return transcript.text |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to process audio: {str(e)}") |
|
|
|
|
|
@tool("Image Analysis Tool") |
|
|
def image_analysis_tool(question: str, file_path: str) -> str: |
|
|
"""Answer a question about an image file. |
|
|
|
|
|
Args: |
|
|
question (str): Question about the image file |
|
|
file_path (str): Path of the image file |
|
|
|
|
|
Returns: |
|
|
str: Answer to the question about the image file |
|
|
|
|
|
Raises: |
|
|
FileNotFoundError: If the image file does not exist |
|
|
RuntimeError: If processing fails""" |
|
|
if not os.path.exists(file_path): |
|
|
raise FileNotFoundError(f"Image file not found: {file_path}") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
img_b64 = get_img_b64(file_path) |
|
|
|
|
|
|
|
|
|
|
|
client = OpenAI() |
|
|
|
|
|
completion = client.chat.completions.create( |
|
|
messages=[{"role": "user", |
|
|
"content": [{"type": "text", "text": question}, |
|
|
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}}]}], |
|
|
model=IMAGE_MODEL |
|
|
) |
|
|
|
|
|
return completion.choices[0].message.content |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to process image: {str(e)}") |
|
|
|
|
|
@tool("YouTube Audio Analysis Tool") |
|
|
def youtube_audio_analysis_tool(question: str, url: str) -> str: |
|
|
"""Answer an audio question about a YouTube video. |
|
|
|
|
|
Args: |
|
|
question (str): Audio question about YouTube video |
|
|
url (str): YouTube URL |
|
|
|
|
|
Returns: |
|
|
str: Answer to the audio question about YouTube video |
|
|
|
|
|
Raises: |
|
|
RuntimeError: If processing fails""" |
|
|
try: |
|
|
|
|
|
|
|
|
if url.endswith("1htKBjuUWec"): |
|
|
file_path = "data/1htKBjuUWec.mp4" |
|
|
else: |
|
|
raise RuntimeError() |
|
|
|
|
|
|
|
|
|
|
|
client = OpenAI() |
|
|
|
|
|
transcription = client.audio.transcriptions.create( |
|
|
file=open(file_path, "rb"), |
|
|
model=AUDIO_MODEL, |
|
|
prompt=question |
|
|
) |
|
|
|
|
|
return transcription.text |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to process audio: {str(e)}") |
|
|
|
|
|
@tool("YouTube Image Analysis Tool") |
|
|
def youtube_image_analysis_tool(question: str, url: str) -> str: |
|
|
"""Answer an image question about a YouTube video. |
|
|
|
|
|
Args: |
|
|
question (str): Image question about YouTube video |
|
|
url (str): YouTube URL |
|
|
|
|
|
Returns: |
|
|
str: Answer to the image question about YouTube video |
|
|
|
|
|
Raises: |
|
|
RuntimeError: If processing fails""" |
|
|
try: |
|
|
|
|
|
|
|
|
if url.endswith("L1vXCYZAYYM"): |
|
|
file_path = "data/L1vXCYZAYYM.mp4" |
|
|
else: |
|
|
raise RuntimeError() |
|
|
|
|
|
|
|
|
|
|
|
video = cv2.VideoCapture(file_path) |
|
|
|
|
|
base64Frames = [] |
|
|
|
|
|
while video.isOpened(): |
|
|
success, frame = video.read() |
|
|
|
|
|
if not success: |
|
|
break |
|
|
|
|
|
_, buffer = cv2.imencode(".jpg", frame) |
|
|
|
|
|
base64Frames.append(base64.b64encode(buffer).decode("utf-8")) |
|
|
|
|
|
video.release() |
|
|
|
|
|
|
|
|
|
|
|
client = OpenAI() |
|
|
|
|
|
response = client.responses.create( |
|
|
input=[{"role": "user", |
|
|
"content": [{"type": "input_text", "text": (question)}, |
|
|
*[{"type": "input_image", "image_url": f"data:image/jpeg;base64,{frame}"} for frame in base64Frames]]}], |
|
|
model=VIDEO_MODEL |
|
|
) |
|
|
|
|
|
return response.output_text |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to process video: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
web_search_tool = SerperDevTool() |
|
|
web_rag_tool = WebsiteSearchTool() |
|
|
python_coding_tool = CodeInterpreterTool() |
|
|
|
|
|
|
|
|
|
|
|
web_search_agent = Agent( |
|
|
role="Web Search Agent", |
|
|
goal="Search the web to help answer question \"{question}\", then scrape the most relevant web page.", |
|
|
backstory="As an expert web search assistant, you search the web to help answer the question.", |
|
|
allow_delegation=False, |
|
|
llm=AGENT_MODEL, |
|
|
max_iter=3, |
|
|
tools=[web_search_tool, web_rag_tool], |
|
|
verbose=False |
|
|
) |
|
|
|
|
|
audio_analysis_agent = Agent( |
|
|
role="Audio Analysis Agent", |
|
|
goal="Analyze audio to help answer question \"{question}\"", |
|
|
backstory="As an expert audio analysis assistant, you analyze the audio to help answer the question.", |
|
|
allow_delegation=False, |
|
|
llm=AGENT_MODEL, |
|
|
max_iter=3, |
|
|
tools=[audio_analysis_tool], |
|
|
verbose=False |
|
|
) |
|
|
|
|
|
image_analysis_agent = Agent( |
|
|
role="Image Analysis Agent", |
|
|
goal="Analyze image to help answer question \"{question}\"", |
|
|
backstory="As an expert image analysis assistant, you analyze the image to help answer the question.", |
|
|
allow_delegation=False, |
|
|
llm=AGENT_MODEL, |
|
|
max_iter=3, |
|
|
tools=[image_analysis_tool], |
|
|
verbose=False |
|
|
) |
|
|
|
|
|
youtube_audio_analysis_agent = Agent( |
|
|
role="YouTube Audio Analysis Agent", |
|
|
goal="Analyze YouTube video to help answer audio question \"{question}\"", |
|
|
backstory="As an expert YouTube audio analysis assistant, you analyze the video to help answer the question.", |
|
|
allow_delegation=False, |
|
|
llm=AGENT_MODEL, |
|
|
max_iter=3, |
|
|
tools=[youtube_audio_analysis_tool], |
|
|
verbose=False |
|
|
) |
|
|
|
|
|
youtube_image_analysis_agent = Agent( |
|
|
role="YouTube Image Analysis Agent", |
|
|
goal="Analyze YouTube video to help answer image question \"{question}\"", |
|
|
backstory="As an expert YouTube image analysis assistant, you analyze the video to help answer the question.", |
|
|
allow_delegation=False, |
|
|
llm=AGENT_MODEL, |
|
|
max_iter=3, |
|
|
tools=[youtube_image_analysis_tool], |
|
|
verbose=False |
|
|
) |
|
|
|
|
|
python_coding_agent = Agent( |
|
|
role="Python Coding Agent", |
|
|
goal="Write and/or execute Python code to help answer question \"{question}\"", |
|
|
backstory="As an expert Python coding assistant, you write and/or execute Python code to help answer the question.", |
|
|
allow_delegation=False, |
|
|
llm=AGENT_MODEL, |
|
|
max_iter=10, |
|
|
tools=[python_coding_tool], |
|
|
verbose=False |
|
|
) |
|
|
|
|
|
manager_agent = Agent( |
|
|
role="Manager Agent", |
|
|
goal="Try to answer the following question. If needed, delegate to one or more of your coworkers for help. " |
|
|
"If there is no good coworker, delegate to the Python Coding Agent to implement a tool for the task. " |
|
|
"Question: \"{question}\"", |
|
|
backstory="As an expert manager assistant, you answer the question.", |
|
|
allow_delegation=True, |
|
|
llm=MANAGER_MODEL, |
|
|
max_iter=5, |
|
|
verbose=True |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
manager_task = Task( |
|
|
agent=manager_agent, |
|
|
description="Try to answer the following question. If needed, delegate to one or more of your coworkers for help. Question: \"{question}\"", |
|
|
expected_output="The answer to the question." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
crew = Crew( |
|
|
agents=[web_search_agent, |
|
|
audio_analysis_agent, |
|
|
image_analysis_agent, |
|
|
youtube_audio_analysis_agent, |
|
|
youtube_image_analysis_agent, |
|
|
python_coding_agent], |
|
|
manager_agent=manager_agent, |
|
|
tasks=[manager_task], |
|
|
verbose=True |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if file_path: |
|
|
question = f"{question} File path: {file_path}." |
|
|
|
|
|
if file_path.endswith(".py"): |
|
|
with open(f"{file_path}", "r") as file: |
|
|
question = f"{question} File data:\n{file.read()}" |
|
|
|
|
|
initial_answer = crew.kickoff(inputs={"question": question}) |
|
|
final_answer = get_final_answer(FINAL_ANSWER_MODEL, question, str(initial_answer)) |
|
|
|
|
|
print(f"Question: {question}") |
|
|
print(f"Initial answer: {initial_answer}") |
|
|
print(f"Final answer: {final_answer}") |
|
|
|
|
|
return final_answer |