import gradio as gr import time import threading import os from agent import run_agent, disconnect, initialize_session, is_session_initialized from workflow_vizualizer import ( track_workflow_step, track_communication, complete_workflow_step, get_workflow_visualization, get_workflow_summary, reset_workflow ) # Global state for API key management _api_key_set = False _api_key_lock = threading.Lock() _using_default_key = False _default_key_available = False # Debouncing to prevent rapid-fire requests _last_request_time = 0 _request_lock = threading.Lock() _processing = False def check_default_api_key(): """Check if there's a default API key available in environment.""" default_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY") return default_key is not None, default_key def validate_and_set_api_key(api_key, is_user_provided=True): """Validate and set the API key by actually testing model initialization.""" global _api_key_set, _using_default_key if not api_key or not api_key.strip(): return "❌ Please enter your Gemini API key", False api_key = api_key.strip() try: # Store original key for restoration if needed original_gemini_key = os.environ.get("GEMINI_API_KEY") original_google_key = os.environ.get("GOOGLE_API_KEY") # Set the environment variable for testing os.environ["GEMINI_API_KEY"] = api_key # Test the API key by actually initializing and making a test call from inference import initialize, generate_content # Clear any existing client to force re-initialization import inference inference.client = None # Initialize with the new API key initialize() # Make a simple test call to verify the API key works test_response = generate_content("Hello", model_name="gemini-1.5-flash") # If we get here, the API key works with _api_key_lock: _api_key_set = True _using_default_key = not is_user_provided if is_user_provided: return "✅ Your personal API key validated successfully! You now have full access.", True else: return "✅ Default API key is active. You can start using the chat with limited usage.", True except Exception as e: # Restore original keys if validation failed if original_gemini_key: os.environ["GEMINI_API_KEY"] = original_gemini_key elif "GEMINI_API_KEY" in os.environ: del os.environ["GEMINI_API_KEY"] if original_google_key: os.environ["GOOGLE_API_KEY"] = original_google_key # Reset client state import inference inference.client = None error_msg = str(e).lower() if "api" in error_msg and ("key" in error_msg or "auth" in error_msg): return "❌ Invalid API key. Please check your key and try again.", False elif "quota" in error_msg or "limit" in error_msg: if not is_user_provided: return "⚠️ Default API key has reached its limit. Please provide your own API key to continue.", False else: return "❌ API quota exceeded. Please check your API usage limits.", False elif "permission" in error_msg or "access" in error_msg: return "❌ API access denied. Please verify your API key has proper permissions.", False elif "network" in error_msg or "connection" in error_msg: return "❌ Network error. Please check your internet connection and try again.", False else: return f"❌ API key validation failed: {str(e)[:100]}", False def initialize_default_api_if_available(): """Try to initialize with default API key if available.""" global _default_key_available, _api_key_set, _using_default_key has_default, default_key = check_default_api_key() _default_key_available = has_default if has_default: try: status_msg, is_valid = validate_and_set_api_key(default_key, is_user_provided=False) if is_valid: with _api_key_lock: _api_key_set = True _using_default_key = True return True, status_msg except Exception as e: print(f"Failed to initialize default API key: {e}") return False, "No default API key available" def check_api_key_status(): """Check if API key is set and valid.""" with _api_key_lock: return _api_key_set def get_api_key_status_info(): """Get information about current API key status.""" with _api_key_lock: if _api_key_set: if _using_default_key: return "🔑 Using default API key (limited usage)" else: return "🔑 Using your personal API key (full access)" else: return "❌ No API key active" def chat_fn(message, history): global _last_request_time, _processing # Check API key first if not check_api_key_status(): if _default_key_available: return history + [{"role": "assistant", "content": "⚠️ Please set up an API key first using the section above."}], None, {"status": "no_api_key", "message": "API key required"}, "" else: return history + [{"role": "assistant", "content": "⚠️ Please provide your Gemini API key first using the field above."}], None, {"status": "no_api_key", "message": "API key required"}, "" if not message.strip(): return history, None, {"status": "empty_message", "message": "Please enter a message"}, "" # Check if already processing if _processing: return history, None, {"status": "busy", "message": "Please wait for the current request to complete"}, "" # Debounce requests with _request_lock: current_time = time.time() if current_time - _last_request_time < 2.0: return history, None, {"status": "rate_limited", "message": "Please wait 2 seconds between requests"}, "" _last_request_time = current_time _processing = True input_step = None try: # Start new workflow reset_workflow() # Track user input input_step = track_workflow_step("input", message) # Track UI to agent communication ui_to_agent_step = track_communication("ui", "agent", "chat_request", message, parent_step=input_step) # Initialize session if needed (SINGLE initialization per session) if not is_session_initialized(): session_init_step = track_workflow_step("session_init", "Initializing persistent session", parent_step=ui_to_agent_step) initialize_session() complete_workflow_step(session_init_step, "completed") else: # Track session reuse reuse_step = track_workflow_step("session_reuse", "Using existing persistent session", parent_step=ui_to_agent_step) complete_workflow_step(reuse_step, "completed") # Process the message (no additional initialization needed) response = run_agent(message) # Track agent to UI response agent_to_ui_step = track_communication("agent", "ui", "chat_response", response[:100], parent_step=ui_to_agent_step) # Complete steps complete_workflow_step(ui_to_agent_step, "completed") complete_workflow_step(agent_to_ui_step, "completed") if input_step is not None: complete_workflow_step(input_step, "completed") except Exception as e: error_str = str(e).lower() # Check if it's a quota/rate limit error if ("quota" in error_str or "limit" in error_str or "rate" in error_str) and _using_default_key: response = "⚠️ Default API key has reached its usage limit. Please provide your personal API key above to continue with unlimited access." else: response = f"I encountered an error while processing your request: {str(e)}" if input_step is not None: complete_workflow_step(input_step, "error") print(f"Agent error: {e}") finally: _processing = False # Track visualization generation viz_step = track_workflow_step("visualization", "Generating workflow visualization") try: img_b64 = get_workflow_visualization() summary = get_workflow_summary() complete_workflow_step(viz_step, "completed", details={"summary_steps": summary.get("total_steps", 0)}) except Exception as e: print(f"Visualization error: {e}") img_b64 = None summary = {"error": f"Visualization failed: {str(e)}", "status": "visualization_error"} complete_workflow_step(viz_step, "error", details={"error": str(e)}) # Track final response response_step = track_workflow_step("response", f"Final response: {response[:50]}...") complete_workflow_step(response_step, "completed") # Add to history history = history + [{"role": "user", "content": message}, {"role": "assistant", "content": response}] return history, img_b64, summary, "" # Test examples for each server SEMANTIC_TESTS = [ "Find semantic keywords in: Machine learning and artificial intelligence are transforming technology", "Find similar sentences to 'deep learning' in: AI uses neural networks. Machine learning algorithms. Statistics and data science.", "What's the semantic similarity between 'happy' and 'joyful'?" ] TOKEN_COUNTER_TESTS = [ "How many tokens are in: Hello world, how are you today?", "Count tokens using GPT-4 tokenizer: The quick brown fox jumps over the lazy dog", "Compare token counts for: Natural language processing is fascinating" ] SENTIMENT_TESTS = [ "What's the sentiment of: This is absolutely amazing and wonderful!", "Analyze sentiment: I hate this terrible horrible experience", "Sentiment analysis: The weather is okay, nothing special" ] def handle_api_key_submit(api_key): """Handle API key submission.""" status_msg, is_valid = validate_and_set_api_key(api_key, is_user_provided=True) if is_valid: return ( status_msg, gr.update(visible=False), # Hide API key section gr.update(visible=True), # Show chat interface get_api_key_status_info(), # Update status "" # Clear API key input for security ) else: return ( status_msg, gr.update(visible=True), # Keep API key section visible gr.update(visible=_api_key_set), # Show chat if default key works get_api_key_status_info(), # Update status api_key # Keep the input value for correction ) def handle_test_example(example_text, history): """Handle click on test example button.""" return chat_fn(example_text, history) # Initialize default API key if available default_initialized, default_status = initialize_default_api_if_available() # Gradio interface with API key input with gr.Blocks( title="MCP Agent Client", css=""" .gradio-container { max-width: 100% !important; } footer { display: none !important; } .gradio-footer { display: none !important; } .message-row { margin: 8px 0; } .warning, .error-display { display: none !important; } .test-button { margin: 2px !important; font-size: 12px !important; } .server-section { border: 1px solid #ddd; border-radius: 8px; padding: 10px; margin: 5px 0; } .api-key-section { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 20px; border-radius: 10px; margin: 10px 0; color: white; } .api-key-section-optional { background: linear-gradient(135deg, #28a745 0%, #20c997 100%); padding: 15px; border-radius: 10px; margin: 10px 0; color: white; } .api-key-input { background: rgba(255,255,255,0.9) !important; border-radius: 5px !important; } .status-info { padding: 10px; border-radius: 5px; margin: 5px 0; background: rgba(0,0,0,0.1); } .accordion-header { cursor: pointer; padding: 15px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 10px; margin: 10px 0; color: white; display: flex; justify-content: space-between; align-items: center; } .accordion-header-optional { background: linear-gradient(135deg, #28a745 0%, #20c997 100%); } .accordion-content { background: rgba(255,255,255,0.05); border-radius: 0 0 10px 10px; padding: 20px; margin-top: -10px; } """ ) as demo: gr.Markdown("# 🚀 Enhanced MCP Agent Client") # Status bar api_status_display = gr.Markdown( get_api_key_status_info(), elem_classes=["status-info"] ) # API Key Section - conditional visibility and messaging if default_initialized: # Default key available - show optional collapsible upgrade section with gr.Accordion("🔓 Upgrade to Personal API Key (Optional)", open=False) as api_accordion: gr.Markdown(""" **You're currently using a limited default API key. For unlimited access:** 1. 🌐 Go to [Google AI Studio](https://aistudio.google.com/app/apikey) 2. 🔑 Click "Create API Key" 3. 📋 Copy your API key 4. 📝 Paste it below and click "Upgrade to Personal Key" *Your personal API key will give you unlimited access and faster responses.* """) with gr.Row(): api_key_input = gr.Textbox( label="Enter your personal Gemini API Key (optional)", placeholder="Insert your API key here for unlimited access...", type="password", elem_classes=["api-key-input"], scale=4 ) api_key_submit = gr.Button("🚀 Upgrade to Personal Key", variant="primary", scale=1) api_key_status = gr.Markdown("", visible=True) else: # No default key - show required collapsible section (open by default) with gr.Accordion("🔐 Setup Required: Gemini API Key", open=True) as api_accordion: gr.Markdown(""" **To use this application, you need a free Gemini API key:** 1. 🌐 Go to [Google AI Studio](https://aistudio.google.com/app/apikey) 2. 🔑 Click "Create API Key" 3. 📋 Copy your API key 4. 📝 Paste it below and click "Validate & Start" Your API key is only stored locally in this session and is not saved anywhere. """) with gr.Row(): api_key_input = gr.Textbox( label="Enter your Gemini API Key", placeholder="Insert API key here...", type="password", elem_classes=["api-key-input"], scale=4 ) api_key_submit = gr.Button("🚀 Validate & Start", variant="primary", scale=1) api_key_status = gr.Markdown("", visible=True) # Main Chat Interface - visible if default key works, hidden otherwise with gr.Group(visible=default_initialized) as chat_interface: gr.Markdown("*✅ Connected! Optimized: Single initialization per session, global caching, connection pooling*") with gr.Row(): with gr.Column(scale=3): chatbot = gr.Chatbot( label="Agent Chat", type="messages", height=400 ) with gr.Row(): txt = gr.Textbox( placeholder="Type your message or use test buttons below...", show_label=False, scale=4 ) submit_btn = gr.Button("Send", scale=1, variant="primary") with gr.Column(scale=2): viz_img = gr.Image( label="Complete Workflow Visualization", type="filepath", height=200 ) viz_json = gr.JSON( label="Detailed Workflow Summary", height=200 ) # Test Examples Section gr.Markdown("## 🧪 Quick Test Examples") gr.Markdown("*Click any button to test specific server capabilities:*") with gr.Row(): # Semantic Search Tests with gr.Column(): gr.Markdown("### 🔍 **Semantic Search Server**") gr.Markdown("*Tests: keywords, similarity, semantic search*") semantic_btn1 = gr.Button( "Extract Keywords Test", elem_classes=["test-button"], size="sm" ) semantic_btn2 = gr.Button( "Find Similar Sentences", elem_classes=["test-button"], size="sm" ) semantic_btn3 = gr.Button( "Semantic Similarity Test", elem_classes=["test-button"], size="sm" ) # Token Counter Tests with gr.Column(): gr.Markdown("### 🔢 **Token Counter Server**") gr.Markdown("*Tests: GPT-4, BERT, various tokenizers*") token_btn1 = gr.Button( "Basic Token Count", elem_classes=["test-button"], size="sm" ) token_btn2 = gr.Button( "GPT-4 Tokenizer Test", elem_classes=["test-button"], size="sm" ) token_btn3 = gr.Button( "Compare Tokenizers", elem_classes=["test-button"], size="sm" ) # Sentiment Analysis Tests with gr.Column(): gr.Markdown("### 😊 **Sentiment Server**") gr.Markdown("*Tests: positive, negative, neutral sentiment*") sentiment_btn1 = gr.Button( "Positive Sentiment", elem_classes=["test-button"], size="sm" ) sentiment_btn2 = gr.Button( "Negative Sentiment", elem_classes=["test-button"], size="sm" ) sentiment_btn3 = gr.Button( "Neutral Sentiment", elem_classes=["test-button"], size="sm" ) # Mixed/Complex Tests with gr.Row(): gr.Markdown("### 🔄 **Multi-Server Tests**") with gr.Row(): complex_btn1 = gr.Button( "Full Pipeline: 'Analyze sentiment and count tokens in: I love machine learning!'", elem_classes=["test-button"] ) complex_btn2 = gr.Button( "Semantic + Sentiment: 'Find keywords and sentiment in: This AI is terrible'", elem_classes=["test-button"] ) complex_btn3 = gr.Button( "All Servers: 'Count tokens, find sentiment, extract keywords from: Amazing breakthrough!'", elem_classes=["test-button"] ) # Event handlers def submit_and_clear(message, history): try: result = chat_fn(message, history) return result[0], result[1], result[2], "", get_api_key_status_info() except Exception as e: print(f"UI error: {e}") error_msg = [{"role": "assistant", "content": "Sorry, there was an interface error. Please try again."}] return history + error_msg, None, {"error": str(e)}, "", get_api_key_status_info() def test_example_handler(example_text, history): """Handler for test example buttons that includes clearing input.""" try: result = chat_fn(example_text, history) return result[0], result[1], result[2], "", get_api_key_status_info() except Exception as e: print(f"Test example error: {e}") error_msg = [{"role": "assistant", "content": f"Test failed: {str(e)}"}] return history + error_msg, None, {"error": str(e)}, "", get_api_key_status_info() def handle_api_key_submit(api_key): """Handle API key submission.""" status_msg, is_valid = validate_and_set_api_key(api_key, is_user_provided=True) if is_valid: return ( status_msg, gr.update(open=False), # Close accordion after successful setup gr.update(visible=True), # Show chat interface get_api_key_status_info(), # Update status "" # Clear API key input for security ) else: return ( status_msg, gr.update(open=True), # Keep accordion open on error gr.update(visible=_api_key_set), # Show chat if default key works get_api_key_status_info(), # Update status api_key # Keep the input value for correction ) # API Key submission api_key_submit.click( fn=handle_api_key_submit, inputs=[api_key_input], outputs=[api_key_status, api_accordion, chat_interface, api_status_display, api_key_input] ) # Allow Enter key in API key input api_key_input.submit( fn=handle_api_key_submit, inputs=[api_key_input], outputs=[api_key_status, api_accordion, chat_interface, api_status_display, api_key_input] ) # Main chat handlers (only work when API key is set) submit_btn.click( fn=submit_and_clear, inputs=[txt, chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display], api_name=False ) txt.submit( fn=submit_and_clear, inputs=[txt, chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display], api_name=False ) # Semantic Search Test Buttons semantic_btn1.click( fn=lambda history: test_example_handler(SEMANTIC_TESTS[0], history), inputs=[chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display] ) semantic_btn2.click( fn=lambda history: test_example_handler(SEMANTIC_TESTS[1], history), inputs=[chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display] ) semantic_btn3.click( fn=lambda history: test_example_handler(SEMANTIC_TESTS[2], history), inputs=[chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display] ) # Token Counter Test Buttons token_btn1.click( fn=lambda history: test_example_handler(TOKEN_COUNTER_TESTS[0], history), inputs=[chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display] ) token_btn2.click( fn=lambda history: test_example_handler(TOKEN_COUNTER_TESTS[1], history), inputs=[chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display] ) token_btn3.click( fn=lambda history: test_example_handler(TOKEN_COUNTER_TESTS[2], history), inputs=[chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display] ) # Sentiment Analysis Test Buttons sentiment_btn1.click( fn=lambda history: test_example_handler(SENTIMENT_TESTS[0], history), inputs=[chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display] ) sentiment_btn2.click( fn=lambda history: test_example_handler(SENTIMENT_TESTS[1], history), inputs=[chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display] ) sentiment_btn3.click( fn=lambda history: test_example_handler(SENTIMENT_TESTS[2], history), inputs=[chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display] ) # Complex Multi-Server Test Buttons complex_btn1.click( fn=lambda history: test_example_handler( "Analyze sentiment and count tokens in: I love machine learning!", history ), inputs=[chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display] ) complex_btn2.click( fn=lambda history: test_example_handler( "Find keywords and sentiment in: This AI is terrible", history ), inputs=[chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display] ) complex_btn3.click( fn=lambda history: test_example_handler( "Count tokens, find sentiment, extract keywords from: Amazing breakthrough!", history ), inputs=[chatbot], outputs=[chatbot, viz_img, viz_json, txt, api_status_display] ) # Startup message if default_initialized: print(f"✅ Application ready with default API key! {default_status}") print("💡 Users can optionally upgrade to their personal API key for unlimited access.") else: print("⚠️ Application ready. No default API key found - users must provide their own.") try: demo.launch(debug=True) finally: disconnect()