shgao's picture
update
9b0e7e4
raw
history blame
11.8 kB
import datetime
# Imports required for Google Sheets integration
import gspread
import random
import time
import functools
from gspread.exceptions import SpreadsheetNotFound, APIError
from oauth2client.service_account import ServiceAccountCredentials
import pandas as pd
import json
import gradio as gr
import os
GSERVICE_ACCOUNT_INFO = {
"type": "service_account",
"project_id": "txagent",
"private_key_id": "cc1a12e427917244a93faf6f19e72b589a685e65",
"private_key": None,
"client_email": "[email protected]",
"client_id": "108950722202634464257",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/shanghua%40txagent.iam.gserviceaccount.com",
"universe_domain": "googleapis.com"
}
GSHEET_NAME = "TxAgent_data_collection"
GSheet_API_KEY = os.environ.get("GSheets_Shanghua_PrivateKey")
if GSheet_API_KEY is None:
print("GSheet_API_KEY not found in environment variables. Please set it.")
else:
GSheet_API_KEY = GSheet_API_KEY.replace("\\n", "\n")
GSERVICE_ACCOUNT_INFO["private_key"] = GSheet_API_KEY
#Exponential backoff retry decorator
def exponential_backoff_gspread(max_retries=30, max_backoff_sec=64, base_delay_sec=1, target_exception=APIError):
"""
Decorator to implement exponential backoff for gspread API calls.
Retries a function call if it raises a specific exception (defaults to APIError)
that matches the Google Sheets API rate limit error (HTTP 429).
Args:
max_retries (int): Maximum number of retry attempts.
max_backoff_sec (int): Maximum delay between retries in seconds.
base_delay_sec (int): Initial delay in seconds for the first retry.
target_exception (Exception): The specific exception type to catch.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while True: # Loop indefinitely until success or max retries exceeded
try:
# Attempt to execute the wrapped function
return func(*args, **kwargs)
except target_exception as e:
# Check if the error is the specific 429 Quota Exceeded error
# We parse the string representation as gspread's APIError includes the status code there.
error_message = str(e)
is_rate_limit_error = "[429]" in error_message and (
"Quota exceeded" in error_message or "Too Many Requests" in error_message
)
if is_rate_limit_error:
retries += 1
if retries > max_retries:
print(f"Max retries ({max_retries}) exceeded for {func.__name__}. Last error: {e}")
raise e # Re-raise the last exception after exhausting retries
# Calculate exponential backoff delay with random jitter (0-1 second)
backoff_delay = min(max_backoff_sec, base_delay_sec * (2 ** (retries - 1)) + random.uniform(0, 1))
print(
f"Rate limit hit for {func.__name__} (Attempt {retries}/{max_retries}). "
f"Retrying in {backoff_delay:.2f} seconds. Error: {e}"
)
time.sleep(backoff_delay)
else:
# If it's a different kind of APIError (e.g., 403 Forbidden, 404 Not Found), re-raise immediately.
print(f"Non-rate-limit APIError encountered in {func.__name__}: {e}")
raise e
except Exception as e:
# Catch any other unexpected exceptions during the function execution
print(f"An unexpected error occurred in {func.__name__}: {e}")
raise e # Re-raise unexpected errors
return wrapper
return decorator
#2) Initialize Google Sheets client
# Define the scopes
scope = [
"https://spreadsheets.google.com/feeds",
"https://www.googleapis.com/auth/drive",
]
# Authenticate immediately on import
creds = ServiceAccountCredentials.from_json_keyfile_dict(GSERVICE_ACCOUNT_INFO, scope)
client = gspread.authorize(creds)
@exponential_backoff_gspread(max_retries=30, max_backoff_sec=64)
def read_sheet_to_df(custom_sheet_name=None, sheet_index=0):
"""
Read all data from a Google Sheet into a pandas DataFrame.
Parameters:
custom_sheet_name (str): The name of the Google Sheet to open. If None, uses GSHEET_NAME.
sheet_index (int): Index of the worksheet within the spreadsheet (default is 0, the first sheet).
Returns:
pandas.DataFrame: DataFrame containing the sheet data, with the first row used as headers.
"""
# Determine which sheet name to use
if custom_sheet_name is None:
custom_sheet_name = GSHEET_NAME
# Open the spreadsheet
try:
spreadsheet = client.open(custom_sheet_name)
except gspread.SpreadsheetNotFound:
return None
# Select the desired worksheet
try:
worksheet = spreadsheet.get_worksheet(sheet_index)
except IndexError:
return None
# Fetch all data: first row as header, remaining as records
data = worksheet.get_all_records()
# Convert to DataFrame
df = pd.DataFrame(data)
return df
@exponential_backoff_gspread(max_retries=30, max_backoff_sec=64)
def append_to_sheet(user_data=None, custom_row_dict=None, custom_sheet_name=None, add_header_when_create_sheet=False):
"""
Append a new row to a Google Sheet. If 'custom_row' is provided, append that row.
Otherwise, append a default row constructed from the provided user_data.
Ensures that each value is aligned with the correct column header.
"""
if custom_sheet_name is None:
custom_sheet_name = GSHEET_NAME
try:
# Try to open the spreadsheet by name
spreadsheet = client.open(custom_sheet_name)
is_new = False
except SpreadsheetNotFound:
# If it doesn't exist, create it
spreadsheet = client.create(custom_sheet_name)
# Optionally, share the new spreadsheet with designated emails
spreadsheet.share('[email protected]', perm_type='user', role='writer')
spreadsheet.share('[email protected]', perm_type='user', role='writer')
is_new = True
print("Spreadsheet ID:", spreadsheet.id)
# Access the first worksheet
sheet = spreadsheet.sheet1
# Check if the sheet has any rows yet
existing_values = sheet.get_all_values()
is_empty = (existing_values == [[]]) #indicates empty spreadsheet that was cleared in the past
# --- Always ensure header row is present and get headers ---
if (is_new or is_empty) and add_header_when_create_sheet:
# headers come from the keys of our row dict
if custom_row_dict is not None:
headers = list(custom_row_dict.keys())
else:
headers = list(user_data.keys())
sheet.append_row(headers)
else:
# Read headers from the first row of the sheet
headers = sheet.row_values(1) if sheet.row_count > 0 else []
# --- Build row aligned to headers ---
if custom_row_dict is not None:
# Ensure all values are aligned to headers, fill missing with ""
custom_row = [custom_row_dict.get(header, "") for header in headers]
else:
# Construct the default row with a timestamp and user_data fields
custom_row = [str(datetime.datetime.now()), user_data["question"], user_data["final_answer"], user_data["trace"]]
# Append the custom or default row to the sheet
sheet.append_row(custom_row)
def format_chat(response, tool_database_labels):
chat_history = []
# Keep track of the last assistant message's tool_calls
last_tool_calls = []
for msg in response:
if msg["role"] == "assistant":
content = msg.get("content", "")
# Extract tool_calls from this assistant message (if any)
last_tool_calls = json.loads(msg.get("tool_calls", "[]"))
# Emit the assistant's main message
chat_history.append(
gr.ChatMessage(role="assistant", content=content)
)
elif msg["role"] == "tool":
# For each tool response, we pair it with the corresponding call
for i, tool_call in enumerate(last_tool_calls):
name = tool_call.get("name", "")
args = tool_call.get("arguments", {})
# Determine icon + title
database_label = ""
if name == "Tool_RAG":
title = "🧰 Tool RAG"
else:
title = f"🛠️ {name}"
for db_label, tool_list in tool_database_labels.items():
if name in tool_list:
title = f"🛠️ {name}\n(**Info** {db_label} [Click to view])"
database_label = " (" + db_label + ")"
break
# Parse and pretty-print the tool response content
raw = msg.get("content", "")
try:
parsed = json.loads(raw)
pretty = json.dumps(parsed)
except json.JSONDecodeError:
pretty = raw
# Add as a single ChatMessage with metadata.title and metadata.log.
# Display the arguments as the first part of the content, clearly separated from the response,
# and display the tool response content as contiguous text.
chat_history.append(
gr.ChatMessage(
role="assistant",
content=f"Tool Response{database_label}:\n{pretty}",
metadata={
"title": title,
"log": json.dumps(args),
"status": 'done'
}
)
)
# Clear after rendering
last_tool_calls = []
# if chat_history:
# last_msg = chat_history[-1]
# if isinstance(last_msg.content, str) and "[FinalAnswer]" in last_msg.content:
# # Find the first assistant message
# for msg in chat_history:
# if msg.role == "assistant" and isinstance(msg.content, str):
# msg.content = "**Reasoning:**\n" + msg.content
# break
if chat_history:
last_msg = chat_history[-1]
if isinstance(last_msg.content, str) and "[FinalAnswer]" in last_msg.content:
last_msg.content = last_msg.content.replace("[FinalAnswer]", "\n**Answer:**\n")
final_answer_messages = [gr.ChatMessage(role="assistant", content=chat_history[-1].content.split("\n**Answer:**\n")[-1].strip())]
assistant_count = sum(1 for msg in chat_history if msg.role == "assistant")
if assistant_count == 1:
# If only one assistant message, show "No reasoning conducted."
reasoning_messages = [gr.ChatMessage(role="assistant", content="No reasoning was conducted.")]
else:
# Include ALL messages in reasoning_messages, including the last one
reasoning_messages = chat_history.copy()
return final_answer_messages, reasoning_messages, chat_history