File size: 21,401 Bytes
ca9d99e
358b475
ca9d99e
 
 
 
 
 
 
358b475
 
ca9d99e
 
 
 
 
 
 
358b475
ca9d99e
 
 
 
ed8fa70
358b475
ca9d99e
 
 
358b475
 
ca9d99e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed8fa70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca9d99e
 
 
 
358b475
ca9d99e
 
 
358b475
ca9d99e
 
 
 
 
 
 
 
358b475
 
ca9d99e
358b475
 
ca9d99e
 
 
 
 
 
 
 
 
 
358b475
ca9d99e
 
358b475
ca9d99e
358b475
ca9d99e
 
ed8fa70
 
ca9d99e
 
 
 
 
ed8fa70
 
 
358b475
ed8fa70
 
 
 
 
 
 
ca9d99e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed8fa70
ca9d99e
 
 
 
 
358b475
 
ca9d99e
 
 
 
 
 
 
358b475
ca9d99e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
358b475
 
 
 
ca9d99e
358b475
 
ca9d99e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
358b475
 
ca9d99e
 
 
 
 
 
 
 
 
 
358b475
ca9d99e
358b475
 
 
 
 
 
 
ca9d99e
 
 
 
358b475
ca9d99e
 
 
 
 
 
358b475
ca9d99e
358b475
 
 
 
 
ca9d99e
 
 
 
358b475
 
 
ca9d99e
 
 
358b475
 
 
 
 
 
 
 
 
 
 
ca9d99e
 
 
358b475
ca9d99e
 
 
358b475
 
 
 
 
 
 
 
 
 
ca9d99e
 
 
 
 
 
358b475
 
 
 
 
 
 
 
 
 
 
 
ca9d99e
358b475
 
ca9d99e
 
 
 
 
 
 
 
 
 
 
 
 
 
358b475
 
 
 
 
 
 
 
ca9d99e
358b475
 
 
 
 
 
 
 
 
 
ca9d99e
358b475
 
 
 
 
 
 
 
 
 
 
 
ca9d99e
358b475
ca9d99e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
"""
Chat handling logic for Universal MCP Client - Updated with ChatMessage support
"""
import re
import logging
import traceback
from datetime import datetime
from typing import Dict, Any, List, Tuple, Optional
import gradio as gr
from gradio import ChatMessage
import time

from config import AppConfig
from mcp_client import UniversalMCPClient

logger = logging.getLogger(__name__)

