import os from crewai import Agent, Crew, Process, Task from crewai.tools import tool #from crewai_tools import ( # SerperDevTool, # WebsiteSearchTool #) from google import genai from google.genai import types from openinference.instrumentation.crewai import CrewAIInstrumentor from phoenix.otel import register from util import get_final_answer ## LLMs MANAGER_MODEL = "gpt-4.1-mini" AGENT_MODEL = "gpt-4.1-mini" FINAL_ANSWER_MODEL = "gemini-2.5-flash-preview-04-17" WEB_SEARCH_MODEL = "gemini-2.5-flash-preview-04-17" IMAGE_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17" AUDIO_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17" VIDEO_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17" YOUTUBE_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17" DOCUMENT_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17" CODE_GENERATION_MODEL = "gemini-2.5-flash-preview-04-17" CODE_EXECUTION_MODEL = "gemini-2.5-flash-preview-04-17" # LLM evaluation PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"] os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}" os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com" tracer_provider = register( auto_instrument=True, project_name="gaia" ) CrewAIInstrumentor().instrument(tracer_provider=tracer_provider) def run_crew(question, file_path): # Tools #web_search_tool = SerperDevTool() #web_rag_tool = WebsiteSearchTool() @tool("Web Search Tool") def web_search_tool(question: str) -> str: """Search the web to answer a question. Args: question (str): Question to answer Returns: str: Answer to the question Raises: RuntimeError: If processing fails""" try: #client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) #response = client.models.generate_content( # model=IMAGE_MODEL, # contents=[file, question] #) #return response.text ### client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) response = client.models.generate_content( model=WEB_SEARCH_MODEL, contents=question, config=types.GenerateContentConfig( tools=[types.Tool(google_search=types.GoogleSearchRetrieval())] ) ) return response.text ### except Exception as e: raise RuntimeError(f"Processing failed: {str(e)}") @tool("Image Analysis Tool") def image_analysis_tool(question: str, file_path: str) -> str: """Answer a question about an image file. Args: question (str): Question about an image file file_path (str): The image file path Returns: str: Answer to the question about the image file Raises: RuntimeError: If processing fails""" try: client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) file = client.files.upload(file=file_path) response = client.models.generate_content( model=IMAGE_ANALYSIS_MODEL, contents=[file, question] ) return response.text except Exception as e: raise RuntimeError(f"Processing failed: {str(e)}") @tool("Audio Analysis Tool") def audio_analysis_tool(question: str, file_path: str) -> str: """Answer a question about an audio file. Args: question (str): Question about an audio file file_path (str): The audio file path Returns: str: Answer to the question about the audio file Raises: RuntimeError: If processing fails""" try: client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) file = client.files.upload(file=file_path) response = client.models.generate_content( model=AUDIO_ANALYSIS_MODEL, contents=[file, question] ) return response.text except Exception as e: raise RuntimeError(f"Processing failed: {str(e)}") @tool("Video Analysis Tool") def video_analysis_tool(question: str, file_path: str) -> str: """Answer a question about a video file. Args: question (str): Question about a video file file_path (str): The video file path Returns: str: Answer to the question about the video file Raises: RuntimeError: If processing fails""" try: client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) file = client.files.upload(file=file_path) response = client.models.generate_content( model=VIDEO_ANALYSIS_MODEL, contents=[file, question] ) return response.text except Exception as e: raise RuntimeError(f"Processing failed: {str(e)}") @tool("YouTube Analysis Tool") def youtube_analysis_tool(question: str, url: str) -> str: """Answer a question about a YouTube video. Args: question (str): Question about a YouTube video url (str): The YouTube video URL Returns: str: Answer to the question about the YouTube video Raises: RuntimeError: If processing fails""" try: client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) return client.models.generate_content( model=YOUTUBE_ANALYSIS_MODEL, contents=types.Content( parts=[types.Part(file_data=types.FileData(file_uri=url)), types.Part(text=question)] ) ) except Exception as e: raise RuntimeError(f"Processing failed: {str(e)}") @tool("Document Analysis Tool") def document_analysis_tool(question: str, file_path: str) -> str: """Answer a question about a document file. Supported document types include: .txt, .csv, .xml, .rtf, .pdf, .md, .html, .css, .js Args: question (str): Question about a document file file_path (str): The document file path Returns: str: Answer to the question about the document file Raises: RuntimeError: If processing fails""" try: client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) file = client.files.upload(file=file_path) response = client.models.generate_content( model=DOCUMENT_ANALYSIS_MODEL, contents=[file, question] ) return response.text except Exception as e: raise RuntimeError(f"Processing failed: {str(e)}") @tool("Code Generation Tool") def code_generation_tool(question: str) -> str: """Generate and execute Python code to answer a question. Args: question (str): Question to answer Returns: str: Answer to the question Raises: RuntimeError: If processing fails""" try: client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) file = client.files.upload(file=file_path) response = client.models.generate_content( model=CODE_GENERATION_MODEL, contents=[question], config=types.GenerateContentConfig( tools=[types.Tool(code_execution=types.ToolCodeExecution)] ), ) for part in response.candidates[0].content.parts: if part.code_execution_result is not None: return part.code_execution_result.output except Exception as e: raise RuntimeError(f"Processing failed: {str(e)}") @tool("Code Execution Tool") def code_execution_tool(question: str, file_path: str) -> str: """Execute a Python code file to answer a question. Args: question (str): Question to answer file_path (str): The Python code file path Returns: str: Answer to the question Raises: RuntimeError: If processing fails""" try: client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) file = client.files.upload(file=file_path) response = client.models.generate_content( model=CODE_EXECUTION_MODEL, contents=[file, question], config=types.GenerateContentConfig( tools=[types.Tool(code_execution=types.ToolCodeExecution)] ), ) for part in response.candidates[0].content.parts: if part.code_execution_result is not None: return part.code_execution_result.output except Exception as e: raise RuntimeError(f"Processing failed: {str(e)}") # Agents web_search_agent = Agent( role="Web Search Agent", goal="Search the web to help answer question \"{question}\"", backstory="As an expert web search assistant, you search the web to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=2, tools=[web_search_tool], verbose=False ) image_analysis_agent = Agent( role="Image Analysis Agent", goal="Analyze image file to help answer question \"{question}\"", backstory="As an expert image analysis assistant, you analyze the image file to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=2, tools=[image_analysis_tool], verbose=False ) audio_analysis_agent = Agent( role="Audio Analysis Agent", goal="Analyze audio file to help answer question \"{question}\"", backstory="As an expert audio analysis assistant, you analyze the audio file to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=2, tools=[audio_analysis_tool], verbose=False ) video_analysis_agent = Agent( role="Video Analysis Agent", goal="Analyze video file to help answer question \"{question}\"", backstory="As an expert video analysis assistant, you analyze the video file to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=2, tools=[video_analysis_tool], verbose=False ) youtube_analysis_agent = Agent( role="YouTube Analysis Agent", goal="Analyze YouTube video to help answer question \"{question}\"", backstory="As an expert YouTube analysis assistant, you analyze the video to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=2, tools=[youtube_analysis_tool], verbose=False ) document_analysis_agent = Agent( role="Document Analysis Agent", goal="Analyze document of type .txt, .csv, .xml, .rtf, .pdf, .md, .html, .css, .js to help answer question \"{question}\"", backstory="As an expert document analysis assistant, you analyze the document to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=2, tools=[document_analysis_tool], verbose=False ) code_generation_agent = Agent( role="Code Generation Agent", goal="Generate Python code and execute it to help answer question \"{question}\"", backstory="As an expert Python code generation assistant, you generate and execute code to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=3, tools=[code_execution_tool], verbose=False ) code_execution_agent = Agent( role="Code Execution Agent", goal="Execute Python code file to help answer question \"{question}\"", backstory="As an expert Python code execution assistant, you execute the code file to help answer the question.", allow_delegation=False, llm=AGENT_MODEL, max_iter=3, tools=[code_execution_tool], verbose=False ) manager_agent = Agent( role="Manager Agent", goal="Try to answer the following question. If needed, delegate to one or more of your coworkers for help. " "If there is no good coworker, delegate to the Python Coding Agent to implement a tool for the task. " "Question: \"{question}\"", backstory="As an expert manager assistant, you answer the question.", allow_delegation=True, llm=MANAGER_MODEL, max_iter=5, verbose=True ) # Task manager_task = Task( agent=manager_agent, description="Try to answer the following question. If needed, delegate to one or more of your coworkers for help. Question: \"{question}\"", expected_output="The answer to the question." ) # Crew crew = Crew( agents=[web_search_agent, image_analysis_agent, audio_analysis_agent, video_analysis_agent, youtube_analysis_agent, document_analysis_agent, code_generation_agent, code_execution_agent], manager_agent=manager_agent, tasks=[manager_task], verbose=True ) # Process if file_path: question = f"{question} File path: {file_path}." #if file_path.endswith(".py"): # with open(f"{file_path}", "r") as file: # question = f"{question} File data:\n{file.read()}" initial_answer = crew.kickoff(inputs={"question": question}) final_answer = get_final_answer(FINAL_ANSWER_MODEL, question, str(initial_answer)) print(f"Question: {question}") print(f"Initial answer: {initial_answer}") print(f"Final answer: {final_answer}") return final_answer