""" Gradio Interface for LangGraph ReAct Agent A production-ready web interface with real-time streaming output Styled similar to gradio_ui.py with sidebar layout """ import os import re import sys import time from typing import Generator, Optional from dataclasses import dataclass import gradio as gr from gradio.themes.utils import fonts from dotenv import load_dotenv from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, AIMessage from langchain_anthropic import ChatAnthropic # Import the agent from agent_v3 sys.path.append(os.path.dirname(__file__)) from agent_v3 import CodeAgent from core.types import AgentConfig # Import support modules from managers import PythonExecutor # Load environment variables load_dotenv("./.env") class GradioAgentUI: """ Gradio interface for interacting with the LangGraph ReAct Agent. Styled similar to the smolagents GradioUI with sidebar layout. """ def __init__(self, model=None, config=None): """Initialize the Gradio Agent UI.""" # Default model if model is None: api_key = os.environ["OPENROUTER_API_KEY"] if not api_key: raise ValueError( "OPENROUTER_API_KEY environment variable is not set. " "Please set it or provide a model instance." ) # model = ChatOpenAI( # model="google/gemini-2.5-flash", # base_url="https://openrouter.ai/api/v1", # temperature=0.7, # api_key=api_key, # ) api_key_anthropic = os.environ["Anthropic_API_KEY"] model = ChatAnthropic( model='claude-sonnet-4-5-20250929', temperature=0.7, api_key=api_key_anthropic ) # Default config if config is None: config = AgentConfig( max_steps=15, max_conversation_length=30, retry_attempts=3, timeout_seconds=1200, verbose=True ) self.model = model self.config = config self.name = "Chat with AlphaGenome Agent" self.description = "AlphaGenome Agent is automatically created by [Paper2Agent](https://github.com/jmiao24/Paper2Agent) to use [AlphaGenome](https://github.com/google-deepmind/alphagenome) and interpret DNA variants." def get_step_footnote(self, step_num: int, duration: float) -> str: """Create a footnote for a step with timing information.""" return f'Step {step_num} | Duration: {duration:.2f}s' def format_code_block(self, code: str) -> str: """Format code as a Python code block.""" code = code.strip() if not code.startswith("```"): code = f"```python\n{code}\n```" return code def extract_content_parts(self, message_content: str) -> dict: """Extract different parts from the message content.""" parts = { 'thinking': '', 'plan': None, 'code': None, 'solution': None, 'error': None, 'observation': None } # Extract thinking (content before any tags) thinking_pattern = r'^(.*?)(?=<(?:execute|solution|error|observation)|$)' thinking_match = re.match(thinking_pattern, message_content, re.DOTALL) if thinking_match: thinking = thinking_match.group(1).strip() # Remove plan from thinking plan_pattern = r'\d+\.\s*\[[^\]]*\]\s*[^\n]+(?:\n\d+\.\s*\[[^\]]*\]\s*[^\n]+)*' thinking = re.sub(plan_pattern, '', thinking).strip() # Remove any existing "Thinking:" or "Plan:" labels (including duplicates) thinking = re.sub(r'(^|\n)(Thinking:|Plan:)\s*', '\n', thinking).strip() # Remove any duplicate Thinking: that might remain thinking = re.sub(r'Thinking:\s*', '', thinking).strip() if thinking: parts['thinking'] = thinking # Extract plan plan_pattern = r'\d+\.\s*\[[^\]]*\]\s*[^\n]+(?:\n\d+\.\s*\[[^\]]*\]\s*[^\n]+)*' plan_match = re.search(plan_pattern, message_content) if plan_match: parts['plan'] = plan_match.group(0) # Extract code code_match = re.search(r'(.*?)', message_content, re.DOTALL) if code_match: parts['code'] = code_match.group(1).strip() # Extract solution solution_match = re.search(r'(.*?)', message_content, re.DOTALL) if solution_match: parts['solution'] = solution_match.group(1).strip() # Extract error error_match = re.search(r'(.*?)', message_content, re.DOTALL) if error_match: parts['error'] = error_match.group(1).strip() # Extract observation obs_match = re.search(r'(.*?)', message_content, re.DOTALL) if obs_match: parts['observation'] = obs_match.group(1).strip() return parts def truncate_output(self, text: str, max_lines: int = 20) -> str: """Truncate output to specified number of lines.""" lines = text.split('\n') if len(lines) > max_lines: truncated = '\n'.join(lines[:max_lines]) truncated += f"\n... (truncated {len(lines) - max_lines} lines)" return truncated return text def stream_agent_response(self, agent: CodeAgent, query: str) -> Generator: """Stream agent responses as Gradio ChatMessages with structured blocks.""" # Get all tool functions using the agent's method tools_dict = agent.get_all_tool_functions() agent.python_executor.send_functions(tools_dict) agent.python_executor.send_variables({}) # Create initial state input_state = { "messages": [HumanMessage(content=query)], "step_count": 0, "error_count": 0, "start_time": time.time(), "current_plan": None } # Track state displayed_reasoning = set() previous_plan = None step_timings = {} try: # Stream through the workflow execution for state in agent.workflow_engine.graph.stream(input_state, stream_mode="values"): step_count = state.get("step_count", 0) error_count = state.get("error_count", 0) current_plan = state.get("current_plan") # Track timing if step_count not in step_timings: step_timings[step_count] = time.time() message = state["messages"][-1] if isinstance(message, AIMessage): content = message.content parts = self.extract_content_parts(content) # First yield step header only if step_count > 0: step_header = f"## Step {step_count}\n" yield gr.ChatMessage( role="assistant", content=step_header, metadata={"status": "done"} ) # Display Thinking block (styled like Current Plan) if parts['thinking'] and len(parts['thinking']) > 20: thinking_text = parts['thinking'] # Clean up any remaining labels thinking_text = re.sub(r'(Thinking:|Plan:)\s*', '', thinking_text).strip() # Clean up excessive whitespace - replace multiple newlines with max 2 thinking_text = re.sub(r'\n\s*\n\s*\n+', '\n\n', thinking_text).strip() # Also clean up any leading/trailing whitespace on each line thinking_text = '\n'.join(line.strip() for line in thinking_text.split('\n') if line.strip()) content_hash = hash(thinking_text) if content_hash not in displayed_reasoning and thinking_text: thinking_block = f"""
šŸ¤” Thinking
{thinking_text}
""" yield gr.ChatMessage( role="assistant", content=thinking_block, metadata={"status": "done"} ) displayed_reasoning.add(content_hash) # Then display Current Plan after thinking if current_plan and current_plan != previous_plan: formatted_plan = current_plan.replace('[ ]', '☐').replace('[āœ“]', 'āœ…').replace('[āœ—]', 'āŒ') plan_block = f"""
šŸ“‹ Current Plan
{formatted_plan}
""" yield gr.ChatMessage( role="assistant", content=plan_block, metadata={"status": "done"} ) previous_plan = current_plan # Display Executing Code as independent block with proper syntax highlighting if parts['code']: # Format as markdown code block for proper rendering code_block = f"""
⚔ Executing Code
```python {parts['code']} ```""" yield gr.ChatMessage( role="assistant", content=code_block, metadata={"status": "done"} ) # Display Execution Result as independent block if parts['observation']: truncated = self.truncate_output(parts['observation']) result_block = f"""
šŸ“Š Execution Result
``` {truncated} ```""" yield gr.ChatMessage( role="assistant", content=result_block, metadata={"status": "done"} ) # Display solution with special formatting as independent block if parts['solution']: solution_block = f"""
āœ… Final Solution
{parts['solution']}
""" yield gr.ChatMessage( role="assistant", content=solution_block, metadata={"status": "done"} ) # Display error with special formatting if parts['error']: error_block = f"""
āš ļø Error
{parts['error']}
""" yield gr.ChatMessage( role="assistant", content=error_block, metadata={"status": "done"} ) # Add step footnote with timing ONLY after Execution Result if parts['observation'] and step_count in step_timings: duration = time.time() - step_timings[step_count] footnote = f'
Step {step_count} | Duration: {duration:.2f}s
' yield gr.ChatMessage( role="assistant", content=footnote, metadata={"status": "done"} ) # Add separator between steps if step_count > 0: yield gr.ChatMessage( role="assistant", content='
', metadata={"status": "done"} ) except Exception as e: error_block = f"""
šŸ’„ Critical Error
Error during agent execution: {str(e)}
""" yield gr.ChatMessage( role="assistant", content=error_block, metadata={"status": "done"} ) def interact_with_agent(self, prompt: str, api_key: str, messages: list, session_state: dict) -> Generator: """Handle interaction with the agent.""" # Store original user input for display original_prompt = prompt # Add API key to the prompt if provided (for agent processing) if api_key and api_key.strip(): prompt = f"{prompt} My API key is: {api_key.strip()}." # Initialize agent in session if needed if "agent" not in session_state: session_state["agent"] = CodeAgent( model=self.model, config=self.config, use_tool_manager=True, use_tool_selection=False # Disable for Gradio UI by default ) # Try to load MCP config if available mcp_config_path = "./mcp_config.yaml" if os.path.exists(mcp_config_path): try: session_state["agent"].add_mcp(mcp_config_path) print(f"āœ… Loaded MCP tools from {mcp_config_path}") except Exception as e: print(f"āš ļø Could not load MCP tools: {e}") agent = session_state["agent"] try: # Add user message immediately (use original prompt for display) messages.append(gr.ChatMessage(role="user", content=original_prompt, metadata={"status": "done"})) yield messages # Add a "thinking" indicator immediately messages.append(gr.ChatMessage(role="assistant", content="šŸ¤” Processing...", metadata={"status": "pending"})) yield messages # Remove the thinking indicator before adding real content messages.pop() # Stream agent responses with real-time updates for msg in self.stream_agent_response(agent, prompt): messages.append(msg) yield messages except Exception as e: messages.append( gr.ChatMessage( role="assistant", content=f"Error: {str(e)}", metadata={"title": "šŸ’„ Error", "status": "done"} ) ) yield messages def create_app(self): """Create the Gradio app with sidebar layout.""" # Create custom theme with modern fonts modern_theme = gr.themes.Monochrome( font=fonts.GoogleFont("Inter"), font_mono=fonts.GoogleFont("JetBrains Mono") ) with gr.Blocks(theme=modern_theme, fill_height=True, title=self.name, css=""" /* Hide Gradio footer */ .footer {display: none !important;} footer {display: none !important;} .gradio-footer {display: none !important;} #footer {display: none !important;} [class*="footer"] {display: none !important;} [id*="footer"] {display: none !important;} .block.svelte-1scc9gv {display: none !important;} .built-with-gradio {display: none !important;} .gradio-container footer {display: none !important;} """) as demo: # Force light theme and hide footer demo.load(js=""" () => { document.body.classList.remove('dark'); document.querySelector('gradio-app').classList.remove('dark'); const url = new URL(window.location); url.searchParams.set('__theme', 'light'); window.history.replaceState({}, '', url); // Hide footer elements setTimeout(() => { const footers = document.querySelectorAll('footer, .footer, .gradio-footer, #footer, [class*="footer"], [id*="footer"], .built-with-gradio'); footers.forEach(footer => footer.style.display = 'none'); // Also hide any Gradio logo/branding const brandingElements = document.querySelectorAll('a[href*="gradio"], .gradio-logo, [alt*="gradio"]'); brandingElements.forEach(el => el.style.display = 'none'); }, 100); } """) # Session state session_state = gr.State({}) stored_messages = gr.State([]) with gr.Row(): # Sidebar with gr.Column(scale=1): gr.Markdown(f"# {self.name}") gr.Markdown(f"> {self.description}") gr.Markdown("---") # API Key section with gr.Group(): gr.Markdown("**AlphaGenome API Key**") api_key_input = gr.Textbox( label="API Key", type="password", placeholder="Enter your AlphaGenome API key", value="AIzaSyD1USDNy9WqfIROICB3FWI1wJHmkO2z21U" ) # Input section with gr.Group(): gr.Markdown("**Your Request**") text_input = gr.Textbox( lines=4, label="Query", placeholder="Enter your query here and press Enter or click Submit", value="""Use AlphaGenome MCP to analyze heart gene expression data to identify the causal gene for the variant chr11:116837649:T>G, associated with Hypoalphalipoproteinemia.""" ) submit_btn = gr.Button("šŸš€ Submit", variant="primary", size="lg") # Configuration section with gr.Accordion("āš™ļø Configuration", open=False): max_steps_input = gr.Slider( label="Max Steps", minimum=5, maximum=50, value=self.config.max_steps, step=1 ) temperature_input = gr.Slider( label="Temperature", minimum=0.0, maximum=1.0, value=0.7, step=0.1 ) apply_config_btn = gr.Button("Apply Configuration", size="sm") # Example queries with gr.Accordion("šŸ“š Example Queries", open=False): gr.Examples( examples=[ ["What's the quantile score of chr3:197081044:TACTC>T on splice junction in Artery (tibial)?"], ["What are the raw and quantile scores of the variant chr3:120280774:G>T for chromatin accessibility in the GM12878 cell line?"], ], inputs=text_input, ) gr.Markdown("---") # gr.HTML( # "
Powered by LangGraph & OpenRouter
" # ) # Main chat area with gr.Column(scale=3): chatbot = gr.Chatbot( label="Agent Conversation", type="messages", height=700, show_copy_button=True, avatar_images=( None, # Default user avatar "images.png" # Assistant avatar ), latex_delimiters=[ {"left": r"$$", "right": r"$$", "display": True}, {"left": r"$", "right": r"$", "display": False}, ], render_markdown=True, sanitize_html=False, # Allow custom HTML for better formatting ) with gr.Row(): clear_btn = gr.Button("šŸ—‘ļø Clear", size="sm") # Note: stop_btn is created but not wired up yet # This could be used for future cancellation functionality stop_btn = gr.Button("ā¹ļø Stop", size="sm", variant="stop") # Event handlers def update_config(max_steps, temperature, session_state): """Update agent configuration.""" if "agent" in session_state: agent = session_state["agent"] agent.config.max_steps = max_steps if hasattr(agent.model, 'temperature'): agent.model.temperature = temperature return "Configuration updated!" def clear_chat(): """Clear the chat.""" return [], [] # Wire up events text_input.submit( lambda x: ("", gr.Button(interactive=False)), [text_input], [text_input, submit_btn] ).then( self.interact_with_agent, [stored_messages, api_key_input, chatbot, session_state], [chatbot] ).then( lambda: gr.Button(interactive=True), None, [submit_btn] ) submit_btn.click( lambda x: (x, "", gr.Button(interactive=False)), [text_input], [stored_messages, text_input, submit_btn] ).then( self.interact_with_agent, [stored_messages, api_key_input, chatbot, session_state], [chatbot] ).then( lambda: gr.Button(interactive=True), None, [submit_btn] ) apply_config_btn.click( update_config, [max_steps_input, temperature_input, session_state], None ) clear_btn.click( clear_chat, None, [chatbot, stored_messages] ) return demo def launch(self, share: bool = False, **kwargs): """Launch the Gradio app.""" app = self.create_app() # Set defaults if not provided in kwargs kwargs.setdefault('server_name', '0.0.0.0') kwargs.setdefault('server_port', 7860) kwargs.setdefault('show_error', True) app.queue(max_size=10).launch( share=share, show_api=False, favicon_path=None, **kwargs ) if __name__ == "__main__": # Check for API key before starting if not os.environ.get("OPENROUTER_API_KEY"): print("\nāš ļø Error: OPENROUTER_API_KEY environment variable is not set!") print("\nPlease set it using one of these methods:") print("1. Export it in your shell:") print(" export OPENROUTER_API_KEY='your-api-key-here'") print("\n2. Create a .env file in the current directory with:") print(" OPENROUTER_API_KEY=your-api-key-here") print("\n3. Or provide your own model instance when initializing GradioAgentUI") sys.exit(1) try: # Create and launch the interface with optional MCP config ui = GradioAgentUI() # Optional: Load MCP tools if config exists mcp_config_path = "./mcp_config.yaml" if os.path.exists(mcp_config_path): print(f"Found MCP config at {mcp_config_path}") ui.launch(share=False, quiet=False) except Exception as e: print(f"\nāŒ Error starting Gradio interface: {e}") sys.exit(1)