""" Chat handling logic for Universal MCP Client - Enhanced with Inference Provider Support """ import re import logging import traceback import asyncio from datetime import datetime from typing import Dict, Any, List, Tuple, Optional import gradio as gr from gradio import ChatMessage import time from config import AppConfig from mcp_client import UniversalMCPClient logger = logging.getLogger(__name__) class ChatHandler: """Handles chat interactions with multiple LLM backends and MCP servers using ChatMessage dataclass""" def __init__(self, mcp_client: UniversalMCPClient): self.mcp_client = mcp_client def process_multimodal_message(self, message: Dict[str, Any], history: List) -> Tuple[List[ChatMessage], Dict[str, Any]]: """Enhanced MCP chat function with multimodal input support and multiple LLM backends""" # Check if any LLM backend is configured backend_configured = False if self.mcp_client.anthropic_client and AppConfig.ANTHROPIC_API_KEY: backend_configured = True backend_type = "anthropic" elif self.mcp_client.hf_client and self.mcp_client.current_provider: backend_configured = True backend_type = "hf_inference" if not backend_configured: error_msg = "❌ No LLM backend configured. Please configure either Anthropic API key or HuggingFace Inference Provider." history.append(ChatMessage(role="user", content=error_msg)) history.append(ChatMessage(role="assistant", content=error_msg)) return history, gr.MultimodalTextbox(value=None, interactive=False) # Initialize variables for error handling user_text = "" user_files = [] try: # Handle multimodal input - message is a dict with 'text' and 'files' user_text = message.get("text", "") if message else "" user_files = message.get("files", []) if message else [] # Handle case where message might be a string (backward compatibility) if isinstance(message, str): user_text = message user_files = [] logger.info(f"💬 Processing multimodal message with {backend_type} backend:") logger.info(f" 📝 Text: {user_text}") logger.info(f" 📁 Files: {len(user_files)} files uploaded") logger.info(f" 📋 History type: {type(history)}, length: {len(history)}") # Convert history to ChatMessage objects if needed converted_history = [] for i, msg in enumerate(history): try: if isinstance(msg, dict): # Convert dict to ChatMessage for internal processing logger.info(f" 📝 Converting dict message {i}: {msg.get('role', 'unknown')}") converted_history.append(ChatMessage( role=msg.get('role', 'assistant'), content=msg.get('content', ''), metadata=msg.get('metadata', None) )) else: # Already a ChatMessage logger.info(f" ✅ ChatMessage {i}: {getattr(msg, 'role', 'unknown')}") converted_history.append(msg) except Exception as conv_error: logger.error(f"Error converting message {i}: {conv_error}") logger.error(f"Message content: {msg}") # Skip problematic messages continue history = converted_history # Add uploaded files to chat history first for file_path in user_files: logger.info(f" 📄 File: {file_path}") history.append(ChatMessage(role="user", content={"path": file_path})) # Add text message if provided if user_text and user_text.strip(): history.append(ChatMessage(role="user", content=user_text)) # If no text and no files, return early if not user_text.strip() and not user_files: return history, gr.MultimodalTextbox(value=None, interactive=False) # Create messages for LLM API messages = self._prepare_llm_messages(history) # Process the chat based on backend type if backend_type == "anthropic": response_messages = self._call_anthropic_api(messages, user_files) else: # hf_inference response_messages = self._call_hf_inference_api(messages, user_files) # Add all response messages to history history.extend(response_messages) return history, gr.MultimodalTextbox(value=None, interactive=False) except Exception as e: error_msg = f"❌ Error: {str(e)}" logger.error(f"Chat error: {e}") logger.error(traceback.format_exc()) # Add user input to history if it exists if user_text and user_text.strip(): history.append(ChatMessage(role="user", content=user_text)) if user_files: for file_path in user_files: history.append(ChatMessage(role="user", content={"path": file_path})) history.append(ChatMessage(role="assistant", content=error_msg)) return history, gr.MultimodalTextbox(value=None, interactive=False) def _prepare_llm_messages(self, history: List) -> List[Dict[str, Any]]: """Convert history (ChatMessage or dict) to LLM API format""" messages = [] # Convert history to LLM API format (text only for context) recent_history = history[-16:] if len(history) > 16 else history for msg in recent_history: # Handle both ChatMessage objects and dictionary format for backward compatibility if hasattr(msg, 'role'): # ChatMessage object role = msg.role content = msg.content elif isinstance(msg, dict) and 'role' in msg: # Dictionary format role = msg.get('role') content = msg.get('content') else: continue # Skip invalid messages if role in ["user", "assistant"]: # Convert any non-string content to string description for context if isinstance(content, dict): if "path" in content: file_path = content.get('path', 'unknown') # Determine file type for context if AppConfig.is_image_file(file_path): content = f"[User uploaded an image: {file_path}]" elif AppConfig.is_audio_file(file_path): content = f"[User uploaded an audio file: {file_path}]" elif AppConfig.is_video_file(file_path): content = f"[User uploaded a video file: {file_path}]" else: content = f"[User uploaded a file: {file_path}]" else: content = f"[Object: {str(content)[:50]}...]" elif isinstance(content, (list, tuple)): content = f"[List: {str(content)[:50]}...]" elif content is None: content = "[Empty]" else: content = str(content) messages.append({ "role": role, "content": content }) return messages def _call_anthropic_api(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[ChatMessage]: """Call Anthropic API (existing implementation)""" # Check if we have MCP servers to use if not self.mcp_client.servers: return self._call_claude_without_mcp(messages) else: return self._call_claude_with_mcp(messages, user_files) def _call_hf_inference_api(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[ChatMessage]: """Call HuggingFace Inference API with custom MCP implementation""" # Run async call in sync context def run_async(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete( self.mcp_client.call_llm_with_mcp(messages, user_files) ) finally: loop.close() try: return run_async() except Exception as e: logger.error(f"HF Inference API error: {e}") return [ChatMessage(role="assistant", content=f"❌ Error with HF Inference: {str(e)}")] def _call_claude_without_mcp(self, messages: List[Dict[str, Any]]) -> List[ChatMessage]: """Call Claude API without MCP servers""" logger.info("💬 No MCP servers available, using regular Claude chat") system_prompt = self._get_native_system_prompt() # Use regular messages API response = self.mcp_client.anthropic_client.messages.create( model=AppConfig.CLAUDE_MODEL, max_tokens=AppConfig.MAX_TOKENS, system=system_prompt, messages=messages ) response_text = "" for content in response.content: if content.type == "text": response_text += content.text if not response_text: response_text = "I understand your request and I'm here to help." return [ChatMessage(role="assistant", content=response_text)] def _call_claude_with_mcp(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[ChatMessage]: """Call Claude API with MCP servers and return structured responses""" mcp_servers = [] for server_name, config in self.mcp_client.servers.items(): mcp_servers.append({ "type": "url", "url": config.url, "name": server_name.replace(" ", "_").lower() }) # Enhanced system prompt with multimodal and MCP instructions system_prompt = self._get_mcp_system_prompt(user_files) # Debug logging logger.info(f"📤 Sending {len(messages)} messages to Claude API") logger.info(f"🔧 Using {len(mcp_servers)} MCP servers") start_time = time.time() # Call Claude with MCP connector using the correct beta API response = self.mcp_client.anthropic_client.beta.messages.create( model=AppConfig.CLAUDE_MODEL, max_tokens=AppConfig.MAX_TOKENS, system=system_prompt, messages=messages, mcp_servers=mcp_servers, betas=[AppConfig.MCP_BETA_VERSION] ) return self._process_mcp_response(response, start_time) def _process_mcp_response(self, response, start_time: float) -> List[ChatMessage]: """Process Claude's response with MCP tool calls into structured ChatMessage objects""" chat_messages = [] current_tool_id = None current_server_name = None tool_start_time = None text_segments = [] # Collect text segments separately # Process Claude's response for content in response.content: if content.type == "text": # Collect text segments but don't combine them yet text_content = content.text # Check if Claude indicated media was generated if "MEDIA_GENERATED:" in text_content: media_match = re.search(r"MEDIA_GENERATED:\s*([^\s]+)", text_content) if media_match: media_url = media_match.group(1) # Clean up the response text text_content = re.sub(r"MEDIA_GENERATED:\s*[^\s]+", "", text_content).strip() logger.info(f"🎯 Claude indicated media generated: {media_url}") # Add media as separate message chat_messages.append(ChatMessage( role="assistant", content={"path": media_url} )) if text_content.strip(): text_segments.append(text_content.strip()) elif hasattr(content, 'type') and content.type == "mcp_tool_use": # Add any accumulated text before tool use if text_segments: combined_text = " ".join(text_segments) if combined_text.strip(): chat_messages.append(ChatMessage( role="assistant", content=combined_text.strip() )) text_segments = [] # Reset tool_name = content.name server_name = content.server_name current_tool_id = getattr(content, 'id', 'unknown') current_server_name = server_name tool_start_time = time.time() logger.info(f"🔧 Claude used MCP tool: {tool_name} on server: {server_name}") # Create a "thinking" message for tool usage chat_messages.append(ChatMessage( role="assistant", content="", metadata={ "title": f"🔧 Using {tool_name}", "id": current_tool_id, "status": "pending", "log": f"Server: {server_name}" } )) elif hasattr(content, 'type') and content.type == "mcp_tool_result": tool_use_id = getattr(content, 'tool_use_id', 'unknown') duration = time.time() - tool_start_time if tool_start_time else None logger.info(f"📝 Processing MCP tool result (tool_use_id: {tool_use_id})") # Update the pending tool message to completed for msg in chat_messages: if (msg.metadata and msg.metadata.get("id") == current_tool_id and msg.metadata.get("status") == "pending"): msg.metadata["status"] = "done" if duration: msg.metadata["duration"] = round(duration, 2) break media_url = None if content.content: result_content = content.content[0] result_text = result_content.text if hasattr(result_content, 'text') else str(result_content) logger.info(f"📝 MCP tool result: {result_text[:200]}...") # Try to extract media URL from the result if current_server_name and current_server_name in self.mcp_client.servers: config = self.mcp_client.servers[current_server_name] extracted_media = self.mcp_client._extract_media_from_mcp_response(result_text, config) if extracted_media: media_url = extracted_media logger.info(f"🎯 Extracted media from MCP result: {media_url}") else: # Fallback: try all servers to find media for server_name, config in self.mcp_client.servers.items(): extracted_media = self.mcp_client._extract_media_from_mcp_response(result_text, config) if extracted_media: media_url = extracted_media logger.info(f"🎯 Extracted media from MCP result (fallback): {media_url}") break # Always show the full tool result chat_messages.append(ChatMessage( role="assistant", content=result_text, metadata={ "title": "📋 Tool Result", "parent_id": current_tool_id, "status": "done" } )) # Only add separate media display if the tool result does NOT contain # any Gradio file data structures that would be auto-rendered if media_url and not self._contains_gradio_file_structure(result_text): logger.info(f"🎯 Adding separate media display for: {media_url}") chat_messages.append(ChatMessage( role="assistant", content={"path": media_url} )) else: if media_url: logger.info(f"🚫 Skipping separate media - tool result contains Gradio file structure") else: logger.info(f"🚫 No media URL extracted") else: # Add error message for failed tool call chat_messages.append(ChatMessage( role="assistant", content="Tool call failed: No content returned", metadata={ "title": "❌ Tool Error", "parent_id": current_tool_id, "status": "done" } )) # Add any remaining text segments after all processing if text_segments: combined_text = " ".join(text_segments) if combined_text.strip(): chat_messages.append(ChatMessage( role="assistant", content=combined_text.strip() )) # Fallback if no content was processed if not chat_messages: chat_messages.append(ChatMessage( role="assistant", content="I understand your request and I'm here to help." )) return chat_messages def _contains_gradio_file_structure(self, text: str) -> bool: """Check if the text contains ANY Gradio file data structures that would be auto-rendered""" # Check for key indicators of Gradio file structures gradio_indicators = [ # Gradio FileData type indicators "'_type': 'gradio.FileData'", '"_type": "gradio.FileData"', 'gradio.FileData', # File structure patterns "'path':", '"path":', "'url':", '"url":', "'orig_name':", '"orig_name":', "'mime_type':", '"mime_type":', 'is_stream', 'meta_type', # Common file result patterns "{'image':", '{"image":', "{'audio':", '{"audio":', "{'video':", '{"video":', "{'file':", '{"file":', # List patterns that typically contain file objects "[{'image'", '[{"image"', "[{'audio'", '[{"audio"', "[{'video'", '[{"video"', "[{'file'", '[{"file"' ] # If we find multiple indicators, it's likely a Gradio file structure indicator_count = sum(1 for indicator in gradio_indicators if indicator in text) # Also check for simple URL patterns (for audio case) is_simple_url = (text.strip().startswith('http') and len(text.strip().split()) == 1 and any(ext in text.lower() for ext in ['.wav', '.mp3', '.mp4', '.png', '.jpg', '.jpeg', '.gif', '.svg', '.webm', '.ogg'])) result = indicator_count >= 2 or is_simple_url logger.debug(f"📋 File structure check: {indicator_count} indicators, simple_url: {is_simple_url}, result: {result}") return result def _get_native_system_prompt(self) -> str: """Get system prompt for Claude without MCP servers""" return f"""You are Claude Sonnet 4, a helpful AI assistant with native multimodal capabilities. You can have conversations, answer questions, help with various tasks, and provide information on a wide range of topics. YOUR NATIVE CAPABILITIES (Available right now): - **Image Understanding**: You can directly see and describe images, analyze their content, read text in images, identify objects, people, scenes, etc. - **Text Processing**: You can analyze, summarize, translate, and process text directly - **General Knowledge**: You can answer questions, explain concepts, and have conversations - **Code Analysis**: You can read, analyze, and explain code Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} IMPORTANT: You DO NOT need MCP servers for: - Describing or analyzing uploaded images - Reading text in images - Identifying objects, people, or scenes in images - General conversation and knowledge questions You DO need MCP servers for: - Creating new images, audio, or video - Editing or transforming existing media files - Transcribing audio files - Processing non-image files (audio, video, documents) If users upload images and ask you to describe or analyze them, use your native vision capabilities immediately. Only mention MCP servers if they ask for creation or editing tasks.""" def _get_mcp_system_prompt(self, user_files: List[str]) -> str: """Get system prompt for Claude with MCP servers""" uploaded_files_context = "" if user_files: uploaded_files_context = f"\n\nFILES UPLOADED BY USER:\n" for i, file_path in enumerate(user_files, 1): file_name = file_path.split('/')[-1] if '/' in file_path else file_path if AppConfig.is_image_file(file_path): file_type = "Image" elif AppConfig.is_audio_file(file_path): file_type = "Audio" elif AppConfig.is_video_file(file_path): file_type = "Video" else: file_type = "File" uploaded_files_context += f"{i}. {file_type}: {file_name} (path: {file_path})\n" return f"""You are Claude Sonnet 4, a helpful AI assistant with both native multimodal capabilities and access to various MCP tools. YOUR NATIVE CAPABILITIES (No MCP tools needed): - **Image Understanding**: You can directly see and describe images, analyze their content, read text in images, etc. - **Text Processing**: You can analyze, summarize, translate, and process text directly - **General Knowledge**: You can answer questions, explain concepts, and have conversations - **Code Analysis**: You can read, analyze, and explain code WHEN TO USE MCP TOOLS: - **Image Generation**: Creating new images from text prompts - **Image Editing**: Modifying, enhancing, or transforming existing images - **Audio Processing**: Transcribing audio, generating speech, audio enhancement - **Video Processing**: Creating or editing videos - **Specialized Analysis**: Tasks requiring specific models or APIs UPLOADED FILES HANDLING: {uploaded_files_context} IMPORTANT - For uploaded images: - **Image Description/Analysis**: Use your NATIVE vision capabilities - you can see and describe images directly - **Image Editing/Enhancement**: Use MCP image processing tools - **Image Generation**: Use MCP image generation tools IMPORTANT - GRADIO MEDIA DISPLAY: When MCP tools return media, end your response with "MEDIA_GENERATED: [URL]" where [URL] is the actual media URL. Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Available MCP servers: {list(self.mcp_client.servers.keys())}"""