Spaces:
Sleeping
Sleeping
# Copyright 2025 Jesus Vilela Jato. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# https://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# --- Imports --- | |
import os | |
import gradio as gr | |
import requests | |
import pandas as pd | |
import json | |
import logging | |
from typing import Optional, List, Dict, Any, Tuple, Union, Type, TYPE_CHECKING, Annotated | |
import hashlib | |
from urllib.parse import urlparse | |
import mimetypes | |
import subprocess # For yt-dlp | |
import io # For BytesIO with PIL | |
# --- Global Variables for Startup Status --- | |
missing_vars_startup_list_global = [] | |
agent_pre_init_status_msg_global = "Agent status will be determined at startup." | |
# File Processing Libs | |
try: from PyPDF2 import PdfReader; PYPDF2_AVAILABLE = True | |
except ImportError: PYPDF2_AVAILABLE = False; print("WARNING: PyPDF2 not found, PDF tool will be disabled.") | |
try: from PIL import Image; import pytesseract; PIL_TESSERACT_AVAILABLE = True | |
except ImportError: PIL_TESSERACT_AVAILABLE = False; print("WARNING: Pillow or Pytesseract not found, OCR tool will be disabled.") | |
try: import whisper; WHISPER_AVAILABLE = True | |
except ImportError: WHISPER_AVAILABLE = False; print("WARNING: OpenAI Whisper not found, Audio Transcription tool will be disabled.") | |
# Google GenAI SDK types | |
from google.genai.types import HarmCategory, HarmBlockThreshold | |
from google.ai import generativelanguage as glm # For FileState enum | |
# LangChain | |
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage | |
from langchain.prompts import PromptTemplate | |
from langchain.tools import BaseTool, tool as lc_tool_decorator | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain.agents import AgentExecutor, create_react_agent | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_experimental.tools import PythonREPLTool | |
# LangGraph Conditional Imports | |
if TYPE_CHECKING: | |
from langgraph.graph import StateGraph as StateGraphAliasedForHinting | |
from langgraph.prebuilt import ToolNode as ToolExecutorAliasedForHinting | |
from typing_extensions import TypedDict | |
from langgraph.checkpoint.base import BaseCheckpointSaver | |
LANGGRAPH_FLAVOR_AVAILABLE = False | |
LG_StateGraph: Optional[Type[Any]] = None | |
LG_ToolExecutor_Class: Optional[Type[Any]] = None | |
LG_END: Optional[Any] = None | |
LG_ToolInvocation: Optional[Type[Any]] = None | |
add_messages: Optional[Any] = None | |
MemorySaver_Class: Optional[Type[Any]] = None | |
AGENT_INSTANCE: Optional[Union[AgentExecutor, Any]] = None | |
TOOLS: List[BaseTool] = [] | |
LLM_INSTANCE: Optional[ChatGoogleGenerativeAI] = None | |
LANGGRAPH_MEMORY_SAVER: Optional[Any] = None | |
# google-genai Client SDK | |
from google import genai as google_genai_sdk | |
google_genai_client: Optional[google_genai_sdk.Client] = None | |
try: | |
from langgraph.graph import StateGraph, END | |
try: | |
from langgraph.prebuilt import ToolNode | |
LG_ToolExecutor_Class = ToolNode | |
print("Using langgraph.prebuilt.ToolNode for LangGraph tool execution.") | |
except ImportError: | |
try: | |
from langgraph.prebuilt import ToolExecutor | |
LG_ToolExecutor_Class = ToolExecutor | |
print("Using langgraph.prebuilt.ToolExecutor (fallback) for LangGraph tool execution.") | |
except ImportError as e_lg_exec_inner: | |
print(f"Failed to import ToolNode and ToolExecutor from langgraph.prebuilt: {e_lg_exec_inner}") | |
LG_ToolExecutor_Class = None | |
if LG_ToolExecutor_Class is not None: | |
try: | |
from langgraph.prebuilt import ToolInvocation as LGToolInvocationActual | |
except ImportError: | |
try: | |
from langgraph.tools import ToolInvocation as LGToolInvocationActual | |
print("Imported ToolInvocation from langgraph.tools") | |
except ImportError as e_ti: | |
print(f"WARNING: Could not import ToolInvocation from langgraph.prebuilt or langgraph.tools: {e_ti}") | |
LGToolInvocationActual = None # type: ignore | |
if LGToolInvocationActual is not None or type(LG_ToolExecutor_Class).__name__ == 'ToolNode': | |
from langgraph.graph.message import add_messages as lg_add_messages | |
from langgraph.checkpoint.memory import MemorySaver as LGMemorySaver | |
LANGGRAPH_FLAVOR_AVAILABLE = True | |
LG_StateGraph, LG_END, LG_ToolInvocation, add_messages, MemorySaver_Class = \ | |
StateGraph, END, LGToolInvocationActual, lg_add_messages, LGMemorySaver # type: ignore | |
print("Successfully imported essential LangGraph components.") | |
else: | |
LANGGRAPH_FLAVOR_AVAILABLE = False | |
LG_StateGraph, LG_END, add_messages, MemorySaver_Class = (None,) * 4 # type: ignore | |
print(f"WARNING: LangGraph ToolInvocation not found and may be required by older ToolExecutor. LangGraph agent functionality might be limited or disabled.") | |
else: | |
LANGGRAPH_FLAVOR_AVAILABLE = False | |
LG_StateGraph, LG_END, LG_ToolInvocation, add_messages, MemorySaver_Class = (None,) * 5 # type: ignore | |
print(f"WARNING: No suitable LangGraph tool executor (ToolNode/ToolExecutor) found. LangGraph agent will be disabled.") | |
except ImportError as e: | |
LANGGRAPH_FLAVOR_AVAILABLE = False | |
LG_StateGraph, LG_ToolExecutor_Class, LG_END, LG_ToolInvocation, add_messages, MemorySaver_Class = (None,) * 6 | |
print(f"WARNING: Core LangGraph components (like StateGraph, END) not found or import error: {e}. LangGraph agent will be disabled.") | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
GEMINI_MODEL_NAME = "gemini-2.5-pro" | |
GEMINI_FLASH_MULTIMODAL_MODEL_NAME = "gemini-1.5-flash-latest" | |
SCORING_API_BASE_URL = os.getenv("SCORING_API_URL", DEFAULT_API_URL) | |
MAX_FILE_SIZE_BYTES = 50 * 1024 * 1024 | |
LOCAL_FILE_STORE_PATH = "./Data" | |
os.makedirs(LOCAL_FILE_STORE_PATH, exist_ok=True) | |
# --- Global State --- | |
WHISPER_MODEL: Optional[Any] = None | |
# --- Environment Variables & API Keys --- | |
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") | |
HUGGINGFACE_TOKEN = os.environ.get("HF_TOKEN") | |
TAVILY_API_KEY = os.environ.get("TAVILY_API_KEY") | |
# --- Setup Logging --- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(module)s:%(lineno)d - %(message)s') | |
logger = logging.getLogger(__name__) | |
# --- Initialize google-genai Client SDK --- | |
if GOOGLE_API_KEY: | |
try: | |
google_genai_client = google_genai_sdk.Client(api_key=GOOGLE_API_KEY) | |
logger.info("google-genai SDK Client initialized successfully.") | |
except Exception as e: | |
logger.error(f"Failed to initialize google-genai SDK Client: {e}") | |
google_genai_client = None | |
else: | |
logger.warning("GOOGLE_API_KEY not found. google-genai SDK Client not initialized.") | |
# --- Helper Functions (Unchanged) --- | |
def _strip_exact_match_answer(text: Any) -> str: | |
if not isinstance(text, str): text = str(text) | |
text_lower_check = text.lower() | |
if text_lower_check.startswith("final answer:"): | |
text = text[len("final answer:"):].strip() | |
text = text.strip() | |
if text.startswith("```") and text.endswith("```"): | |
if "\n" in text: | |
text_content = text.split("\n", 1)[1] if len(text.split("\n", 1)) > 1 else "" | |
text = text_content.strip()[:-3].strip() if text_content.strip().endswith("```") else text[3:-3].strip() | |
else: text = text[3:-3].strip() | |
elif text.startswith("`") and text.endswith("`"): text = text[1:-1].strip() | |
if (text.startswith('"') and text.endswith('"')) or \ | |
(text.startswith("'") and text.endswith("'")): | |
if len(text) > 1: text = text[1:-1] | |
return text.strip() | |
def _is_full_url(url_string: str) -> bool: | |
try: result = urlparse(url_string); return all([result.scheme, result.netloc]) | |
except ValueError: return False | |
def _is_youtube_url(url: str) -> bool: | |
parsed_url = urlparse(url) | |
return parsed_url.netloc.lower().endswith(("youtube.com", "youtu.be")) | |
def _download_file(file_identifier: str, task_id_for_file: Optional[str] = None) -> str: | |
os.makedirs(LOCAL_FILE_STORE_PATH, exist_ok=True) | |
logger.debug(f"Download request: '{file_identifier}', task_id: {task_id_for_file}") | |
original_filename = os.path.basename(urlparse(file_identifier).path) if _is_full_url(file_identifier) else os.path.basename(file_identifier) | |
if not original_filename or original_filename == '/': | |
original_filename = hashlib.md5(file_identifier.encode()).hexdigest()[:12] + ".download" | |
prefix = f"{task_id_for_file}_" if task_id_for_file else "" | |
sanitized_original_filename = "".join(c if c.isalnum() or c in ['.', '_', '-'] else '_' for c in original_filename) | |
tentative_local_path = os.path.join(LOCAL_FILE_STORE_PATH, f"{prefix}{sanitized_original_filename}") | |
if _is_full_url(file_identifier) and _is_youtube_url(file_identifier): | |
logger.info(f"YouTube URL: {file_identifier}. Using yt-dlp.") | |
yt_file_hash = hashlib.md5(file_identifier.encode()).hexdigest()[:10] | |
yt_filename_base = f"youtube_{prefix}{yt_file_hash}" | |
target_mp3_path = os.path.join(LOCAL_FILE_STORE_PATH, yt_filename_base + ".mp3") | |
if os.path.exists(target_mp3_path) and os.path.getsize(target_mp3_path) > 0: | |
logger.info(f"Cached YouTube MP3: {target_mp3_path}"); return target_mp3_path | |
temp_output_template = os.path.join(LOCAL_FILE_STORE_PATH, yt_filename_base + "_temp.%(ext)s") | |
try: | |
command = ['yt-dlp', '--quiet', '--no-warnings', '-x', '--audio-format', 'mp3', | |
'--audio-quality', '0', '--max-filesize', str(MAX_FILE_SIZE_BYTES), | |
'-o', temp_output_template, file_identifier] | |
logger.info(f"yt-dlp command: {' '.join(command)}") | |
process = subprocess.run(command, capture_output=True, text=True, timeout=180, check=False) | |
downloaded_temp_file = next((os.path.join(LOCAL_FILE_STORE_PATH, f) for f in os.listdir(LOCAL_FILE_STORE_PATH) | |
if f.startswith(yt_filename_base + "_temp") and f.endswith(".mp3")), None) | |
if process.returncode == 0 and downloaded_temp_file and os.path.exists(downloaded_temp_file): | |
os.rename(downloaded_temp_file, target_mp3_path) | |
logger.info(f"yt-dlp success: {target_mp3_path}"); return target_mp3_path | |
else: | |
err_msg = process.stderr.strip() if process.stderr else "Unknown yt-dlp error" | |
logger.error(f"yt-dlp failed. RC:{process.returncode}. File:{downloaded_temp_file}. Err:{err_msg[:500]}") | |
if downloaded_temp_file and os.path.exists(downloaded_temp_file): os.remove(downloaded_temp_file) | |
return f"Error: yt-dlp failed. Msg:{err_msg[:200]}" | |
except Exception as e: logger.error(f"yt-dlp exception: {e}", exc_info=True); return f"Error: yt-dlp exception: {str(e)[:200]}" | |
file_url_to_try = file_identifier if _is_full_url(file_identifier) else None | |
if not file_url_to_try and task_id_for_file: | |
file_url_to_try = f"{SCORING_API_BASE_URL.rstrip('/')}/files/{task_id_for_file}" | |
elif not file_url_to_try: | |
if os.path.exists(file_identifier): logger.info(f"Using local file: {file_identifier}"); return file_identifier | |
return f"Error: Not URL, not local, no task_id for '{file_identifier}'." | |
if os.path.exists(tentative_local_path) and os.path.getsize(tentative_local_path) > 0: | |
logger.info(f"Cached file (pre-CD): {tentative_local_path}"); return tentative_local_path | |
effective_save_path = tentative_local_path | |
try: | |
auth_headers = {"Authorization": f"Bearer {HUGGINGFACE_TOKEN}"} if HUGGINGFACE_TOKEN and \ | |
any(s in file_url_to_try for s in [SCORING_API_BASE_URL, ".hf.space", "huggingface.co"]) else {} | |
logger.info(f"Standard download: {file_url_to_try} (Headers: {list(auth_headers.keys())})") | |
with requests.get(file_url_to_try, stream=True, headers=auth_headers, timeout=60) as r: | |
r.raise_for_status() | |
cd_header = r.headers.get('content-disposition') | |
filename_from_cd = None | |
if cd_header: | |
try: | |
decoded_cd_header = cd_header.encode('latin-1', 'replace').decode('utf-8', 'replace') | |
_, params = requests.utils.parse_header_links(decoded_cd_header) # type: ignore | |
for key, val in params.items(): | |
if key.lower() == 'filename*' and val.lower().startswith("utf-8''"): | |
filename_from_cd = requests.utils.unquote(val[len("utf-8''"):]); break | |
elif key.lower() == 'filename': | |
filename_from_cd = requests.utils.unquote(val) | |
if filename_from_cd.startswith('"') and filename_from_cd.endswith('"'): filename_from_cd = filename_from_cd[1:-1] | |
break | |
except Exception as e_cd: logger.warning(f"CD parse error '{cd_header}': {e_cd}") | |
if filename_from_cd: | |
sanitized_cd_filename = "".join(c if c.isalnum() or c in ['.', '_', '-'] else '_' for c in filename_from_cd) | |
effective_save_path = os.path.join(LOCAL_FILE_STORE_PATH, f"{prefix}{sanitized_cd_filename}") | |
logger.info(f"Using CD filename: '{sanitized_cd_filename}'. Path: {effective_save_path}") | |
name_without_ext, current_ext = os.path.splitext(effective_save_path) | |
if not current_ext: | |
content_type_header = r.headers.get('content-type', '') | |
content_type_val = content_type_header.split(';').strip() if content_type_header else '' | |
if content_type_val: | |
guessed_ext = mimetypes.guess_extension(content_type_val) | |
if guessed_ext: effective_save_path += guessed_ext; logger.info(f"Added guessed ext: {guessed_ext}") | |
if effective_save_path != tentative_local_path and os.path.exists(effective_save_path) and os.path.getsize(effective_save_path) > 0: | |
logger.info(f"Cached file (CD name): {effective_save_path}"); return effective_save_path | |
with open(effective_save_path, "wb") as f_download: | |
for chunk in r.iter_content(chunk_size=1024*1024): f_download.write(chunk) | |
logger.info(f"File downloaded to {effective_save_path}"); return effective_save_path | |
except requests.exceptions.HTTPError as e: | |
err_msg = f"HTTP {e.response.status_code} for {file_url_to_try}. Detail: {e.response.text[:100]}" | |
logger.error(err_msg, exc_info=False); return f"Error downloading: {err_msg}" | |
except Exception as e: | |
logger.error(f"Download error for {file_url_to_try}: {e}", exc_info=True); return f"Error: {str(e)[:100]}" | |
# --- Tool Function Definitions --- | |
def read_pdf_tool(action_input_json_str: str) -> str: | |
"""Reads text content from a PDF file. Input: JSON '{\"file_identifier\": \"FILENAME_OR_URL\", \"task_id\": \"TASK_ID_IF_GAIA_FILENAME_ONLY\"}'. Returns extracted text.""" | |
if not PYPDF2_AVAILABLE: return "Error: PyPDF2 not installed." | |
try: data = json.loads(action_input_json_str); file_id, task_id = data.get("file_identifier"), data.get("task_id") | |
except Exception as e: return f"Error parsing JSON for read_pdf_tool: {e}. Input: {action_input_json_str}" | |
if not file_id: return "Error: 'file_identifier' missing." | |
path = _download_file(file_id, task_id) | |
if path.startswith("Error:"): return path | |
try: | |
text_content = ""; | |
with open(path, "rb") as f_pdf: | |
reader = PdfReader(f_pdf) | |
if reader.is_encrypted: | |
try: reader.decrypt('') | |
except: return f"Error: PDF '{path}' encrypted." | |
for page_num in range(len(reader.pages)): | |
page = reader.pages[page_num] | |
text_content += page.extract_text() + "\n\n" | |
return text_content[:40000] | |
except Exception as e: return f"Error reading PDF '{path}': {e}" | |
def ocr_image_tool(action_input_json_str: str) -> str: | |
"""Extracts text from an image using OCR. Input: JSON '{\"file_identifier\": \"FILENAME_OR_URL\", \"task_id\": \"TASK_ID_IF_GAIA_FILENAME_ONLY\"}'. Returns extracted text.""" | |
if not PIL_TESSERACT_AVAILABLE: return "Error: Pillow/Pytesseract not installed." | |
try: data = json.loads(action_input_json_str); file_id, task_id = data.get("file_identifier"), data.get("task_id") | |
except Exception as e: return f"Error parsing JSON for ocr_image_tool: {e}. Input: {action_input_json_str}" | |
if not file_id: return "Error: 'file_identifier' missing." | |
path = _download_file(file_id, task_id) | |
if path.startswith("Error:"): return path | |
try: return pytesseract.image_to_string(Image.open(path))[:40000] | |
except Exception as e: return f"Error OCR'ing '{path}': {e}" | |
def transcribe_audio_tool(action_input_json_str: str) -> str: | |
"""Transcribes speech from an audio file (or YouTube URL) to text. Input: JSON '{\"file_identifier\": \"FILENAME_OR_URL_OR_YOUTUBE_URL\", \"task_id\": \"TASK_ID_IF_GAIA_FILENAME_ONLY\"}'. Returns transcript.""" | |
global WHISPER_MODEL | |
if not WHISPER_AVAILABLE: return "Error: Whisper not installed." | |
try: data = json.loads(action_input_json_str); file_id, task_id = data.get("file_identifier"), data.get("task_id") | |
except Exception as e: return f"Error parsing JSON for transcribe_audio_tool: {e}. Input: {action_input_json_str}" | |
if not file_id: return "Error: 'file_identifier' missing." | |
if WHISPER_MODEL is None: | |
try: WHISPER_MODEL = whisper.load_model("base"); logger.info("Whisper 'base' model loaded.") | |
except Exception as e: logger.error(f"Whisper load failed: {e}"); return f"Error: Whisper load: {e}" | |
path = _download_file(file_id, task_id) | |
if path.startswith("Error:"): return path | |
try: result = WHISPER_MODEL.transcribe(path, fp16=False); return result["text"][:40000] # type: ignore | |
except Exception as e: logger.error(f"Whisper error on '{path}': {e}", exc_info=True); return f"Error transcribing '{path}': {e}" | |
def direct_multimodal_gemini_tool(action_input_json_str: str) -> str: | |
"""Processes an image file (URL or local path) along with a text prompt using a Gemini multimodal model (gemini-1.5-flash-latest) for tasks like image description, Q&A about the image, or text generation based on the image. Input: JSON '{\"file_identifier\": \"IMAGE_FILENAME_OR_URL\", \"text_prompt\": \"Your question or instruction related to the image.\", \"task_id\": \"TASK_ID_IF_GAIA_FILENAME_ONLY\" (optional)}'. Returns the model's text response.""" | |
global google_genai_client | |
if not google_genai_client: return "Error: google-genai SDK client not initialized." | |
if not PIL_TESSERACT_AVAILABLE : return "Error: Pillow (PIL) library not available for image processing." | |
try: | |
data = json.loads(action_input_json_str) | |
file_identifier = data.get("file_identifier") | |
text_prompt = data.get("text_prompt", "Describe this image.") | |
task_id = data.get("task_id") | |
if not file_identifier: return "Error: 'file_identifier' for image missing." | |
logger.info(f"Direct Multimodal Tool: Processing image '{file_identifier}' with prompt '{text_prompt}'") | |
local_image_path = _download_file(file_identifier, task_id) | |
if local_image_path.startswith("Error:"): return f"Error downloading image for Direct MM Tool: {local_image_path}" | |
try: | |
pil_image = Image.open(local_image_path) | |
except Exception as e_img_open: return f"Error opening image file {local_image_path}: {str(e_img_open)}" | |
model_id_for_client = f"models/{GEMINI_FLASH_MULTIMODAL_MODEL_NAME}" if not GEMINI_FLASH_MULTIMODAL_MODEL_NAME.startswith("models/") else GEMINI_FLASH_MULTIMODAL_MODEL_NAME | |
response = google_genai_client.models.generate_content( | |
model=model_id_for_client, contents=[pil_image, text_prompt] | |
) | |
logger.info(f"Direct Multimodal Tool: Response received from {model_id_for_client} received.") | |
return response.text[:40000] | |
except json.JSONDecodeError as e_json_mm: return f"Error parsing JSON input for Direct MM Tool: {str(e_json_mm)}. Input: {action_input_json_str}" | |
except Exception as e_tool_mm: | |
logger.error(f"Error in direct_multimodal_gemini_tool: {e_tool_mm}", exc_info=True) | |
return f"Error executing Direct Multimodal Tool: {str(e_tool_mm)}" | |
# --- Agent Prompts --- | |
LANGGRAPH_PROMPT_TEMPLATE_STR = """You are a highly intelligent agent for the GAIA benchmark. | |
Your goal is to provide an EXACT MATCH final answer. No conversational text, explanations, or markdown unless explicitly part of the answer. | |
TOOLS: | |
You have access to the following tools. Use them if necessary. | |
{tools} | |
TOOL USAGE: | |
- To use a tool, your response must include a `tool_calls` attribute in the AIMessage. Each tool call should be a dictionary with "name", "args" (a dictionary of arguments), and "id". | |
- For file tools ('read_pdf_tool', 'ocr_image_tool', 'transcribe_audio_tool', 'direct_multimodal_gemini_tool'): The `args` field must be a dictionary with a single key 'action_input_json_str' whose value is a JSON STRING. Example: {{"action_input_json_str": "{{\\"file_identifier\\": \\"file.pdf\\", \\"task_id\\": \\"123\\"}}"}}. | |
- 'tavily_search_results_json': `args` is like '{{"query": "search query"}}'. | |
- 'python_repl': `args` is like '{{"query": "python code string"}}'. Use print() for output. | |
RESPONSE FORMAT: | |
Final AIMessage should contain ONLY the answer in 'content' and NO 'tool_calls'. If using tools, 'content' can be thought process, with 'tool_calls'. | |
Begin!""" | |
REACT_PROMPT_TEMPLATE_STR = """You are a highly intelligent agent for the GAIA benchmark. | |
Goal: EXACT MATCH answer. No extra text/markdown. | |
Tools: {tools} | |
Process: Question -> Thought -> Action (ONE of [{tool_names}]) -> Action Input -> Observation -> Thought ... -> Final Answer: [exact answer] | |
Tool Inputs: | |
- tavily_search_results_json: Your search query string. | |
- python_repl: Python code string. Use print(). For Excel/CSV, use pandas: import pandas as pd; df = pd.read_excel('./Data/TASKID_filename.xlsx'); print(df.head()) | |
- read_pdf_tool, ocr_image_tool, transcribe_audio_tool, direct_multimodal_gemini_tool: JSON string like '{{"file_identifier": "FILENAME_OR_URL", "task_id": "CURRENT_TASK_ID_IF_FILENAME"}}'. | |
If tool fails or info missing, Final Answer: N/A. Do NOT use unlisted tools. | |
Begin! | |
{input} | |
Thought:{agent_scratchpad}""" | |
# --- Agent Initialization and Response Logic --- | |
def initialize_agent_and_tools(force_reinit=False): | |
global AGENT_INSTANCE, TOOLS, LLM_INSTANCE, LANGGRAPH_FLAVOR_AVAILABLE, LG_StateGraph, LG_ToolExecutor_Class, LG_END, LG_ToolInvocation, add_messages, MemorySaver_Class, LANGGRAPH_MEMORY_SAVER, google_genai_client | |
if AGENT_INSTANCE and not force_reinit: logger.info("Agent already initialized."); return | |
logger.info("Initializing agent and tools...") | |
if not GOOGLE_API_KEY: raise ValueError("GOOGLE_API_KEY not set for LangChain LLM.") | |
try: | |
LLM_INSTANCE = ChatGoogleGenerativeAI( | |
model=GEMINI_MODEL_NAME, | |
google_api_key=GOOGLE_API_KEY, | |
temperature=0.0, | |
timeout=120, | |
convert_system_message_to_human=False | |
) | |
logger.info(f"LangChain LLM (Planner) initialized: {GEMINI_MODEL_NAME} (Using default safety settings, convert_system_message_to_human=False)") | |
except Exception as e: | |
logger.error(f"LangChain LLM init FAILED: {e}", exc_info=True) | |
raise | |
TOOLS = [] | |
if PYPDF2_AVAILABLE: TOOLS.append(read_pdf_tool) | |
if PIL_TESSERACT_AVAILABLE: TOOLS.append(ocr_image_tool) | |
if WHISPER_AVAILABLE: TOOLS.append(transcribe_audio_tool) | |
if google_genai_client and PIL_TESSERACT_AVAILABLE: TOOLS.append(direct_multimodal_gemini_tool); logger.info("Added 'direct_multimodal_gemini_tool'.") | |
else: logger.warning("'direct_multimodal_gemini_tool' NOT added (client or PIL missing).") | |
try: | |
search_tool = TavilySearchResults(max_results=3) | |
TOOLS.append(search_tool) | |
logger.info("Added 'TavilySearchResults' tool.") | |
except Exception as e: logger.warning(f"TavilySearchResults init failed: {e}") | |
try: python_repl = PythonREPLTool(name="python_repl"); python_repl.description = "Python REPL. print() for output. The input is a single string of code."; TOOLS.append(python_repl) | |
except Exception as e: logger.warning(f"PythonREPLTool init failed: {e}") | |
logger.info(f"Final tools list for agent: {[t.name for t in TOOLS]}") | |
if LANGGRAPH_FLAVOR_AVAILABLE and all([LG_StateGraph, LG_ToolExecutor_Class, LG_END, LLM_INSTANCE, add_messages]): | |
if not LANGGRAPH_MEMORY_SAVER and MemorySaver_Class: LANGGRAPH_MEMORY_SAVER = MemorySaver_Class(); logger.info("LangGraph MemorySaver initialized.") | |
try: | |
logger.info(f"Attempting LangGraph init (Tool Executor type: {LG_ToolExecutor_Class.__name__ if LG_ToolExecutor_Class else 'None'})") | |
_TypedDict = getattr(__import__('typing_extensions'), 'TypedDict', dict) | |
class AgentState(_TypedDict): | |
messages: Annotated[List[Any], add_messages] | |
base_system_prompt_content_lg = LANGGRAPH_PROMPT_TEMPLATE_STR | |
def agent_node(state: AgentState): | |
system_message_content = base_system_prompt_content_lg.format( | |
tools="\n".join([f"- {t.name}: {t.description}" for t in TOOLS]) | |
) | |
messages_for_llm = [SystemMessage(content=system_message_content)] | |
messages_for_llm.extend(state['messages']) | |
logger.debug(f"LangGraph agent_node - messages_for_llm: {messages_for_llm}") | |
if not messages_for_llm or not any(isinstance(m, (HumanMessage, ToolMessage)) for m in messages_for_llm): | |
logger.error("LLM call would fail in agent_node: No HumanMessage or ToolMessage found in history.") | |
return {"messages": [AIMessage(content="[ERROR] Agent node: No user input found in messages.")]} | |
bound_llm = LLM_INSTANCE.bind_tools(TOOLS) | |
response = bound_llm.invoke(messages_for_llm) | |
return {"messages": [response]} | |
if not LG_ToolExecutor_Class: raise ValueError("LG_ToolExecutor_Class is None for LangGraph.") | |
tool_executor_instance_lg = LG_ToolExecutor_Class(tools=TOOLS) | |
def tool_node(state: AgentState): | |
last_msg = state['messages'][-1] if state.get('messages') and isinstance(state['messages'][-1], AIMessage) else None | |
if not last_msg or not last_msg.tool_calls: return {"messages": []} | |
tool_results = [] | |
for tc in last_msg.tool_calls: | |
name, args, tc_id = tc.get('name'), tc.get('args'), tc.get('id') | |
if not all([name, isinstance(args, dict), tc_id]): | |
err_msg=f"Invalid tool_call: {tc}"; logger.error(err_msg) | |
tool_results.append(ToolMessage(f"Error: {err_msg}", tool_call_id=tc_id or "error_id", name=name or "error_tool")) | |
continue | |
try: | |
logger.info(f"LG Tool Invoking: '{name}' with {args} (ID: {tc_id})") | |
if LG_ToolInvocation and type(LG_ToolExecutor_Class).__name__ != 'ToolNode': | |
invocation = LG_ToolInvocation(tool=name, tool_input=args) | |
output_lg = tool_executor_instance_lg.invoke(invocation) # type: ignore | |
else: | |
output_lg = tool_executor_instance_lg.invoke(tc) # type: ignore | |
tool_results.append(ToolMessage(content=str(output_lg), tool_call_id=tc_id, name=name)) | |
except Exception as e_tool_node_lg: | |
logger.error(f"LG Tool Error ('{name}'): {e_tool_node_lg}", exc_info=True) | |
tool_results.append(ToolMessage(content=f"Error for tool {name}: {str(e_tool_node_lg)}", tool_call_id=tc_id, name=name)) | |
return {"messages": tool_results} | |
workflow_lg = LG_StateGraph(AgentState) # type: ignore | |
workflow_lg.add_node("agent", agent_node) | |
workflow_lg.add_node("tools", tool_node) | |
workflow_lg.set_entry_point("agent") | |
def should_continue_lg(state: AgentState): return "tools" if state['messages'][-1].tool_calls else LG_END | |
workflow_lg.add_conditional_edges("agent", should_continue_lg, {"tools": "tools", LG_END: LG_END}) # type: ignore | |
workflow_lg.add_edge("tools", "agent") | |
AGENT_INSTANCE = workflow_lg.compile(checkpointer=LANGGRAPH_MEMORY_SAVER) if LANGGRAPH_MEMORY_SAVER else workflow_lg.compile() | |
logger.info(f"LangGraph compiled (Memory: {LANGGRAPH_MEMORY_SAVER is not None}).") | |
except Exception as e_lg_init_main: | |
logger.error(f"LangGraph init error: {e_lg_init_main}. Fallback ReAct.", exc_info=True); AGENT_INSTANCE = None | |
else: | |
logger.info("Skipping LangGraph: core components missing or LLM not ready."); AGENT_INSTANCE = None | |
if not AGENT_INSTANCE: | |
logger.info("Initializing ReAct agent as fallback.") | |
try: | |
if not LLM_INSTANCE: raise ValueError("LLM_INSTANCE is None for ReAct.") | |
prompt_react_instance = PromptTemplate.from_template(REACT_PROMPT_TEMPLATE_STR).partial( | |
tools="\n".join([f"- {t.name}:{t.description}" for t in TOOLS]), | |
tool_names=",".join([t.name for t in TOOLS]) | |
) | |
react_agent_runnable_instance = create_react_agent(LLM_INSTANCE, TOOLS, prompt_react_instance) | |
AGENT_INSTANCE = AgentExecutor(agent=react_agent_runnable_instance, tools=TOOLS, verbose=True, handle_parsing_errors=True, max_iterations=15, early_stopping_method="force") | |
logger.info("ReAct agent initialized.") | |
except Exception as e_react_init_main: | |
logger.error(f"ReAct agent init failed: {e_react_init_main}", exc_info=True); AGENT_INSTANCE = None | |
if not AGENT_INSTANCE: raise RuntimeError("CRITICAL: Agent initialization completely failed.") | |
logger.info(f"Agent init finished. Active agent type: {type(AGENT_INSTANCE).__name__}") | |
def get_agent_response(prompt: str, task_id: Optional[str]=None, thread_id: Optional[str]=None) -> str: | |
global AGENT_INSTANCE, LLM_INSTANCE | |
thread_id_to_use = thread_id or (f"gaia_task_{task_id}" if task_id else hashlib.md5(prompt.encode()).hexdigest()[:8]) | |
if not AGENT_INSTANCE or not LLM_INSTANCE: | |
logger.warning("Agent/LLM not initialized in get_agent_response. Attempting re-initialization.") | |
try: initialize_agent_and_tools(force_reinit=True) | |
except Exception as e_reinit_get: logger.error(f"Re-initialization failed: {e_reinit_get}"); return f"[ERROR] Agent/LLM re-init failed: {str(e_reinit_get)}" | |
if not AGENT_INSTANCE or not LLM_INSTANCE: return "[ERROR] Agent/LLM still None after re-init." | |
agent_name_get = type(AGENT_INSTANCE).__name__ | |
logger.info(f"Agent ({agent_name_get}) processing. Task: {task_id or 'N/A'}. Thread: {thread_id_to_use}. Prompt: {prompt[:100]}...") | |
is_langgraph_agent_get = LANGGRAPH_FLAVOR_AVAILABLE and AGENT_INSTANCE and hasattr(AGENT_INSTANCE, 'graph') and hasattr(AGENT_INSTANCE, 'config_schema') | |
try: | |
if is_langgraph_agent_get: | |
logger.debug(f"Using LangGraph agent for thread: {thread_id_to_use}") | |
input_for_lg_get = {"messages": [HumanMessage(content=prompt)]} | |
logger.debug(f"Invoking LangGraph with input: {input_for_lg_get}") | |
final_state_lg_get = AGENT_INSTANCE.invoke(input_for_lg_get, {"configurable": {"thread_id": thread_id_to_use}}) | |
logger.debug(f"LangGraph final_state_lg_get: {final_state_lg_get}") | |
if not final_state_lg_get or 'messages' not in final_state_lg_get or not final_state_lg_get['messages']: | |
logger.error("LangGraph: No final state or messages found in the output.") | |
return "[ERROR] LangGraph: No final state or messages." | |
for message_item_lg_get in reversed(final_state_lg_get['messages']): | |
if isinstance(message_item_lg_get, AIMessage) and not message_item_lg_get.tool_calls: | |
if isinstance(message_item_lg_get.content, str): | |
return message_item_lg_get.content | |
elif isinstance(message_item_lg_get.content, list) and message_item_lg_get.content: | |
final_text_parts = [part.get("text", "") if isinstance(part, dict) and part.get("type")=="text" else (str(part) if isinstance(part, str) else "") for part in message_item_lg_get.content] | |
return " ".join(filter(None, final_text_parts)) | |
else: | |
logger.warning(f"LangGraph: Final AIMessage content not string or parsable list: {type(message_item_lg_get.content)}") | |
return f"[WARN] AIMessage content type not string: {str(message_item_lg_get.content)[:100]}" | |
logger.warning("LangGraph: No AIMessage without tool_calls found as final answer. Examining last message.") | |
last_message_in_history = final_state_lg_get['messages'][-1] | |
if hasattr(last_message_in_history, 'content') and isinstance(last_message_in_history.content, str): | |
return f"[INFO] Fallback to last message content: {str(last_message_in_history.content)}" | |
elif hasattr(last_message_in_history, 'content') and isinstance(last_message_in_history.content, list): | |
return f"[INFO] Fallback to last message (list content): {str(last_message_in_history.content)[:150]}" | |
elif isinstance(last_message_in_history, ToolMessage): | |
return f"[INFO] Fallback to ToolMessage content: {str(last_message_in_history.content)[:150]}" | |
else: | |
logger.error(f"LangGraph: Could not extract string content from the very last message: {last_message_in_history}") | |
return "[ERROR] LangGraph: Could not extract final answer from messages." | |
elif isinstance(AGENT_INSTANCE, AgentExecutor): | |
logger.debug("Using ReAct agent for get_agent_response.") | |
react_input = {"input": prompt} | |
logger.debug(f"ReAct input: {react_input}") | |
response_react_get = AGENT_INSTANCE.invoke(react_input) | |
logger.debug(f"ReAct response: {response_react_get}") | |
return str(response_react_get.get("output", "[ERROR] ReAct: No 'output' key in response.")) | |
else: | |
logger.error(f"Unknown agent type: {agent_name_get}"); return f"[ERROR] Unknown agent type: {agent_name_get}" | |
except Exception as e_agent_run_get: | |
logger.error(f"Error during agent execution ({agent_name_get}): {e_agent_run_get}", exc_info=True) | |
return f"[ERROR] Agent execution failed: {str(e_agent_run_get)[:150]}" | |
def construct_prompt_for_agent(q: Dict[str,Any]) -> str: | |
tid,q_str=q.get("task_id","N/A"),q.get("question",""); files=q.get("files",[]) | |
files_info = ("\nFiles:\n"+"\n".join([f"- {f} (task_id:{tid})"for f in files])) if files else "" | |
level = f"\nLevel:{q.get('level')}" if q.get('level') else "" | |
return f"Task ID:{tid}{level}{files_info}\n\nQuestion:{q_str}" | |
def run_and_submit_all(profile: Optional[gr.OAuthProfile] = None): | |
global AGENT_INSTANCE | |
space_id = os.getenv("SPACE_ID") | |
username_for_submission = None | |
if profile and hasattr(profile, 'username') and profile.username: | |
username_for_submission = profile.username | |
logger.info(f"Username from OAuth profile: {username_for_submission}") | |
else: | |
logger.warning("OAuth profile not available or username missing.") | |
return "Hugging Face login required. Please use the login button and try again.", None | |
if AGENT_INSTANCE is None: | |
try: logger.info("Agent not pre-initialized. Initializing for run..."); initialize_agent_and_tools() | |
except Exception as e: return f"Agent on-demand initialization failed: {e}", None | |
if AGENT_INSTANCE is None: return "Agent is still None after on-demand init.", None | |
agent_code_url_run=f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local_dev_run" | |
questions_url_run,submit_url_run=f"{DEFAULT_API_URL}/questions",f"{DEFAULT_API_URL}/submit" | |
auth_headers_run={"Authorization":f"Bearer {HUGGINGFACE_TOKEN}"} if HUGGINGFACE_TOKEN else {} | |
try: | |
logger.info(f"Fetching questions from {questions_url_run}") | |
response_q_run=requests.get(questions_url_run,headers=auth_headers_run,timeout=30);response_q_run.raise_for_status();questions_data_run=response_q_run.json() | |
if not questions_data_run or not isinstance(questions_data_run,list):logger.error(f"Invalid questions data: {questions_data_run}");return "Fetched questions_data invalid.",None | |
logger.info(f"Fetched {len(questions_data_run)} questions.") | |
except Exception as e:logger.error(f"Fetch questions error: {e}",exc_info=True);return f"Fetch questions error:{e}",None | |
results_log_run,answers_payload_run=[],[] | |
logger.info(f"Running agent on {len(questions_data_run)} questions for user '{username_for_submission}'...") | |
for i,item_run in enumerate(questions_data_run): | |
task_id_run,question_text_run=item_run.get("task_id"),item_run.get("question") | |
if not task_id_run or question_text_run is None:logger.warning(f"Skipping item: {item_run}");continue | |
prompt_run=construct_prompt_for_agent(item_run);thread_id_run=f"gaia_batch_task_{task_id_run}" | |
logger.info(f"Processing Q {i+1}/{len(questions_data_run)} - Task: {task_id_run}") | |
try: | |
raw_answer_run=get_agent_response(prompt_run,task_id=task_id_run,thread_id=thread_id_run);submitted_answer_run=_strip_exact_match_answer(raw_answer_run) | |
answers_payload_run.append({"task_id":task_id_run,"submitted_answer":submitted_answer_run}) | |
results_log_run.append({"Task ID":task_id_run,"Question":question_text_run,"Full Agent Prompt":prompt_run,"Raw Agent Output":raw_answer_run,"Submitted Answer":submitted_answer_run}) | |
except Exception as e: | |
logger.error(f"Agent error task {task_id_run}:{e}",exc_info=True);error_answer_run=f"AGENT ERROR:{str(e)[:100]}" | |
answers_payload_run.append({"task_id":task_id_run,"submitted_answer":"N/A [AGENT_ERROR]"}) | |
results_log_run.append({"Task ID":task_id_run,"Question":question_text_run,"Full Agent Prompt":prompt_run,"Raw Agent Output":error_answer_run,"Submitted Answer":"N/A [AGENT_ERROR]"}) | |
if not answers_payload_run:return "Agent produced no answers.",pd.DataFrame(results_log_run) | |
submission_payload_run={"username":username_for_submission.strip(),"agent_code":agent_code_url_run,"answers":answers_payload_run} | |
logger.info(f"Submitting {len(answers_payload_run)} answers to {submit_url_run} for user '{username_for_submission}'...") | |
submission_headers_run={"Content-Type":"application/json",**auth_headers_run} | |
try: | |
response_s_run=requests.post(submit_url_run,json=submission_payload_run,headers=submission_headers_run,timeout=120);response_s_run.raise_for_status();submission_result_run=response_s_run.json() | |
result_message_run=(f"User:{submission_result_run.get('username',username_for_submission)}\nScore:{submission_result_run.get('score','N/A')}% ({submission_result_run.get('correct_count','?')}/{submission_result_run.get('total_attempted','?')})\nMsg:{submission_result_run.get('message','N/A')}") | |
logger.info(f"Submission OK! {result_message_run}");return f"Submission OK!\n{result_message_run}",pd.DataFrame(results_log_run,columns=["Task ID","Question","Full Agent Prompt","Raw Agent Output","Submitted Answer"]) | |
except requests.exceptions.HTTPError as e: | |
error_http_run=f"HTTP {e.response.status_code}. Detail:{e.response.text[:200]}"; logger.error(f"Submit Fail:{error_http_run}",exc_info=True); return f"Submit Fail:{error_http_run}",pd.DataFrame(results_log_run) | |
except Exception as e:logger.error(f"Submit Fail unexpected:{e}",exc_info=True);return f"Submit Fail:{str(e)[:100]}",pd.DataFrame(results_log_run) | |
# --- Build Gradio Interface --- | |
with gr.Blocks(css=".gradio-container {max-width:1280px !important;margin:auto !important;}",theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# GAIA Agent Challenge Runner v7 (OAuth for Username)") | |
gr.Markdown(f"""**Instructions:** | |
1. **Login with Hugging Face** using the button below. Your HF username will be used for submission. | |
2. Click 'Run Evaluation & Submit' to process GAIA questions (typically 20). | |
3. **Goal: 30%+ (6/20).** Agent uses Gemini Pro ({GEMINI_MODEL_NAME}) as planner. Tools include Web Search, Python, PDF, OCR, Audio/YouTube, and a new Direct Multimodal tool using Gemini Flash ({GEMINI_FLASH_MULTIMODAL_MODEL_NAME}). | |
4. Ensure `GOOGLE_API_KEY`, `HUGGINGFACE_TOKEN`, and `TAVILY_API_KEY` are Space secrets. | |
5. Check Space logs for details. LangGraph is attempted (ReAct fallback).""") | |
agent_status_display = gr.Markdown("**Agent Status:** Initializing...") | |
missing_secrets_display = gr.Markdown("") | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=7, interactive=False) | |
results_table = gr.DataFrame(label="Q&A Log", headers=["Task ID","Question","Prompt","Raw","Submitted"], wrap=True) | |
run_button.click(fn=run_and_submit_all, outputs=[status_output,results_table], api_name="run_evaluation") | |
def update_ui_on_load_fn_within_context(): | |
global missing_vars_startup_list_global, agent_pre_init_status_msg_global | |
secrets_msg_md = "" | |
if missing_vars_startup_list_global: | |
secrets_msg_md = f"<font color='red'>**⚠️ Secrets Missing:** {', '.join(missing_vars_startup_list_global)}.</font>" | |
env_issues = [] | |
try: subprocess.run(['yt-dlp','--version'],check=True,stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL) | |
except: env_issues.append("yt-dlp"); logger.warning("yt-dlp check failed (UI load).") | |
try: subprocess.run(['ffmpeg','-version'],check=True,stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL) | |
except: env_issues.append("ffmpeg"); logger.warning("ffmpeg check failed (UI load).") | |
if env_issues: secrets_msg_md += f"<br/><font color='orange'>**Tool Deps Missing:** {', '.join(env_issues)}.</font>" | |
current_status_md = agent_pre_init_status_msg_global | |
if not LANGGRAPH_FLAVOR_AVAILABLE and "LangGraph" not in current_status_md: | |
current_status_md += f" (LangGraph core components not fully loaded: LG_ToolExecutor_Class is {type(LG_ToolExecutor_Class).__name__ if LG_ToolExecutor_Class else 'None'}, ReAct fallback.)" | |
elif LANGGRAPH_FLAVOR_AVAILABLE and "LangGraph" not in current_status_md: | |
current_status_md += f" (LangGraph ready with {type(LG_ToolExecutor_Class).__name__ if LG_ToolExecutor_Class else 'UnknownExecutor'}.)" | |
return { agent_status_display: gr.Markdown(value=current_status_md), | |
missing_secrets_display: gr.Markdown(value=secrets_msg_md) } | |
demo.load(update_ui_on_load_fn_within_context, [], [agent_status_display, missing_secrets_display]) | |
if __name__ == "__main__": | |
logger.info(f"Application starting up (v7.2 - Agent Node Message & LLM Safety Fix)...") | |
if not PYPDF2_AVAILABLE: logger.warning("PyPDF2 (PDF tool) NOT AVAILABLE.") | |
if not PIL_TESSERACT_AVAILABLE: logger.warning("Pillow/Pytesseract (OCR tool) NOT AVAILABLE.") | |
if not WHISPER_AVAILABLE: logger.warning("Whisper (Audio tool) NOT AVAILABLE.") | |
if LANGGRAPH_FLAVOR_AVAILABLE: logger.info(f"Core LangGraph components (StateGraph, END, {type(LG_ToolExecutor_Class).__name__ if LG_ToolExecutor_Class else 'FailedExecutor'}) loaded.") | |
else: logger.warning("Core LangGraph FAILED import or essential component (ToolExecutor/Node/Invocation) missing. ReAct fallback. Check requirements & Space build logs.") | |
missing_vars_startup_list_global.clear() | |
if not GOOGLE_API_KEY: missing_vars_startup_list_global.append("GOOGLE_API_KEY") | |
if not HUGGINGFACE_TOKEN: missing_vars_startup_list_global.append("HUGGINGFACE_TOKEN (for GAIA API)") | |
if not TAVILY_API_KEY: missing_vars_startup_list_global.append("TAVILY_API_KEY (for Tavily Search)") | |
try: | |
logger.info("Pre-initializing agent...") | |
initialize_agent_and_tools() | |
if AGENT_INSTANCE: | |
agent_type_name = type(AGENT_INSTANCE).__name__ | |
agent_pre_init_status_msg_global = f"Agent Pre-initialized: **{agent_type_name}**." | |
if LANGGRAPH_FLAVOR_AVAILABLE and ("StateGraph" in agent_type_name or "CompiledGraph" in agent_type_name) : | |
lg_executor_display_name = type(LG_ToolExecutor_Class).__name__ if LG_ToolExecutor_Class else "UnknownExecutor" | |
agent_pre_init_status_msg_global = f"Agent Pre-initialized: **LangGraph** (Executor: {lg_executor_display_name}, Memory: {LANGGRAPH_MEMORY_SAVER is not None})." | |
else: agent_pre_init_status_msg_global = "Agent pre-init FAILED (AGENT_INSTANCE is None)." | |
logger.info(agent_pre_init_status_msg_global.replace("**","")) | |
except Exception as e: | |
agent_pre_init_status_msg_global = f"Agent pre-init CRASHED: {str(e)}" | |
logger.critical(f"Agent pre-init CRASHED: {e}", exc_info=True) | |
logger.info(f"Space ID: {os.getenv('SPACE_ID', 'Not Set')}") | |
logger.info("Gradio Interface launching...") | |
demo.queue().launch(debug=os.getenv("GRADIO_DEBUG","false").lower()=="true", share=False, max_threads=20) |