|
import os |
|
import gradio as gr |
|
import requests |
|
import inspect |
|
import pandas as pd |
|
import json |
|
import asyncio |
|
from pathlib import Path |
|
from datetime import datetime |
|
from typing import List, Dict, Any, Optional |
|
from tqdm.asyncio import tqdm as async_tqdm |
|
from agents.llama_index_agent import GaiaAgent |
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
CACHE_DIR = "cache" |
|
CACHE_FILE = os.path.join(CACHE_DIR, "agent_cache.json") |
|
MAX_CONCURRENT_REQUESTS = 3 |
|
|
|
|
|
CLAUDE = { |
|
"model_provider": "anthropic", |
|
"model_name": "claude-3-7-sonnet-latest" |
|
} |
|
OPENAI = { |
|
"model_provider": "openai", |
|
"model_name": "gpt-4o" |
|
} |
|
|
|
|
|
class OptimizedGaiaAgent: |
|
""" |
|
Enhanced GAIA agent with caching and asynchronous processing capabilities. |
|
""" |
|
def __init__( |
|
self, |
|
model_config=CLAUDE, |
|
use_cache=True, |
|
cache_file=CACHE_FILE, |
|
max_concurrent=MAX_CONCURRENT_REQUESTS |
|
): |
|
""" |
|
Initialize the optimized agent. |
|
|
|
Args: |
|
model_config: Dictionary with model_provider and model_name |
|
use_cache: Whether to use caching |
|
cache_file: Path to the cache file |
|
max_concurrent: Maximum number of concurrent requests |
|
""" |
|
self.agent = GaiaAgent(**model_config) |
|
self.model_config = model_config |
|
self.use_cache = use_cache |
|
self.cache_file = cache_file |
|
self.cache = self._load_cache() if use_cache else {} |
|
self.semaphore = asyncio.Semaphore(max_concurrent) |
|
|
|
print(f"OptimizedGaiaAgent initialized with {model_config['model_provider']} {model_config['model_name']}") |
|
if use_cache: |
|
print(f"Cache loaded with {len(self.cache)} answers") |
|
|
|
def _load_cache(self) -> Dict[str, str]: |
|
"""Load cached answers from file""" |
|
|
|
os.makedirs(os.path.dirname(self.cache_file), exist_ok=True) |
|
|
|
cache_path = Path(self.cache_file) |
|
if cache_path.exists(): |
|
try: |
|
with open(cache_path, 'r') as f: |
|
return json.load(f) |
|
except Exception as e: |
|
print(f"Error loading cache: {e}") |
|
return {} |
|
return {} |
|
|
|
def _save_cache(self) -> None: |
|
"""Save cached answers to file""" |
|
try: |
|
with open(self.cache_file, 'w') as f: |
|
json.dump(self.cache, f, indent=2) |
|
except Exception as e: |
|
print(f"Error saving cache: {e}") |
|
|
|
def _get_cache_key(self, question: str) -> str: |
|
"""Generate a consistent key for the cache""" |
|
|
|
return question.strip() |
|
|
|
async def process_question(self, task_id: str, question: str) -> Dict[str, Any]: |
|
""" |
|
Process a single question, using cache if available. |
|
|
|
Args: |
|
task_id: ID of the task/question |
|
question: The question text |
|
|
|
Returns: |
|
Dictionary with task_id, question, answer, and metadata |
|
""" |
|
cache_key = self._get_cache_key(question) |
|
|
|
|
|
if self.use_cache and cache_key in self.cache: |
|
print(f"🔄 Cache hit for task {task_id[:8]}...") |
|
return { |
|
"task_id": task_id, |
|
"question": question, |
|
"submitted_answer": self.cache[cache_key], |
|
"cached": True, |
|
"error": False |
|
} |
|
|
|
|
|
async with self.semaphore: |
|
print(f"⚙️ Processing task {task_id[:8]}...") |
|
try: |
|
response = await self.agent.run(question) |
|
answer = response.response.blocks[-1].text |
|
|
|
|
|
if self.use_cache: |
|
self.cache[cache_key] = answer |
|
|
|
await asyncio.to_thread(self._save_cache) |
|
|
|
return { |
|
"task_id": task_id, |
|
"question": question, |
|
"submitted_answer": answer, |
|
"cached": False, |
|
"error": False |
|
} |
|
except Exception as e: |
|
error_message = f"ERROR: {str(e)}" |
|
print(f"❌ Error processing task {task_id[:8]}: {error_message}") |
|
return { |
|
"task_id": task_id, |
|
"question": question, |
|
"submitted_answer": error_message, |
|
"cached": False, |
|
"error": True |
|
} |
|
|
|
async def process_all( |
|
self, |
|
questions_data: List[Dict[str, Any]], |
|
progress_callback=None |
|
) -> List[Dict[str, Any]]: |
|
""" |
|
Process all questions, with progress reporting. |
|
|
|
Args: |
|
questions_data: List of question dictionaries |
|
progress_callback: Function to call with progress updates |
|
|
|
Returns: |
|
List of results with answers and metadata |
|
""" |
|
|
|
valid_questions = [ |
|
item for item in questions_data |
|
if item.get("task_id") and item.get("question") is not None |
|
] |
|
|
|
if not valid_questions: |
|
print("No valid questions to process.") |
|
return [] |
|
|
|
total = len(valid_questions) |
|
print(f"Processing {total} questions with {MAX_CONCURRENT_REQUESTS} concurrent tasks...") |
|
|
|
|
|
results = [] |
|
|
|
|
|
tasks = [ |
|
self.process_question(item["task_id"], item["question"]) |
|
for item in valid_questions |
|
] |
|
|
|
|
|
if progress_callback: |
|
progress_callback(0, desc="Starting processing...") |
|
|
|
|
|
for i, task in enumerate(asyncio.as_completed(tasks)): |
|
result = await task |
|
results.append(result) |
|
|
|
|
|
if progress_callback: |
|
progress_callback((i + 1) / total, desc=f"Processed {i + 1}/{total} questions") |
|
|
|
|
|
id_to_result = {result["task_id"]: result for result in results} |
|
ordered_results = [ |
|
id_to_result.get( |
|
item["task_id"], |
|
{"task_id": item["task_id"], "question": item.get("question"), "submitted_answer": "ERROR: Processing failed", "error": True} |
|
) |
|
for item in valid_questions |
|
] |
|
|
|
return ordered_results |
|
|
|
|
|
|
|
class BasicAgent: |
|
""" |
|
Optimized agent wrapper for the GAIA benchmark. |
|
""" |
|
def __init__( |
|
self, |
|
model_provider="anthropic", |
|
model_name="claude-3-7-sonnet-latest", |
|
api_key=None, |
|
use_cache=True, |
|
max_concurrent=MAX_CONCURRENT_REQUESTS |
|
): |
|
""" |
|
Initialize the BasicAgent with caching and async capabilities. |
|
|
|
Args: |
|
model_provider: LLM provider to use |
|
model_name: Specific model to use |
|
api_key: Optional API key |
|
use_cache: Whether to use caching |
|
max_concurrent: Maximum concurrent requests |
|
""" |
|
model_config = { |
|
"model_provider": model_provider, |
|
"model_name": model_name, |
|
"api_key": api_key |
|
} |
|
|
|
self.agent = OptimizedGaiaAgent( |
|
model_config=model_config, |
|
use_cache=use_cache, |
|
max_concurrent=max_concurrent |
|
) |
|
print(f"BasicAgent initialized with {model_provider} {model_name}.") |
|
|
|
async def process_async(self, questions_data, progress_callback=None): |
|
"""Process questions asynchronously with progress reporting""" |
|
return await self.agent.process_all(questions_data, progress_callback) |
|
|
|
def __call__(self, question: str) -> str: |
|
""" |
|
Process a single question (for compatibility with the original interface). |
|
This method is synchronous for backward compatibility. |
|
""" |
|
print(f"Agent received question (first 50 chars): {question[:50]}...") |
|
|
|
async def agentic_main(): |
|
result = await self.agent.process_question("single", question) |
|
return result["submitted_answer"] |
|
|
|
final_answer = asyncio.run(agentic_main()) |
|
print(f"Agent returning answer: {final_answer}") |
|
return final_answer |
|
|
|
|
|
|
|
async def async_run_and_submit_all( |
|
profile: gr.OAuthProfile | None, |
|
progress=gr.Progress() |
|
) -> tuple: |
|
""" |
|
Asynchronous version of run_and_submit_all. |
|
Fetches questions, processes them concurrently, and submits answers. |
|
""" |
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
|
if not profile: |
|
print("User not logged in.") |
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
username = f"{profile.username}" |
|
print(f"User logged in: {username}") |
|
|
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/questions" |
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
try: |
|
progress(0, desc="Initializing agent...") |
|
agent = BasicAgent() |
|
except Exception as e: |
|
print(f"Error instantiating agent: {e}") |
|
return f"Error initializing agent: {e}", None |
|
|
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
print(agent_code) |
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
progress(0.1, desc="Fetching questions...") |
|
try: |
|
|
|
async def fetch_questions(): |
|
loop = asyncio.get_event_loop() |
|
return await loop.run_in_executor( |
|
None, |
|
lambda: requests.get(questions_url, timeout=15) |
|
) |
|
|
|
response = await fetch_questions() |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
|
|
if not questions_data: |
|
print("Fetched questions list is empty.") |
|
return "Fetched questions list is empty or invalid format.", None |
|
|
|
print(f"Fetched {len(questions_data)} questions.") |
|
progress(0.2, desc=f"Successfully fetched {len(questions_data)} questions.") |
|
|
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching questions: {e}") |
|
return f"Error fetching questions: {e}", None |
|
except requests.exceptions.JSONDecodeError as e: |
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
print(f"Response text: {response.text[:500]}") |
|
return f"Error decoding server response for questions: {e}", None |
|
except Exception as e: |
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
return f"An unexpected error occurred fetching questions: {e}", None |
|
|
|
|
|
print(f"Processing {len(questions_data)} questions...") |
|
try: |
|
|
|
def update_progress(value, desc=""): |
|
|
|
progress(0.2 + (value * 0.6), desc=desc) |
|
|
|
results = await agent.process_async(questions_data, update_progress) |
|
|
|
|
|
answers_payload = [ |
|
{"task_id": result["task_id"], "submitted_answer": result["submitted_answer"]} |
|
for result in results |
|
] |
|
|
|
|
|
results_log = [ |
|
{"Task ID": result["task_id"], "Question": result["question"], "Submitted Answer": result["submitted_answer"]} |
|
for result in results |
|
] |
|
|
|
progress(0.8, desc=f"Processed all {len(results)} questions. Preparing submission...") |
|
|
|
except Exception as e: |
|
print(f"Error during question processing: {e}") |
|
return f"Error during question processing: {e}", None |
|
|
|
if not answers_payload: |
|
print("Agent did not produce any answers to submit.") |
|
return "Agent did not produce any answers to submit.", pd.DataFrame([]) |
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
|
print(status_update) |
|
progress(0.9, desc="Submitting answers...") |
|
|
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
try: |
|
async def submit_answers(): |
|
loop = asyncio.get_event_loop() |
|
return await loop.run_in_executor( |
|
None, |
|
lambda: requests.post(submit_url, json=submission_data, timeout=60) |
|
) |
|
|
|
response = await submit_answers() |
|
response.raise_for_status() |
|
result_data = response.json() |
|
|
|
final_status = ( |
|
f"Submission Successful!\n" |
|
f"User: {result_data.get('username')}\n" |
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
) |
|
|
|
print("Submission successful.") |
|
progress(1.0, desc="Complete!") |
|
results_df = pd.DataFrame(results_log) |
|
return final_status, results_df |
|
|
|
except requests.exceptions.HTTPError as e: |
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
try: |
|
error_json = e.response.json() |
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
except requests.exceptions.JSONDecodeError: |
|
error_detail += f" Response: {e.response.text[:500]}" |
|
status_message = f"Submission Failed: {error_detail}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.Timeout: |
|
status_message = "Submission Failed: The request timed out." |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.RequestException as e: |
|
status_message = f"Submission Failed: Network error - {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except Exception as e: |
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
|
|
|
|
|
|
def run_and_submit_all(profile: gr.OAuthProfile | None, progress=gr.Progress()): |
|
"""Synchronous wrapper for the async function""" |
|
return asyncio.run(async_run_and_submit_all(profile, progress)) |
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Optimized GAIA Agent Evaluation Runner") |
|
gr.Markdown( |
|
""" |
|
**Instructions:** |
|
|
|
1. Please clone this space, then modify the code to define your agent's logic, the tools, and necessary packages. |
|
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. |
|
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, process them, and see your score. |
|
|
|
This implementation features: |
|
- **Caching**: Answers are saved to avoid reprocessing the same questions |
|
- **Asynchronous Processing**: Questions are processed concurrently for better performance |
|
- **Progress Tracking**: See real-time progress as questions are processed |
|
""" |
|
) |
|
|
|
with gr.Row(): |
|
gr.LoginButton() |
|
clear_cache_button = gr.Button("Clear Cache") |
|
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary") |
|
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
|
|
def clear_cache(): |
|
if os.path.exists(CACHE_FILE): |
|
try: |
|
os.remove(CACHE_FILE) |
|
return f"Cache cleared successfully! ({CACHE_FILE})" |
|
except Exception as e: |
|
return f"Error clearing cache: {e}" |
|
return "No cache file found." |
|
|
|
|
|
clear_cache_button.click( |
|
fn=clear_cache, |
|
outputs=status_output |
|
) |
|
|
|
run_button.click( |
|
fn=run_and_submit_all, |
|
inputs=[gr.OAuthProfile()], |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
print("\n" + "-"*30 + " App Starting " + "-"*30) |
|
|
|
space_host_startup = os.getenv("SPACE_HOST") |
|
space_id_startup = os.getenv("SPACE_ID") |
|
|
|
if space_host_startup: |
|
print(f"✅ SPACE_HOST found: {space_host_startup}") |
|
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
|
else: |
|
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
|
if space_id_startup: |
|
print(f"✅ SPACE_ID found: {space_id_startup}") |
|
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
|
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
|
else: |
|
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
|
print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
|
print("Launching Gradio Interface for Optimized Agent Evaluation...") |
|
demo.launch(debug=True, share=False) |