class ChatHandler:
    """Handles chat interactions with Claude and MCP servers using ChatMessage dataclass"""
    
    def __init__(self, mcp_client: UniversalMCPClient):
        self.mcp_client = mcp_client
    
    def process_multimodal_message(self, message: Dict[str, Any], history: List) -> Tuple[List[ChatMessage], Dict[str, Any]]:
        """Enhanced MCP chat function with multimodal input support and ChatMessage formatting"""
        
        if not self.mcp_client.anthropic_client:
            error_msg = "❌ Anthropic API key not configured. Please set ANTHROPIC_API_KEY environment variable."
            history.append(ChatMessage(role="user", content=error_msg))
            history.append(ChatMessage(role="assistant", content=error_msg))
            return history, gr.MultimodalTextbox(value=None, interactive=False)
        
        # Initialize variables for error handling
        user_text = ""
        user_files = []
        
        try:
            # Handle multimodal input - message is a dict with 'text' and 'files'
            user_text = message.get("text", "") if message else ""
            user_files = message.get("files", []) if message else []
            
            # Handle case where message might be a string (backward compatibility)
            if isinstance(message, str):
                user_text = message
                user_files = []
            
            logger.info(f"πŸ’¬ Processing multimodal message:")
            logger.info(f"  πŸ“ Text: {user_text}")
            logger.info(f"  πŸ“ Files: {len(user_files)} files uploaded")
            logger.info(f"  πŸ“‹ History type: {type(history)}, length: {len(history)}")
            
            # Convert history to ChatMessage objects if needed
            converted_history = []
            for i, msg in enumerate(history):
                try:
                    if isinstance(msg, dict):
                        # Convert dict to ChatMessage for internal processing
                        logger.info(f"  πŸ“ Converting dict message {i}: {msg.get('role', 'unknown')}")
                        converted_history.append(ChatMessage(
                            role=msg.get('role', 'assistant'),
                            content=msg.get('content', ''),
                            metadata=msg.get('metadata', None)
                        ))
                    else:
                        # Already a ChatMessage
                        logger.info(f"  βœ… ChatMessage {i}: {getattr(msg, 'role', 'unknown')}")
                        converted_history.append(msg)
                except Exception as conv_error:
                    logger.error(f"Error converting message {i}: {conv_error}")
                    logger.error(f"Message content: {msg}")
                    # Skip problematic messages
                    continue
            
            history = converted_history
            
            # Add uploaded files to chat history first
            for file_path in user_files:
                logger.info(f"  πŸ“„ File: {file_path}")
                history.append(ChatMessage(role="user", content={"path": file_path}))
            
            # Add text message if provided
            if user_text and user_text.strip():
                history.append(ChatMessage(role="user", content=user_text))
            
            # If no text and no files, return early
            if not user_text.strip() and not user_files:
                return history, gr.MultimodalTextbox(value=None, interactive=False)
            
            # Create messages for Claude API 
            messages = self._prepare_claude_messages(history)
            
            # Process the chat and get structured responses
            response_messages = self._call_claude_api(messages, user_files)
            
            # Add all response messages to history
            history.extend(response_messages)
            
            return history, gr.MultimodalTextbox(value=None, interactive=False)
            
        except Exception as e:
            error_msg = f"❌ Error: {str(e)}"
            logger.error(f"Chat error: {e}")
            logger.error(traceback.format_exc())
            
            # Add user input to history if it exists
            if user_text and user_text.strip():
                history.append(ChatMessage(role="user", content=user_text))
            if user_files:
                for file_path in user_files:
                    history.append(ChatMessage(role="user", content={"path": file_path}))
                    
            history.append(ChatMessage(role="assistant", content=error_msg))
            return history, gr.MultimodalTextbox(value=None, interactive=False)
    
    def _prepare_claude_messages(self, history: List) -> List[Dict[str, Any]]:
        """Convert history (ChatMessage or dict) to Claude API format"""
        messages = []
        
        # Convert history to Claude API format (text only for context)
        recent_history = history[-16:] if len(history) > 16 else history
        for msg in recent_history:
            # Handle both ChatMessage objects and dictionary format for backward compatibility
            if hasattr(msg, 'role'):  # ChatMessage object
                role = msg.role
                content = msg.content
            elif isinstance(msg, dict) and 'role' in msg:  # Dictionary format
                role = msg.get('role')
                content = msg.get('content')
            else:
                continue  # Skip invalid messages
                
            if role in ["user", "assistant"]:
                
                # Convert any non-string content to string description for context
                if isinstance(content, dict):
                    if "path" in content:
                        file_path = content.get('path', 'unknown')
                        # Determine file type for context
                        if AppConfig.is_image_file(file_path):
                            content = f"[User uploaded an image: {file_path}]"
                        elif AppConfig.is_audio_file(file_path):
                            content = f"[User uploaded an audio file: {file_path}]"
                        elif AppConfig.is_video_file(file_path):
                            content = f"[User uploaded a video file: {file_path}]"
                        else:
                            content = f"[User uploaded a file: {file_path}]"
                    else:
                        content = f"[Object: {str(content)[:50]}...]"
                elif isinstance(content, (list, tuple)):
                    content = f"[List: {str(content)[:50]}...]"
                elif content is None:
                    content = "[Empty]"
                else:
                    content = str(content)
                
                messages.append({
                    "role": role, 
                    "content": content
                })
        
        return messages
    
    def _call_claude_api(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[ChatMessage]:
        """Call Claude API and return structured ChatMessage responses"""
        
        # Check if we have MCP servers to use
        if not self.mcp_client.servers:
            return self._call_claude_without_mcp(messages)
        else:
            return self._call_claude_with_mcp(messages, user_files)
    
    def _call_claude_without_mcp(self, messages: List[Dict[str, Any]]) -> List[ChatMessage]:
        """Call Claude API without MCP servers"""
        logger.info("πŸ’¬ No MCP servers available, using regular Claude chat")
        
        system_prompt = self._get_native_system_prompt()
        
        # Use regular messages API
        response = self.mcp_client.anthropic_client.messages.create(
            model=AppConfig.CLAUDE_MODEL,
            max_tokens=AppConfig.MAX_TOKENS,
            system=system_prompt,
            messages=messages
        )
        
        response_text = ""
        for content in response.content:
            if content.type == "text":
                response_text += content.text
        
        if not response_text:
            response_text = "I understand your request and I'm here to help."
        
        return [ChatMessage(role="assistant", content=response_text)]
    
    def _call_claude_with_mcp(self, messages: List[Dict[str, Any]], user_files: List[str]) -> List[ChatMessage]:
        """Call Claude API with MCP servers and return structured responses"""
        mcp_servers = []
        for server_name, config in self.mcp_client.servers.items():
            mcp_servers.append({
                "type": "url",
                "url": config.url,
                "name": server_name.replace(" ", "_").lower()
            })
        
        # Enhanced system prompt with multimodal and MCP instructions
        system_prompt = self._get_mcp_system_prompt(user_files)
        
        # Debug logging
        logger.info(f"πŸ“€ Sending {len(messages)} messages to Claude API")
        logger.info(f"πŸ”§ Using {len(mcp_servers)} MCP servers")
        
        start_time = time.time()
        
        # Call Claude with MCP connector using the correct beta API
        response = self.mcp_client.anthropic_client.beta.messages.create(
            model=AppConfig.CLAUDE_MODEL,
            max_tokens=AppConfig.MAX_TOKENS,
            system=system_prompt,
            messages=messages,
            mcp_servers=mcp_servers,
            betas=[AppConfig.MCP_BETA_VERSION]
        )
        
        return self._process_mcp_response(response, start_time)
    
    def _process_mcp_response(self, response, start_time: float) -> List[ChatMessage]:
        """Process Claude's response with MCP tool calls into structured ChatMessage objects"""
        chat_messages = []
        current_tool_id = None
        current_server_name = None
        tool_start_time = None
        main_response_content = ""
        
        # Process Claude's response
        for content in response.content:
            if content.type == "text":
                main_response_content += content.text
                # Check if Claude indicated media was generated
                if "MEDIA_GENERATED:" in content.text:
                    media_match = re.search(r"MEDIA_GENERATED:\s*([^\s]+)", content.text)
                    if media_match:
                        media_url = media_match.group(1)
                        # Clean up the response text
                        main_response_content = re.sub(r"MEDIA_GENERATED:\s*[^\s]+", "", main_response_content).strip()
                        logger.info(f"🎯 Claude indicated media generated: {media_url}")
                        # Add media as separate message
                        chat_messages.append(ChatMessage(
                            role="assistant", 
                            content={"path": media_url}
                        ))
            
            elif hasattr(content, 'type') and content.type == "mcp_tool_use":
                tool_name = content.name
                server_name = content.server_name
                current_tool_id = getattr(content, 'id', 'unknown')
                current_server_name = server_name
                tool_start_time = time.time()
                
                logger.info(f"πŸ”§ Claude used MCP tool: {tool_name} on server: {server_name}")
                
                # Create a "thinking" message for tool usage
                chat_messages.append(ChatMessage(
                    role="assistant",
                    content="",
                    metadata={
                        "title": f"πŸ”§ Using {tool_name}",
                        "id": current_tool_id,
                        "status": "pending",
                        "log": f"Server: {server_name}"
                    }
                ))
                
            elif hasattr(content, 'type') and content.type == "mcp_tool_result":
                tool_use_id = getattr(content, 'tool_use_id', 'unknown')
                duration = time.time() - tool_start_time if tool_start_time else None
                
                logger.info(f"πŸ“ Processing MCP tool result (tool_use_id: {tool_use_id})")
                
                # Update the pending tool message to completed
                for msg in chat_messages:
                    if (msg.metadata and 
                        msg.metadata.get("id") == current_tool_id and 
                        msg.metadata.get("status") == "pending"):
                        msg.metadata["status"] = "done"
                        if duration:
                            msg.metadata["duration"] = round(duration, 2)
                        break
                
                if content.content:
                    result_content = content.content[0]
                    result_text = result_content.text if hasattr(result_content, 'text') else str(result_content)
                    
                    logger.info(f"πŸ“ MCP tool result: {result_text[:200]}...")
                    
                    # Create a result message with metadata
                    result_msg = ChatMessage(
                        role="assistant",
                        content=result_text,
                        metadata={
                            "title": "πŸ“‹ Tool Result",
                            "parent_id": current_tool_id,
                            "status": "done"
                        }
                    )
                    
                    chat_messages.append(result_msg)
                    
                    # Try to extract media from the result
                    media_url = None
                    if current_server_name and current_server_name in self.mcp_client.servers:
                        config = self.mcp_client.servers[current_server_name]
                        extracted_media = self.mcp_client._extract_media_from_mcp_response(result_text, config)
                        if extracted_media:
                            media_url = extracted_media
                            logger.info(f"🎯 Extracted media from MCP result: {media_url}")
                    else:
                        # Fallback: try all servers to find media
                        for server_name, config in self.mcp_client.servers.items():
                            extracted_media = self.mcp_client._extract_media_from_mcp_response(result_text, config)
                            if extracted_media:
                                media_url = extracted_media
                                logger.info(f"🎯 Extracted media from MCP result (fallback): {media_url}")
                                break
                    
                    # Add media as separate message if found
                    if media_url:
                        chat_messages.append(ChatMessage(
                            role="assistant", 
                            content={"path": media_url}
                        ))
                        
                else:
                    # Add error message for failed tool call
                    chat_messages.append(ChatMessage(
                        role="assistant",
                        content="Tool call failed: No content returned",
                        metadata={
                            "title": "❌ Tool Error",
                            "parent_id": current_tool_id,
                            "status": "done"
                        }
                    ))
        
        # Add the main response if there's any text content
        if main_response_content.strip():
            chat_messages.append(ChatMessage(
                role="assistant",
                content=main_response_content.strip()
            ))
        elif not chat_messages:
            # Fallback if no content was processed
            chat_messages.append(ChatMessage(
                role="assistant",
                content="I understand your request and I'm here to help."
            ))
        
        return chat_messages
    
    def _get_native_system_prompt(self) -> str:
        """Get system prompt for Claude without MCP servers"""
        return f"""You are Claude Sonnet 4, a helpful AI assistant with native multimodal capabilities. You can have conversations, answer questions, help with various tasks, and provide information on a wide range of topics.

YOUR NATIVE CAPABILITIES (Available right now):
- **Image Understanding**: You can directly see and describe images, analyze their content, read text in images, identify objects, people, scenes, etc.
- **Text Processing**: You can analyze, summarize, translate, and process text directly
- **General Knowledge**: You can answer questions, explain concepts, and have conversations
- **Code Analysis**: You can read, analyze, and explain code

Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

IMPORTANT: You DO NOT need MCP servers for:
- Describing or analyzing uploaded images
- Reading text in images
- Identifying objects, people, or scenes in images
- General conversation and knowledge questions

You DO need MCP servers for:
- Creating new images, audio, or video
- Editing or transforming existing media files
- Transcribing audio files
- Processing non-image files (audio, video, documents)

If users upload images and ask you to describe or analyze them, use your native vision capabilities immediately. Only mention MCP servers if they ask for creation or editing tasks."""
    
    def _get_mcp_system_prompt(self, user_files: List[str]) -> str:
        """Get system prompt for Claude with MCP servers"""
        uploaded_files_context = ""
        if user_files:
            uploaded_files_context = f"\n\nFILES UPLOADED BY USER:\n"
            for i, file_path in enumerate(user_files, 1):
                file_name = file_path.split('/')[-1] if '/' in file_path else file_path
                if AppConfig.is_image_file(file_path):
                    file_type = "Image"
                elif AppConfig.is_audio_file(file_path):
                    file_type = "Audio"
                elif AppConfig.is_video_file(file_path):
                    file_type = "Video"
                else:
                    file_type = "File"
                uploaded_files_context += f"{i}. {file_type}: {file_name} (path: {file_path})\n"
        
        return f"""You are Claude Sonnet 4, a helpful AI assistant with both native multimodal capabilities and access to various MCP tools.

YOUR NATIVE CAPABILITIES (No MCP tools needed):
- **Image Understanding**: You can directly see and describe images, analyze their content, read text in images, etc.
- **Text Processing**: You can analyze, summarize, translate, and process text directly
- **General Knowledge**: You can answer questions, explain concepts, and have conversations
- **Code Analysis**: You can read, analyze, and explain code

WHEN TO USE MCP TOOLS:
- **Image Generation**: Creating new images from text prompts
- **Image Editing**: Modifying, enhancing, or transforming existing images  
- **Audio Processing**: Transcribing audio, generating speech, audio enhancement
- **Video Processing**: Creating or editing videos
- **Specialized Analysis**: Tasks requiring specific models or APIs

UPLOADED FILES HANDLING:
{uploaded_files_context}

IMPORTANT - For uploaded images:
- **Image Description/Analysis**: Use your NATIVE vision capabilities - you can see and describe images directly
- **Image Editing/Enhancement**: Use MCP image processing tools
- **Image Generation**: Use MCP image generation tools

IMPORTANT - File URL Conversion for MCP Tools:
When using MCP tools that require file inputs, you need to be aware that uploaded files have local paths that remote MCP servers cannot access. 

For uploaded files in MCP tool calls:
- If an MCP tool fails with "Invalid file data format" or similar errors about file paths
- The issue is that remote MCP servers cannot access local file paths like '/tmp/gradio/...'
- In such cases, inform the user that the MCP server requires files to be accessible via public URLs
- Suggest that they need a "File Upload" MCP server or that the specific MCP server may need configuration for file handling

Current uploaded files that may need URL conversion:
{uploaded_files_context}

IMPORTANT - GRADIO MEDIA DISPLAY:
When MCP tools return media, end your response with "MEDIA_GENERATED: [URL]" where [URL] is the actual media URL.

Examples:
- User uploads image + "What's in this image?" β†’ Use NATIVE vision (no MCP needed)
- User uploads image + "Make this vintage" β†’ Use MCP image editing tool  
- User says "Generate a sunset image" β†’ Use MCP image generation tool
- User uploads audio + "Transcribe this" β†’ Use MCP transcription tool

Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Available MCP servers: {list(self.mcp_client.servers.keys())}"""