abtsousa commited on
Commit
1f19061
·
1 Parent(s): 689ccd6

Refactor BasicAgent to support async processing and rate limiting; implement concurrent question handling.

Browse files
Files changed (1) hide show
  1. app.py +113 -30
app.py CHANGED
@@ -1,11 +1,35 @@
1
  import os
2
  import gradio as gr
3
  import requests
4
- import inspect
5
  import pandas as pd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
  # Phoenix imports
7
  from phoenix.otel import register
8
  import logging
 
 
 
 
 
 
9
  # --- Constants ---
10
  DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
11
  APP_NAME = "OracleBot"
@@ -25,16 +49,85 @@ start_phoenix()
25
 
26
  # --- Basic Agent Definition ---
27
  # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------
 
 
 
 
 
 
 
28
  class BasicAgent:
29
  def __init__(self):
30
- print("BasicAgent initialized.")
31
- def __call__(self, question: str) -> str:
32
- print(f"Agent received question (first 50 chars): {question[:50]}...")
33
- fixed_answer = "This is a default answer."
34
- print(f"Agent returning fixed answer: {fixed_answer}")
35
- return fixed_answer
36
-
37
- def run_and_submit_all( profile: gr.OAuthProfile | None):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38
  """
39
  Fetches all questions, runs the BasicAgent on them, submits all answers,
40
  and displays the results.
@@ -73,34 +166,24 @@ def run_and_submit_all( profile: gr.OAuthProfile | None):
73
  print("Fetched questions list is empty.")
74
  return "Fetched questions list is empty or invalid format.", None
75
  print(f"Fetched {len(questions_data)} questions.")
76
- except requests.exceptions.RequestException as e:
77
- print(f"Error fetching questions: {e}")
78
- return f"Error fetching questions: {e}", None
79
  except requests.exceptions.JSONDecodeError as e:
80
  print(f"Error decoding JSON response from questions endpoint: {e}")
81
  print(f"Response text: {response.text[:500]}")
82
  return f"Error decoding server response for questions: {e}", None
 
 
 
83
  except Exception as e:
84
  print(f"An unexpected error occurred fetching questions: {e}")
85
  return f"An unexpected error occurred fetching questions: {e}", None
86
 
87
- # 3. Run your Agent
88
- results_log = []
89
- answers_payload = []
90
- print(f"Running agent on {len(questions_data)} questions...")
91
- for item in questions_data:
92
- task_id = item.get("task_id")
93
- question_text = item.get("question")
94
- if not task_id or question_text is None:
95
- print(f"Skipping item with missing task_id or question: {item}")
96
- continue
97
- try:
98
- submitted_answer = agent(question_text)
99
- answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
100
- results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
101
- except Exception as e:
102
- print(f"Error running agent on task {task_id}: {e}")
103
- results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
104
 
105
  if not answers_payload:
106
  print("Agent did not produce any answers to submit.")
@@ -208,4 +291,4 @@ if __name__ == "__main__":
208
  print("-"*(60 + len(" App Starting ")) + "\n")
209
 
210
  print("Launching Gradio Interface for Basic Agent Evaluation...")
211
- demo.launch(debug=True, share=False)
 
1
  import os
2
  import gradio as gr
3
  import requests
 
4
  import pandas as pd
5
+ from langchain_openai import ChatOpenAI
6
+ from os import getenv
7
+ from dotenv import load_dotenv
8
+ from typing import Annotated
9
+ from pydantic import SecretStr
10
+
11
+ from typing_extensions import TypedDict
12
+
13
+ from langgraph.graph import StateGraph, START, END
14
+ from langgraph.graph.message import add_messages
15
+ import asyncio # Added for async processing
16
+ import time # Added for rate limiting
17
+
18
+ from langchain_community.tools import WikipediaQueryRun
19
+ from langchain_community.utilities.wikipedia import WikipediaAPIWrapper
20
+ from langgraph.prebuilt import tools_condition
21
+ from langgraph.checkpoint.memory import MemorySaver
22
+ from langgraph.prebuilt import create_react_agent
23
+
24
  # Phoenix imports
25
  from phoenix.otel import register
26
  import logging
27
+
28
+ from agent.agent import get_agent
29
+ from agent.config import create_agent_config
30
+
31
+ load_dotenv()
32
+
33
  # --- Constants ---
34
  DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
35
  APP_NAME = "OracleBot"
 
49
 
50
  # --- Basic Agent Definition ---
51
  # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------
52
+
53
+ # class State(TypedDict):
54
+ # # Messages have the type "list". The `add_messages` function
55
+ # # in the annotation defines how this state key should be updated
56
+ # # (in this case, it appends messages to the list, rather than overwriting them)
57
+ # messages: Annotated[list, add_messages]
58
+
59
  class BasicAgent:
60
  def __init__(self):
61
+ self.agent = get_agent()
62
+ self._last_request_time = 0
63
+ self._request_lock = asyncio.Lock()
64
+
65
+ async def __call__(self, question: str) -> str:
66
+ print(f"Agent received question: {question}")
67
+
68
+ # Rate limiting: ensure at least 1 second between requests
69
+ async with self._request_lock:
70
+ current_time = time.time()
71
+ time_since_last_request = current_time - self._last_request_time
72
+ if time_since_last_request < 1.0:
73
+ sleep_time = 1.0 - time_since_last_request
74
+ print(f"Rate limiting: sleeping for {sleep_time:.2f} seconds")
75
+ await asyncio.sleep(sleep_time)
76
+
77
+ self._last_request_time = time.time()
78
+
79
+ # Create configuration like in main.py
80
+ config = create_agent_config(app_name=APP_NAME)
81
+
82
+ # Call the agent with the question and config (like main.py)
83
+ answer = await self.agent.ainvoke(
84
+ {"messages": [{"role": "user", "content": question}]},
85
+ config=config
86
+ )
87
+
88
+ print(f"Agent returning answer: {answer}")
89
+
90
+ # Extract content from the last message in the response
91
+ if "messages" in answer and answer["messages"]:
92
+ last_message = answer["messages"][-1]
93
+ if hasattr(last_message, 'content'):
94
+ content = last_message.content
95
+ else:
96
+ content = str(last_message)
97
+ else:
98
+ content = str(answer)
99
+
100
+ return str(content) if content is not None else ""
101
+
102
+ # Simplified concurrent processor: launch all tasks immediately and await them together
103
+ async def process_questions(agent: BasicAgent, questions_data: list):
104
+ print(f"Running agent on {len(questions_data)} questions concurrently (simple fan-out)...")
105
+
106
+ async def handle(item: dict):
107
+ task_id = item.get("task_id")
108
+ question_text = item.get("question")
109
+ if not task_id or question_text is None:
110
+ print(f"Skipping item with missing task_id or question: {item}")
111
+ return None
112
+ try:
113
+ submitted_answer = await agent(question_text)
114
+ return {
115
+ "log": {"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer},
116
+ "payload": {"task_id": task_id, "submitted_answer": submitted_answer},
117
+ }
118
+ except Exception as e:
119
+ print(f"Error running agent on task {task_id}: {e}")
120
+ return {
121
+ "log": {"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"},
122
+ "payload": {"task_id": task_id, "submitted_answer": f"AGENT ERROR: {e}"},
123
+ }
124
+
125
+ results = await asyncio.gather(*(handle(item) for item in questions_data))
126
+ results_log = [r["log"] for r in results if r]
127
+ answers_payload = [r["payload"] for r in results if r]
128
+ return results_log, answers_payload
129
+
130
+ async def run_and_submit_all( profile: gr.OAuthProfile | None):
131
  """
