import os from crewai import Agent, Crew, Process, Task from crewai.tools import tool from crewai_tools import ( CodeInterpreterTool, SerperDevTool, WebsiteSearchTool ) from openai import OpenAI from openinference.instrumentation.crewai import CrewAIInstrumentor from phoenix.otel import register from pytubefix import YouTube from util import get_final_answer, get_img_b64, get_imgs_b64 ## LLMs MANAGER_MODEL = "gpt-4.1" AGENT_MODEL = "gpt-4.1" FINAL_ANSWER_MODEL = "gpt-4.5-preview" AUDIO_MODEL = "gpt-4o-transcribe" IMAGE_MODEL = "gpt-4.1" VIDEO_MODEL = "gpt-4.1-mini" # LLM evaluation PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"] os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}" os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com" tracer_provider = register( auto_instrument=True, project_name="gaia" ) CrewAIInstrumentor().instrument(tracer_provider=tracer_provider) def run_crew(question, file_path): # Custom tools @tool("Audio Analysis Tool") def audio_analysis_tool(question: str, file_path: str) -> str: """Answer a question about an audio file. Args: question (str): Question about the audio file file_path (str): Path of the audio file Returns: str: Answer to the question about the audio file Raises: FileNotFoundError: If the audio file does not exist RuntimeError: If processing fails""" if not os.path.exists(file_path): raise FileNotFoundError(f"Audio file not found: {file_path}") try: client = OpenAI() transcript = client.audio.transcriptions.create( file=open(file_path, "rb"), model=AUDIO_MODEL, prompt=question ) return transcript.text except Exception as e: raise RuntimeError(f"Failed to process audio: {str(e)}") @tool("Image Analysis Tool") def image_analysis_tool(question: str, file_path: str) -> str: """Answer a question about an image file. Args: question (str): Question about the image file file_path (str): Path of the image file Returns: str: Answer to the question about the image file Raises: FileNotFoundError: If the image file does not exist RuntimeError: If processing fails""" if not os.path.exists(file_path): raise FileNotFoundError(f"Image file not found: {file_path}") try: # Get image img_b64 = get_img_b64(file_path) # OpenAI client = OpenAI() completion = client.chat.completions.create( messages=[{"role": "user", "content": [{"type": "text", "text": question}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}}]}], model=IMAGE_MODEL ) return completion.choices[0].message.content except Exception as e: raise RuntimeError(f"Failed to process image: {str(e)}") @tool("YouTube Audio Analysis Tool") def youtube_audio_analysis_tool(question: str, url: str) -> str: """Answer an audio question about a YouTube video. Args: question (str): Audio question about YouTube video url (str): YouTube URL Returns: str: Answer to the audio question about YouTube video Raises: RuntimeError: If processing fails""" try: # YouTube (hack to deal with access issues) if url.endswith("1htKBjuUWec"): file_path = "data/1htKBjuUWec.mp4" else: raise RuntimeError() #file_path = "audio.mp4" #yt = YouTube(url, use_oauth=True, allow_oauth_cache=True) #stream = yt.streams.filter(only_audio=True).first() #tream.download(filename=file_path) # OpenAI client = OpenAI() transcription = client.audio.transcriptions.create( file=open(file_path, "rb"), model=AUDIO_MODEL, prompt=question ) return transcription.text except Exception as e: raise RuntimeError(f"Failed to process audio: {str(e)}") @tool("YouTube Image Analysis Tool") def youtube_image_analysis_tool(question: str, url: str) -> str: """Answer an image question about a YouTube video. Args: question (str): Image question about YouTube video url (str): YouTube URL Returns: str: Answer to the image question about YouTube video Raises: RuntimeError: If processing fails""" try: # YouTube (hack to deal with access issues) if url.endswith("L1vXCYZAYYM"): file_path = "data/L1vXCYZAYYM.mp4" else: raise RuntimeError() #file_path = "video.mp4" #yt = YouTube(url, use_oauth=True, allow_oauth_cache=True) #stream = yt.streams.get_highest_resolution() #stream.download(filename=file_path) # Get images imgs_b64 = get_imgs_b64(file_path) # OpenAI client = OpenAI() response = client.responses.create( input=[{"role": "user", "content": [{"type": "input_text", "text": (question)}, *[{"type": "input_image", "image_url": f"data:image/jpeg;base64,{img_b64}"} for img_b64 in imgs_b64]]}], model=VIDEO_MODEL ) return response.output_text except Exception as e: raise RuntimeError(f"Failed to process video: {str(e)}") # Built-in tools web_search_tool = SerperDevTool() web_rag_tool = WebsiteSearchTool() python_coding_tool = CodeInterpreterTool() # Agents web_search_agent = Agent( role="Web Search Agent", goal="Search the web to help answer question \"{question}\", then scrape the most relevant web page.", backstory="As an expert web search assistant, you search the web to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=2, tools=[web_search_tool, web_rag_tool], verbose=False ) audio_analysis_agent = Agent( role="Audio Analysis Agent", goal="Analyze audio to help answer question \"{question}\"", backstory="As an expert audio analysis assistant, you analyze the audio to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=2, tools=[audio_analysis_tool], verbose=False ) image_analysis_agent = Agent( role="Image Analysis Agent", goal="Analyze image to help answer question \"{question}\"", backstory="As an expert image analysis assistant, you analyze the image to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=2, tools=[image_analysis_tool], verbose=False ) youtube_audio_analysis_agent = Agent( role="YouTube Audio Analysis Agent", goal="Analyze YouTube video to help answer audio question \"{question}\"", backstory="As an expert YouTube audio analysis assistant, you analyze the video to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=2, tools=[youtube_audio_analysis_tool], verbose=False ) youtube_image_analysis_agent = Agent( role="YouTube Image Analysis Agent", goal="Analyze YouTube video to help answer image question \"{question}\"", backstory="As an expert YouTube image analysis assistant, you analyze the video to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=2, tools=[youtube_image_analysis_tool], verbose=False ) python_coding_agent = Agent( role="Python Coding Agent", goal="Write and/or execute Python code to help answer question \"{question}\"", backstory="As an expert Python coding assistant, you write and/or execute Python code to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=5, tools=[python_coding_tool], verbose=False ) manager_agent = Agent( role="Manager Agent", goal="Try to answer the following question. If needed, delegate to one or more of your coworkers for help. " "If there is no good coworker, delegate to the Python Coding Agent to implement a tool for the task. " "Question: \"{question}\"", backstory="As an expert manager assistant, you answer the question.", allow_delegation=True, llm=MANAGER_MODEL, max_iter=10, verbose=True ) # Task manager_task = Task( agent=manager_agent, description="Try to answer the following question. If needed, delegate to one or more of your coworkers for help. Question: \"{question}\"", expected_output="The answer to the question." ) # Crew crew = Crew( agents=[web_search_agent, audio_analysis_agent, image_analysis_agent, youtube_audio_analysis_agent, youtube_image_analysis_agent, python_coding_agent], manager_agent=manager_agent, tasks=[manager_task], verbose=True ) # Process if file_path: question = f"{question} File path: {file_path}." if file_path.endswith(".py"): with open(f"{file_path}", "r") as file: question = f"{question} File data:\n{file.read()}" initial_answer = crew.kickoff(inputs={"question": question}) final_answer = get_final_answer(FINAL_ANSWER_MODEL, question, str(initial_answer)) print(f"Question: {question}") print(f"Initial answer: {initial_answer}") print(f"Final answer: {final_answer}") return final_answer