Spaces:
Sleeping
Sleeping

Create functional MCP Playground app with working calculator, text processor, and echo server demos
29c4a42
# app.py - Gradio MCP Playground for HF Spaces | |
import gradio as gr | |
import json | |
import os | |
from datetime import datetime | |
from typing import Dict, List, Optional, Tuple, Any | |
import uuid | |
import asyncio | |
import time | |
import subprocess | |
import sys | |
from pathlib import Path | |
import tempfile | |
import shutil | |
# Configuration | |
HF_SPACE_MODE = True | |
HF_SPACE_ID = os.getenv("SPACE_ID", "seanpoyner/gradio-mcp-playground") | |
class MCPPlaygroundApp: | |
"""Gradio MCP Playground - Functional demo for HF Spaces""" | |
def __init__(self): | |
self.sessions = {} | |
self.templates = self.load_templates() | |
self.active_servers = {} | |
def load_templates(self) -> List[Dict]: | |
"""Load available MCP server templates""" | |
return [ | |
{ | |
"name": "Calculator", | |
"id": "calculator", | |
"description": "Basic arithmetic operations server", | |
"code": '''import json | |
import sys | |
class CalculatorServer: | |
def handle_add(self, a, b): | |
return {"result": a + b} | |
def handle_subtract(self, a, b): | |
return {"result": a - b} | |
def handle_multiply(self, a, b): | |
return {"result": a * b} | |
def handle_divide(self, a, b): | |
if b == 0: | |
return {"error": "Division by zero"} | |
return {"result": a / b} | |
# Simple stdin/stdout handler | |
server = CalculatorServer() | |
while True: | |
try: | |
line = sys.stdin.readline() | |
if not line: | |
break | |
request = json.loads(line) | |
method = request.get("method", "") | |
params = request.get("params", {}) | |
if method == "add": | |
result = server.handle_add(params.get("a", 0), params.get("b", 0)) | |
elif method == "subtract": | |
result = server.handle_subtract(params.get("a", 0), params.get("b", 0)) | |
elif method == "multiply": | |
result = server.handle_multiply(params.get("a", 0), params.get("b", 0)) | |
elif method == "divide": | |
result = server.handle_divide(params.get("a", 0), params.get("b", 1)) | |
else: | |
result = {"error": f"Unknown method: {method}"} | |
response = {"id": request.get("id"), "result": result} | |
print(json.dumps(response)) | |
sys.stdout.flush() | |
except Exception as e: | |
print(json.dumps({"error": str(e)})) | |
sys.stdout.flush() | |
''' | |
}, | |
{ | |
"name": "Text Processor", | |
"id": "text_processor", | |
"description": "Text manipulation and analysis", | |
"code": '''import json | |
import sys | |
class TextProcessor: | |
def count_words(self, text): | |
return {"count": len(text.split())} | |
def reverse_text(self, text): | |
return {"reversed": text[::-1]} | |
def to_uppercase(self, text): | |
return {"uppercase": text.upper()} | |
def to_lowercase(self, text): | |
return {"lowercase": text.lower()} | |
# Simple stdin/stdout handler | |
processor = TextProcessor() | |
while True: | |
try: | |
line = sys.stdin.readline() | |
if not line: | |
break | |
request = json.loads(line) | |
method = request.get("method", "") | |
params = request.get("params", {}) | |
text = params.get("text", "") | |
if method == "count_words": | |
result = processor.count_words(text) | |
elif method == "reverse": | |
result = processor.reverse_text(text) | |
elif method == "uppercase": | |
result = processor.to_uppercase(text) | |
elif method == "lowercase": | |
result = processor.to_lowercase(text) | |
else: | |
result = {"error": f"Unknown method: {method}"} | |
response = {"id": request.get("id"), "result": result} | |
print(json.dumps(response)) | |
sys.stdout.flush() | |
except Exception as e: | |
print(json.dumps({"error": str(e)})) | |
sys.stdout.flush() | |
''' | |
}, | |
{ | |
"name": "Echo Server", | |
"id": "echo", | |
"description": "Simple echo server for testing", | |
"code": '''import json | |
import sys | |
import datetime | |
# Simple echo server | |
while True: | |
try: | |
line = sys.stdin.readline() | |
if not line: | |
break | |
request = json.loads(line) | |
response = { | |
"id": request.get("id"), | |
"result": { | |
"echo": request, | |
"timestamp": datetime.datetime.now().isoformat(), | |
"server": "echo-server-v1" | |
} | |
} | |
print(json.dumps(response)) | |
sys.stdout.flush() | |
except Exception as e: | |
print(json.dumps({"error": str(e)})) | |
sys.stdout.flush() | |
''' | |
} | |
] | |
def create_interface(self) -> gr.Blocks: | |
"""Create the Gradio interface""" | |
with gr.Blocks( | |
title="π Gradio MCP Playground", | |
theme=gr.themes.Soft( | |
primary_hue="blue", | |
secondary_hue="gray", | |
font=["Inter", "system-ui", "sans-serif"] | |
), | |
css=self.get_custom_css() | |
) as demo: | |
# Header | |
gr.Markdown(""" | |
# π Gradio MCP Playground | |
Build, test, and deploy Model Context Protocol (MCP) servers directly in your browser. | |
This demo showcases core MCP capabilities with functional examples. | |
""") | |
# Session state | |
session_id = gr.State(value=lambda: str(uuid.uuid4())) | |
with gr.Tabs() as tabs: | |
# Dashboard Tab | |
with gr.Tab("π Dashboard"): | |
gr.Markdown("## Active MCP Servers") | |
with gr.Row(): | |
server_status = gr.JSON( | |
value={"message": "No active servers", "count": 0}, | |
label="Server Status" | |
) | |
refresh_btn = gr.Button("π Refresh", scale=0) | |
gr.Markdown("## Quick Actions") | |
with gr.Row(): | |
with gr.Column(): | |
quick_template = gr.Dropdown( | |
choices=[t["name"] for t in self.templates], | |
label="Select Template", | |
value="Calculator" | |
) | |
deploy_quick = gr.Button("π Quick Deploy", variant="primary") | |
deployment_result = gr.JSON(label="Deployment Result") | |
# Server Builder Tab | |
with gr.Tab("π§ Server Builder"): | |
gr.Markdown("## Create MCP Server from Template") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
template_gallery = gr.Radio( | |
choices=[t["name"] for t in self.templates], | |
label="Available Templates", | |
value="Calculator" | |
) | |
template_info = gr.JSON( | |
value=self.templates[0], | |
label="Template Details" | |
) | |
with gr.Column(scale=2): | |
server_name = gr.Textbox( | |
label="Server Name", | |
placeholder="my-calculator-server" | |
) | |
server_code = gr.Code( | |
value=self.templates[0]["code"], | |
language="python", | |
label="Server Code", | |
lines=20 | |
) | |
with gr.Row(): | |
create_btn = gr.Button("β¨ Create Server", variant="primary") | |
test_btn = gr.Button("π§ͺ Test Server") | |
creation_output = gr.JSON(label="Server Status") | |
# Tools Tab | |
with gr.Tab("π οΈ Tools"): | |
gr.Markdown("## Test MCP Server Tools") | |
with gr.Row(): | |
with gr.Column(): | |
active_server = gr.Dropdown( | |
choices=["calculator", "text_processor", "echo"], | |
label="Select Server", | |
value="calculator" | |
) | |
tool_method = gr.Dropdown( | |
choices=["add", "subtract", "multiply", "divide"], | |
label="Select Method" | |
) | |
tool_params = gr.JSON( | |
value={"a": 10, "b": 5}, | |
label="Parameters" | |
) | |
execute_btn = gr.Button("βΆοΈ Execute", variant="primary") | |
with gr.Column(): | |
execution_result = gr.JSON( | |
label="Execution Result", | |
value={"status": "ready"} | |
) | |
execution_log = gr.Textbox( | |
label="Execution Log", | |
lines=10, | |
max_lines=20 | |
) | |
# Settings Tab | |
with gr.Tab("βοΈ Settings"): | |
gr.Markdown("## Playground Settings") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Display Settings") | |
theme_select = gr.Radio( | |
choices=["Light", "Dark", "System"], | |
label="Theme", | |
value="System" | |
) | |
gr.Markdown("### About") | |
gr.Markdown(f""" | |
- **Version**: 1.0.0 | |
- **Space ID**: {HF_SPACE_ID} | |
- **Mode**: {'HF Space' if HF_SPACE_MODE else 'Local'} | |
- **Created by**: Gradio MCP Team | |
""") | |
# Event handlers | |
refresh_btn.click( | |
self.refresh_status, | |
inputs=[session_id], | |
outputs=[server_status] | |
) | |
deploy_quick.click( | |
self.quick_deploy, | |
inputs=[quick_template, session_id], | |
outputs=[deployment_result, server_status] | |
) | |
template_gallery.change( | |
self.update_template_view, | |
inputs=[template_gallery], | |
outputs=[template_info, server_code] | |
) | |
create_btn.click( | |
self.create_server, | |
inputs=[server_name, server_code, session_id], | |
outputs=[creation_output] | |
) | |
test_btn.click( | |
self.test_server, | |
inputs=[server_code], | |
outputs=[creation_output] | |
) | |
active_server.change( | |
self.update_tool_methods, | |
inputs=[active_server], | |
outputs=[tool_method, tool_params] | |
) | |
execute_btn.click( | |
self.execute_tool, | |
inputs=[active_server, tool_method, tool_params, session_id], | |
outputs=[execution_result, execution_log] | |
) | |
return demo | |
def get_custom_css(self) -> str: | |
"""Custom CSS for the interface""" | |
return """ | |
.gradio-container { | |
max-width: 1200px !important; | |
} | |
/* Dark mode friendly styles */ | |
.template-card { | |
background: #374151; | |
border: 2px solid #4b5563; | |
border-radius: 8px; | |
padding: 16px; | |
margin: 8px 0; | |
transition: all 0.3s ease; | |
} | |
.template-card:hover { | |
border-color: #60a5fa; | |
transform: translateY(-2px); | |
} | |
.server-status { | |
padding: 12px; | |
border-radius: 8px; | |
margin: 8px 0; | |
} | |
.status-active { | |
background: #10b981; | |
color: white; | |
} | |
.status-inactive { | |
background: #ef4444; | |
color: white; | |
} | |
""" | |
def refresh_status(self, session_id: str) -> Dict: | |
"""Refresh server status""" | |
session = self.sessions.get(session_id, {}) | |
servers = session.get("servers", []) | |
return { | |
"message": f"Active servers: {len(servers)}", | |
"count": len(servers), | |
"servers": servers, | |
"timestamp": datetime.now().isoformat() | |
} | |
def quick_deploy(self, template_name: str, session_id: str) -> Tuple[Dict, Dict]: | |
"""Quick deploy a template""" | |
template = next((t for t in self.templates if t["name"] == template_name), None) | |
if not template: | |
return {"error": "Template not found"}, self.refresh_status(session_id) | |
# Initialize session | |
if session_id not in self.sessions: | |
self.sessions[session_id] = {"servers": []} | |
# Add server to session | |
server_id = f"{template['id']}-{int(time.time())}" | |
server_info = { | |
"id": server_id, | |
"name": template_name, | |
"status": "active", | |
"created": datetime.now().isoformat() | |
} | |
self.sessions[session_id]["servers"].append(server_info) | |
return { | |
"success": True, | |
"message": f"Deployed {template_name}", | |
"server_id": server_id | |
}, self.refresh_status(session_id) | |
def update_template_view(self, template_name: str) -> Tuple[Dict, str]: | |
"""Update template view when selection changes""" | |
template = next((t for t in self.templates if t["name"] == template_name), None) | |
if template: | |
return template, template["code"] | |
return {"error": "Template not found"}, "" | |
def create_server(self, name: str, code: str, session_id: str) -> Dict: | |
"""Create a new server""" | |
if not name: | |
return {"error": "Server name is required"} | |
# Save server code temporarily | |
temp_dir = tempfile.mkdtemp() | |
server_file = Path(temp_dir) / f"{name}.py" | |
try: | |
server_file.write_text(code) | |
# Test if code is valid Python | |
result = subprocess.run( | |
[sys.executable, "-m", "py_compile", str(server_file)], | |
capture_output=True, | |
text=True | |
) | |
if result.returncode != 0: | |
return {"error": f"Invalid Python code: {result.stderr}"} | |
return { | |
"success": True, | |
"message": f"Server '{name}' created successfully", | |
"path": str(server_file) | |
} | |
except Exception as e: | |
return {"error": f"Failed to create server: {str(e)}"} | |
finally: | |
# Cleanup | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
def test_server(self, code: str) -> Dict: | |
"""Test server code""" | |
try: | |
# Basic syntax check | |
compile(code, '<string>', 'exec') | |
return { | |
"success": True, | |
"message": "Code syntax is valid", | |
"timestamp": datetime.now().isoformat() | |
} | |
except SyntaxError as e: | |
return { | |
"error": f"Syntax error at line {e.lineno}: {e.msg}", | |
"line": e.lineno | |
} | |
except Exception as e: | |
return {"error": f"Validation error: {str(e)}"} | |
def update_tool_methods(self, server: str) -> Tuple[gr.Dropdown, Dict]: | |
"""Update available methods based on selected server""" | |
methods_map = { | |
"calculator": { | |
"methods": ["add", "subtract", "multiply", "divide"], | |
"default_params": {"a": 10, "b": 5} | |
}, | |
"text_processor": { | |
"methods": ["count_words", "reverse", "uppercase", "lowercase"], | |
"default_params": {"text": "Hello, World!"} | |
}, | |
"echo": { | |
"methods": ["echo"], | |
"default_params": {"message": "Test message"} | |
} | |
} | |
server_info = methods_map.get(server, {"methods": [], "default_params": {}}) | |
return gr.Dropdown( | |
choices=server_info["methods"], | |
value=server_info["methods"][0] if server_info["methods"] else None | |
), server_info["default_params"] | |
def execute_tool(self, server: str, method: str, params: Dict, session_id: str) -> Tuple[Dict, str]: | |
"""Execute a tool method""" | |
log_lines = [] | |
log_lines.append(f"[{datetime.now().strftime('%H:%M:%S')}] Executing {server}.{method}") | |
log_lines.append(f"Parameters: {json.dumps(params, indent=2)}") | |
try: | |
# Simulate execution based on server type | |
if server == "calculator": | |
if method == "add": | |
result = {"result": params.get("a", 0) + params.get("b", 0)} | |
elif method == "subtract": | |
result = {"result": params.get("a", 0) - params.get("b", 0)} | |
elif method == "multiply": | |
result = {"result": params.get("a", 0) * params.get("b", 0)} | |
elif method == "divide": | |
b = params.get("b", 1) | |
if b == 0: | |
result = {"error": "Division by zero"} | |
else: | |
result = {"result": params.get("a", 0) / b} | |
else: | |
result = {"error": f"Unknown method: {method}"} | |
elif server == "text_processor": | |
text = params.get("text", "") | |
if method == "count_words": | |
result = {"count": len(text.split())} | |
elif method == "reverse": | |
result = {"reversed": text[::-1]} | |
elif method == "uppercase": | |
result = {"uppercase": text.upper()} | |
elif method == "lowercase": | |
result = {"lowercase": text.lower()} | |
else: | |
result = {"error": f"Unknown method: {method}"} | |
elif server == "echo": | |
result = { | |
"echo": params, | |
"timestamp": datetime.now().isoformat(), | |
"server": server | |
} | |
else: | |
result = {"error": f"Unknown server: {server}"} | |
log_lines.append(f"Result: {json.dumps(result, indent=2)}") | |
log_lines.append(f"[{datetime.now().strftime('%H:%M:%S')}] Execution completed") | |
return result, "\n".join(log_lines) | |
except Exception as e: | |
error_result = {"error": str(e), "type": type(e).__name__} | |
log_lines.append(f"Error: {str(e)}") | |
return error_result, "\n".join(log_lines) | |
# Create and launch the app | |
app = MCPPlaygroundApp() | |
demo = app.create_interface() | |
if __name__ == "__main__": | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
favicon_path="π" | |
) |