132
  Fetches all questions, runs the BasicAgent on them, submits all answers,
133
  and displays the results.
 
166
  print("Fetched questions list is empty.")
167
  return "Fetched questions list is empty or invalid format.", None
168
  print(f"Fetched {len(questions_data)} questions.")
 
 
 
169
  except requests.exceptions.JSONDecodeError as e:
170
  print(f"Error decoding JSON response from questions endpoint: {e}")
171
  print(f"Response text: {response.text[:500]}")
172
  return f"Error decoding server response for questions: {e}", None
173
+ except requests.exceptions.RequestException as e:
174
+ print(f"Error fetching questions: {e}")
175
+ return f"Error fetching questions: {e}", None
176
  except Exception as e:
177
  print(f"An unexpected error occurred fetching questions: {e}")
178
  return f"An unexpected error occurred fetching questions: {e}", None
179
 
180
+ # 3. Run your Agent concurrently (simple gather)
181
+ results_log, answers_payload = await process_questions(agent, questions_data)
182
+
183
+ # Remove everything before "FINAL ANSWER: " in submitted answers
184
+ for answer in answers_payload:
185
+ if "submitted_answer" in answer:
186
+ answer["submitted_answer"] = answer["submitted_answer"].split("FINAL ANSWER: ", 1)[-1].strip()
 
 
 
 
 
 
 
 
 
 
187
 
188
  if not answers_payload:
189
  print("Agent did not produce any answers to submit.")
 
291
  print("-"*(60 + len(" App Starting ")) + "\n")
292
 
293
  print("Launching Gradio Interface for Basic Agent Evaluation...")
294
+ demo.launch(debug=True, share=False)