import datetime # Imports required for Google Sheets integration import gspread import random import time import functools from gspread.exceptions import SpreadsheetNotFound, APIError from oauth2client.service_account import ServiceAccountCredentials import pandas as pd import json import gradio as gr import os GSERVICE_ACCOUNT_INFO = { "type": "service_account", "project_id": "txagent", "private_key_id": "cc1a12e427917244a93faf6f19e72b589a685e65", "private_key": None, "client_email": "shanghua@txagent.iam.gserviceaccount.com", "client_id": "108950722202634464257", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/shanghua%40txagent.iam.gserviceaccount.com", "universe_domain": "googleapis.com" } GSHEET_NAME = "TxAgent_data_collection" GSheet_API_KEY = os.environ.get("GSheets_Shanghua_PrivateKey") if GSheet_API_KEY is None: print("GSheet_API_KEY not found in environment variables. Please set it.") else: GSheet_API_KEY = GSheet_API_KEY.replace("\\n", "\n") GSERVICE_ACCOUNT_INFO["private_key"] = GSheet_API_KEY #Exponential backoff retry decorator def exponential_backoff_gspread(max_retries=30, max_backoff_sec=64, base_delay_sec=1, target_exception=APIError): """ Decorator to implement exponential backoff for gspread API calls. Retries a function call if it raises a specific exception (defaults to APIError) that matches the Google Sheets API rate limit error (HTTP 429). Args: max_retries (int): Maximum number of retry attempts. max_backoff_sec (int): Maximum delay between retries in seconds. base_delay_sec (int): Initial delay in seconds for the first retry. target_exception (Exception): The specific exception type to catch. """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): retries = 0 while True: # Loop indefinitely until success or max retries exceeded try: # Attempt to execute the wrapped function return func(*args, **kwargs) except target_exception as e: # Check if the error is the specific 429 Quota Exceeded error # We parse the string representation as gspread's APIError includes the status code there. error_message = str(e) is_rate_limit_error = "[429]" in error_message and ( "Quota exceeded" in error_message or "Too Many Requests" in error_message ) if is_rate_limit_error: retries += 1 if retries > max_retries: print(f"Max retries ({max_retries}) exceeded for {func.__name__}. Last error: {e}") raise e # Re-raise the last exception after exhausting retries # Calculate exponential backoff delay with random jitter (0-1 second) backoff_delay = min(max_backoff_sec, base_delay_sec * (2 ** (retries - 1)) + random.uniform(0, 1)) print( f"Rate limit hit for {func.__name__} (Attempt {retries}/{max_retries}). " f"Retrying in {backoff_delay:.2f} seconds. Error: {e}" ) time.sleep(backoff_delay) else: # If it's a different kind of APIError (e.g., 403 Forbidden, 404 Not Found), re-raise immediately. print(f"Non-rate-limit APIError encountered in {func.__name__}: {e}") raise e except Exception as e: # Catch any other unexpected exceptions during the function execution print(f"An unexpected error occurred in {func.__name__}: {e}") raise e # Re-raise unexpected errors return wrapper return decorator #2) Initialize Google Sheets client # Define the scopes scope = [ "https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive", ] # Authenticate immediately on import creds = ServiceAccountCredentials.from_json_keyfile_dict(GSERVICE_ACCOUNT_INFO, scope) client = gspread.authorize(creds) @exponential_backoff_gspread(max_retries=30, max_backoff_sec=64) def read_sheet_to_df(custom_sheet_name=None, sheet_index=0): """ Read all data from a Google Sheet into a pandas DataFrame. Parameters: custom_sheet_name (str): The name of the Google Sheet to open. If None, uses GSHEET_NAME. sheet_index (int): Index of the worksheet within the spreadsheet (default is 0, the first sheet). Returns: pandas.DataFrame: DataFrame containing the sheet data, with the first row used as headers. """ # Determine which sheet name to use if custom_sheet_name is None: custom_sheet_name = GSHEET_NAME # Open the spreadsheet try: spreadsheet = client.open(custom_sheet_name) except gspread.SpreadsheetNotFound: return None # Select the desired worksheet try: worksheet = spreadsheet.get_worksheet(sheet_index) except IndexError: return None # Fetch all data: first row as header, remaining as records data = worksheet.get_all_records() # Convert to DataFrame df = pd.DataFrame(data) return df @exponential_backoff_gspread(max_retries=30, max_backoff_sec=64) def append_to_sheet(user_data=None, custom_row_dict=None, custom_sheet_name=None, add_header_when_create_sheet=False): """ Append a new row to a Google Sheet. If 'custom_row' is provided, append that row. Otherwise, append a default row constructed from the provided user_data. Ensures that each value is aligned with the correct column header. """ if custom_sheet_name is None: custom_sheet_name = GSHEET_NAME try: # Try to open the spreadsheet by name spreadsheet = client.open(custom_sheet_name) is_new = False except SpreadsheetNotFound: # If it doesn't exist, create it spreadsheet = client.create(custom_sheet_name) # Optionally, share the new spreadsheet with designated emails spreadsheet.share('shanghuagao@gmail.com', perm_type='user', role='writer') spreadsheet.share('rzhu@college.harvard.edu', perm_type='user', role='writer') is_new = True print("Spreadsheet ID:", spreadsheet.id) # Access the first worksheet sheet = spreadsheet.sheet1 # Check if the sheet has any rows yet existing_values = sheet.get_all_values() is_empty = (existing_values == [[]]) #indicates empty spreadsheet that was cleared in the past # --- Always ensure header row is present and get headers --- if (is_new or is_empty) and add_header_when_create_sheet: # headers come from the keys of our row dict if custom_row_dict is not None: headers = list(custom_row_dict.keys()) else: headers = list(user_data.keys()) sheet.append_row(headers) else: # Read headers from the first row of the sheet headers = sheet.row_values(1) if sheet.row_count > 0 else [] # --- Build row aligned to headers --- if custom_row_dict is not None: # Ensure all values are aligned to headers, fill missing with "" custom_row = [custom_row_dict.get(header, "") for header in headers] else: # Construct the default row with a timestamp and user_data fields custom_row = [str(datetime.datetime.now()), user_data["question"], user_data["final_answer"], user_data["trace"]] # Append the custom or default row to the sheet sheet.append_row(custom_row) def format_chat(response, tool_database_labels): chat_history = [] # Keep track of the last assistant message's tool_calls last_tool_calls = [] for msg in response: if msg["role"] == "assistant": content = msg.get("content", "") # Extract tool_calls from this assistant message (if any) last_tool_calls = json.loads(msg.get("tool_calls", "[]")) # Emit the assistant's main message chat_history.append( gr.ChatMessage(role="assistant", content=content) ) elif msg["role"] == "tool": # For each tool response, we pair it with the corresponding call for i, tool_call in enumerate(last_tool_calls): name = tool_call.get("name", "") args = tool_call.get("arguments", {}) # Determine icon + title database_label = "" if name == "Tool_RAG": title = "🧰 Tool RAG" else: title = f"🛠️ {name}" for db_label, tool_list in tool_database_labels.items(): if name in tool_list: title = f"🛠️ {name}\n(**Info** {db_label} [Click to view])" database_label = " (" + db_label + ")" break # Parse and pretty-print the tool response content raw = msg.get("content", "") try: parsed = json.loads(raw) pretty = json.dumps(parsed) except json.JSONDecodeError: pretty = raw # Add as a single ChatMessage with metadata.title and metadata.log. # Display the arguments as the first part of the content, clearly separated from the response, # and display the tool response content as contiguous text. chat_history.append( gr.ChatMessage( role="assistant", content=f"Tool Response{database_label}:\n{pretty}", metadata={ "title": title, "log": json.dumps(args), "status": 'done' } ) ) # Clear after rendering last_tool_calls = [] # if chat_history: # last_msg = chat_history[-1] # if isinstance(last_msg.content, str) and "[FinalAnswer]" in last_msg.content: # # Find the first assistant message # for msg in chat_history: # if msg.role == "assistant" and isinstance(msg.content, str): # msg.content = "**Reasoning:**\n" + msg.content # break if chat_history: last_msg = chat_history[-1] if isinstance(last_msg.content, str) and "[FinalAnswer]" in last_msg.content: last_msg.content = last_msg.content.replace("[FinalAnswer]", "\n**Answer:**\n") final_answer_messages = [gr.ChatMessage(role="assistant", content=chat_history[-1].content.split("\n**Answer:**\n")[-1].strip())] assistant_count = sum(1 for msg in chat_history if msg.role == "assistant") if assistant_count == 1: # If only one assistant message, show "No reasoning conducted." reasoning_messages = [gr.ChatMessage(role="assistant", content="No reasoning was conducted.")] else: # Include ALL messages in reasoning_messages, including the last one reasoning_messages = chat_history.copy() return final_answer_messages, reasoning_messages, chat_history