File size: 11,370 Bytes
c4045a3 6df2faf c4045a3 6df2faf c4045a3 6df2faf c4045a3 6df2faf c4045a3 e7ba58a c4045a3 e7ba58a c4045a3 e7ba58a c4045a3 e7ba58a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
import datetime
# Imports required for Google Sheets integration
import gspread
import random
import time
import functools
from gspread.exceptions import SpreadsheetNotFound, APIError
from oauth2client.service_account import ServiceAccountCredentials
import pandas as pd
import json
import gradio as gr
import os
GSERVICE_ACCOUNT_INFO = {
"type": "service_account",
"project_id": "txagent",
"private_key_id": "cc1a12e427917244a93faf6f19e72b589a685e65",
"private_key": None,
"client_email": "[email protected]",
"client_id": "108950722202634464257",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/shanghua%40txagent.iam.gserviceaccount.com",
"universe_domain": "googleapis.com"
}
GSHEET_NAME = "TxAgent_data_collection"
GSheet_API_KEY = os.environ.get("GSheets_Shanghua_PrivateKey")
if GSheet_API_KEY is None:
print("GSheet_API_KEY not found in environment variables. Please set it.")
else:
GSheet_API_KEY = GSheet_API_KEY.replace("\\n", "\n")
GSERVICE_ACCOUNT_INFO["private_key"] = GSheet_API_KEY
#Exponential backoff retry decorator
def exponential_backoff_gspread(max_retries=30, max_backoff_sec=64, base_delay_sec=1, target_exception=APIError):
"""
Decorator to implement exponential backoff for gspread API calls.
Retries a function call if it raises a specific exception (defaults to APIError)
that matches the Google Sheets API rate limit error (HTTP 429).
Args:
max_retries (int): Maximum number of retry attempts.
max_backoff_sec (int): Maximum delay between retries in seconds.
base_delay_sec (int): Initial delay in seconds for the first retry.
target_exception (Exception): The specific exception type to catch.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while True: # Loop indefinitely until success or max retries exceeded
try:
# Attempt to execute the wrapped function
return func(*args, **kwargs)
except target_exception as e:
# Check if the error is the specific 429 Quota Exceeded error
# We parse the string representation as gspread's APIError includes the status code there.
error_message = str(e)
is_rate_limit_error = "[429]" in error_message and (
"Quota exceeded" in error_message or "Too Many Requests" in error_message
)
if is_rate_limit_error:
retries += 1
if retries > max_retries:
print(f"Max retries ({max_retries}) exceeded for {func.__name__}. Last error: {e}")
raise e # Re-raise the last exception after exhausting retries
# Calculate exponential backoff delay with random jitter (0-1 second)
backoff_delay = min(max_backoff_sec, base_delay_sec * (2 ** (retries - 1)) + random.uniform(0, 1))
print(
f"Rate limit hit for {func.__name__} (Attempt {retries}/{max_retries}). "
f"Retrying in {backoff_delay:.2f} seconds. Error: {e}"
)
time.sleep(backoff_delay)
else:
# If it's a different kind of APIError (e.g., 403 Forbidden, 404 Not Found), re-raise immediately.
print(f"Non-rate-limit APIError encountered in {func.__name__}: {e}")
raise e
except Exception as e:
# Catch any other unexpected exceptions during the function execution
print(f"An unexpected error occurred in {func.__name__}: {e}")
raise e # Re-raise unexpected errors
return wrapper
return decorator
#2) Initialize Google Sheets client
# Define the scopes
scope = [
"https://spreadsheets.google.com/feeds",
"https://www.googleapis.com/auth/drive",
]
# Authenticate immediately on import
creds = ServiceAccountCredentials.from_json_keyfile_dict(GSERVICE_ACCOUNT_INFO, scope)
client = gspread.authorize(creds)
@exponential_backoff_gspread(max_retries=30, max_backoff_sec=64)
def read_sheet_to_df(custom_sheet_name=None, sheet_index=0):
"""
Read all data from a Google Sheet into a pandas DataFrame.
Parameters:
custom_sheet_name (str): The name of the Google Sheet to open. If None, uses GSHEET_NAME.
sheet_index (int): Index of the worksheet within the spreadsheet (default is 0, the first sheet).
Returns:
pandas.DataFrame: DataFrame containing the sheet data, with the first row used as headers.
"""
# Determine which sheet name to use
if custom_sheet_name is None:
custom_sheet_name = GSHEET_NAME
# Open the spreadsheet
try:
spreadsheet = client.open(custom_sheet_name)
except gspread.SpreadsheetNotFound:
return None
# Select the desired worksheet
try:
worksheet = spreadsheet.get_worksheet(sheet_index)
except IndexError:
return None
# Fetch all data: first row as header, remaining as records
data = worksheet.get_all_records()
# Convert to DataFrame
df = pd.DataFrame(data)
return df
@exponential_backoff_gspread(max_retries=30, max_backoff_sec=64)
def append_to_sheet(user_data=None, custom_row_dict=None, custom_sheet_name=None, add_header_when_create_sheet=False):
"""
Append a new row to a Google Sheet. If 'custom_row' is provided, append that row.
Otherwise, append a default row constructed from the provided user_data.
Ensures that each value is aligned with the correct column header.
"""
if custom_sheet_name is None:
custom_sheet_name = GSHEET_NAME
try:
# Try to open the spreadsheet by name
spreadsheet = client.open(custom_sheet_name)
is_new = False
except SpreadsheetNotFound:
# If it doesn't exist, create it
spreadsheet = client.create(custom_sheet_name)
# Optionally, share the new spreadsheet with designated emails
spreadsheet.share('[email protected]', perm_type='user', role='writer')
spreadsheet.share('[email protected]', perm_type='user', role='writer')
is_new = True
print("Spreadsheet ID:", spreadsheet.id)
# Access the first worksheet
sheet = spreadsheet.sheet1
# Check if the sheet has any rows yet
existing_values = sheet.get_all_values()
is_empty = (existing_values == [[]]) #indicates empty spreadsheet that was cleared in the past
# --- Always ensure header row is present and get headers ---
if (is_new or is_empty) and add_header_when_create_sheet:
# headers come from the keys of our row dict
if custom_row_dict is not None:
headers = list(custom_row_dict.keys())
else:
headers = list(user_data.keys())
sheet.append_row(headers)
else:
# Read headers from the first row of the sheet
headers = sheet.row_values(1) if sheet.row_count > 0 else []
# --- Build row aligned to headers ---
if custom_row_dict is not None:
# Ensure all values are aligned to headers, fill missing with ""
custom_row = [custom_row_dict.get(header, "") for header in headers]
else:
# Construct the default row with a timestamp and user_data fields
custom_row = [str(datetime.datetime.now()), user_data["question"], user_data["final_answer"], user_data["trace"]]
# Append the custom or default row to the sheet
sheet.append_row(custom_row)
def format_chat(response, tool_database_labels):
chat_history = []
# Keep track of the last assistant message's tool_calls
last_tool_calls = []
for msg in response:
if msg["role"] == "assistant":
content = msg.get("content", "")
# Extract tool_calls from this assistant message (if any)
last_tool_calls = json.loads(msg.get("tool_calls", "[]"))
# Emit the assistant's main message
chat_history.append(
gr.ChatMessage(role="assistant", content=content)
)
elif msg["role"] == "tool":
# For each tool response, we pair it with the corresponding call
for i, tool_call in enumerate(last_tool_calls):
name = tool_call.get("name", "")
args = tool_call.get("arguments", {})
# Determine icon + title
database_label = ""
if name == "Tool_RAG":
title = "🧰 Tool RAG"
else:
title = f"🛠️ {name}"
for db_label, tool_list in tool_database_labels.items():
if name in tool_list:
title = f"🛠️ {name}\n(**Info** {db_label} [Click to view])"
database_label = " (" + db_label + ")"
break
# Parse and pretty-print the tool response content
raw = msg.get("content", "")
try:
parsed = json.loads(raw)
pretty = json.dumps(parsed)
except json.JSONDecodeError:
pretty = raw
# Add as a single ChatMessage with metadata.title and metadata.log.
# Display the arguments as the first part of the content, clearly separated from the response,
# and display the tool response content as contiguous text.
chat_history.append(
gr.ChatMessage(
role="assistant",
content=f"Tool Response{database_label}:\n{pretty}",
metadata={
"title": title,
"log": json.dumps(args),
"status": 'done'
}
)
)
# Clear after rendering
last_tool_calls = []
# Post-process: replace [FinalAnswer] with "\n***Answer:**\n" in the last message's content if present
# Insert "**Rasoning:**\n" at the beginning of the first assistant message if [FinalAnswer] is in the last assistant message
if chat_history:
last_msg = chat_history[-1]
if isinstance(last_msg.content, str) and "[FinalAnswer]" in last_msg.content:
# Find the first assistant message
for msg in chat_history:
if msg.role == "assistant" and isinstance(msg.content, str):
msg.content = "**Reasoning:**\n" + msg.content
break
if chat_history:
last_msg = chat_history[-1]
if isinstance(last_msg.content, str) and "[FinalAnswer]" in last_msg.content:
last_msg.content = last_msg.content.replace("[FinalAnswer]", "\n**Answer:**\n")
return chat_history |