grady / crew.py
bstraehle's picture
Update crew.py
ff22768 verified
raw
history blame
14.9 kB
import os
from crewai import Agent, Crew, Process, Task
from crewai.tools import tool
#from crewai_tools import (
# SerperDevTool,
# WebsiteSearchTool
#)
from google import genai
from google.genai import types
from openinference.instrumentation.crewai import CrewAIInstrumentor
from phoenix.otel import register
from util import get_final_answer
## LLMs
MANAGER_MODEL = "gpt-4.1-mini"
AGENT_MODEL = "gpt-4.1-mini"
FINAL_ANSWER_MODEL = "gemini-2.5-flash-preview-04-17"
WEB_SEARCH_MODEL = "gemini-2.5-flash-preview-04-17"
IMAGE_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17"
AUDIO_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17"
VIDEO_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17"
YOUTUBE_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17"
DOCUMENT_ANALYSIS_MODEL = "gemini-2.5-flash-preview-04-17"
CODE_GENERATION_MODEL = "gemini-2.5-flash-preview-04-17"
CODE_EXECUTION_MODEL = "gemini-2.5-flash-preview-04-17"
# LLM evaluation
PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"]
os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}"
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com"
tracer_provider = register(
auto_instrument=True,
project_name="gaia"
)
CrewAIInstrumentor().instrument(tracer_provider=tracer_provider)
def run_crew(question, file_path):
# Tools
#web_search_tool = SerperDevTool()
#web_rag_tool = WebsiteSearchTool()
@tool("Web Search Tool")
def web_search_tool(question: str) -> str:
"""Search the web to answer a question.
Args:
question (str): Question to answer
Returns:
str: Answer to the question
Raises:
RuntimeError: If processing fails"""
try:
#client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
#response = client.models.generate_content(
# model=IMAGE_MODEL,
# contents=[file, question]
#)
#return response.text
###
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
response = client.models.generate_content(
model=WEB_SEARCH_MODEL,
contents=question,
config=types.GenerateContentConfig(
tools=[types.Tool(google_search=types.GoogleSearchRetrieval())]
)
)
return response.text
###
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("Image Analysis Tool")
def image_analysis_tool(question: str, file_path: str) -> str:
"""Answer a question about an image file.
Args:
question (str): Question about an image file
file_path (str): The image file path
Returns:
str: Answer to the question about the image file
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
file = client.files.upload(file=file_path)
response = client.models.generate_content(
model=IMAGE_ANALYSIS_MODEL,
contents=[file, question]
)
return response.text
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("Audio Analysis Tool")
def audio_analysis_tool(question: str, file_path: str) -> str:
"""Answer a question about an audio file.
Args:
question (str): Question about an audio file
file_path (str): The audio file path
Returns:
str: Answer to the question about the audio file
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
file = client.files.upload(file=file_path)
response = client.models.generate_content(
model=AUDIO_ANALYSIS_MODEL,
contents=[file, question]
)
return response.text
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("Video Analysis Tool")
def video_analysis_tool(question: str, file_path: str) -> str:
"""Answer a question about a video file.
Args:
question (str): Question about a video file
file_path (str): The video file path
Returns:
str: Answer to the question about the video file
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
file = client.files.upload(file=file_path)
response = client.models.generate_content(
model=VIDEO_ANALYSIS_MODEL,
contents=[file, question]
)
return response.text
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("YouTube Analysis Tool")
def youtube_analysis_tool(question: str, url: str) -> str:
"""Answer a question about a YouTube video.
Args:
question (str): Question about a YouTube video
url (str): The YouTube video URL
Returns:
str: Answer to the question about the YouTube video
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
return client.models.generate_content(
model=YOUTUBE_ANALYSIS_MODEL,
contents=types.Content(
parts=[types.Part(file_data=types.FileData(file_uri=url)),
types.Part(text=question)]
)
)
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("Document Analysis Tool")
def document_analysis_tool(question: str, file_path: str) -> str:
"""Answer a question about a document file. Supported document types include:
.txt, .csv, .xml, .rtf, .pdf, .md, .html, .css, .js
Args:
question (str): Question about a document file
file_path (str): The document file path
Returns:
str: Answer to the question about the document file
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
file = client.files.upload(file=file_path)
response = client.models.generate_content(
model=DOCUMENT_ANALYSIS_MODEL,
contents=[file, question]
)
return response.text
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("Code Generation Tool")
def code_generation_tool(question: str) -> str:
"""Generate and execute Python code to answer a question.
Args:
question (str): Question to answer
Returns:
str: Answer to the question
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
file = client.files.upload(file=file_path)
response = client.models.generate_content(
model=CODE_GENERATION_MODEL,
contents=[question],
config=types.GenerateContentConfig(
tools=[types.Tool(code_execution=types.ToolCodeExecution)]
),
)
for part in response.candidates[0].content.parts:
if part.code_execution_result is not None:
return part.code_execution_result.output
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
@tool("Code Execution Tool")
def code_execution_tool(question: str, file_path: str) -> str:
"""Execute a Python code file to answer a question.
Args:
question (str): Question to answer
file_path (str): The Python code file path
Returns:
str: Answer to the question
Raises:
RuntimeError: If processing fails"""
try:
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
file = client.files.upload(file=file_path)
response = client.models.generate_content(
model=CODE_EXECUTION_MODEL,
contents=[file, question],
config=types.GenerateContentConfig(
tools=[types.Tool(code_execution=types.ToolCodeExecution)]
),
)
for part in response.candidates[0].content.parts:
if part.code_execution_result is not None:
return part.code_execution_result.output
except Exception as e:
raise RuntimeError(f"Processing failed: {str(e)}")
# Agents
web_search_agent = Agent(
role="Web Search Agent",
goal="Search the web to help answer question \"{question}\"",
backstory="As an expert web search assistant, you search the web to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=2,
tools=[web_search_tool],
verbose=False
)
image_analysis_agent = Agent(
role="Image Analysis Agent",
goal="Analyze image file to help answer question \"{question}\"",
backstory="As an expert image analysis assistant, you analyze the image file to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=2,
tools=[image_analysis_tool],
verbose=False
)
audio_analysis_agent = Agent(
role="Audio Analysis Agent",
goal="Analyze audio file to help answer question \"{question}\"",
backstory="As an expert audio analysis assistant, you analyze the audio file to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=2,
tools=[audio_analysis_tool],
verbose=False
)
video_analysis_agent = Agent(
role="Video Analysis Agent",
goal="Analyze video file to help answer question \"{question}\"",
backstory="As an expert video analysis assistant, you analyze the video file to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=2,
tools=[video_analysis_tool],
verbose=False
)
youtube_analysis_agent = Agent(
role="YouTube Analysis Agent",
goal="Analyze YouTube video to help answer question \"{question}\"",
backstory="As an expert YouTube analysis assistant, you analyze the video to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=2,
tools=[youtube_analysis_tool],
verbose=False
)
document_analysis_agent = Agent(
role="Document Analysis Agent",
goal="Analyze document of type .txt, .csv, .xml, .rtf, .pdf, .md, .html, .css, .js to help answer question \"{question}\"",
backstory="As an expert document analysis assistant, you analyze the document to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=2,
tools=[document_analysis_tool],
verbose=False
)
code_generation_agent = Agent(
role="Code Generation Agent",
goal="Generate Python code and execute it to help answer question \"{question}\"",
backstory="As an expert Python code generation assistant, you generate and execute code to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=3,
tools=[code_execution_tool],
verbose=False
)
code_execution_agent = Agent(
role="Code Execution Agent",
goal="Execute Python code file to help answer question \"{question}\"",
backstory="As an expert Python code execution assistant, you execute the code file to help answer the question.",
allow_delegation=False,
llm=AGENT_MODEL,
max_iter=3,
tools=[code_execution_tool],
verbose=False
)
manager_agent = Agent(
role="Manager Agent",
goal="Try to answer the following question. If needed, delegate to one or more of your coworkers for help. "
"If there is no good coworker, delegate to the Python Coding Agent to implement a tool for the task. "
"Question: \"{question}\"",
backstory="As an expert manager assistant, you answer the question.",
allow_delegation=True,
llm=MANAGER_MODEL,
max_iter=5,
verbose=True
)
# Task
manager_task = Task(
agent=manager_agent,
description="Try to answer the following question. If needed, delegate to one or more of your coworkers for help. Question: \"{question}\"",
expected_output="The answer to the question."
)
# Crew
crew = Crew(
agents=[web_search_agent,
image_analysis_agent,
audio_analysis_agent,
video_analysis_agent,
youtube_analysis_agent,
document_analysis_agent,
code_generation_agent,
code_execution_agent],
manager_agent=manager_agent,
tasks=[manager_task],
verbose=True
)
# Process
if file_path:
question = f"{question} File path: {file_path}."
#if file_path.endswith(".py"):
# with open(f"{file_path}", "r") as file:
# question = f"{question} File data:\n{file.read()}"
initial_answer = crew.kickoff(inputs={"question": question})
final_answer = get_final_answer(FINAL_ANSWER_MODEL, question, str(initial_answer))
print(f"Question: {question}")
print(f"Initial answer: {initial_answer}")
print(f"Final answer: {final_answer}")
return final_answer