NLarchive's picture
Update app.py
cbdad3b verified
import gradio as gr
import time
import threading
import os
from agent import run_agent, disconnect, initialize_session, is_session_initialized
from workflow_vizualizer import (
track_workflow_step, track_communication, complete_workflow_step,
get_workflow_visualization, get_workflow_summary,
reset_workflow
)
# Global state for API key management
_api_key_set = False
_api_key_lock = threading.Lock()
_using_default_key = False
_default_key_available = False
# Debouncing to prevent rapid-fire requests
_last_request_time = 0
_request_lock = threading.Lock()
_processing = False
def check_default_api_key():
"""Check if there's a default API key available in environment."""
default_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY")
return default_key is not None, default_key
def validate_and_set_api_key(api_key, is_user_provided=True):
"""Validate and set the API key by actually testing model initialization."""
global _api_key_set, _using_default_key
if not api_key or not api_key.strip():
return "❌ Please enter your Gemini API key", False
api_key = api_key.strip()
try:
# Store original key for restoration if needed
original_gemini_key = os.environ.get("GEMINI_API_KEY")
original_google_key = os.environ.get("GOOGLE_API_KEY")
# Set the environment variable for testing
os.environ["GEMINI_API_KEY"] = api_key
# Test the API key by actually initializing and making a test call
from inference import initialize, generate_content
# Clear any existing client to force re-initialization
import inference
inference.client = None
# Initialize with the new API key
initialize()
# Make a simple test call to verify the API key works
test_response = generate_content("Hello", model_name="gemini-1.5-flash")
# If we get here, the API key works
with _api_key_lock:
_api_key_set = True
_using_default_key = not is_user_provided
if is_user_provided:
return "✅ Your personal API key validated successfully! You now have full access.", True
else:
return "✅ Default API key is active. You can start using the chat with limited usage.", True
except Exception as e:
# Restore original keys if validation failed
if original_gemini_key:
os.environ["GEMINI_API_KEY"] = original_gemini_key
elif "GEMINI_API_KEY" in os.environ:
del os.environ["GEMINI_API_KEY"]
if original_google_key:
os.environ["GOOGLE_API_KEY"] = original_google_key
# Reset client state
import inference
inference.client = None
error_msg = str(e).lower()
if "api" in error_msg and ("key" in error_msg or "auth" in error_msg):
return "❌ Invalid API key. Please check your key and try again.", False
elif "quota" in error_msg or "limit" in error_msg:
if not is_user_provided:
return "⚠️ Default API key has reached its limit. Please provide your own API key to continue.", False
else:
return "❌ API quota exceeded. Please check your API usage limits.", False
elif "permission" in error_msg or "access" in error_msg:
return "❌ API access denied. Please verify your API key has proper permissions.", False
elif "network" in error_msg or "connection" in error_msg:
return "❌ Network error. Please check your internet connection and try again.", False
else:
return f"❌ API key validation failed: {str(e)[:100]}", False
def initialize_default_api_if_available():
"""Try to initialize with default API key if available."""
global _default_key_available, _api_key_set, _using_default_key
has_default, default_key = check_default_api_key()
_default_key_available = has_default
if has_default:
try:
status_msg, is_valid = validate_and_set_api_key(default_key, is_user_provided=False)
if is_valid:
with _api_key_lock:
_api_key_set = True
_using_default_key = True
return True, status_msg
except Exception as e:
print(f"Failed to initialize default API key: {e}")
return False, "No default API key available"
def check_api_key_status():
"""Check if API key is set and valid."""
with _api_key_lock:
return _api_key_set
def get_api_key_status_info():
"""Get information about current API key status."""
with _api_key_lock:
if _api_key_set:
if _using_default_key:
return "🔑 Using default API key (limited usage)"
else:
return "🔑 Using your personal API key (full access)"
else:
return "❌ No API key active"
def chat_fn(message, history):
global _last_request_time, _processing
# Check API key first
if not check_api_key_status():
if _default_key_available:
return history + [{"role": "assistant", "content": "⚠️ Please set up an API key first using the section above."}], None, {"status": "no_api_key", "message": "API key required"}, ""
else:
return history + [{"role": "assistant", "content": "⚠️ Please provide your Gemini API key first using the field above."}], None, {"status": "no_api_key", "message": "API key required"}, ""
if not message.strip():
return history, None, {"status": "empty_message", "message": "Please enter a message"}, ""
# Check if already processing
if _processing:
return history, None, {"status": "busy", "message": "Please wait for the current request to complete"}, ""
# Debounce requests
with _request_lock:
current_time = time.time()
if current_time - _last_request_time < 2.0:
return history, None, {"status": "rate_limited", "message": "Please wait 2 seconds between requests"}, ""
_last_request_time = current_time
_processing = True
input_step = None
try:
# Start new workflow
reset_workflow()
# Track user input
input_step = track_workflow_step("input", message)
# Track UI to agent communication
ui_to_agent_step = track_communication("ui", "agent", "chat_request", message, parent_step=input_step)
# Initialize session if needed (SINGLE initialization per session)
if not is_session_initialized():
session_init_step = track_workflow_step("session_init", "Initializing persistent session", parent_step=ui_to_agent_step)
initialize_session()
complete_workflow_step(session_init_step, "completed")
else:
# Track session reuse
reuse_step = track_workflow_step("session_reuse", "Using existing persistent session", parent_step=ui_to_agent_step)
complete_workflow_step(reuse_step, "completed")
# Process the message (no additional initialization needed)
response = run_agent(message)
# Track agent to UI response
agent_to_ui_step = track_communication("agent", "ui", "chat_response", response[:100], parent_step=ui_to_agent_step)
# Complete steps
complete_workflow_step(ui_to_agent_step, "completed")
complete_workflow_step(agent_to_ui_step, "completed")
if input_step is not None:
complete_workflow_step(input_step, "completed")
except Exception as e:
error_str = str(e).lower()
# Check if it's a quota/rate limit error
if ("quota" in error_str or "limit" in error_str or "rate" in error_str) and _using_default_key:
response = "⚠️ Default API key has reached its usage limit. Please provide your personal API key above to continue with unlimited access."
else:
response = f"I encountered an error while processing your request: {str(e)}"
if input_step is not None:
complete_workflow_step(input_step, "error")
print(f"Agent error: {e}")
finally:
_processing = False
# Track visualization generation
viz_step = track_workflow_step("visualization", "Generating workflow visualization")
try:
img_b64 = get_workflow_visualization()
summary = get_workflow_summary()
complete_workflow_step(viz_step, "completed", details={"summary_steps": summary.get("total_steps", 0)})
except Exception as e:
print(f"Visualization error: {e}")
img_b64 = None
summary = {"error": f"Visualization failed: {str(e)}", "status": "visualization_error"}
complete_workflow_step(viz_step, "error", details={"error": str(e)})
# Track final response
response_step = track_workflow_step("response", f"Final response: {response[:50]}...")
complete_workflow_step(response_step, "completed")
# Add to history
history = history + [{"role": "user", "content": message}, {"role": "assistant", "content": response}]
return history, img_b64, summary, ""
# Test examples for each server
SEMANTIC_TESTS = [
"Find semantic keywords in: Machine learning and artificial intelligence are transforming technology",
"Find similar sentences to 'deep learning' in: AI uses neural networks. Machine learning algorithms. Statistics and data science.",
"What's the semantic similarity between 'happy' and 'joyful'?"
]
TOKEN_COUNTER_TESTS = [
"How many tokens are in: Hello world, how are you today?",
"Count tokens using GPT-4 tokenizer: The quick brown fox jumps over the lazy dog",
"Compare token counts for: Natural language processing is fascinating"
]
SENTIMENT_TESTS = [
"What's the sentiment of: This is absolutely amazing and wonderful!",
"Analyze sentiment: I hate this terrible horrible experience",
"Sentiment analysis: The weather is okay, nothing special"
]
def handle_api_key_submit(api_key):
"""Handle API key submission."""
status_msg, is_valid = validate_and_set_api_key(api_key, is_user_provided=True)
if is_valid:
return (
status_msg,
gr.update(visible=False), # Hide API key section
gr.update(visible=True), # Show chat interface
get_api_key_status_info(), # Update status
"" # Clear API key input for security
)
else:
return (
status_msg,
gr.update(visible=True), # Keep API key section visible
gr.update(visible=_api_key_set), # Show chat if default key works
get_api_key_status_info(), # Update status
api_key # Keep the input value for correction
)
def handle_test_example(example_text, history):
"""Handle click on test example button."""
return chat_fn(example_text, history)
# Initialize default API key if available
default_initialized, default_status = initialize_default_api_if_available()
# Gradio interface with API key input
with gr.Blocks(
title="MCP Agent Client",
css="""
.gradio-container {
max-width: 100% !important;
}
footer {
display: none !important;
}
.gradio-footer {
display: none !important;
}
.message-row {
margin: 8px 0;
}
.warning, .error-display {
display: none !important;
}
.test-button {
margin: 2px !important;
font-size: 12px !important;
}
.server-section {
border: 1px solid #ddd;
border-radius: 8px;
padding: 10px;
margin: 5px 0;
}
.api-key-section {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 20px;
border-radius: 10px;
margin: 10px 0;
color: white;
}
.api-key-section-optional {
background: linear-gradient(135deg, #28a745 0%, #20c997 100%);
padding: 15px;
border-radius: 10px;
margin: 10px 0;
color: white;
}
.api-key-input {
background: rgba(255,255,255,0.9) !important;
border-radius: 5px !important;
}
.status-info {
padding: 10px;
border-radius: 5px;
margin: 5px 0;
background: rgba(0,0,0,0.1);
}
.accordion-header {
cursor: pointer;
padding: 15px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border-radius: 10px;
margin: 10px 0;
color: white;
display: flex;
justify-content: space-between;
align-items: center;
}
.accordion-header-optional {
background: linear-gradient(135deg, #28a745 0%, #20c997 100%);
}
.accordion-content {
background: rgba(255,255,255,0.05);
border-radius: 0 0 10px 10px;
padding: 20px;
margin-top: -10px;
}
"""
) as demo:
gr.Markdown("# 🚀 Enhanced MCP Agent Client")
# Status bar
api_status_display = gr.Markdown(
get_api_key_status_info(),
elem_classes=["status-info"]
)
# API Key Section - conditional visibility and messaging
if default_initialized:
# Default key available - show optional collapsible upgrade section
with gr.Accordion("🔓 Upgrade to Personal API Key (Optional)", open=False) as api_accordion:
gr.Markdown("""
**You're currently using a limited default API key. For unlimited access:**
1. 🌐 Go to [Google AI Studio](https://aistudio.google.com/app/apikey)
2. 🔑 Click "Create API Key"
3. 📋 Copy your API key
4. 📝 Paste it below and click "Upgrade to Personal Key"
*Your personal API key will give you unlimited access and faster responses.*
""")
with gr.Row():
api_key_input = gr.Textbox(
label="Enter your personal Gemini API Key (optional)",
placeholder="Insert your API key here for unlimited access...",
type="password",
elem_classes=["api-key-input"],
scale=4
)
api_key_submit = gr.Button("🚀 Upgrade to Personal Key", variant="primary", scale=1)
api_key_status = gr.Markdown("", visible=True)
else:
# No default key - show required collapsible section (open by default)
with gr.Accordion("🔐 Setup Required: Gemini API Key", open=True) as api_accordion:
gr.Markdown("""
**To use this application, you need a free Gemini API key:**
1. 🌐 Go to [Google AI Studio](https://aistudio.google.com/app/apikey)
2. 🔑 Click "Create API Key"
3. 📋 Copy your API key
4. 📝 Paste it below and click "Validate & Start"
Your API key is only stored locally in this session and is not saved anywhere.
""")
with gr.Row():
api_key_input = gr.Textbox(
label="Enter your Gemini API Key",
placeholder="Insert API key here...",
type="password",
elem_classes=["api-key-input"],
scale=4
)
api_key_submit = gr.Button("🚀 Validate & Start", variant="primary", scale=1)
api_key_status = gr.Markdown("", visible=True)
# Main Chat Interface - visible if default key works, hidden otherwise
with gr.Group(visible=default_initialized) as chat_interface:
gr.Markdown("*✅ Connected! Optimized: Single initialization per session, global caching, connection pooling*")
with gr.Row():
with gr.Column(scale=3):
chatbot = gr.Chatbot(
label="Agent Chat",
type="messages",
height=400
)
with gr.Row():
txt = gr.Textbox(
placeholder="Type your message or use test buttons below...",
show_label=False,
scale=4
)
submit_btn = gr.Button("Send", scale=1, variant="primary")
with gr.Column(scale=2):
viz_img = gr.Image(
label="Complete Workflow Visualization",
type="filepath",
height=200
)
viz_json = gr.JSON(
label="Detailed Workflow Summary",
height=200
)
# Test Examples Section
gr.Markdown("## 🧪 Quick Test Examples")
gr.Markdown("*Click any button to test specific server capabilities:*")
with gr.Row():
# Semantic Search Tests
with gr.Column():
gr.Markdown("### 🔍 **Semantic Search Server**")
gr.Markdown("*Tests: keywords, similarity, semantic search*")
semantic_btn1 = gr.Button(
"Extract Keywords Test",
elem_classes=["test-button"],
size="sm"
)
semantic_btn2 = gr.Button(
"Find Similar Sentences",
elem_classes=["test-button"],
size="sm"
)
semantic_btn3 = gr.Button(
"Semantic Similarity Test",
elem_classes=["test-button"],
size="sm"
)
# Token Counter Tests
with gr.Column():
gr.Markdown("### 🔢 **Token Counter Server**")
gr.Markdown("*Tests: GPT-4, BERT, various tokenizers*")
token_btn1 = gr.Button(
"Basic Token Count",
elem_classes=["test-button"],
size="sm"
)
token_btn2 = gr.Button(
"GPT-4 Tokenizer Test",
elem_classes=["test-button"],
size="sm"
)
token_btn3 = gr.Button(
"Compare Tokenizers",
elem_classes=["test-button"],
size="sm"
)
# Sentiment Analysis Tests
with gr.Column():
gr.Markdown("### 😊 **Sentiment Server**")
gr.Markdown("*Tests: positive, negative, neutral sentiment*")
sentiment_btn1 = gr.Button(
"Positive Sentiment",
elem_classes=["test-button"],
size="sm"
)
sentiment_btn2 = gr.Button(
"Negative Sentiment",
elem_classes=["test-button"],
size="sm"
)
sentiment_btn3 = gr.Button(
"Neutral Sentiment",
elem_classes=["test-button"],
size="sm"
)
# Mixed/Complex Tests
with gr.Row():
gr.Markdown("### 🔄 **Multi-Server Tests**")
with gr.Row():
complex_btn1 = gr.Button(
"Full Pipeline: 'Analyze sentiment and count tokens in: I love machine learning!'",
elem_classes=["test-button"]
)
complex_btn2 = gr.Button(
"Semantic + Sentiment: 'Find keywords and sentiment in: This AI is terrible'",
elem_classes=["test-button"]
)
complex_btn3 = gr.Button(
"All Servers: 'Count tokens, find sentiment, extract keywords from: Amazing breakthrough!'",
elem_classes=["test-button"]
)
# Event handlers
def submit_and_clear(message, history):
try:
result = chat_fn(message, history)
return result[0], result[1], result[2], "", get_api_key_status_info()
except Exception as e:
print(f"UI error: {e}")
error_msg = [{"role": "assistant", "content": "Sorry, there was an interface error. Please try again."}]
return history + error_msg, None, {"error": str(e)}, "", get_api_key_status_info()
def test_example_handler(example_text, history):
"""Handler for test example buttons that includes clearing input."""
try:
result = chat_fn(example_text, history)
return result[0], result[1], result[2], "", get_api_key_status_info()
except Exception as e:
print(f"Test example error: {e}")
error_msg = [{"role": "assistant", "content": f"Test failed: {str(e)}"}]
return history + error_msg, None, {"error": str(e)}, "", get_api_key_status_info()
def handle_api_key_submit(api_key):
"""Handle API key submission."""
status_msg, is_valid = validate_and_set_api_key(api_key, is_user_provided=True)
if is_valid:
return (
status_msg,
gr.update(open=False), # Close accordion after successful setup
gr.update(visible=True), # Show chat interface
get_api_key_status_info(), # Update status
"" # Clear API key input for security
)
else:
return (
status_msg,
gr.update(open=True), # Keep accordion open on error
gr.update(visible=_api_key_set), # Show chat if default key works
get_api_key_status_info(), # Update status
api_key # Keep the input value for correction
)
# API Key submission
api_key_submit.click(
fn=handle_api_key_submit,
inputs=[api_key_input],
outputs=[api_key_status, api_accordion, chat_interface, api_status_display, api_key_input]
)
# Allow Enter key in API key input
api_key_input.submit(
fn=handle_api_key_submit,
inputs=[api_key_input],
outputs=[api_key_status, api_accordion, chat_interface, api_status_display, api_key_input]
)
# Main chat handlers (only work when API key is set)
submit_btn.click(
fn=submit_and_clear,
inputs=[txt, chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display],
api_name=False
)
txt.submit(
fn=submit_and_clear,
inputs=[txt, chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display],
api_name=False
)
# Semantic Search Test Buttons
semantic_btn1.click(
fn=lambda history: test_example_handler(SEMANTIC_TESTS[0], history),
inputs=[chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display]
)
semantic_btn2.click(
fn=lambda history: test_example_handler(SEMANTIC_TESTS[1], history),
inputs=[chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display]
)
semantic_btn3.click(
fn=lambda history: test_example_handler(SEMANTIC_TESTS[2], history),
inputs=[chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display]
)
# Token Counter Test Buttons
token_btn1.click(
fn=lambda history: test_example_handler(TOKEN_COUNTER_TESTS[0], history),
inputs=[chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display]
)
token_btn2.click(
fn=lambda history: test_example_handler(TOKEN_COUNTER_TESTS[1], history),
inputs=[chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display]
)
token_btn3.click(
fn=lambda history: test_example_handler(TOKEN_COUNTER_TESTS[2], history),
inputs=[chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display]
)
# Sentiment Analysis Test Buttons
sentiment_btn1.click(
fn=lambda history: test_example_handler(SENTIMENT_TESTS[0], history),
inputs=[chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display]
)
sentiment_btn2.click(
fn=lambda history: test_example_handler(SENTIMENT_TESTS[1], history),
inputs=[chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display]
)
sentiment_btn3.click(
fn=lambda history: test_example_handler(SENTIMENT_TESTS[2], history),
inputs=[chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display]
)
# Complex Multi-Server Test Buttons
complex_btn1.click(
fn=lambda history: test_example_handler(
"Analyze sentiment and count tokens in: I love machine learning!",
history
),
inputs=[chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display]
)
complex_btn2.click(
fn=lambda history: test_example_handler(
"Find keywords and sentiment in: This AI is terrible",
history
),
inputs=[chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display]
)
complex_btn3.click(
fn=lambda history: test_example_handler(
"Count tokens, find sentiment, extract keywords from: Amazing breakthrough!",
history
),
inputs=[chatbot],
outputs=[chatbot, viz_img, viz_json, txt, api_status_display]
)
# Startup message
if default_initialized:
print(f"✅ Application ready with default API key! {default_status}")
print("💡 Users can optionally upgrade to their personal API key for unlimited access.")
else:
print("⚠️ Application ready. No default API key found - users must provide their own.")
try:
demo.launch(debug=True)
finally:
disconnect()