|
import gradio as gr |
|
import time |
|
import threading |
|
import os |
|
from agent import run_agent, disconnect, initialize_session, is_session_initialized |
|
from workflow_vizualizer import ( |
|
track_workflow_step, track_communication, complete_workflow_step, |
|
get_workflow_visualization, get_workflow_summary, |
|
reset_workflow |
|
) |
|
|
|
|
|
_api_key_set = False |
|
_api_key_lock = threading.Lock() |
|
_using_default_key = False |
|
_default_key_available = False |
|
|
|
|
|
_last_request_time = 0 |
|
_request_lock = threading.Lock() |
|
_processing = False |
|
|
|
def check_default_api_key(): |
|
"""Check if there's a default API key available in environment.""" |
|
default_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY") |
|
return default_key is not None, default_key |
|
|
|
def validate_and_set_api_key(api_key, is_user_provided=True): |
|
"""Validate and set the API key by actually testing model initialization.""" |
|
global _api_key_set, _using_default_key |
|
|
|
if not api_key or not api_key.strip(): |
|
return "❌ Please enter your Gemini API key", False |
|
|
|
api_key = api_key.strip() |
|
|
|
try: |
|
|
|
original_gemini_key = os.environ.get("GEMINI_API_KEY") |
|
original_google_key = os.environ.get("GOOGLE_API_KEY") |
|
|
|
|
|
os.environ["GEMINI_API_KEY"] = api_key |
|
|
|
|
|
from inference import initialize, generate_content |
|
|
|
|
|
import inference |
|
inference.client = None |
|
|
|
|
|
initialize() |
|
|
|
|
|
test_response = generate_content("Hello", model_name="gemini-1.5-flash") |
|
|
|
|
|
with _api_key_lock: |
|
_api_key_set = True |
|
_using_default_key = not is_user_provided |
|
|
|
if is_user_provided: |
|
return "✅ Your personal API key validated successfully! You now have full access.", True |
|
else: |
|
return "✅ Default API key is active. You can start using the chat with limited usage.", True |
|
|
|
except Exception as e: |
|
|
|
if original_gemini_key: |
|
os.environ["GEMINI_API_KEY"] = original_gemini_key |
|
elif "GEMINI_API_KEY" in os.environ: |
|
del os.environ["GEMINI_API_KEY"] |
|
|
|
if original_google_key: |
|
os.environ["GOOGLE_API_KEY"] = original_google_key |
|
|
|
|
|
import inference |
|
inference.client = None |
|
|
|
error_msg = str(e).lower() |
|
|
|
if "api" in error_msg and ("key" in error_msg or "auth" in error_msg): |
|
return "❌ Invalid API key. Please check your key and try again.", False |
|
elif "quota" in error_msg or "limit" in error_msg: |
|
if not is_user_provided: |
|
return "⚠️ Default API key has reached its limit. Please provide your own API key to continue.", False |
|
else: |
|
return "❌ API quota exceeded. Please check your API usage limits.", False |
|
elif "permission" in error_msg or "access" in error_msg: |
|
return "❌ API access denied. Please verify your API key has proper permissions.", False |
|
elif "network" in error_msg or "connection" in error_msg: |
|
return "❌ Network error. Please check your internet connection and try again.", False |
|
else: |
|
return f"❌ API key validation failed: {str(e)[:100]}", False |
|
|
|
def initialize_default_api_if_available(): |
|
"""Try to initialize with default API key if available.""" |
|
global _default_key_available, _api_key_set, _using_default_key |
|
|
|
has_default, default_key = check_default_api_key() |
|
_default_key_available = has_default |
|
|
|
if has_default: |
|
try: |
|
status_msg, is_valid = validate_and_set_api_key(default_key, is_user_provided=False) |
|
if is_valid: |
|
with _api_key_lock: |
|
_api_key_set = True |
|
_using_default_key = True |
|
return True, status_msg |
|
except Exception as e: |
|
print(f"Failed to initialize default API key: {e}") |
|
|
|
return False, "No default API key available" |
|
|
|
def check_api_key_status(): |
|
"""Check if API key is set and valid.""" |
|
with _api_key_lock: |
|
return _api_key_set |
|
|
|
def get_api_key_status_info(): |
|
"""Get information about current API key status.""" |
|
with _api_key_lock: |
|
if _api_key_set: |
|
if _using_default_key: |
|
return "🔑 Using default API key (limited usage)" |
|
else: |
|
return "🔑 Using your personal API key (full access)" |
|
else: |
|
return "❌ No API key active" |
|
|
|
def chat_fn(message, history): |
|
global _last_request_time, _processing |
|
|
|
|
|
if not check_api_key_status(): |
|
if _default_key_available: |
|
return history + [{"role": "assistant", "content": "⚠️ Please set up an API key first using the section above."}], None, {"status": "no_api_key", "message": "API key required"}, "" |
|
else: |
|
return history + [{"role": "assistant", "content": "⚠️ Please provide your Gemini API key first using the field above."}], None, {"status": "no_api_key", "message": "API key required"}, "" |
|
|
|
if not message.strip(): |
|
return history, None, {"status": "empty_message", "message": "Please enter a message"}, "" |
|
|
|
|
|
if _processing: |
|
return history, None, {"status": "busy", "message": "Please wait for the current request to complete"}, "" |
|
|
|
|
|
with _request_lock: |
|
current_time = time.time() |
|
if current_time - _last_request_time < 2.0: |
|
return history, None, {"status": "rate_limited", "message": "Please wait 2 seconds between requests"}, "" |
|
_last_request_time = current_time |
|
_processing = True |
|
|
|
input_step = None |
|
|
|
try: |
|
|
|
reset_workflow() |
|
|
|
|
|
input_step = track_workflow_step("input", message) |
|
|
|
|
|
ui_to_agent_step = track_communication("ui", "agent", "chat_request", message, parent_step=input_step) |
|
|
|
|
|
if not is_session_initialized(): |
|
session_init_step = track_workflow_step("session_init", "Initializing persistent session", parent_step=ui_to_agent_step) |
|
initialize_session() |
|
complete_workflow_step(session_init_step, "completed") |
|
else: |
|
|
|
reuse_step = track_workflow_step("session_reuse", "Using existing persistent session", parent_step=ui_to_agent_step) |
|
complete_workflow_step(reuse_step, "completed") |
|
|
|
|
|
response = run_agent(message) |
|
|
|
|
|
agent_to_ui_step = track_communication("agent", "ui", "chat_response", response[:100], parent_step=ui_to_agent_step) |
|
|
|
|
|
complete_workflow_step(ui_to_agent_step, "completed") |
|
complete_workflow_step(agent_to_ui_step, "completed") |
|
if input_step is not None: |
|
complete_workflow_step(input_step, "completed") |
|
|
|
except Exception as e: |
|
error_str = str(e).lower() |
|
|
|
|
|
if ("quota" in error_str or "limit" in error_str or "rate" in error_str) and _using_default_key: |
|
response = "⚠️ Default API key has reached its usage limit. Please provide your personal API key above to continue with unlimited access." |
|
else: |
|
response = f"I encountered an error while processing your request: {str(e)}" |
|
|
|
if input_step is not None: |
|
complete_workflow_step(input_step, "error") |
|
print(f"Agent error: {e}") |
|
finally: |
|
_processing = False |
|
|
|
|
|
viz_step = track_workflow_step("visualization", "Generating workflow visualization") |
|
try: |
|
img_b64 = get_workflow_visualization() |
|
summary = get_workflow_summary() |
|
complete_workflow_step(viz_step, "completed", details={"summary_steps": summary.get("total_steps", 0)}) |
|
except Exception as e: |
|
print(f"Visualization error: {e}") |
|
img_b64 = None |
|
summary = {"error": f"Visualization failed: {str(e)}", "status": "visualization_error"} |
|
complete_workflow_step(viz_step, "error", details={"error": str(e)}) |
|
|
|
|
|
response_step = track_workflow_step("response", f"Final response: {response[:50]}...") |
|
complete_workflow_step(response_step, "completed") |
|
|
|
|
|
history = history + [{"role": "user", "content": message}, {"role": "assistant", "content": response}] |
|
|
|
return history, img_b64, summary, "" |
|
|
|
|
|
SEMANTIC_TESTS = [ |
|
"Find semantic keywords in: Machine learning and artificial intelligence are transforming technology", |
|
"Find similar sentences to 'deep learning' in: AI uses neural networks. Machine learning algorithms. Statistics and data science.", |
|
"What's the semantic similarity between 'happy' and 'joyful'?" |
|
] |
|
|
|
TOKEN_COUNTER_TESTS = [ |
|
"How many tokens are in: Hello world, how are you today?", |
|
"Count tokens using GPT-4 tokenizer: The quick brown fox jumps over the lazy dog", |
|
"Compare token counts for: Natural language processing is fascinating" |
|
] |
|
|
|
SENTIMENT_TESTS = [ |
|
"What's the sentiment of: This is absolutely amazing and wonderful!", |
|
"Analyze sentiment: I hate this terrible horrible experience", |
|
"Sentiment analysis: The weather is okay, nothing special" |
|
] |
|
|
|
def handle_api_key_submit(api_key): |
|
"""Handle API key submission.""" |
|
status_msg, is_valid = validate_and_set_api_key(api_key, is_user_provided=True) |
|
|
|
if is_valid: |
|
return ( |
|
status_msg, |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
get_api_key_status_info(), |
|
"" |
|
) |
|
else: |
|
return ( |
|
status_msg, |
|
gr.update(visible=True), |
|
gr.update(visible=_api_key_set), |
|
get_api_key_status_info(), |
|
api_key |
|
) |
|
|
|
def handle_test_example(example_text, history): |
|
"""Handle click on test example button.""" |
|
return chat_fn(example_text, history) |
|
|
|
|
|
default_initialized, default_status = initialize_default_api_if_available() |
|
|
|
|
|
with gr.Blocks( |
|
title="MCP Agent Client", |
|
css=""" |
|
.gradio-container { |
|
max-width: 100% !important; |
|
} |
|
footer { |
|
display: none !important; |
|
} |
|
.gradio-footer { |
|
display: none !important; |
|
} |
|
.message-row { |
|
margin: 8px 0; |
|
} |
|
.warning, .error-display { |
|
display: none !important; |
|
} |
|
.test-button { |
|
margin: 2px !important; |
|
font-size: 12px !important; |
|
} |
|
.server-section { |
|
border: 1px solid #ddd; |
|
border-radius: 8px; |
|
padding: 10px; |
|
margin: 5px 0; |
|
} |
|
.api-key-section { |
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
padding: 20px; |
|
border-radius: 10px; |
|
margin: 10px 0; |
|
color: white; |
|
} |
|
.api-key-section-optional { |
|
background: linear-gradient(135deg, #28a745 0%, #20c997 100%); |
|
padding: 15px; |
|
border-radius: 10px; |
|
margin: 10px 0; |
|
color: white; |
|
} |
|
.api-key-input { |
|
background: rgba(255,255,255,0.9) !important; |
|
border-radius: 5px !important; |
|
} |
|
.status-info { |
|
padding: 10px; |
|
border-radius: 5px; |
|
margin: 5px 0; |
|
background: rgba(0,0,0,0.1); |
|
} |
|
.accordion-header { |
|
cursor: pointer; |
|
padding: 15px; |
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
border-radius: 10px; |
|
margin: 10px 0; |
|
color: white; |
|
display: flex; |
|
justify-content: space-between; |
|
align-items: center; |
|
} |
|
.accordion-header-optional { |
|
background: linear-gradient(135deg, #28a745 0%, #20c997 100%); |
|
} |
|
.accordion-content { |
|
background: rgba(255,255,255,0.05); |
|
border-radius: 0 0 10px 10px; |
|
padding: 20px; |
|
margin-top: -10px; |
|
} |
|
""" |
|
) as demo: |
|
gr.Markdown("# 🚀 Enhanced MCP Agent Client") |
|
|
|
|
|
api_status_display = gr.Markdown( |
|
get_api_key_status_info(), |
|
elem_classes=["status-info"] |
|
) |
|
|
|
|
|
if default_initialized: |
|
|
|
with gr.Accordion("🔓 Upgrade to Personal API Key (Optional)", open=False) as api_accordion: |
|
gr.Markdown(""" |
|
**You're currently using a limited default API key. For unlimited access:** |
|
|
|
1. 🌐 Go to [Google AI Studio](https://aistudio.google.com/app/apikey) |
|
2. 🔑 Click "Create API Key" |
|
3. 📋 Copy your API key |
|
4. 📝 Paste it below and click "Upgrade to Personal Key" |
|
|
|
*Your personal API key will give you unlimited access and faster responses.* |
|
""") |
|
|
|
with gr.Row(): |
|
api_key_input = gr.Textbox( |
|
label="Enter your personal Gemini API Key (optional)", |
|
placeholder="Insert your API key here for unlimited access...", |
|
type="password", |
|
elem_classes=["api-key-input"], |
|
scale=4 |
|
) |
|
api_key_submit = gr.Button("🚀 Upgrade to Personal Key", variant="primary", scale=1) |
|
|
|
api_key_status = gr.Markdown("", visible=True) |
|
else: |
|
|
|
with gr.Accordion("🔐 Setup Required: Gemini API Key", open=True) as api_accordion: |
|
gr.Markdown(""" |
|
**To use this application, you need a free Gemini API key:** |
|
|
|
1. 🌐 Go to [Google AI Studio](https://aistudio.google.com/app/apikey) |
|
2. 🔑 Click "Create API Key" |
|
3. 📋 Copy your API key |
|
4. 📝 Paste it below and click "Validate & Start" |
|
|
|
Your API key is only stored locally in this session and is not saved anywhere. |
|
""") |
|
|
|
with gr.Row(): |
|
api_key_input = gr.Textbox( |
|
label="Enter your Gemini API Key", |
|
placeholder="Insert API key here...", |
|
type="password", |
|
elem_classes=["api-key-input"], |
|
scale=4 |
|
) |
|
api_key_submit = gr.Button("🚀 Validate & Start", variant="primary", scale=1) |
|
|
|
api_key_status = gr.Markdown("", visible=True) |
|
|
|
|
|
with gr.Group(visible=default_initialized) as chat_interface: |
|
gr.Markdown("*✅ Connected! Optimized: Single initialization per session, global caching, connection pooling*") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
chatbot = gr.Chatbot( |
|
label="Agent Chat", |
|
type="messages", |
|
height=400 |
|
) |
|
with gr.Row(): |
|
txt = gr.Textbox( |
|
placeholder="Type your message or use test buttons below...", |
|
show_label=False, |
|
scale=4 |
|
) |
|
submit_btn = gr.Button("Send", scale=1, variant="primary") |
|
|
|
with gr.Column(scale=2): |
|
viz_img = gr.Image( |
|
label="Complete Workflow Visualization", |
|
type="filepath", |
|
height=200 |
|
) |
|
viz_json = gr.JSON( |
|
label="Detailed Workflow Summary", |
|
height=200 |
|
) |
|
|
|
|
|
gr.Markdown("## 🧪 Quick Test Examples") |
|
gr.Markdown("*Click any button to test specific server capabilities:*") |
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(): |
|
gr.Markdown("### 🔍 **Semantic Search Server**") |
|
gr.Markdown("*Tests: keywords, similarity, semantic search*") |
|
|
|
semantic_btn1 = gr.Button( |
|
"Extract Keywords Test", |
|
elem_classes=["test-button"], |
|
size="sm" |
|
) |
|
semantic_btn2 = gr.Button( |
|
"Find Similar Sentences", |
|
elem_classes=["test-button"], |
|
size="sm" |
|
) |
|
semantic_btn3 = gr.Button( |
|
"Semantic Similarity Test", |
|
elem_classes=["test-button"], |
|
size="sm" |
|
) |
|
|
|
|
|
with gr.Column(): |
|
gr.Markdown("### 🔢 **Token Counter Server**") |
|
gr.Markdown("*Tests: GPT-4, BERT, various tokenizers*") |
|
|
|
token_btn1 = gr.Button( |
|
"Basic Token Count", |
|
elem_classes=["test-button"], |
|
size="sm" |
|
) |
|
token_btn2 = gr.Button( |
|
"GPT-4 Tokenizer Test", |
|
elem_classes=["test-button"], |
|
size="sm" |
|
) |
|
token_btn3 = gr.Button( |
|
"Compare Tokenizers", |
|
elem_classes=["test-button"], |
|
size="sm" |
|
) |
|
|
|
|
|
with gr.Column(): |
|
gr.Markdown("### 😊 **Sentiment Server**") |
|
gr.Markdown("*Tests: positive, negative, neutral sentiment*") |
|
|
|
sentiment_btn1 = gr.Button( |
|
"Positive Sentiment", |
|
elem_classes=["test-button"], |
|
size="sm" |
|
) |
|
sentiment_btn2 = gr.Button( |
|
"Negative Sentiment", |
|
elem_classes=["test-button"], |
|
size="sm" |
|
) |
|
sentiment_btn3 = gr.Button( |
|
"Neutral Sentiment", |
|
elem_classes=["test-button"], |
|
size="sm" |
|
) |
|
|
|
|
|
with gr.Row(): |
|
gr.Markdown("### 🔄 **Multi-Server Tests**") |
|
|
|
with gr.Row(): |
|
complex_btn1 = gr.Button( |
|
"Full Pipeline: 'Analyze sentiment and count tokens in: I love machine learning!'", |
|
elem_classes=["test-button"] |
|
) |
|
complex_btn2 = gr.Button( |
|
"Semantic + Sentiment: 'Find keywords and sentiment in: This AI is terrible'", |
|
elem_classes=["test-button"] |
|
) |
|
complex_btn3 = gr.Button( |
|
"All Servers: 'Count tokens, find sentiment, extract keywords from: Amazing breakthrough!'", |
|
elem_classes=["test-button"] |
|
) |
|
|
|
|
|
def submit_and_clear(message, history): |
|
try: |
|
result = chat_fn(message, history) |
|
return result[0], result[1], result[2], "", get_api_key_status_info() |
|
except Exception as e: |
|
print(f"UI error: {e}") |
|
error_msg = [{"role": "assistant", "content": "Sorry, there was an interface error. Please try again."}] |
|
return history + error_msg, None, {"error": str(e)}, "", get_api_key_status_info() |
|
|
|
def test_example_handler(example_text, history): |
|
"""Handler for test example buttons that includes clearing input.""" |
|
try: |
|
result = chat_fn(example_text, history) |
|
return result[0], result[1], result[2], "", get_api_key_status_info() |
|
except Exception as e: |
|
print(f"Test example error: {e}") |
|
error_msg = [{"role": "assistant", "content": f"Test failed: {str(e)}"}] |
|
return history + error_msg, None, {"error": str(e)}, "", get_api_key_status_info() |
|
|
|
def handle_api_key_submit(api_key): |
|
"""Handle API key submission.""" |
|
status_msg, is_valid = validate_and_set_api_key(api_key, is_user_provided=True) |
|
|
|
if is_valid: |
|
return ( |
|
status_msg, |
|
gr.update(open=False), |
|
gr.update(visible=True), |
|
get_api_key_status_info(), |
|
"" |
|
) |
|
else: |
|
return ( |
|
status_msg, |
|
gr.update(open=True), |
|
gr.update(visible=_api_key_set), |
|
get_api_key_status_info(), |
|
api_key |
|
) |
|
|
|
|
|
api_key_submit.click( |
|
fn=handle_api_key_submit, |
|
inputs=[api_key_input], |
|
outputs=[api_key_status, api_accordion, chat_interface, api_status_display, api_key_input] |
|
) |
|
|
|
|
|
api_key_input.submit( |
|
fn=handle_api_key_submit, |
|
inputs=[api_key_input], |
|
outputs=[api_key_status, api_accordion, chat_interface, api_status_display, api_key_input] |
|
) |
|
|
|
|
|
submit_btn.click( |
|
fn=submit_and_clear, |
|
inputs=[txt, chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display], |
|
api_name=False |
|
) |
|
|
|
txt.submit( |
|
fn=submit_and_clear, |
|
inputs=[txt, chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display], |
|
api_name=False |
|
) |
|
|
|
|
|
semantic_btn1.click( |
|
fn=lambda history: test_example_handler(SEMANTIC_TESTS[0], history), |
|
inputs=[chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display] |
|
) |
|
semantic_btn2.click( |
|
fn=lambda history: test_example_handler(SEMANTIC_TESTS[1], history), |
|
inputs=[chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display] |
|
) |
|
semantic_btn3.click( |
|
fn=lambda history: test_example_handler(SEMANTIC_TESTS[2], history), |
|
inputs=[chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display] |
|
) |
|
|
|
|
|
token_btn1.click( |
|
fn=lambda history: test_example_handler(TOKEN_COUNTER_TESTS[0], history), |
|
inputs=[chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display] |
|
) |
|
token_btn2.click( |
|
fn=lambda history: test_example_handler(TOKEN_COUNTER_TESTS[1], history), |
|
inputs=[chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display] |
|
) |
|
token_btn3.click( |
|
fn=lambda history: test_example_handler(TOKEN_COUNTER_TESTS[2], history), |
|
inputs=[chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display] |
|
) |
|
|
|
|
|
sentiment_btn1.click( |
|
fn=lambda history: test_example_handler(SENTIMENT_TESTS[0], history), |
|
inputs=[chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display] |
|
) |
|
sentiment_btn2.click( |
|
fn=lambda history: test_example_handler(SENTIMENT_TESTS[1], history), |
|
inputs=[chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display] |
|
) |
|
sentiment_btn3.click( |
|
fn=lambda history: test_example_handler(SENTIMENT_TESTS[2], history), |
|
inputs=[chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display] |
|
) |
|
|
|
|
|
complex_btn1.click( |
|
fn=lambda history: test_example_handler( |
|
"Analyze sentiment and count tokens in: I love machine learning!", |
|
history |
|
), |
|
inputs=[chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display] |
|
) |
|
complex_btn2.click( |
|
fn=lambda history: test_example_handler( |
|
"Find keywords and sentiment in: This AI is terrible", |
|
history |
|
), |
|
inputs=[chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display] |
|
) |
|
complex_btn3.click( |
|
fn=lambda history: test_example_handler( |
|
"Count tokens, find sentiment, extract keywords from: Amazing breakthrough!", |
|
history |
|
), |
|
inputs=[chatbot], |
|
outputs=[chatbot, viz_img, viz_json, txt, api_status_display] |
|
) |
|
|
|
|
|
if default_initialized: |
|
print(f"✅ Application ready with default API key! {default_status}") |
|
print("💡 Users can optionally upgrade to their personal API key for unlimited access.") |
|
else: |
|
print("⚠️ Application ready. No default API key found - users must provide their own.") |
|
|
|
try: |
|
demo.launch(debug=True) |
|
finally: |
|
disconnect() |