grady / crew.py
bstraehle's picture
Update crew.py
ec1b07b verified
raw
history blame
14.4 kB
# References:
# https://docs.crewai.com/introduction
# https://ai.google.dev/gemini-api/docs
import os
from crewai import Agent, Crew, Process, Task
from crewai.tools import tool
from google import genai
from google.genai import types
from openinference.instrumentation.crewai import CrewAIInstrumentor
from phoenix.otel import register
from util import get_final_answer
## LLMs
MANAGER_MODEL = "gpt-4.1-mini"
AGENT_MODEL = "gpt-4.1-mini"
FINAL_ANSWER_MODEL = "gemini-2.5-flash-preview-04-17"
WEB_SEARCH_MODEL = "gemini-2.5-flash-preview-04-17"
IMAGE_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17"
AUDIO_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17"
VIDEO_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17"
YOUTUBE_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17"
DOCUMENT_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17"
CODE_GENERATION_MODEL = "gemini-2.5-flash-preview-04-17"
CODE_EXECUTION_MODEL = "gemini-2.5-flash-preview-04-17"
# LLM evaluation
PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"]
os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}"
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com"
tracer_provider = register(
auto_instrument=True,
project_name="gaia"
)
CrewAIInstrumentor().instrument(tracer_provider=tracer_provider)
def run_crew(question, file_path):
# Tools
@tool("Web Search Tool")
def web_search_tool(question: str) -> str:
"""Search the web to answer a question.
Args:
question (str): Question to answer
Returns:
str: Answer to the question
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
response = client.models.generate_content(
model=WEB_SEARCH_MODEL,
contents=question,
config=types.GenerateContentConfig(
tools=[types.Tool(google_search=types.GoogleSearchRetrieval())]
)
)
return response.text
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("Image Analysis Tool")
def image_analysis_tool(question: str, file_path: str) -> str:
"""Answer a question about an image file.
Args:
question (str): Question about an image file
file_path (str): The image file path
Returns:
str: Answer to the question about the image file
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
file = client.files.upload(file=file_path)
response = client.models.generate_content(
model=IMAGE_ANALYSIS_MODEL,
contents=[file, question]
)
return response.text
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("Audio Analysis Tool")
def audio_analysis_tool(question: str, file_path: str) -> str:
"""Answer a question about an audio file.
Args:
question (str): Question about an audio file
file_path (str): The audio file path
Returns:
str: Answer to the question about the audio file
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
file = client.files.upload(file=file_path)
response = client.models.generate_content(
model=AUDIO_ANALYSIS_MODEL,
contents=[file, question]
)
return response.text
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("Video Analysis Tool")
def video_analysis_tool(question: str, file_path: str) -> str:
"""Answer a question about a video file.
Args:
question (str): Question about a video file
file_path (str): The video file path
Returns:
str: Answer to the question about the video file
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
file = client.files.upload(file=file_path)
response = client.models.generate_content(
model=VIDEO_ANALYSIS_MODEL,
contents=[file, question]
)
return response.text
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("YouTube Analysis Tool")
def youtube_analysis_tool(question: str, url: str) -> str:
"""Answer a question about a YouTube video.
Args:
question (str): Question about a YouTube video
url (str): The YouTube video URL
Returns:
str: Answer to the question about the YouTube video
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
return client.models.generate_content(
model=YOUTUBE_ANALYSIS_MODEL,
contents=types.Content(
parts=[types.Part(file_data=types.FileData(file_uri=url)),
types.Part(text=question)]
)
)
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("Document Analysis Tool")
def document_analysis_tool(question: str, file_path: str) -> str:
"""Answer a question about a document file. Supported document types include:
.xlxs, .txt, .csv, .xml, .rtf, .pdf, .md, .html, .css, .js, .py
Args:
question (str): Question about a document file
file_path (str): The document file path
Returns:
str: Answer to the question about the document file
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
file = client.files.upload(file=file_path)
response = client.models.generate_content(
model=DOCUMENT_ANALYSIS_MODEL,
contents=[file, question]
)
return response.text
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("Code Generation Tool")
def code_generation_tool(question: str) -> str:
"""Generate and execute Python code to answer a question.
Args:
question (str): Question to answer
Returns:
str: Answer to the question
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
file = client.files.upload(file=file_path)
response = client.models.generate_content(
model=CODE_GENERATION_MODEL,
contents=[question],
config=types.GenerateContentConfig(
tools=[types.Tool(code_execution=types.ToolCodeExecution)]
),
)
for part in response.candidates[0].content.parts:
if part.code_execution_result is not None:
return part.code_execution_result.output
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("Code Execution Tool")
def code_execution_tool(question: str, file_path: str) -> str:
"""Execute a Python code file to answer a question.
Args:
question (str): Question to answer
file_path (str): The Python code file path
Returns:
str: Answer to the question
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
file = client.files.upload(file=file_path)
response = client.models.generate_content(
model=CODE_EXECUTION_MODEL,
contents=[file, question],
config=types.GenerateContentConfig(
tools=[types.Tool(code_execution=types.ToolCodeExecution)]
),
)
for part in response.candidates[0].content.parts:
if part.code_execution_result is not None:
return part.code_execution_result.output
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
# Agents
web_search_agent = Agent(
role="Web Search Agent",
goal="Search the web to help answer question \"{question}\"",
backstory="As an expert web search assistant, you search the web to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=2,
tools=[web_search_tool],
verbose=False
)
image_analysis_agent = Agent(
role="Image Analysis Agent",
goal="Analyze image file to help answer question \"{question}\"",
backstory="As an expert image analysis assistant, you analyze the image file to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=2,
tools=[image_analysis_tool],
verbose=False
)
audio_analysis_agent = Agent(
role="Audio Analysis Agent",
goal="Analyze audio file to help answer question \"{question}\"",
backstory="As an expert audio analysis assistant, you analyze the audio file to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=2,
tools=[audio_analysis_tool],
verbose=False
)
video_analysis_agent = Agent(
role="Video Analysis Agent",
goal="Analyze video file to help answer question \"{question}\"",
backstory="As an expert video analysis assistant, you analyze the video file to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=2,
tools=[video_analysis_tool],
verbose=False
)
youtube_analysis_agent = Agent(
role="YouTube Analysis Agent",
goal="Analyze YouTube video to help answer question \"{question}\"",
backstory="As an expert YouTube analysis assistant, you analyze the video to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=2,
tools=[youtube_analysis_tool],
verbose=False
)
document_analysis_agent = Agent(
role="Document Analysis Agent",
goal="Analyze document of type .xlxs, .txt, .csv, .xml, .rtf, .pdf, .md, .html, .css, .js, .py to help answer question \"{question}\"",
backstory="As an expert document analysis assistant, you analyze the document to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=2,
tools=[document_analysis_tool],
verbose=False
)
code_generation_agent = Agent(
role="Code Generation Agent",
goal="Generate Python code and execute it to help answer question \"{question}\"",
backstory="As an expert Python code generation assistant, you generate and execute code to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=3,
tools=[code_execution_tool],
verbose=False
)
code_execution_agent = Agent(
role="Code Execution Agent",
goal="Execute Python code file to help answer question \"{question}\"",
backstory="As an expert Python code execution assistant, you execute the code file to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=3,
tools=[code_execution_tool],
verbose=False
)
manager_agent = Agent(
role="Manager Agent",
goal="Try to answer the following question. If needed, delegate to one or more of your coworkers for help. "
"If there is no good coworker, delegate to the Python Coding Agent to implement a tool for the task. "
"Question: \"{question}\"",
backstory="As an expert manager assistant, you answer the question.",
allow_delegation=True,
llm=MANAGER_MODEL,
max_iter=5,
verbose=True
)
# Task
manager_task = Task(
agent=manager_agent,
description="Try to answer the following question. If needed, delegate to one or more of your coworkers for help. Question: \"{question}\"",
expected_output="The answer to the question."
)
# Crew
crew = Crew(
agents=[web_search_agent,
image_analysis_agent,
audio_analysis_agent,
video_analysis_agent,
youtube_analysis_agent,
document_analysis_agent,
code_generation_agent,
code_execution_agent],
manager_agent=manager_agent,
tasks=[manager_task],
verbose=True
)
# Process
if file_path:
question = f"{question} File path: {file_path}."
initial_answer = crew.kickoff(inputs={"question": question})
final_answer = get_final_answer(FINAL_ANSWER_MODEL, question, str(initial_answer))
print(f"Question: {question}")
print(f"Initial answer: {initial_answer}")
print(f"Final answer: {final_answer}")
return final_answer