Spaces:
Sleeping
Sleeping
""" | |
MCP Client implementation for Universal MCP Client | |
Enhanced with HuggingFace Inference Provider support | |
""" | |
import asyncio | |
import json | |
import re | |
import base64 | |
from typing import Dict, Optional, Tuple, List, Any | |
import anthropic | |
import logging | |
import traceback | |
# Import the proper MCP client components | |
from mcp import ClientSession | |
from mcp.client.sse import sse_client | |
from config import MCPServerConfig, AppConfig, HTTPX_AVAILABLE, HF_INFERENCE_AVAILABLE | |
logger = logging.getLogger(__name__) | |
if HF_INFERENCE_AVAILABLE: | |
from huggingface_hub import InferenceClient | |
class UniversalMCPClient: | |
"""Universal MCP Client for connecting to various MCP servers with multiple LLM backends""" | |
def __init__(self): | |
self.servers: Dict[str, MCPServerConfig] = {} | |
self.anthropic_client = None | |
self.hf_client = None | |
self.current_provider = None | |
self.current_model = None | |
# Initialize Anthropic client if API key is available | |
if AppConfig.ANTHROPIC_API_KEY: | |
self.anthropic_client = anthropic.Anthropic( | |
api_key=AppConfig.ANTHROPIC_API_KEY | |
) | |
logger.info("β Anthropic client initialized") | |
else: | |
logger.warning("β οΈ ANTHROPIC_API_KEY not found") | |
# Initialize HuggingFace client if available | |
if HF_INFERENCE_AVAILABLE and AppConfig.HF_TOKEN: | |
logger.info("β HuggingFace Hub available") | |
else: | |
logger.warning("β οΈ HF_TOKEN not found or huggingface_hub not available") | |
def configure_inference_provider(self, provider: str, model: str) -> bool: | |
"""Configure the inference provider and model""" | |
try: | |
if not HF_INFERENCE_AVAILABLE: | |
logger.error("HuggingFace Hub not available") | |
return False | |
if not AppConfig.HF_TOKEN: | |
logger.error("HF_TOKEN not configured") | |
return False | |
self.hf_client = InferenceClient( | |
provider=provider, | |
api_key=AppConfig.HF_TOKEN | |
) | |
self.current_provider = provider | |
self.current_model = model | |
logger.info(f"β Configured inference provider: {provider} with model: {model}") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to configure inference provider: {e}") | |
return False | |
def get_current_llm_backend(self) -> str: | |
"""Get the currently configured LLM backend""" | |
if self.current_provider and self.hf_client: | |
return f"HF Inference Provider: {self.current_provider}" | |
elif self.anthropic_client: | |
return "Anthropic Claude Sonnet 4" | |
else: | |
return "No LLM backend configured" | |
async def add_server_async(self, config: MCPServerConfig) -> Tuple[bool, str]: | |
"""Add an MCP server using pure MCP protocol""" | |
try: | |
logger.info(f"π§ Adding MCP server: {config.name} at {config.url}") | |
# Clean and validate URL - handle various input formats | |
original_url = config.url.strip() | |
# Remove common MCP endpoint variations | |
base_url = original_url | |
for endpoint in ["/gradio_api/mcp/sse", "/gradio_api/mcp/", "/gradio_api/mcp"]: | |
if base_url.endswith(endpoint): | |
base_url = base_url[:-len(endpoint)] | |
break | |
# Remove trailing slashes | |
base_url = base_url.rstrip("/") | |
# Construct proper MCP URL | |
mcp_url = f"{base_url}/gradio_api/mcp/sse" | |
logger.info(f"π§ Original URL: {original_url}") | |
logger.info(f"π§ Base URL: {base_url}") | |
logger.info(f"π§ MCP URL: {mcp_url}") | |
# Extract space ID if it's a HuggingFace space | |
if "hf.space" in base_url: | |
space_parts = base_url.split("/") | |
if len(space_parts) >= 1: | |
space_id = space_parts[-1].replace('.hf.space', '').replace('https://', '').replace('http://', '') | |
if '-' in space_id: | |
# Format: username-spacename.hf.space | |
config.space_id = space_id.replace('-', '/', 1) | |
else: | |
config.space_id = space_id | |
logger.info(f"π Detected HF Space ID: {config.space_id}") | |
# Update config with proper MCP URL | |
config.url = mcp_url | |
# Test MCP connection | |
success, message = await self._test_mcp_connection(config) | |
if success: | |
self.servers[config.name] = config | |
logger.info(f"β MCP Server {config.name} added successfully") | |
return True, f"β Successfully added MCP server: {config.name}\n{message}" | |
else: | |
logger.error(f"β Failed to connect to MCP server {config.name}: {message}") | |
return False, f"β Failed to add server: {config.name}\n{message}" | |
except Exception as e: | |
error_msg = f"Failed to add server {config.name}: {str(e)}" | |
logger.error(error_msg) | |
logger.error(traceback.format_exc()) | |
return False, f"β {error_msg}" | |
async def _test_mcp_connection(self, config: MCPServerConfig) -> Tuple[bool, str]: | |
"""Test MCP server connection with detailed debugging""" | |
try: | |
logger.info(f"π Testing MCP connection to {config.url}") | |
async with sse_client(config.url, timeout=AppConfig.MCP_TIMEOUT_SECONDS) as (read_stream, write_stream): | |
async with ClientSession(read_stream, write_stream) as session: | |
# Initialize MCP session | |
logger.info("π§ Initializing MCP session...") | |
await session.initialize() | |
# List available tools | |
logger.info("π Listing available tools...") | |
tools = await session.list_tools() | |
tool_info = [] | |
for tool in tools.tools: | |
tool_info.append(f" - {tool.name}: {tool.description}") | |
logger.info(f" π Tool: {tool.name}") | |
logger.info(f" Description: {tool.description}") | |
if hasattr(tool, 'inputSchema') and tool.inputSchema: | |
logger.info(f" Input Schema: {tool.inputSchema}") | |
if len(tools.tools) == 0: | |
return False, "No tools found on MCP server" | |
message = f"Connected successfully!\nFound {len(tools.tools)} tools:\n" + "\n".join(tool_info) | |
return True, message | |
except asyncio.TimeoutError: | |
return False, "Connection timeout - server may be sleeping or unreachable" | |
except Exception as e: | |
logger.error(f"MCP connection failed: {e}") | |
logger.error(traceback.format_exc()) | |
return False, f"Connection failed: {str(e)}" | |
async def call_llm_with_mcp(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[Any]: | |
"""Call LLM with MCP servers - handles both Anthropic and HF providers""" | |
if self.current_provider and self.hf_client: | |
# Use HuggingFace Inference Provider with custom MCP implementation | |
return await self._call_hf_with_custom_mcp(messages, user_files) | |
elif self.anthropic_client: | |
# Use Anthropic's native MCP support | |
return self._call_anthropic_with_native_mcp(messages, user_files) | |
else: | |
raise ValueError("No LLM backend configured") | |
def _call_anthropic_with_native_mcp(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[Any]: | |
"""Call Anthropic API with native MCP support (existing implementation)""" | |
if not self.servers: | |
return self._call_claude_without_mcp(messages) | |
mcp_servers = [] | |
for server_name, config in self.servers.items(): | |
mcp_servers.append({ | |
"type": "url", | |
"url": config.url, | |
"name": server_name.replace(" ", "_").lower() | |
}) | |
# Enhanced system prompt with multimodal and MCP instructions | |
system_prompt = self._get_anthropic_mcp_system_prompt(user_files) | |
# Debug logging | |
logger.info(f"π€ Sending {len(messages)} messages to Claude API") | |
logger.info(f"π§ Using {len(mcp_servers)} MCP servers") | |
start_time = time.time() | |
# Call Claude with MCP connector using the correct beta API | |
response = self.anthropic_client.beta.messages.create( | |
model=AppConfig.CLAUDE_MODEL, | |
max_tokens=AppConfig.MAX_TOKENS, | |
system=system_prompt, | |
messages=messages, | |
mcp_servers=mcp_servers, | |
betas=[AppConfig.MCP_BETA_VERSION] | |
) | |
return self._process_mcp_response(response, start_time) | |
async def _call_hf_with_custom_mcp(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[Any]: | |
"""Call HuggingFace Inference Provider with custom MCP implementation""" | |
from gradio import ChatMessage | |
import time | |
# Get available tools from MCP servers | |
available_tools = await self._get_mcp_tools() | |
if not available_tools: | |
# No MCP tools available, use regular chat completion | |
return await self._call_hf_without_mcp(messages) | |
# Enhanced system prompt for HF providers with MCP | |
system_prompt = self._get_hf_mcp_system_prompt(user_files, available_tools) | |
# Add system message if not present | |
if not messages or messages[0].get("role") != "system": | |
messages.insert(0, {"role": "system", "content": system_prompt}) | |
else: | |
messages[0]["content"] = system_prompt | |
chat_messages = [] | |
max_iterations = 5 # Prevent infinite loops | |
iteration = 0 | |
while iteration < max_iterations: | |
iteration += 1 | |
logger.info(f"π HF+MCP Iteration {iteration}") | |
# Call HuggingFace model | |
try: | |
completion = self.hf_client.chat.completions.create( | |
model=self.current_model, | |
messages=messages, | |
max_tokens=AppConfig.MAX_TOKENS, | |
temperature=0.7 | |
) | |
response_content = completion.choices[0].message.content | |
logger.info(f"π HF Response: {response_content[:200]}...") | |
# Check if model wants to use tools | |
tool_calls = self._extract_tool_calls_from_response(response_content) | |
if not tool_calls: | |
# No tool calls, return final response | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=response_content | |
)) | |
break | |
# Execute tool calls | |
for tool_call in tool_calls: | |
tool_name = tool_call.get("name") | |
tool_args = tool_call.get("arguments", {}) | |
# Add thinking message for tool usage | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content="", | |
metadata={ | |
"title": f"π§ Using {tool_name}", | |
"id": f"tool_{iteration}", | |
"status": "pending", | |
"log": f"Calling MCP tool: {tool_name}" | |
} | |
)) | |
# Execute tool via MCP | |
tool_result = await self._execute_mcp_tool(tool_name, tool_args) | |
# Update tool status | |
for msg in chat_messages: | |
if (msg.metadata and | |
msg.metadata.get("id") == f"tool_{iteration}" and | |
msg.metadata.get("status") == "pending"): | |
msg.metadata["status"] = "done" | |
break | |
# Add tool result | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=tool_result, | |
metadata={ | |
"title": "π Tool Result", | |
"parent_id": f"tool_{iteration}", | |
"status": "done" | |
} | |
)) | |
# Add tool result to conversation context | |
messages.append({ | |
"role": "assistant", | |
"content": f"I used the tool {tool_name} and got this result: {tool_result}" | |
}) | |
# Check for media in tool result | |
media_url = self._extract_media_from_tool_result(tool_result) | |
if media_url: | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content={"path": media_url} | |
)) | |
# Continue conversation with tool results | |
messages.append({ | |
"role": "user", | |
"content": "Please provide a summary of the results and help with the user's original request." | |
}) | |
except Exception as e: | |
logger.error(f"Error in HF+MCP iteration {iteration}: {e}") | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=f"β Error during tool execution: {str(e)}" | |
)) | |
break | |
if not chat_messages: | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content="I understand your request and I'm here to help." | |
)) | |
return chat_messages | |
async def _get_mcp_tools(self) -> List[Dict[str, Any]]: | |
"""Get available tools from all MCP servers""" | |
tools = [] | |
for server_name, config in self.servers.items(): | |
try: | |
async with sse_client(config.url, timeout=AppConfig.MCP_TIMEOUT_SECONDS) as (read_stream, write_stream): | |
async with ClientSession(read_stream, write_stream) as session: | |
await session.initialize() | |
server_tools = await session.list_tools() | |
for tool in server_tools.tools: | |
tools.append({ | |
"name": tool.name, | |
"description": tool.description, | |
"server": server_name, | |
"schema": tool.inputSchema if hasattr(tool, 'inputSchema') else {} | |
}) | |
except Exception as e: | |
logger.error(f"Failed to get tools from {server_name}: {e}") | |
return tools | |
def _extract_tool_calls_from_response(self, response: str) -> List[Dict[str, Any]]: | |
"""Extract tool calls from LLM response text""" | |
# Look for tool call patterns in the response | |
# This is a simple implementation - you might want to make this more robust | |
import re | |
tool_calls = [] | |
# Pattern to match tool calls like: CALL_TOOL: tool_name(arg1="value1", arg2="value2") | |
pattern = r'CALL_TOOL:\s*(\w+)\((.*?)\)' | |
matches = re.findall(pattern, response) | |
for match in matches: | |
tool_name = match[0] | |
args_str = match[1] | |
# Simple argument parsing (you might want to improve this) | |
args = {} | |
if args_str: | |
arg_pairs = args_str.split(',') | |
for pair in arg_pairs: | |
if '=' in pair: | |
key, value = pair.split('=', 1) | |
key = key.strip().strip('"').strip("'") | |
value = value.strip().strip('"').strip("'") | |
args[key] = value | |
tool_calls.append({ | |
"name": tool_name, | |
"arguments": args | |
}) | |
return tool_calls | |
async def _execute_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: | |
"""Execute a tool via MCP servers""" | |
for server_name, config in self.servers.items(): | |
try: | |
async with sse_client(config.url, timeout=AppConfig.MCP_TIMEOUT_SECONDS) as (read_stream, write_stream): | |
async with ClientSession(read_stream, write_stream) as session: | |
await session.initialize() | |
# Check if this server has the tool | |
tools = await session.list_tools() | |
tool_found = False | |
for tool in tools.tools: | |
if tool.name == tool_name: | |
tool_found = True | |
break | |
if not tool_found: | |
continue | |
# Call the tool | |
result = await session.call_tool(tool_name, arguments) | |
if result.content: | |
return result.content[0].text if hasattr(result.content[0], 'text') else str(result.content[0]) | |
else: | |
return "Tool executed successfully but returned no content" | |
except Exception as e: | |
logger.error(f"Failed to execute tool {tool_name} on {server_name}: {e}") | |
return f"β Failed to execute tool: {tool_name}" | |
def _extract_media_from_tool_result(self, result: str) -> Optional[str]: | |
"""Extract media URL from tool result""" | |
# Use existing media extraction logic | |
if not self.servers: | |
return None | |
# Use the first server's config for media extraction | |
config = next(iter(self.servers.values())) | |
return self._extract_media_from_mcp_response(result, config) | |
async def _call_hf_without_mcp(self, messages: List[Dict[str, Any]]) -> List[Any]: | |
"""Call HuggingFace provider without MCP""" | |
from gradio import ChatMessage | |
try: | |
completion = self.hf_client.chat.completions.create( | |
model=self.current_model, | |
messages=messages, | |
max_tokens=AppConfig.MAX_TOKENS, | |
temperature=0.7 | |
) | |
response_content = completion.choices[0].message.content | |
return [ChatMessage(role="assistant", content=response_content)] | |
except Exception as e: | |
logger.error(f"HF inference error: {e}") | |
return [ChatMessage(role="assistant", content=f"β Error: {str(e)}")] | |
def _call_claude_without_mcp(self, messages: List[Dict[str, Any]]) -> List[Any]: | |
"""Call Claude API without MCP servers (existing implementation)""" | |
from gradio import ChatMessage | |
import time | |
logger.info("π¬ No MCP servers available, using regular Claude chat") | |
system_prompt = self._get_native_system_prompt() | |
# Use regular messages API | |
response = self.anthropic_client.messages.create( | |
model=AppConfig.CLAUDE_MODEL, | |
max_tokens=AppConfig.MAX_TOKENS, | |
system=system_prompt, | |
messages=messages | |
) | |
response_text = "" | |
for content in response.content: | |
if content.type == "text": | |
response_text += content.text | |
if not response_text: | |
response_text = "I understand your request and I'm here to help." | |
return [ChatMessage(role="assistant", content=response_text)] | |
def _get_native_system_prompt(self) -> str: | |
"""Get system prompt for Claude without MCP servers""" | |
from datetime import datetime | |
return f"""You are Claude Sonnet 4, a helpful AI assistant with native multimodal capabilities. You can have conversations, answer questions, help with various tasks, and provide information on a wide range of topics. | |
YOUR NATIVE CAPABILITIES (Available right now): | |
- **Image Understanding**: You can directly see and describe images, analyze their content, read text in images, identify objects, people, scenes, etc. | |
- **Text Processing**: You can analyze, summarize, translate, and process text directly | |
- **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
- **Code Analysis**: You can read, analyze, and explain code | |
Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
IMPORTANT: You DO NOT need MCP servers for: | |
- Describing or analyzing uploaded images | |
- Reading text in images | |
- Identifying objects, people, or scenes in images | |
- General conversation and knowledge questions | |
You DO need MCP servers for: | |
- Creating new images, audio, or video | |
- Editing or transforming existing media files | |
- Transcribing audio files | |
- Processing non-image files (audio, video, documents) | |
If users upload images and ask you to describe or analyze them, use your native vision capabilities immediately. Only mention MCP servers if they ask for creation or editing tasks.""" | |
def _get_anthropic_mcp_system_prompt(self, user_files: List[str]) -> str: | |
"""Get system prompt for Claude with MCP servers (existing implementation)""" | |
from datetime import datetime | |
uploaded_files_context = "" | |
if user_files: | |
uploaded_files_context = f"\n\nFILES UPLOADED BY USER:\n" | |
for i, file_path in enumerate(user_files, 1): | |
file_name = file_path.split('/')[-1] if '/' in file_path else file_path | |
if AppConfig.is_image_file(file_path): | |
file_type = "Image" | |
elif AppConfig.is_audio_file(file_path): | |
file_type = "Audio" | |
elif AppConfig.is_video_file(file_path): | |
file_type = "Video" | |
else: | |
file_type = "File" | |
uploaded_files_context += f"{i}. {file_type}: {file_name} (path: {file_path})\n" | |
return f"""You are Claude Sonnet 4, a helpful AI assistant with both native multimodal capabilities and access to various MCP tools. | |
YOUR NATIVE CAPABILITIES (No MCP tools needed): | |
- **Image Understanding**: You can directly see and describe images, analyze their content, read text in images, etc. | |
- **Text Processing**: You can analyze, summarize, translate, and process text directly | |
- **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
- **Code Analysis**: You can read, analyze, and explain code | |
WHEN TO USE MCP TOOLS: | |
- **Image Generation**: Creating new images from text prompts | |
- **Image Editing**: Modifying, enhancing, or transforming existing images | |
- **Audio Processing**: Transcribing audio, generating speech, audio enhancement | |
- **Video Processing**: Creating or editing videos | |
- **Specialized Analysis**: Tasks requiring specific models or APIs | |
UPLOADED FILES HANDLING: | |
{uploaded_files_context} | |
IMPORTANT - For uploaded images: | |
- **Image Description/Analysis**: Use your NATIVE vision capabilities - you can see and describe images directly | |
- **Image Editing/Enhancement**: Use MCP image processing tools | |
- **Image Generation**: Use MCP image generation tools | |
IMPORTANT - GRADIO MEDIA DISPLAY: | |
When MCP tools return media, end your response with "MEDIA_GENERATED: [URL]" where [URL] is the actual media URL. | |
Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
Available MCP servers: {list(self.servers.keys())}""" | |
def _get_hf_mcp_system_prompt(self, user_files: List[str], available_tools: List[Dict[str, Any]]) -> str: | |
"""Get system prompt for HuggingFace providers with MCP""" | |
from datetime import datetime | |
uploaded_files_context = "" | |
if user_files: | |
uploaded_files_context = f"\n\nFILES UPLOADED BY USER:\n" | |
for i, file_path in enumerate(user_files, 1): | |
file_name = file_path.split('/')[-1] if '/' in file_path else file_path | |
if AppConfig.is_image_file(file_path): | |
file_type = "Image" | |
elif AppConfig.is_audio_file(file_path): | |
file_type = "Audio" | |
elif AppConfig.is_video_file(file_path): | |
file_type = "Video" | |
else: | |
file_type = "File" | |
uploaded_files_context += f"{i}. {file_type}: {file_name} (path: {file_path})\n" | |
tools_context = "" | |
if available_tools: | |
tools_context = f"\n\nAVAILABLE MCP TOOLS:\n" | |
for tool in available_tools: | |
tools_context += f"- {tool['name']}: {tool['description']} (server: {tool['server']})\n" | |
tools_context += f"\nTo use a tool, respond with: CALL_TOOL: tool_name(arg1=\"value1\", arg2=\"value2\")\n" | |
return f"""You are an AI assistant using {self.current_provider} inference with {self.current_model}. You have access to external tools via MCP (Model Context Protocol). | |
YOUR CAPABILITIES: | |
- **Text Processing**: You can analyze, summarize, translate, and process text directly | |
- **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
- **Code Analysis**: You can read, analyze, and explain code | |
- **Tool Usage**: You can call external tools to extend your capabilities | |
UPLOADED FILES HANDLING: | |
{uploaded_files_context} | |
{tools_context} | |
IMPORTANT INSTRUCTIONS: | |
- For complex tasks requiring specialized capabilities, use the available MCP tools | |
- When you need to use a tool, clearly indicate it with the CALL_TOOL format | |
- After using tools, provide a clear summary of the results to the user | |
- If a tool returns media (images, audio, video), describe what was generated/processed | |
Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
Current LLM: {self.current_provider}/{self.current_model} | |
Available MCP servers: {list(self.servers.keys())}""" | |
# Include existing helper methods from original implementation | |
def _extract_media_from_mcp_response(self, result_text: str, config: MCPServerConfig) -> Optional[str]: | |
"""Enhanced media extraction from MCP responses (existing implementation)""" | |
if not isinstance(result_text, str): | |
logger.info(f"π Non-string result: {type(result_text)}") | |
return None | |
base_url = config.url.replace("/gradio_api/mcp/sse", "") | |
logger.info(f"π Processing MCP result for media: {result_text[:300]}...") | |
logger.info(f"π Base URL: {base_url}") | |
# 1. Try to parse as JSON (most Gradio MCP servers return structured data) | |
try: | |
if result_text.strip().startswith('[') or result_text.strip().startswith('{'): | |
logger.info("π Attempting JSON parse...") | |
data = json.loads(result_text.strip()) | |
logger.info(f"π Parsed JSON structure: {data}") | |
# Handle array format: [{'image': {'url': '...'}}] or [{'url': '...'}] | |
if isinstance(data, list) and len(data) > 0: | |
item = data[0] | |
logger.info(f"π First array item: {item}") | |
if isinstance(item, dict): | |
# Check for nested media structure | |
for media_type in ['image', 'audio', 'video']: | |
if media_type in item and isinstance(item[media_type], dict): | |
media_data = item[media_type] | |
if 'url' in media_data: | |
url = media_data['url'] | |
logger.info(f"π― Found {media_type} URL: {url}") | |
return self._resolve_media_url(url, base_url) | |
# Check for direct URL | |
if 'url' in item: | |
url = item['url'] | |
logger.info(f"π― Found direct URL: {url}") | |
return self._resolve_media_url(url, base_url) | |
# Handle object format: {'image': {'url': '...'}} or {'url': '...'} | |
elif isinstance(data, dict): | |
logger.info(f"π Processing dict: {data}") | |
# Check for nested media structure | |
for media_type in ['image', 'audio', 'video']: | |
if media_type in data and isinstance(data[media_type], dict): | |
media_data = data[media_type] | |
if 'url' in media_data: | |
url = media_data['url'] | |
logger.info(f"π― Found {media_type} URL: {url}") | |
return self._resolve_media_url(url, base_url) | |
# Check for direct URL | |
if 'url' in data: | |
url = data['url'] | |
logger.info(f"π― Found direct URL: {url}") | |
return self._resolve_media_url(url, base_url) | |
except json.JSONDecodeError: | |
logger.info("π Not valid JSON, trying other formats...") | |
except Exception as e: | |
logger.warning(f"π JSON parsing error: {e}") | |
# 2. Check for data URLs (base64 encoded media) | |
if result_text.startswith('data:'): | |
logger.info("π― Found data URL") | |
return result_text | |
# 3. Check for base64 image patterns | |
if any(result_text.startswith(pattern) for pattern in ['iVBORw0KGgoAAAANSUhEU', '/9j/', 'UklGR']): | |
logger.info("π― Found base64 image data") | |
return f"data:image/png;base64,{result_text}" | |
# 4. Check for file paths and convert to URLs | |
if AppConfig.is_media_file(result_text): | |
# Extract just the filename if it's a path | |
if '/' in result_text: | |
filename = result_text.split('/')[-1] | |
else: | |
filename = result_text.strip() | |
# Create Gradio file URL | |
if filename.startswith('http'): | |
media_url = filename | |
else: | |
media_url = f"{base_url}/file={filename}" | |
logger.info(f"π― Found media file: {media_url}") | |
return media_url | |
# 5. Check for HTTP URLs that look like media | |
if result_text.startswith('http') and AppConfig.is_media_file(result_text): | |
logger.info(f"π― Found HTTP media URL: {result_text}") | |
return result_text | |
logger.info("β No media detected in result") | |
return None | |
def _resolve_media_url(self, url: str, base_url: str) -> str: | |
"""Resolve relative URLs to absolute URLs""" | |
if url.startswith('http') or url.startswith('data:'): | |
return url | |
elif url.startswith('/'): | |
return f"{base_url}/file={url}" | |
else: | |
return f"{base_url}/file={url}" | |
def _process_mcp_response(self, response, start_time: float) -> List[Any]: | |
"""Process Claude's response with MCP tool calls into structured ChatMessage objects (existing implementation)""" | |
from gradio import ChatMessage | |
import time | |
chat_messages = [] | |
current_tool_id = None | |
current_server_name = None | |
tool_start_time = None | |
text_segments = [] # Collect text segments separately | |
# Process Claude's response | |
for content in response.content: | |
if content.type == "text": | |
# Collect text segments but don't combine them yet | |
text_content = content.text | |
# Check if Claude indicated media was generated | |
if "MEDIA_GENERATED:" in text_content: | |
media_match = re.search(r"MEDIA_GENERATED:\s*([^\s]+)", text_content) | |
if media_match: | |
media_url = media_match.group(1) | |
# Clean up the response text | |
text_content = re.sub(r"MEDIA_GENERATED:\s*[^\s]+", "", text_content).strip() | |
logger.info(f"π― Claude indicated media generated: {media_url}") | |
# Add media as separate message | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content={"path": media_url} | |
)) | |
if text_content.strip(): | |
text_segments.append(text_content.strip()) | |
elif hasattr(content, 'type') and content.type == "mcp_tool_use": | |
# Add any accumulated text before tool use | |
if text_segments: | |
combined_text = " ".join(text_segments) | |
if combined_text.strip(): | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=combined_text.strip() | |
)) | |
text_segments = [] # Reset | |
tool_name = content.name | |
server_name = content.server_name | |
current_tool_id = getattr(content, 'id', 'unknown') | |
current_server_name = server_name | |
tool_start_time = time.time() | |
logger.info(f"π§ Claude used MCP tool: {tool_name} on server: {server_name}") | |
# Create a "thinking" message for tool usage | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content="", | |
metadata={ | |
"title": f"π§ Using {tool_name}", | |
"id": current_tool_id, | |
"status": "pending", | |
"log": f"Server: {server_name}" | |
} | |
)) | |
elif hasattr(content, 'type') and content.type == "mcp_tool_result": | |
tool_use_id = getattr(content, 'tool_use_id', 'unknown') | |
duration = time.time() - tool_start_time if tool_start_time else None | |
logger.info(f"π Processing MCP tool result (tool_use_id: {tool_use_id})") | |
# Update the pending tool message to completed | |
for msg in chat_messages: | |
if (msg.metadata and | |
msg.metadata.get("id") == current_tool_id and | |
msg.metadata.get("status") == "pending"): | |
msg.metadata["status"] = "done" | |
if duration: | |
msg.metadata["duration"] = round(duration, 2) | |
break | |
media_url = None | |
if content.content: | |
result_content = content.content[0] | |
result_text = result_content.text if hasattr(result_content, 'text') else str(result_content) | |
logger.info(f"π MCP tool result: {result_text[:200]}...") | |
# Try to extract media URL from the result | |
if current_server_name and current_server_name in self.servers: | |
config = self.servers[current_server_name] | |
extracted_media = self._extract_media_from_mcp_response(result_text, config) | |
if extracted_media: | |
media_url = extracted_media | |
logger.info(f"π― Extracted media from MCP result: {media_url}") | |
else: | |
# Fallback: try all servers to find media | |
for server_name, config in self.servers.items(): | |
extracted_media = self._extract_media_from_mcp_response(result_text, config) | |
if extracted_media: | |
media_url = extracted_media | |
logger.info(f"π― Extracted media from MCP result (fallback): {media_url}") | |
break | |
# Always show the full tool result | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=result_text, | |
metadata={ | |
"title": "π Tool Result", | |
"parent_id": current_tool_id, | |
"status": "done" | |
} | |
)) | |
# Only add separate media display if the tool result does NOT contain | |
# any Gradio file data structures that would be auto-rendered | |
if media_url and not self._contains_gradio_file_structure(result_text): | |
logger.info(f"π― Adding separate media display for: {media_url}") | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content={"path": media_url} | |
)) | |
else: | |
if media_url: | |
logger.info(f"π« Skipping separate media - tool result contains Gradio file structure") | |
else: | |
logger.info(f"π« No media URL extracted") | |
else: | |
# Add error message for failed tool call | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content="Tool call failed: No content returned", | |
metadata={ | |
"title": "β Tool Error", | |
"parent_id": current_tool_id, | |
"status": "done" | |
} | |
)) | |
# Add any remaining text segments after all processing | |
if text_segments: | |
combined_text = " ".join(text_segments) | |
if combined_text.strip(): | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=combined_text.strip() | |
)) | |
# Fallback if no content was processed | |
if not chat_messages: | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content="I understand your request and I'm here to help." | |
)) | |
return chat_messages | |
def _contains_gradio_file_structure(self, text: str) -> bool: | |
"""Check if the text contains ANY Gradio file data structures that would be auto-rendered (existing implementation)""" | |
# Check for key indicators of Gradio file structures | |
gradio_indicators = [ | |
# Gradio FileData type indicators | |
"'_type': 'gradio.FileData'", | |
'"_type": "gradio.FileData"', | |
'gradio.FileData', | |
# File structure patterns | |
"'path':", | |
'"path":', | |
"'url':", | |
'"url":', | |
"'orig_name':", | |
'"orig_name":', | |
"'mime_type':", | |
'"mime_type":', | |
'is_stream', | |
'meta_type', | |
# Common file result patterns | |
"{'image':", | |
'{"image":', | |
"{'audio':", | |
'{"audio":', | |
"{'video':", | |
'{"video":', | |
"{'file':", | |
'{"file":', | |
# List patterns that typically contain file objects | |
"[{'image'", | |
'[{"image"', | |
"[{'audio'", | |
'[{"audio"', | |
"[{'video'", | |
'[{"video"', | |
"[{'file'", | |
'[{"file"' | |
] | |
# If we find multiple indicators, it's likely a Gradio file structure | |
indicator_count = sum(1 for indicator in gradio_indicators if indicator in text) | |
# Also check for simple URL patterns (for audio case) | |
is_simple_url = (text.strip().startswith('http') and | |
len(text.strip().split()) == 1 and | |
any(ext in text.lower() for ext in ['.wav', '.mp3', '.mp4', '.png', '.jpg', '.jpeg', '.gif', '.svg', '.webm', '.ogg'])) | |
result = indicator_count >= 2 or is_simple_url | |
logger.debug(f"π File structure check: {indicator_count} indicators, simple_url: {is_simple_url}, result: {result}") | |
return result | |
def get_server_status(self) -> Dict[str, str]: | |
"""Get status of all configured servers""" | |
status = {} | |
for name in self.servers: | |
compatibility = self._check_file_upload_compatibility(self.servers[name]) | |
status[name] = f"β Connected (MCP Protocol) - {compatibility}" | |
return status | |
def _check_file_upload_compatibility(self, config: MCPServerConfig) -> str: | |
"""Check if a server likely supports file uploads""" | |
if "hf.space" in config.url: | |
return "π‘ Hugging Face Space (usually compatible)" | |
elif "gradio" in config.url.lower(): | |
return "π’ Gradio server (likely compatible)" | |
elif "localhost" in config.url or "127.0.0.1" in config.url: | |
return "π’ Local server (file access available)" | |
else: | |
return "π΄ Remote server (may need public URLs)" |