import time from typing import Dict, List, Any, Optional from dataclasses import dataclass, asdict import re # Import visualization dependencies with fallbacks try: import networkx as nx import matplotlib.pyplot as plt plt.switch_backend('Agg') import matplotlib matplotlib.use('Agg') import warnings warnings.filterwarnings('ignore', category=UserWarning, module='matplotlib') except ImportError: print("Warning: Visualization deps missing. Install with: pip install networkx matplotlib") nx = None plt = None @dataclass class WorkflowStep: step_id: str step_type: str timestamp: float content: str metadata: Dict[str, Any] duration: Optional[float] = None status: str = 'pending' parent_step: Optional[str] = None details: Optional[Dict[str, Any]] = None mcp_server: Optional[str] = None # Added to track MCP server tool_name: Optional[str] = None # Added to track specific tool class EnhancedWorkflowVisualizer: def __init__(self): self.steps: List[WorkflowStep] = [] self.current_step: Optional[WorkflowStep] = None self.start_time = time.time() self.step_counter = 0 # MCP server mapping for better display names self.server_display_names = { "7860": "Semantic Server", "7861": "Token Counter", "7862": "Sentiment Analysis", "7863": "Health Monitor" } def _extract_mcp_server_from_url(self, url_or_content: str) -> Optional[str]: """Extract MCP server name from URL or content.""" if not url_or_content: return None # Extract port from URL port_match = re.search(r':(\d{4})', url_or_content) if port_match: port = port_match.group(1) return self.server_display_names.get(port, f"Port {port}") # Check for server keywords in content if "semantic" in url_or_content.lower(): return "Semantic Server" elif "token" in url_or_content.lower(): return "Token Counter" elif "sentiment" in url_or_content.lower(): return "Sentiment Analysis" elif "health" in url_or_content.lower(): return "Health Monitor" return None def _extract_tool_name(self, content: str) -> Optional[str]: """Extract tool name from content.""" # Enhanced tool patterns - prioritize actual function names function_patterns = [ # Specific MCP tool functions (high priority) r'\b(sentiment_analysis)\s*\(', r'\b(count_tokens_openai_gpt4)\s*\(', r'\b(count_tokens_openai_gpt3)\s*\(', r'\b(count_tokens_openai_davinci)\s*\(', r'\b(count_tokens_bert_family)\s*\(', r'\b(count_tokens_roberta_family)\s*\(', r'\b(count_tokens_gpt2_family)\s*\(', r'\b(count_tokens_t5_family)\s*\(', r'\b(count_tokens_distilbert)\s*\(', r'\b(semantic_similarity)\s*\(', r'\b(find_similar_sentences)\s*\(', r'\b(extract_semantic_keywords)\s*\(', r'\b(semantic_search_in_text)\s*\(', r'\b(health_check)\s*\(', r'\b(server_status)\s*\(', r'\b(get_server_info)\s*\(', # Generic patterns (lower priority) r'(\w*sentiment_analysis\w*)', r'(\w*semantic_similarity\w*)', r'(\w*find_similar_sentences\w*)', r'(\w*extract_semantic_keywords\w*)', r'(\w*semantic_search_in_text\w*)', r'(\w*count_tokens_\w+)', r'(\w*health_check\w*)', r'(\w*server_status\w*)' ] # Try high-priority function call patterns first for pattern in function_patterns: match = re.search(pattern, content, re.IGNORECASE) if match: tool_name = match.group(1) # Skip common non-tool functions if tool_name not in ['print', 'len', 'str', 'int', 'float', 'final_answer', 'sse', 'model']: return tool_name # Check for execution logs that contain actual function names if "count_tokens_openai_gpt4" in content: return "count_tokens_openai_gpt4" elif "sentiment_analysis" in content: return "sentiment_analysis" elif "extract_semantic_keywords" in content: return "extract_semantic_keywords" elif "semantic_similarity" in content: return "semantic_similarity" elif "find_similar_sentences" in content: return "find_similar_sentences" elif "semantic_search_in_text" in content: return "semantic_search_in_text" return None def add_step(self, step_type: str, content: str, metadata: Optional[Dict[str, Any]] = None, parent_step: Optional[str] = None, details: Optional[Dict[str, Any]] = None, mcp_server: Optional[str] = None, tool_name: Optional[str] = None) -> str: step_id = f"{step_type}_{self.step_counter}" self.step_counter += 1 # Auto-extract MCP server and tool if not provided if not mcp_server: mcp_server = self._extract_mcp_server_from_url(content) if not tool_name: tool_name = self._extract_tool_name(content) step = WorkflowStep( step_id=step_id, step_type=step_type, timestamp=time.time(), content=content, metadata=metadata or {}, status='running', parent_step=parent_step, details=details or {}, mcp_server=mcp_server, tool_name=tool_name ) self.steps.append(step) self.current_step = step return step_id def complete_step(self, step_id: str, status: str = 'completed', additional_metadata: Optional[Dict[str, Any]] = None, details: Optional[Dict[str, Any]] = None): for step in self.steps: if step.step_id == step_id: step.status = status step.duration = time.time() - step.timestamp if additional_metadata and step.metadata is not None: step.metadata.update(additional_metadata) if details and step.details is not None: step.details.update(details) break def add_communication_step(self, from_component: str, to_component: str, message_type: str, content: str, parent_step: Optional[str] = None) -> str: """Add a communication step between components.""" step_type = f"comm_{from_component}_to_{to_component}" # Extract server info for communication steps mcp_server = self._extract_mcp_server_from_url(content) tool_name = self._extract_tool_name(content) details = { "from": from_component, "to": to_component, "message_type": message_type, "content_preview": content[:100] + "..." if len(content) > 100 else content } return self.add_step(step_type, f"{message_type}: {from_component} → {to_component}", parent_step=parent_step, details=details, mcp_server=mcp_server, tool_name=tool_name) def add_tool_execution_step(self, tool_name: str, mcp_server: str, input_data: str, parent_step: Optional[str] = None) -> str: """Specialized method for tool execution steps.""" content = f"Executing {tool_name} on {mcp_server}" return self.add_step("tool_execution", content, parent_step=parent_step, mcp_server=mcp_server, tool_name=tool_name, details={"input_preview": input_data[:50] + "..." if len(input_data) > 50 else input_data}) def generate_graph(self) -> Any: if nx is None: return None G = nx.DiGraph() # Enhanced color mapping with server-specific colors color_map = { 'input': '#e3f2fd', # Light blue 'agent_init': '#f3e5f5', # Light purple 'agent_process': '#e8f5e8', # Light green 'comm_agent_to_mcp': '#fff3e0', # Light orange 'comm_mcp_to_server': '#ffebee', # Light red 'comm_server_to_mcp': '#e0f2f1', # Light teal 'comm_mcp_to_agent': '#f9fbe7', # Light lime 'llm_call': '#fce4ec', # Light pink 'tool_execution': '#e1f5fe', # Light cyan 'response': '#f1f8e9', # Light green 'error': '#ffcdd2' # Light red } # Add nodes with enhanced labeling for step in self.steps: color = color_map.get(step.step_type, '#f5f5f5') # Create enhanced label with MCP server and tool info duration_str = f" ({step.duration:.2f}s)" if step.duration else "" # Build comprehensive label label_parts = [] # Add step type step_display = step.step_type.replace('_', ' ').title() label_parts.append(step_display) # Add MCP server info if step.mcp_server: label_parts.append(f"📡 {step.mcp_server}") # Add tool name prominently if step.tool_name: label_parts.append(f"🔧 {step.tool_name}") # Add content preview (shortened to make room for server/tool info) content_preview = step.content[:20] + "..." if len(step.content) > 20 else step.content if not step.tool_name or step.tool_name.lower() not in content_preview.lower(): label_parts.append(content_preview) # Add duration if duration_str: label_parts.append(duration_str) label = "\n".join(label_parts) G.add_node(step.step_id, label=label, color=color, step_type=step.step_type, status=step.status, mcp_server=step.mcp_server, tool_name=step.tool_name) # Add edges based on parent relationships and chronological order for i, step in enumerate(self.steps): if step.parent_step: # Add edge from parent step G.add_edge(step.parent_step, step.step_id, edge_type='parent') elif i > 0: # Add chronological edge to previous step G.add_edge(self.steps[i-1].step_id, step.step_id, edge_type='sequence') return G def create_matplotlib_visualization(self) -> str: if nx is None or plt is None: return "" G = self.generate_graph() if not G or len(G.nodes()) == 0: return "" # Create larger figure to accommodate enhanced labels fig, ax = plt.subplots(figsize=(20, 12)) # Use hierarchical layout if possible try: pos = nx.spring_layout(G, k=3, iterations=150, seed=42) except: pos = nx.circular_layout(G) # Prepare node visualization with server-aware coloring node_colors = [] node_labels = {} node_sizes = [] for node_id in G.nodes(): step = next(s for s in self.steps if s.step_id == node_id) # Enhanced color coding based on status and server if step.status == 'error': color = '#ff5252' elif step.status == 'completed': # Server-specific color coding if step.mcp_server == "Semantic Server": base_color = '#4caf50' # Green for semantic elif step.mcp_server == "Token Counter": base_color = '#2196f3' # Blue for token counting elif step.mcp_server == "Sentiment Analysis": base_color = '#ff9800' # Orange for sentiment elif step.mcp_server == "Health Monitor": base_color = '#9c27b0' # Purple for health else: # Default colors by step type base_colors = { 'input': '#4caf50', 'agent_init': '#9c27b0', 'agent_process': '#2e7d32', 'comm_agent_to_mcp': '#ff9800', 'comm_mcp_to_server': '#f44336', 'comm_server_to_mcp': '#009688', 'comm_mcp_to_agent': '#8bc34a', 'llm_call': '#e91e63', 'tool_execution': '#03a9f4', 'response': '#4caf50' } base_color = base_colors.get(step.step_type, '#607d8b') color = base_color else: color = '#bdbdbd' node_colors.append(color) # Create enhanced node labels label_parts = [] # Step type step_display = step.step_type.replace('_', ' ').title() label_parts.append(f"**{step_display}**") # MCP Server (prominent) if step.mcp_server: label_parts.append(f"📡 {step.mcp_server}") # Tool name (most prominent) if step.tool_name: label_parts.append(f"🔧 **{step.tool_name}**") # Duration if step.duration: label_parts.append(f"⏱️ {step.duration:.2f}s") node_labels[node_id] = "\n".join(label_parts) # Size based on importance - larger for tool executions if step.step_type == 'tool_execution': node_sizes.append(5000) elif step.step_type in ['input', 'response']: node_sizes.append(4000) elif 'comm_' in step.step_type: node_sizes.append(2500) else: node_sizes.append(3000) # Draw the graph nx.draw(G, pos, node_color=node_colors, node_size=node_sizes, font_size=9, font_weight='bold', arrows=True, arrowsize=20, edge_color='#666666', alpha=0.9, ax=ax, arrowstyle='->') # Draw enhanced labels nx.draw_networkx_labels(G, pos, node_labels, font_size=8, ax=ax) # Add title and formatting ax.set_title("MCP Agent Workflow: Server & Tool Execution Flow", fontsize=20, pad=25, fontweight='bold') ax.axis('off') # Enhanced legend with server info legend_elements = [ plt.Rectangle((0,0),1,1, facecolor='#4caf50', label='Semantic Server'), plt.Rectangle((0,0),1,1, facecolor='#2196f3', label='Token Counter Server'), plt.Rectangle((0,0),1,1, facecolor='#ff9800', label='Sentiment Analysis Server'), plt.Rectangle((0,0),1,1, facecolor='#9c27b0', label='Health Monitor Server'), plt.Rectangle((0,0),1,1, facecolor='#e91e63', label='LLM Calls'), plt.Rectangle((0,0),1,1, facecolor='#607d8b', label='Agent Processing'), ] ax.legend(handles=legend_elements, loc='upper left', bbox_to_anchor=(0, 1)) fig.set_constrained_layout(True) # Save to temporary file import tempfile temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png') plt.savefig(temp_file.name, format='png', dpi=300, bbox_inches='tight') plt.close(fig) return temp_file.name def get_workflow_summary(self) -> Dict[str, Any]: total_duration = time.time() - self.start_time # Count steps by type and server step_counts = {} server_usage = {} tool_usage = {} communication_steps = [] processing_steps = [] for step in self.steps: step_counts[step.step_type] = step_counts.get(step.step_type, 0) + 1 # Track server usage if step.mcp_server: server_usage[step.mcp_server] = server_usage.get(step.mcp_server, 0) + 1 # Track tool usage if step.tool_name: tool_usage[step.tool_name] = tool_usage.get(step.tool_name, 0) + 1 if 'comm_' in step.step_type: communication_steps.append({ 'step_id': step.step_id, 'from': step.details.get('from', 'unknown') if step.details else 'unknown', 'to': step.details.get('to', 'unknown') if step.details else 'unknown', 'message_type': step.details.get('message_type', 'unknown') if step.details else 'unknown', 'mcp_server': step.mcp_server, 'tool_name': step.tool_name, 'duration': step.duration, 'status': step.status }) else: processing_steps.append({ 'step_id': step.step_id, 'type': step.step_type, 'content': step.content[:50] + "..." if len(step.content) > 50 else step.content, 'mcp_server': step.mcp_server, 'tool_name': step.tool_name, 'duration': step.duration, 'status': step.status }) # Calculate timing statistics completed_steps = [s for s in self.steps if s.duration is not None] avg_duration = (sum(s.duration or 0 for s in completed_steps) / len(completed_steps)) if completed_steps else 0 return { 'total_steps': len(self.steps), 'total_duration': round(total_duration, 3), 'average_step_duration': round(avg_duration, 3), 'step_counts': step_counts, 'server_usage': server_usage, # New: server usage stats 'tool_usage': tool_usage, # New: tool usage stats 'communication_flow': communication_steps, 'processing_steps': processing_steps, 'status': 'completed' if all(s.status in ['completed', 'error'] for s in self.steps) else 'running', 'error_count': sum(1 for s in self.steps if s.status == 'error'), 'success_rate': round((sum(1 for s in self.steps if s.status == 'completed') / len(self.steps)) * 100, 1) if self.steps else 0, 'detailed_steps': [asdict(s) for s in self.steps] } # Global instance workflow_visualizer = EnhancedWorkflowVisualizer() # Enhanced helper functions def track_workflow_step(step_type: str, content: str, metadata: Optional[Dict[str, Any]] = None, parent_step: Optional[str] = None, mcp_server: Optional[str] = None, tool_name: Optional[str] = None) -> str: return workflow_visualizer.add_step(step_type, content, metadata, parent_step, mcp_server=mcp_server, tool_name=tool_name) def track_communication(from_component: str, to_component: str, message_type: str, content: str, parent_step: Optional[str] = None) -> str: return workflow_visualizer.add_communication_step(from_component, to_component, message_type, content, parent_step) def track_tool_execution(tool_name: str, mcp_server: str, input_data: str, parent_step: Optional[str] = None) -> str: """New helper for tracking tool executions with clear server/tool info.""" return workflow_visualizer.add_tool_execution_step(tool_name, mcp_server, input_data, parent_step) def complete_workflow_step(step_id: str, status: str = 'completed', metadata: Optional[Dict[str, Any]] = None, details: Optional[Dict[str, Any]] = None): workflow_visualizer.complete_step(step_id, status, metadata, details) def get_workflow_visualization() -> str: return workflow_visualizer.create_matplotlib_visualization() def get_workflow_summary() -> Dict[str, Any]: return workflow_visualizer.get_workflow_summary() def reset_workflow(): global workflow_visualizer workflow_visualizer = EnhancedWorkflowVisualizer()