File size: 11,370 Bytes
c4045a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6df2faf
c4045a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6df2faf
c4045a3
 
 
 
 
 
 
6df2faf
 
 
 
 
c4045a3
6df2faf
 
c4045a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e7ba58a
c4045a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e7ba58a
c4045a3
e7ba58a
 
 
c4045a3
 
 
 
 
 
e7ba58a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import datetime
# Imports required for Google Sheets integration
import gspread
import random
import time
import functools
from gspread.exceptions import SpreadsheetNotFound, APIError
from oauth2client.service_account import ServiceAccountCredentials
import pandas as pd
import json
import gradio as gr
import os

GSERVICE_ACCOUNT_INFO = {
  "type": "service_account",
  "project_id": "txagent",
  "private_key_id": "cc1a12e427917244a93faf6f19e72b589a685e65",
  "private_key": None,
  "client_email": "[email protected]",
  "client_id": "108950722202634464257",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/shanghua%40txagent.iam.gserviceaccount.com",
  "universe_domain": "googleapis.com"
}
GSHEET_NAME = "TxAgent_data_collection"

GSheet_API_KEY = os.environ.get("GSheets_Shanghua_PrivateKey")
if GSheet_API_KEY is None:
    print("GSheet_API_KEY not found in environment variables. Please set it.")
else:
    GSheet_API_KEY = GSheet_API_KEY.replace("\\n", "\n")
    GSERVICE_ACCOUNT_INFO["private_key"] = GSheet_API_KEY

