Spaces:
Sleeping
Sleeping
""" | |
Chat handling logic for Universal MCP Client - Enhanced with Inference Provider Support | |
""" | |
import re | |
import logging | |
import traceback | |
import asyncio | |
from datetime import datetime | |
from typing import Dict, Any, List, Tuple, Optional | |
import gradio as gr | |
from gradio import ChatMessage | |
import time | |
from config import AppConfig | |
from mcp_client import UniversalMCPClient | |
logger = logging.getLogger(__name__) | |
class ChatHandler: | |
"""Handles chat interactions with multiple LLM backends and MCP servers using ChatMessage dataclass""" | |
def __init__(self, mcp_client: UniversalMCPClient): | |
self.mcp_client = mcp_client | |
def process_multimodal_message(self, message: Dict[str, Any], history: List) -> Tuple[List[ChatMessage], Dict[str, Any]]: | |
"""Enhanced MCP chat function with multimodal input support and multiple LLM backends""" | |
# Check if any LLM backend is configured | |
backend_configured = False | |
if self.mcp_client.anthropic_client and AppConfig.ANTHROPIC_API_KEY: | |
backend_configured = True | |
backend_type = "anthropic" | |
elif self.mcp_client.hf_client and self.mcp_client.current_provider: | |
backend_configured = True | |
backend_type = "hf_inference" | |
if not backend_configured: | |
error_msg = "❌ No LLM backend configured. Please configure either Anthropic API key or HuggingFace Inference Provider." | |
history.append(ChatMessage(role="user", content=error_msg)) | |
history.append(ChatMessage(role="assistant", content=error_msg)) | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
# Initialize variables for error handling | |
user_text = "" | |
user_files = [] | |
try: | |
# Handle multimodal input - message is a dict with 'text' and 'files' | |
user_text = message.get("text", "") if message else "" | |
user_files = message.get("files", []) if message else [] | |
# Handle case where message might be a string (backward compatibility) | |
if isinstance(message, str): | |
user_text = message | |
user_files = [] | |
logger.info(f"💬 Processing multimodal message with {backend_type} backend:") | |
logger.info(f" 📝 Text: {user_text}") | |
logger.info(f" 📁 Files: {len(user_files)} files uploaded") | |
logger.info(f" 📋 History type: {type(history)}, length: {len(history)}") | |
# Convert history to ChatMessage objects if needed | |
converted_history = [] | |
for i, msg in enumerate(history): | |
try: | |
if isinstance(msg, dict): | |
# Convert dict to ChatMessage for internal processing | |
logger.info(f" 📝 Converting dict message {i}: {msg.get('role', 'unknown')}") | |
converted_history.append(ChatMessage( | |
role=msg.get('role', 'assistant'), | |
content=msg.get('content', ''), | |
metadata=msg.get('metadata', None) | |
)) | |
else: | |
# Already a ChatMessage | |
logger.info(f" ✅ ChatMessage {i}: {getattr(msg, 'role', 'unknown')}") | |
converted_history.append(msg) | |
except Exception as conv_error: | |
logger.error(f"Error converting message {i}: {conv_error}") | |
logger.error(f"Message content: {msg}") | |
# Skip problematic messages | |
continue | |
history = converted_history | |
# Add uploaded files to chat history first | |
for file_path in user_files: | |
logger.info(f" 📄 File: {file_path}") | |
history.append(ChatMessage(role="user", content={"path": file_path})) | |
# Add text message if provided | |
if user_text and user_text.strip(): | |
history.append(ChatMessage(role="user", content=user_text)) | |
# If no text and no files, return early | |
if not user_text.strip() and not user_files: | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
# Create messages for LLM API | |
messages = self._prepare_llm_messages(history) | |
# Process the chat based on backend type | |
if backend_type == "anthropic": | |
response_messages = self._call_anthropic_api(messages, user_files) | |
else: # hf_inference | |
response_messages = self._call_hf_inference_api(messages, user_files) | |
# Add all response messages to history | |
history.extend(response_messages) | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
except Exception as e: | |
error_msg = f"❌ Error: {str(e)}" | |
logger.error(f"Chat error: {e}") | |
logger.error(traceback.format_exc()) | |
# Add user input to history if it exists | |
if user_text and user_text.strip(): | |
history.append(ChatMessage(role="user", content=user_text)) | |
if user_files: | |
for file_path in user_files: | |
history.append(ChatMessage(role="user", content={"path": file_path})) | |
history.append(ChatMessage(role="assistant", content=error_msg)) | |
return history, gr.MultimodalTextbox(value=None, interactive=False) | |
def _prepare_llm_messages(self, history: List) -> List[Dict[str, Any]]: | |
"""Convert history (ChatMessage or dict) to LLM API format""" | |
messages = [] | |
# Convert history to LLM API format (text only for context) | |
recent_history = history[-16:] if len(history) > 16 else history | |
for msg in recent_history: | |
# Handle both ChatMessage objects and dictionary format for backward compatibility | |
if hasattr(msg, 'role'): # ChatMessage object | |
role = msg.role | |
content = msg.content | |
elif isinstance(msg, dict) and 'role' in msg: # Dictionary format | |
role = msg.get('role') | |
content = msg.get('content') | |
else: | |
continue # Skip invalid messages | |
if role in ["user", "assistant"]: | |
# Convert any non-string content to string description for context | |
if isinstance(content, dict): | |
if "path" in content: | |
file_path = content.get('path', 'unknown') | |
# Determine file type for context | |
if AppConfig.is_image_file(file_path): | |
content = f"[User uploaded an image: {file_path}]" | |
elif AppConfig.is_audio_file(file_path): | |
content = f"[User uploaded an audio file: {file_path}]" | |
elif AppConfig.is_video_file(file_path): | |
content = f"[User uploaded a video file: {file_path}]" | |
else: | |
content = f"[User uploaded a file: {file_path}]" | |
else: | |
content = f"[Object: {str(content)[:50]}...]" | |
elif isinstance(content, (list, tuple)): | |
content = f"[List: {str(content)[:50]}...]" | |
elif content is None: | |
content = "[Empty]" | |
else: | |
content = str(content) | |
messages.append({ | |
"role": role, | |
"content": content | |
}) | |
return messages | |
def _call_anthropic_api(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[ChatMessage]: | |
"""Call Anthropic API (existing implementation)""" | |
# Check if we have MCP servers to use | |
if not self.mcp_client.servers: | |
return self._call_claude_without_mcp(messages) | |
else: | |
return self._call_claude_with_mcp(messages, user_files) | |
def _call_hf_inference_api(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[ChatMessage]: | |
"""Call HuggingFace Inference API with custom MCP implementation""" | |
# Run async call in sync context | |
def run_async(): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
return loop.run_until_complete( | |
self.mcp_client.call_llm_with_mcp(messages, user_files) | |
) | |
finally: | |
loop.close() | |
try: | |
return run_async() | |
except Exception as e: | |
logger.error(f"HF Inference API error: {e}") | |
return [ChatMessage(role="assistant", content=f"❌ Error with HF Inference: {str(e)}")] | |
def _call_claude_without_mcp(self, messages: List[Dict[str, Any]]) -> List[ChatMessage]: | |
"""Call Claude API without MCP servers""" | |
logger.info("💬 No MCP servers available, using regular Claude chat") | |
system_prompt = self._get_native_system_prompt() | |
# Use regular messages API | |
response = self.mcp_client.anthropic_client.messages.create( | |
model=AppConfig.CLAUDE_MODEL, | |
max_tokens=AppConfig.MAX_TOKENS, | |
system=system_prompt, | |
messages=messages | |
) | |
response_text = "" | |
for content in response.content: | |
if content.type == "text": | |
response_text += content.text | |
if not response_text: | |
response_text = "I understand your request and I'm here to help." | |
return [ChatMessage(role="assistant", content=response_text)] | |
def _call_claude_with_mcp(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[ChatMessage]: | |
"""Call Claude API with MCP servers and return structured responses""" | |
mcp_servers = [] | |
for server_name, config in self.mcp_client.servers.items(): | |
mcp_servers.append({ | |
"type": "url", | |
"url": config.url, | |
"name": server_name.replace(" ", "_").lower() | |
}) | |
# Enhanced system prompt with multimodal and MCP instructions | |
system_prompt = self._get_mcp_system_prompt(user_files) | |
# Debug logging | |
logger.info(f"📤 Sending {len(messages)} messages to Claude API") | |
logger.info(f"🔧 Using {len(mcp_servers)} MCP servers") | |
start_time = time.time() | |
# Call Claude with MCP connector using the correct beta API | |
response = self.mcp_client.anthropic_client.beta.messages.create( | |
model=AppConfig.CLAUDE_MODEL, | |
max_tokens=AppConfig.MAX_TOKENS, | |
system=system_prompt, | |
messages=messages, | |
mcp_servers=mcp_servers, | |
betas=[AppConfig.MCP_BETA_VERSION] | |
) | |
return self._process_mcp_response(response, start_time) | |
def _process_mcp_response(self, response, start_time: float) -> List[ChatMessage]: | |
"""Process Claude's response with MCP tool calls into structured ChatMessage objects""" | |
chat_messages = [] | |
current_tool_id = None | |
current_server_name = None | |
tool_start_time = None | |
text_segments = [] # Collect text segments separately | |
# Process Claude's response | |
for content in response.content: | |
if content.type == "text": | |
# Collect text segments but don't combine them yet | |
text_content = content.text | |
# Check if Claude indicated media was generated | |
if "MEDIA_GENERATED:" in text_content: | |
media_match = re.search(r"MEDIA_GENERATED:\s*([^\s]+)", text_content) | |
if media_match: | |
media_url = media_match.group(1) | |
# Clean up the response text | |
text_content = re.sub(r"MEDIA_GENERATED:\s*[^\s]+", "", text_content).strip() | |
logger.info(f"🎯 Claude indicated media generated: {media_url}") | |
# Add media as separate message | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content={"path": media_url} | |
)) | |
if text_content.strip(): | |
text_segments.append(text_content.strip()) | |
elif hasattr(content, 'type') and content.type == "mcp_tool_use": | |
# Add any accumulated text before tool use | |
if text_segments: | |
combined_text = " ".join(text_segments) | |
if combined_text.strip(): | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=combined_text.strip() | |
)) | |
text_segments = [] # Reset | |
tool_name = content.name | |
server_name = content.server_name | |
current_tool_id = getattr(content, 'id', 'unknown') | |
current_server_name = server_name | |
tool_start_time = time.time() | |
logger.info(f"🔧 Claude used MCP tool: {tool_name} on server: {server_name}") | |
# Create a "thinking" message for tool usage | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content="", | |
metadata={ | |
"title": f"🔧 Using {tool_name}", | |
"id": current_tool_id, | |
"status": "pending", | |
"log": f"Server: {server_name}" | |
} | |
)) | |
elif hasattr(content, 'type') and content.type == "mcp_tool_result": | |
tool_use_id = getattr(content, 'tool_use_id', 'unknown') | |
duration = time.time() - tool_start_time if tool_start_time else None | |
logger.info(f"📝 Processing MCP tool result (tool_use_id: {tool_use_id})") | |
# Update the pending tool message to completed | |
for msg in chat_messages: | |
if (msg.metadata and | |
msg.metadata.get("id") == current_tool_id and | |
msg.metadata.get("status") == "pending"): | |
msg.metadata["status"] = "done" | |
if duration: | |
msg.metadata["duration"] = round(duration, 2) | |
break | |
media_url = None | |
if content.content: | |
result_content = content.content[0] | |
result_text = result_content.text if hasattr(result_content, 'text') else str(result_content) | |
logger.info(f"📝 MCP tool result: {result_text[:200]}...") | |
# Try to extract media URL from the result | |
if current_server_name and current_server_name in self.mcp_client.servers: | |
config = self.mcp_client.servers[current_server_name] | |
extracted_media = self.mcp_client._extract_media_from_mcp_response(result_text, config) | |
if extracted_media: | |
media_url = extracted_media | |
logger.info(f"🎯 Extracted media from MCP result: {media_url}") | |
else: | |
# Fallback: try all servers to find media | |
for server_name, config in self.mcp_client.servers.items(): | |
extracted_media = self.mcp_client._extract_media_from_mcp_response(result_text, config) | |
if extracted_media: | |
media_url = extracted_media | |
logger.info(f"🎯 Extracted media from MCP result (fallback): {media_url}") | |
break | |
# Always show the full tool result | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=result_text, | |
metadata={ | |
"title": "📋 Tool Result", | |
"parent_id": current_tool_id, | |
"status": "done" | |
} | |
)) | |
# Only add separate media display if the tool result does NOT contain | |
# any Gradio file data structures that would be auto-rendered | |
if media_url and not self._contains_gradio_file_structure(result_text): | |
logger.info(f"🎯 Adding separate media display for: {media_url}") | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content={"path": media_url} | |
)) | |
else: | |
if media_url: | |
logger.info(f"🚫 Skipping separate media - tool result contains Gradio file structure") | |
else: | |
logger.info(f"🚫 No media URL extracted") | |
else: | |
# Add error message for failed tool call | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content="Tool call failed: No content returned", | |
metadata={ | |
"title": "❌ Tool Error", | |
"parent_id": current_tool_id, | |
"status": "done" | |
} | |
)) | |
# Add any remaining text segments after all processing | |
if text_segments: | |
combined_text = " ".join(text_segments) | |
if combined_text.strip(): | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content=combined_text.strip() | |
)) | |
# Fallback if no content was processed | |
if not chat_messages: | |
chat_messages.append(ChatMessage( | |
role="assistant", | |
content="I understand your request and I'm here to help." | |
)) | |
return chat_messages | |
def _contains_gradio_file_structure(self, text: str) -> bool: | |
"""Check if the text contains ANY Gradio file data structures that would be auto-rendered""" | |
# Check for key indicators of Gradio file structures | |
gradio_indicators = [ | |
# Gradio FileData type indicators | |
"'_type': 'gradio.FileData'", | |
'"_type": "gradio.FileData"', | |
'gradio.FileData', | |
# File structure patterns | |
"'path':", | |
'"path":', | |
"'url':", | |
'"url":', | |
"'orig_name':", | |
'"orig_name":', | |
"'mime_type':", | |
'"mime_type":', | |
'is_stream', | |
'meta_type', | |
# Common file result patterns | |
"{'image':", | |
'{"image":', | |
"{'audio':", | |
'{"audio":', | |
"{'video':", | |
'{"video":', | |
"{'file':", | |
'{"file":', | |
# List patterns that typically contain file objects | |
"[{'image'", | |
'[{"image"', | |
"[{'audio'", | |
'[{"audio"', | |
"[{'video'", | |
'[{"video"', | |
"[{'file'", | |
'[{"file"' | |
] | |
# If we find multiple indicators, it's likely a Gradio file structure | |
indicator_count = sum(1 for indicator in gradio_indicators if indicator in text) | |
# Also check for simple URL patterns (for audio case) | |
is_simple_url = (text.strip().startswith('http') and | |
len(text.strip().split()) == 1 and | |
any(ext in text.lower() for ext in ['.wav', '.mp3', '.mp4', '.png', '.jpg', '.jpeg', '.gif', '.svg', '.webm', '.ogg'])) | |
result = indicator_count >= 2 or is_simple_url | |
logger.debug(f"📋 File structure check: {indicator_count} indicators, simple_url: {is_simple_url}, result: {result}") | |
return result | |
def _get_native_system_prompt(self) -> str: | |
"""Get system prompt for Claude without MCP servers""" | |
return f"""You are Claude Sonnet 4, a helpful AI assistant with native multimodal capabilities. You can have conversations, answer questions, help with various tasks, and provide information on a wide range of topics. | |
YOUR NATIVE CAPABILITIES (Available right now): | |
- **Image Understanding**: You can directly see and describe images, analyze their content, read text in images, identify objects, people, scenes, etc. | |
- **Text Processing**: You can analyze, summarize, translate, and process text directly | |
- **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
- **Code Analysis**: You can read, analyze, and explain code | |
Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
IMPORTANT: You DO NOT need MCP servers for: | |
- Describing or analyzing uploaded images | |
- Reading text in images | |
- Identifying objects, people, or scenes in images | |
- General conversation and knowledge questions | |
You DO need MCP servers for: | |
- Creating new images, audio, or video | |
- Editing or transforming existing media files | |
- Transcribing audio files | |
- Processing non-image files (audio, video, documents) | |
If users upload images and ask you to describe or analyze them, use your native vision capabilities immediately. Only mention MCP servers if they ask for creation or editing tasks.""" | |
def _get_mcp_system_prompt(self, user_files: List[str]) -> str: | |
"""Get system prompt for Claude with MCP servers""" | |
uploaded_files_context = "" | |
if user_files: | |
uploaded_files_context = f"\n\nFILES UPLOADED BY USER:\n" | |
for i, file_path in enumerate(user_files, 1): | |
file_name = file_path.split('/')[-1] if '/' in file_path else file_path | |
if AppConfig.is_image_file(file_path): | |
file_type = "Image" | |
elif AppConfig.is_audio_file(file_path): | |
file_type = "Audio" | |
elif AppConfig.is_video_file(file_path): | |
file_type = "Video" | |
else: | |
file_type = "File" | |
uploaded_files_context += f"{i}. {file_type}: {file_name} (path: {file_path})\n" | |
return f"""You are Claude Sonnet 4, a helpful AI assistant with both native multimodal capabilities and access to various MCP tools. | |
YOUR NATIVE CAPABILITIES (No MCP tools needed): | |
- **Image Understanding**: You can directly see and describe images, analyze their content, read text in images, etc. | |
- **Text Processing**: You can analyze, summarize, translate, and process text directly | |
- **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
- **Code Analysis**: You can read, analyze, and explain code | |
WHEN TO USE MCP TOOLS: | |
- **Image Generation**: Creating new images from text prompts | |
- **Image Editing**: Modifying, enhancing, or transforming existing images | |
- **Audio Processing**: Transcribing audio, generating speech, audio enhancement | |
- **Video Processing**: Creating or editing videos | |
- **Specialized Analysis**: Tasks requiring specific models or APIs | |
UPLOADED FILES HANDLING: | |
{uploaded_files_context} | |
IMPORTANT - For uploaded images: | |
- **Image Description/Analysis**: Use your NATIVE vision capabilities - you can see and describe images directly | |
- **Image Editing/Enhancement**: Use MCP image processing tools | |
- **Image Generation**: Use MCP image generation tools | |
IMPORTANT - GRADIO MEDIA DISPLAY: | |
When MCP tools return media, end your response with "MEDIA_GENERATED: [URL]" where [URL] is the actual media URL. | |
Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
Available MCP servers: {list(self.mcp_client.servers.keys())}""" |