Spaces:
Sleeping
Sleeping
| """ | |
| Chat handling logic for Universal MCP Client - Updated with ChatMessage support | |
| """ | |
| import re | |
| import logging | |
| import traceback | |
| from datetime import datetime | |
| from typing import Dict, Any, List, Tuple, Optional | |
| import gradio as gr | |
| from gradio import ChatMessage | |
| import time | |
| from config import AppConfig | |
| from mcp_client import UniversalMCPClient | |
| logger = logging.getLogger(__name__) | |
| class ChatHandler: | |
| """Handles chat interactions with Claude and MCP servers using ChatMessage dataclass""" | |
| def __init__(self, mcp_client: UniversalMCPClient): | |
| self.mcp_client = mcp_client | |
| def process_multimodal_message(self, message: Dict[str, Any], history: List) -> Tuple[List[ChatMessage], Dict[str, Any]]: | |
| """Enhanced MCP chat function with multimodal input support and ChatMessage formatting""" | |
| if not self.mcp_client.anthropic_client: | |
| error_msg = "❌ Anthropic API key not configured. Please set ANTHROPIC_API_KEY environment variable." | |
| history.append(ChatMessage(role="user", content=error_msg)) | |
| history.append(ChatMessage(role="assistant", content=error_msg)) | |
| return history, gr.MultimodalTextbox(value=None, interactive=False) | |
| # Initialize variables for error handling | |
| user_text = "" | |
| user_files = [] | |
| try: | |
| # Handle multimodal input - message is a dict with 'text' and 'files' | |
| user_text = message.get("text", "") if message else "" | |
| user_files = message.get("files", []) if message else [] | |
| # Handle case where message might be a string (backward compatibility) | |
| if isinstance(message, str): | |
| user_text = message | |
| user_files = [] | |
| logger.info(f"💬 Processing multimodal message:") | |
| logger.info(f" 📝 Text: {user_text}") | |
| logger.info(f" 📁 Files: {len(user_files)} files uploaded") | |
| logger.info(f" 📋 History type: {type(history)}, length: {len(history)}") | |
| # Convert history to ChatMessage objects if needed | |
| converted_history = [] | |
| for i, msg in enumerate(history): | |
| try: | |
| if isinstance(msg, dict): | |
| # Convert dict to ChatMessage for internal processing | |
| logger.info(f" 📝 Converting dict message {i}: {msg.get('role', 'unknown')}") | |
| converted_history.append(ChatMessage( | |
| role=msg.get('role', 'assistant'), | |
| content=msg.get('content', ''), | |
| metadata=msg.get('metadata', None) | |
| )) | |
| else: | |
| # Already a ChatMessage | |
| logger.info(f" ✅ ChatMessage {i}: {getattr(msg, 'role', 'unknown')}") | |
| converted_history.append(msg) | |
| except Exception as conv_error: | |
| logger.error(f"Error converting message {i}: {conv_error}") | |
| logger.error(f"Message content: {msg}") | |
| # Skip problematic messages | |
| continue | |
| history = converted_history | |
| # Add uploaded files to chat history first | |
| for file_path in user_files: | |
| logger.info(f" 📄 File: {file_path}") | |
| history.append(ChatMessage(role="user", content={"path": file_path})) | |
| # Add text message if provided | |
| if user_text and user_text.strip(): | |
| history.append(ChatMessage(role="user", content=user_text)) | |
| # If no text and no files, return early | |
| if not user_text.strip() and not user_files: | |
| return history, gr.MultimodalTextbox(value=None, interactive=False) | |
| # Create messages for Claude API | |
| messages = self._prepare_claude_messages(history) | |
| # Process the chat and get structured responses | |
| response_messages = self._call_claude_api(messages, user_files) | |
| # Add all response messages to history | |
| history.extend(response_messages) | |
| return history, gr.MultimodalTextbox(value=None, interactive=False) | |
| except Exception as e: | |
| error_msg = f"❌ Error: {str(e)}" | |
| logger.error(f"Chat error: {e}") | |
| logger.error(traceback.format_exc()) | |
| # Add user input to history if it exists | |
| if user_text and user_text.strip(): | |
| history.append(ChatMessage(role="user", content=user_text)) | |
| if user_files: | |
| for file_path in user_files: | |
| history.append(ChatMessage(role="user", content={"path": file_path})) | |
| history.append(ChatMessage(role="assistant", content=error_msg)) | |
| return history, gr.MultimodalTextbox(value=None, interactive=False) | |
| def _prepare_claude_messages(self, history: List) -> List[Dict[str, Any]]: | |
| """Convert history (ChatMessage or dict) to Claude API format""" | |
| messages = [] | |
| # Convert history to Claude API format (text only for context) | |
| recent_history = history[-16:] if len(history) > 16 else history | |
| for msg in recent_history: | |
| # Handle both ChatMessage objects and dictionary format for backward compatibility | |
| if hasattr(msg, 'role'): # ChatMessage object | |
| role = msg.role | |
| content = msg.content | |
| elif isinstance(msg, dict) and 'role' in msg: # Dictionary format | |
| role = msg.get('role') | |
| content = msg.get('content') | |
| else: | |
| continue # Skip invalid messages | |
| if role in ["user", "assistant"]: | |
| # Convert any non-string content to string description for context | |
| if isinstance(content, dict): | |
| if "path" in content: | |
| file_path = content.get('path', 'unknown') | |
| # Determine file type for context | |
| if AppConfig.is_image_file(file_path): | |
| content = f"[User uploaded an image: {file_path}]" | |
| elif AppConfig.is_audio_file(file_path): | |
| content = f"[User uploaded an audio file: {file_path}]" | |
| elif AppConfig.is_video_file(file_path): | |
| content = f"[User uploaded a video file: {file_path}]" | |
| else: | |
| content = f"[User uploaded a file: {file_path}]" | |
| else: | |
| content = f"[Object: {str(content)[:50]}...]" | |
| elif isinstance(content, (list, tuple)): | |
| content = f"[List: {str(content)[:50]}...]" | |
| elif content is None: | |
| content = "[Empty]" | |
| else: | |
| content = str(content) | |
| messages.append({ | |
| "role": role, | |
| "content": content | |
| }) | |
| return messages | |
| def _call_claude_api(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[ChatMessage]: | |
| """Call Claude API and return structured ChatMessage responses""" | |
| # Check if we have MCP servers to use | |
| if not self.mcp_client.servers: | |
| return self._call_claude_without_mcp(messages) | |
| else: | |
| return self._call_claude_with_mcp(messages, user_files) | |
| def _call_claude_without_mcp(self, messages: List[Dict[str, Any]]) -> List[ChatMessage]: | |
| """Call Claude API without MCP servers""" | |
| logger.info("💬 No MCP servers available, using regular Claude chat") | |
| system_prompt = self._get_native_system_prompt() | |
| # Use regular messages API | |
| response = self.mcp_client.anthropic_client.messages.create( | |
| model=AppConfig.CLAUDE_MODEL, | |
| max_tokens=AppConfig.MAX_TOKENS, | |
| system=system_prompt, | |
| messages=messages | |
| ) | |
| response_text = "" | |
| for content in response.content: | |
| if content.type == "text": | |
| response_text += content.text | |
| if not response_text: | |
| response_text = "I understand your request and I'm here to help." | |
| return [ChatMessage(role="assistant", content=response_text)] | |
| def _call_claude_with_mcp(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[ChatMessage]: | |
| """Call Claude API with MCP servers and return structured responses""" | |
| mcp_servers = [] | |
| for server_name, config in self.mcp_client.servers.items(): | |
| mcp_servers.append({ | |
| "type": "url", | |
| "url": config.url, | |
| "name": server_name.replace(" ", "_").lower() | |
| }) | |
| # Enhanced system prompt with multimodal and MCP instructions | |
| system_prompt = self._get_mcp_system_prompt(user_files) | |
| # Debug logging | |
| logger.info(f"📤 Sending {len(messages)} messages to Claude API") | |
| logger.info(f"🔧 Using {len(mcp_servers)} MCP servers") | |
| start_time = time.time() | |
| # Call Claude with MCP connector using the correct beta API | |
| response = self.mcp_client.anthropic_client.beta.messages.create( | |
| model=AppConfig.CLAUDE_MODEL, | |
| max_tokens=AppConfig.MAX_TOKENS, | |
| system=system_prompt, | |
| messages=messages, | |
| mcp_servers=mcp_servers, | |
| betas=[AppConfig.MCP_BETA_VERSION] | |
| ) | |
| return self._process_mcp_response(response, start_time) | |
| def _process_mcp_response(self, response, start_time: float) -> List[ChatMessage]: | |
| """Process Claude's response with MCP tool calls into structured ChatMessage objects""" | |
| chat_messages = [] | |
| current_tool_id = None | |
| current_server_name = None | |
| tool_start_time = None | |
| main_response_content = "" | |
| # Process Claude's response | |
| for content in response.content: | |
| if content.type == "text": | |
| main_response_content += content.text | |
| # Check if Claude indicated media was generated | |
| if "MEDIA_GENERATED:" in content.text: | |
| media_match = re.search(r"MEDIA_GENERATED:\s*([^\s]+)", content.text) | |
| if media_match: | |
| media_url = media_match.group(1) | |
| # Clean up the response text | |
| main_response_content = re.sub(r"MEDIA_GENERATED:\s*[^\s]+", "", main_response_content).strip() | |
| logger.info(f"🎯 Claude indicated media generated: {media_url}") | |
| # Add media as separate message | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content={"path": media_url} | |
| )) | |
| elif hasattr(content, 'type') and content.type == "mcp_tool_use": | |
| tool_name = content.name | |
| server_name = content.server_name | |
| current_tool_id = getattr(content, 'id', 'unknown') | |
| current_server_name = server_name | |
| tool_start_time = time.time() | |
| logger.info(f"🔧 Claude used MCP tool: {tool_name} on server: {server_name}") | |
| # Create a "thinking" message for tool usage | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content="", | |
| metadata={ | |
| "title": f"🔧 Using {tool_name}", | |
| "id": current_tool_id, | |
| "status": "pending", | |
| "log": f"Server: {server_name}" | |
| } | |
| )) | |
| elif hasattr(content, 'type') and content.type == "mcp_tool_result": | |
| tool_use_id = getattr(content, 'tool_use_id', 'unknown') | |
| duration = time.time() - tool_start_time if tool_start_time else None | |
| logger.info(f"📝 Processing MCP tool result (tool_use_id: {tool_use_id})") | |
| # Update the pending tool message to completed | |
| for msg in chat_messages: | |
| if (msg.metadata and | |
| msg.metadata.get("id") == current_tool_id and | |
| msg.metadata.get("status") == "pending"): | |
| msg.metadata["status"] = "done" | |
| if duration: | |
| msg.metadata["duration"] = round(duration, 2) | |
| break | |
| if content.content: | |
| result_content = content.content[0] | |
| result_text = result_content.text if hasattr(result_content, 'text') else str(result_content) | |
| logger.info(f"📝 MCP tool result: {result_text[:200]}...") | |
| # Create a result message with metadata | |
| result_msg = ChatMessage( | |
| role="assistant", | |
| content=result_text, | |
| metadata={ | |
| "title": "📋 Tool Result", | |
| "parent_id": current_tool_id, | |
| "status": "done" | |
| } | |
| ) | |
| chat_messages.append(result_msg) | |
| # Try to extract media from the result | |
| media_url = None | |
| if current_server_name and current_server_name in self.mcp_client.servers: | |
| config = self.mcp_client.servers[current_server_name] | |
| extracted_media = self.mcp_client._extract_media_from_mcp_response(result_text, config) | |
| if extracted_media: | |
| media_url = extracted_media | |
| logger.info(f"🎯 Extracted media from MCP result: {media_url}") | |
| else: | |
| # Fallback: try all servers to find media | |
| for server_name, config in self.mcp_client.servers.items(): | |
| extracted_media = self.mcp_client._extract_media_from_mcp_response(result_text, config) | |
| if extracted_media: | |
| media_url = extracted_media | |
| logger.info(f"🎯 Extracted media from MCP result (fallback): {media_url}") | |
| break | |
| # Add media as separate message if found | |
| if media_url: | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content={"path": media_url} | |
| )) | |
| else: | |
| # Add error message for failed tool call | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content="Tool call failed: No content returned", | |
| metadata={ | |
| "title": "❌ Tool Error", | |
| "parent_id": current_tool_id, | |
| "status": "done" | |
| } | |
| )) | |
| # Add the main response if there's any text content | |
| if main_response_content.strip(): | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content=main_response_content.strip() | |
| )) | |
| elif not chat_messages: | |
| # Fallback if no content was processed | |
| chat_messages.append(ChatMessage( | |
| role="assistant", | |
| content="I understand your request and I'm here to help." | |
| )) | |
| return chat_messages | |
| def _get_native_system_prompt(self) -> str: | |
| """Get system prompt for Claude without MCP servers""" | |
| return f"""You are Claude Sonnet 4, a helpful AI assistant with native multimodal capabilities. You can have conversations, answer questions, help with various tasks, and provide information on a wide range of topics. | |
| YOUR NATIVE CAPABILITIES (Available right now): | |
| - **Image Understanding**: You can directly see and describe images, analyze their content, read text in images, identify objects, people, scenes, etc. | |
| - **Text Processing**: You can analyze, summarize, translate, and process text directly | |
| - **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
| - **Code Analysis**: You can read, analyze, and explain code | |
| Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| IMPORTANT: You DO NOT need MCP servers for: | |
| - Describing or analyzing uploaded images | |
| - Reading text in images | |
| - Identifying objects, people, or scenes in images | |
| - General conversation and knowledge questions | |
| You DO need MCP servers for: | |
| - Creating new images, audio, or video | |
| - Editing or transforming existing media files | |
| - Transcribing audio files | |
| - Processing non-image files (audio, video, documents) | |
| If users upload images and ask you to describe or analyze them, use your native vision capabilities immediately. Only mention MCP servers if they ask for creation or editing tasks.""" | |
| def _get_mcp_system_prompt(self, user_files: List[str]) -> str: | |
| """Get system prompt for Claude with MCP servers""" | |
| uploaded_files_context = "" | |
| if user_files: | |
| uploaded_files_context = f"\n\nFILES UPLOADED BY USER:\n" | |
| for i, file_path in enumerate(user_files, 1): | |
| file_name = file_path.split('/')[-1] if '/' in file_path else file_path | |
| if AppConfig.is_image_file(file_path): | |
| file_type = "Image" | |
| elif AppConfig.is_audio_file(file_path): | |
| file_type = "Audio" | |
| elif AppConfig.is_video_file(file_path): | |
| file_type = "Video" | |
| else: | |
| file_type = "File" | |
| uploaded_files_context += f"{i}. {file_type}: {file_name} (path: {file_path})\n" | |
| return f"""You are Claude Sonnet 4, a helpful AI assistant with both native multimodal capabilities and access to various MCP tools. | |
| YOUR NATIVE CAPABILITIES (No MCP tools needed): | |
| - **Image Understanding**: You can directly see and describe images, analyze their content, read text in images, etc. | |
| - **Text Processing**: You can analyze, summarize, translate, and process text directly | |
| - **General Knowledge**: You can answer questions, explain concepts, and have conversations | |
| - **Code Analysis**: You can read, analyze, and explain code | |
| WHEN TO USE MCP TOOLS: | |
| - **Image Generation**: Creating new images from text prompts | |
| - **Image Editing**: Modifying, enhancing, or transforming existing images | |
| - **Audio Processing**: Transcribing audio, generating speech, audio enhancement | |
| - **Video Processing**: Creating or editing videos | |
| - **Specialized Analysis**: Tasks requiring specific models or APIs | |
| UPLOADED FILES HANDLING: | |
| {uploaded_files_context} | |
| IMPORTANT - For uploaded images: | |
| - **Image Description/Analysis**: Use your NATIVE vision capabilities - you can see and describe images directly | |
| - **Image Editing/Enhancement**: Use MCP image processing tools | |
| - **Image Generation**: Use MCP image generation tools | |
| IMPORTANT - File URL Conversion for MCP Tools: | |
| When using MCP tools that require file inputs, you need to be aware that uploaded files have local paths that remote MCP servers cannot access. | |
| For uploaded files in MCP tool calls: | |
| - If an MCP tool fails with "Invalid file data format" or similar errors about file paths | |
| - The issue is that remote MCP servers cannot access local file paths like '/tmp/gradio/...' | |
| - In such cases, inform the user that the MCP server requires files to be accessible via public URLs | |
| - Suggest that they need a "File Upload" MCP server or that the specific MCP server may need configuration for file handling | |
| Current uploaded files that may need URL conversion: | |
| {uploaded_files_context} | |
| IMPORTANT - GRADIO MEDIA DISPLAY: | |
| When MCP tools return media, end your response with "MEDIA_GENERATED: [URL]" where [URL] is the actual media URL. | |
| Examples: | |
| - User uploads image + "What's in this image?" → Use NATIVE vision (no MCP needed) | |
| - User uploads image + "Make this vintage" → Use MCP image editing tool | |
| - User says "Generate a sunset image" → Use MCP image generation tool | |
| - User uploads audio + "Transcribe this" → Use MCP transcription tool | |
| Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| Available MCP servers: {list(self.mcp_client.servers.keys())}""" |