#Exponential backoff retry decorator
def exponential_backoff_gspread(max_retries=30, max_backoff_sec=64, base_delay_sec=1, target_exception=APIError):
    """
    Decorator to implement exponential backoff for gspread API calls.

    Retries a function call if it raises a specific exception (defaults to APIError)
    that matches the Google Sheets API rate limit error (HTTP 429).

    Args:
        max_retries (int): Maximum number of retry attempts.
        max_backoff_sec (int): Maximum delay between retries in seconds.
        base_delay_sec (int): Initial delay in seconds for the first retry.
        target_exception (Exception): The specific exception type to catch.
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            while True: # Loop indefinitely until success or max retries exceeded
                try:
                    # Attempt to execute the wrapped function
                    return func(*args, **kwargs)
                except target_exception as e:
                    # Check if the error is the specific 429 Quota Exceeded error
                    # We parse the string representation as gspread's APIError includes the status code there.
                    error_message = str(e)
                    is_rate_limit_error = "[429]" in error_message and (
                        "Quota exceeded" in error_message or "Too Many Requests" in error_message
                    )

                    if is_rate_limit_error:
                        retries += 1
                        if retries > max_retries:
                            print(f"Max retries ({max_retries}) exceeded for {func.__name__}. Last error: {e}")
                            raise e # Re-raise the last exception after exhausting retries

                        # Calculate exponential backoff delay with random jitter (0-1 second)
                        backoff_delay = min(max_backoff_sec, base_delay_sec * (2 ** (retries - 1)) + random.uniform(0, 1))

                        print(
                            f"Rate limit hit for {func.__name__} (Attempt {retries}/{max_retries}). "
                            f"Retrying in {backoff_delay:.2f} seconds. Error: {e}"
                        )
                        time.sleep(backoff_delay)
                    else:
                        # If it's a different kind of APIError (e.g., 403 Forbidden, 404 Not Found), re-raise immediately.
                        print(f"Non-rate-limit APIError encountered in {func.__name__}: {e}")
                        raise e
                except Exception as e:
                    # Catch any other unexpected exceptions during the function execution
                    print(f"An unexpected error occurred in {func.__name__}: {e}")
                    raise e # Re-raise unexpected errors
        return wrapper
    return decorator

#2) Initialize Google Sheets client
# Define the scopes
scope = [
    "https://spreadsheets.google.com/feeds",
    "https://www.googleapis.com/auth/drive",
]

# Authenticate immediately on import
creds = ServiceAccountCredentials.from_json_keyfile_dict(GSERVICE_ACCOUNT_INFO, scope)
client = gspread.authorize(creds)

@exponential_backoff_gspread(max_retries=30, max_backoff_sec=64)
def read_sheet_to_df(custom_sheet_name=None, sheet_index=0):
    """
    Read all data from a Google Sheet into a pandas DataFrame.

    Parameters:
        custom_sheet_name (str): The name of the Google Sheet to open. If None, uses GSHEET_NAME.
        sheet_index (int): Index of the worksheet within the spreadsheet (default is 0, the first sheet).

    Returns:
        pandas.DataFrame: DataFrame containing the sheet data, with the first row used as headers.
    """

    # Determine which sheet name to use
    if custom_sheet_name is None:
        custom_sheet_name = GSHEET_NAME

    # Open the spreadsheet
    try:
        spreadsheet = client.open(custom_sheet_name)
    except gspread.SpreadsheetNotFound:
        return None

    # Select the desired worksheet
    try:
        worksheet = spreadsheet.get_worksheet(sheet_index)
    except IndexError:
        return None

    # Fetch all data: first row as header, remaining as records
    data = worksheet.get_all_records()

    # Convert to DataFrame
    df = pd.DataFrame(data)

    return df

@exponential_backoff_gspread(max_retries=30, max_backoff_sec=64)
def append_to_sheet(user_data=None, custom_row_dict=None, custom_sheet_name=None, add_header_when_create_sheet=False):
    """
    Append a new row to a Google Sheet. If 'custom_row' is provided, append that row.
    Otherwise, append a default row constructed from the provided user_data.
    Ensures that each value is aligned with the correct column header.
    """
    if custom_sheet_name is None:
        custom_sheet_name = GSHEET_NAME
    
    try:
        # Try to open the spreadsheet by name
        spreadsheet = client.open(custom_sheet_name)
        is_new = False
    except SpreadsheetNotFound:
        # If it doesn't exist, create it
        spreadsheet = client.create(custom_sheet_name)
        # Optionally, share the new spreadsheet with designated emails
        spreadsheet.share('[email protected]', perm_type='user', role='writer')
        spreadsheet.share('[email protected]', perm_type='user', role='writer')
        is_new = True

    print("Spreadsheet ID:", spreadsheet.id)
    # Access the first worksheet
    sheet = spreadsheet.sheet1

    # Check if the sheet has any rows yet
    existing_values = sheet.get_all_values()
    is_empty = (existing_values == [[]]) #indicates empty spreadsheet that was cleared in the past

    # --- Always ensure header row is present and get headers ---
    if (is_new or is_empty) and add_header_when_create_sheet:
        # headers come from the keys of our row dict
        if custom_row_dict is not None:
            headers = list(custom_row_dict.keys())
        else:
            headers = list(user_data.keys())
        sheet.append_row(headers)
    else:
        # Read headers from the first row of the sheet
        headers = sheet.row_values(1) if sheet.row_count > 0 else []

    # --- Build row aligned to headers ---
    if custom_row_dict is not None:
        # Ensure all values are aligned to headers, fill missing with ""
        custom_row = [custom_row_dict.get(header, "") for header in headers]
    else:
        # Construct the default row with a timestamp and user_data fields
        custom_row = [str(datetime.datetime.now()), user_data["question"], user_data["final_answer"], user_data["trace"]]
    
    # Append the custom or default row to the sheet
    sheet.append_row(custom_row)

def format_chat(response, tool_database_labels):
    chat_history = []
    # Keep track of the last assistant message's tool_calls
    last_tool_calls = []

    for msg in response:
        if msg["role"] == "assistant":
            content = msg.get("content", "")
            # Extract tool_calls from this assistant message (if any)
            last_tool_calls = json.loads(msg.get("tool_calls", "[]"))
            # Emit the assistant's main message
            chat_history.append(
                gr.ChatMessage(role="assistant", content=content)
            )

        elif msg["role"] == "tool":
            # For each tool response, we pair it with the corresponding call
            for i, tool_call in enumerate(last_tool_calls):
                name = tool_call.get("name", "")
                args = tool_call.get("arguments", {})

                # Determine icon + title
                database_label = ""
                if name == "Tool_RAG":
                    title = "🧰 Tool RAG"
                else:
                    title = f"🛠️ {name}"
                    for db_label, tool_list in tool_database_labels.items():
                        if name in tool_list:
                            title = f"🛠️ {name}\n(**Info** {db_label} [Click to view])"
                            database_label = " (" + db_label + ")"
                            break

                # Parse and pretty-print the tool response content
                raw = msg.get("content", "")
                try:
                    parsed = json.loads(raw)
                    pretty = json.dumps(parsed)
                except json.JSONDecodeError:
                    pretty = raw

                # Add as a single ChatMessage with metadata.title and metadata.log.
                # Display the arguments as the first part of the content, clearly separated from the response,
                # and display the tool response content as contiguous text.
                chat_history.append(
                    gr.ChatMessage(
                        role="assistant",
                        content=f"Tool Response{database_label}:\n{pretty}",
                        metadata={
                            "title": title,
                            "log":  json.dumps(args),
                            "status": 'done'
                        }
                    )
                )

            # Clear after rendering
            last_tool_calls = []
    # Post-process: replace [FinalAnswer] with "\n***Answer:**\n" in the last message's content if present
    # Insert "**Rasoning:**\n" at the beginning of the first assistant message if [FinalAnswer] is in the last assistant message
    if chat_history:
        last_msg = chat_history[-1]
        if isinstance(last_msg.content, str) and "[FinalAnswer]" in last_msg.content:
            # Find the first assistant message
            for msg in chat_history:
                if msg.role == "assistant" and isinstance(msg.content, str):
                    msg.content = "**Reasoning:**\n" + msg.content
                    break
    if chat_history:
        last_msg = chat_history[-1]
        if isinstance(last_msg.content, str) and "[FinalAnswer]" in last_msg.content:
            last_msg.content = last_msg.content.replace("[FinalAnswer]", "\n**Answer:**\n")
    return chat_history