Agent-client-multi-mcp-SKT / workflow_vizualizer.py
NLarchive's picture
Create workflow_vizualizer.py
3bbd581 verified
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
import re
# Import visualization dependencies with fallbacks
try:
import networkx as nx
import matplotlib.pyplot as plt
plt.switch_backend('Agg')
import matplotlib
matplotlib.use('Agg')
import warnings
warnings.filterwarnings('ignore', category=UserWarning, module='matplotlib')
except ImportError:
print("Warning: Visualization deps missing. Install with: pip install networkx matplotlib")
nx = None
plt = None
@dataclass
class WorkflowStep:
step_id: str
step_type: str
timestamp: float
content: str
metadata: Dict[str, Any]
duration: Optional[float] = None
status: str = 'pending'
parent_step: Optional[str] = None
details: Optional[Dict[str, Any]] = None
mcp_server: Optional[str] = None # Added to track MCP server
tool_name: Optional[str] = None # Added to track specific tool
class EnhancedWorkflowVisualizer:
def __init__(self):
self.steps: List[WorkflowStep] = []
self.current_step: Optional[WorkflowStep] = None
self.start_time = time.time()
self.step_counter = 0
# MCP server mapping for better display names
self.server_display_names = {
"7860": "Semantic Server",
"7861": "Token Counter",
"7862": "Sentiment Analysis",
"7863": "Health Monitor"
}
def _extract_mcp_server_from_url(self, url_or_content: str) -> Optional[str]:
"""Extract MCP server name from URL or content."""
if not url_or_content:
return None
# Extract port from URL
port_match = re.search(r':(\d{4})', url_or_content)
if port_match:
port = port_match.group(1)
return self.server_display_names.get(port, f"Port {port}")
# Check for server keywords in content
if "semantic" in url_or_content.lower():
return "Semantic Server"
elif "token" in url_or_content.lower():
return "Token Counter"
elif "sentiment" in url_or_content.lower():
return "Sentiment Analysis"
elif "health" in url_or_content.lower():
return "Health Monitor"
return None
def _extract_tool_name(self, content: str) -> Optional[str]:
"""Extract tool name from content."""
# Enhanced tool patterns - prioritize actual function names
function_patterns = [
# Specific MCP tool functions (high priority)
r'\b(sentiment_analysis)\s*\(',
r'\b(count_tokens_openai_gpt4)\s*\(',
r'\b(count_tokens_openai_gpt3)\s*\(',
r'\b(count_tokens_openai_davinci)\s*\(',
r'\b(count_tokens_bert_family)\s*\(',
r'\b(count_tokens_roberta_family)\s*\(',
r'\b(count_tokens_gpt2_family)\s*\(',
r'\b(count_tokens_t5_family)\s*\(',
r'\b(count_tokens_distilbert)\s*\(',
r'\b(semantic_similarity)\s*\(',
r'\b(find_similar_sentences)\s*\(',
r'\b(extract_semantic_keywords)\s*\(',
r'\b(semantic_search_in_text)\s*\(',
r'\b(health_check)\s*\(',
r'\b(server_status)\s*\(',
r'\b(get_server_info)\s*\(',
# Generic patterns (lower priority)
r'(\w*sentiment_analysis\w*)',
r'(\w*semantic_similarity\w*)',
r'(\w*find_similar_sentences\w*)',
r'(\w*extract_semantic_keywords\w*)',
r'(\w*semantic_search_in_text\w*)',
r'(\w*count_tokens_\w+)',
r'(\w*health_check\w*)',
r'(\w*server_status\w*)'
]
# Try high-priority function call patterns first
for pattern in function_patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
tool_name = match.group(1)
# Skip common non-tool functions
if tool_name not in ['print', 'len', 'str', 'int', 'float', 'final_answer', 'sse', 'model']:
return tool_name
# Check for execution logs that contain actual function names
if "count_tokens_openai_gpt4" in content:
return "count_tokens_openai_gpt4"
elif "sentiment_analysis" in content:
return "sentiment_analysis"
elif "extract_semantic_keywords" in content:
return "extract_semantic_keywords"
elif "semantic_similarity" in content:
return "semantic_similarity"
elif "find_similar_sentences" in content:
return "find_similar_sentences"
elif "semantic_search_in_text" in content:
return "semantic_search_in_text"
return None
def add_step(self, step_type: str, content: str, metadata: Optional[Dict[str, Any]] = None,
parent_step: Optional[str] = None, details: Optional[Dict[str, Any]] = None,
mcp_server: Optional[str] = None, tool_name: Optional[str] = None) -> str:
step_id = f"{step_type}_{self.step_counter}"
self.step_counter += 1
# Auto-extract MCP server and tool if not provided
if not mcp_server:
mcp_server = self._extract_mcp_server_from_url(content)
if not tool_name:
tool_name = self._extract_tool_name(content)
step = WorkflowStep(
step_id=step_id,
step_type=step_type,
timestamp=time.time(),
content=content,
metadata=metadata or {},
status='running',
parent_step=parent_step,
details=details or {},
mcp_server=mcp_server,
tool_name=tool_name
)
self.steps.append(step)
self.current_step = step
return step_id
def complete_step(self, step_id: str, status: str = 'completed',
additional_metadata: Optional[Dict[str, Any]] = None,
details: Optional[Dict[str, Any]] = None):
for step in self.steps:
if step.step_id == step_id:
step.status = status
step.duration = time.time() - step.timestamp
if additional_metadata and step.metadata is not None:
step.metadata.update(additional_metadata)
if details and step.details is not None:
step.details.update(details)
break
def add_communication_step(self, from_component: str, to_component: str,
message_type: str, content: str,
parent_step: Optional[str] = None) -> str:
"""Add a communication step between components."""
step_type = f"comm_{from_component}_to_{to_component}"
# Extract server info for communication steps
mcp_server = self._extract_mcp_server_from_url(content)
tool_name = self._extract_tool_name(content)
details = {
"from": from_component,
"to": to_component,
"message_type": message_type,
"content_preview": content[:100] + "..." if len(content) > 100 else content
}
return self.add_step(step_type, f"{message_type}: {from_component}{to_component}",
parent_step=parent_step, details=details,
mcp_server=mcp_server, tool_name=tool_name)
def add_tool_execution_step(self, tool_name: str, mcp_server: str,
input_data: str, parent_step: Optional[str] = None) -> str:
"""Specialized method for tool execution steps."""
content = f"Executing {tool_name} on {mcp_server}"
return self.add_step("tool_execution", content,
parent_step=parent_step,
mcp_server=mcp_server,
tool_name=tool_name,
details={"input_preview": input_data[:50] + "..." if len(input_data) > 50 else input_data})
def generate_graph(self) -> Any:
if nx is None:
return None
G = nx.DiGraph()
# Enhanced color mapping with server-specific colors
color_map = {
'input': '#e3f2fd', # Light blue
'agent_init': '#f3e5f5', # Light purple
'agent_process': '#e8f5e8', # Light green
'comm_agent_to_mcp': '#fff3e0', # Light orange
'comm_mcp_to_server': '#ffebee', # Light red
'comm_server_to_mcp': '#e0f2f1', # Light teal
'comm_mcp_to_agent': '#f9fbe7', # Light lime
'llm_call': '#fce4ec', # Light pink
'tool_execution': '#e1f5fe', # Light cyan
'response': '#f1f8e9', # Light green
'error': '#ffcdd2' # Light red
}
# Add nodes with enhanced labeling
for step in self.steps:
color = color_map.get(step.step_type, '#f5f5f5')
# Create enhanced label with MCP server and tool info
duration_str = f" ({step.duration:.2f}s)" if step.duration else ""
# Build comprehensive label
label_parts = []
# Add step type
step_display = step.step_type.replace('_', ' ').title()
label_parts.append(step_display)
# Add MCP server info
if step.mcp_server:
label_parts.append(f"📡 {step.mcp_server}")
# Add tool name prominently
if step.tool_name:
label_parts.append(f"🔧 {step.tool_name}")
# Add content preview (shortened to make room for server/tool info)
content_preview = step.content[:20] + "..." if len(step.content) > 20 else step.content
if not step.tool_name or step.tool_name.lower() not in content_preview.lower():
label_parts.append(content_preview)
# Add duration
if duration_str:
label_parts.append(duration_str)
label = "\n".join(label_parts)
G.add_node(step.step_id,
label=label,
color=color,
step_type=step.step_type,
status=step.status,
mcp_server=step.mcp_server,
tool_name=step.tool_name)
# Add edges based on parent relationships and chronological order
for i, step in enumerate(self.steps):
if step.parent_step:
# Add edge from parent step
G.add_edge(step.parent_step, step.step_id, edge_type='parent')
elif i > 0:
# Add chronological edge to previous step
G.add_edge(self.steps[i-1].step_id, step.step_id, edge_type='sequence')
return G
def create_matplotlib_visualization(self) -> str:
if nx is None or plt is None:
return ""
G = self.generate_graph()
if not G or len(G.nodes()) == 0:
return ""
# Create larger figure to accommodate enhanced labels
fig, ax = plt.subplots(figsize=(20, 12))
# Use hierarchical layout if possible
try:
pos = nx.spring_layout(G, k=3, iterations=150, seed=42)
except:
pos = nx.circular_layout(G)
# Prepare node visualization with server-aware coloring
node_colors = []
node_labels = {}
node_sizes = []
for node_id in G.nodes():
step = next(s for s in self.steps if s.step_id == node_id)
# Enhanced color coding based on status and server
if step.status == 'error':
color = '#ff5252'
elif step.status == 'completed':
# Server-specific color coding
if step.mcp_server == "Semantic Server":
base_color = '#4caf50' # Green for semantic
elif step.mcp_server == "Token Counter":
base_color = '#2196f3' # Blue for token counting
elif step.mcp_server == "Sentiment Analysis":
base_color = '#ff9800' # Orange for sentiment
elif step.mcp_server == "Health Monitor":
base_color = '#9c27b0' # Purple for health
else:
# Default colors by step type
base_colors = {
'input': '#4caf50',
'agent_init': '#9c27b0',
'agent_process': '#2e7d32',
'comm_agent_to_mcp': '#ff9800',
'comm_mcp_to_server': '#f44336',
'comm_server_to_mcp': '#009688',
'comm_mcp_to_agent': '#8bc34a',
'llm_call': '#e91e63',
'tool_execution': '#03a9f4',
'response': '#4caf50'
}
base_color = base_colors.get(step.step_type, '#607d8b')
color = base_color
else:
color = '#bdbdbd'
node_colors.append(color)
# Create enhanced node labels
label_parts = []
# Step type
step_display = step.step_type.replace('_', ' ').title()
label_parts.append(f"**{step_display}**")
# MCP Server (prominent)
if step.mcp_server:
label_parts.append(f"📡 {step.mcp_server}")
# Tool name (most prominent)
if step.tool_name:
label_parts.append(f"🔧 **{step.tool_name}**")
# Duration
if step.duration:
label_parts.append(f"⏱️ {step.duration:.2f}s")
node_labels[node_id] = "\n".join(label_parts)
# Size based on importance - larger for tool executions
if step.step_type == 'tool_execution':
node_sizes.append(5000)
elif step.step_type in ['input', 'response']:
node_sizes.append(4000)
elif 'comm_' in step.step_type:
node_sizes.append(2500)
else:
node_sizes.append(3000)
# Draw the graph
nx.draw(G, pos,
node_color=node_colors,
node_size=node_sizes,
font_size=9,
font_weight='bold',
arrows=True,
arrowsize=20,
edge_color='#666666',
alpha=0.9,
ax=ax,
arrowstyle='->')
# Draw enhanced labels
nx.draw_networkx_labels(G, pos, node_labels, font_size=8, ax=ax)
# Add title and formatting
ax.set_title("MCP Agent Workflow: Server & Tool Execution Flow",
fontsize=20, pad=25, fontweight='bold')
ax.axis('off')
# Enhanced legend with server info
legend_elements = [
plt.Rectangle((0,0),1,1, facecolor='#4caf50', label='Semantic Server'),
plt.Rectangle((0,0),1,1, facecolor='#2196f3', label='Token Counter Server'),
plt.Rectangle((0,0),1,1, facecolor='#ff9800', label='Sentiment Analysis Server'),
plt.Rectangle((0,0),1,1, facecolor='#9c27b0', label='Health Monitor Server'),
plt.Rectangle((0,0),1,1, facecolor='#e91e63', label='LLM Calls'),
plt.Rectangle((0,0),1,1, facecolor='#607d8b', label='Agent Processing'),
]
ax.legend(handles=legend_elements, loc='upper left', bbox_to_anchor=(0, 1))
fig.set_constrained_layout(True)
# Save to temporary file
import tempfile
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png')
plt.savefig(temp_file.name, format='png', dpi=300, bbox_inches='tight')
plt.close(fig)
return temp_file.name
def get_workflow_summary(self) -> Dict[str, Any]:
total_duration = time.time() - self.start_time
# Count steps by type and server
step_counts = {}
server_usage = {}
tool_usage = {}
communication_steps = []
processing_steps = []
for step in self.steps:
step_counts[step.step_type] = step_counts.get(step.step_type, 0) + 1
# Track server usage
if step.mcp_server:
server_usage[step.mcp_server] = server_usage.get(step.mcp_server, 0) + 1
# Track tool usage
if step.tool_name:
tool_usage[step.tool_name] = tool_usage.get(step.tool_name, 0) + 1
if 'comm_' in step.step_type:
communication_steps.append({
'step_id': step.step_id,
'from': step.details.get('from', 'unknown') if step.details else 'unknown',
'to': step.details.get('to', 'unknown') if step.details else 'unknown',
'message_type': step.details.get('message_type', 'unknown') if step.details else 'unknown',
'mcp_server': step.mcp_server,
'tool_name': step.tool_name,
'duration': step.duration,
'status': step.status
})
else:
processing_steps.append({
'step_id': step.step_id,
'type': step.step_type,
'content': step.content[:50] + "..." if len(step.content) > 50 else step.content,
'mcp_server': step.mcp_server,
'tool_name': step.tool_name,
'duration': step.duration,
'status': step.status
})
# Calculate timing statistics
completed_steps = [s for s in self.steps if s.duration is not None]
avg_duration = (sum(s.duration or 0 for s in completed_steps) / len(completed_steps)) if completed_steps else 0
return {
'total_steps': len(self.steps),
'total_duration': round(total_duration, 3),
'average_step_duration': round(avg_duration, 3),
'step_counts': step_counts,
'server_usage': server_usage, # New: server usage stats
'tool_usage': tool_usage, # New: tool usage stats
'communication_flow': communication_steps,
'processing_steps': processing_steps,
'status': 'completed' if all(s.status in ['completed', 'error'] for s in self.steps) else 'running',
'error_count': sum(1 for s in self.steps if s.status == 'error'),
'success_rate': round((sum(1 for s in self.steps if s.status == 'completed') / len(self.steps)) * 100, 1) if self.steps else 0,
'detailed_steps': [asdict(s) for s in self.steps]
}
# Global instance
workflow_visualizer = EnhancedWorkflowVisualizer()
# Enhanced helper functions
def track_workflow_step(step_type: str, content: str, metadata: Optional[Dict[str, Any]] = None,
parent_step: Optional[str] = None, mcp_server: Optional[str] = None,
tool_name: Optional[str] = None) -> str:
return workflow_visualizer.add_step(step_type, content, metadata, parent_step,
mcp_server=mcp_server, tool_name=tool_name)
def track_communication(from_component: str, to_component: str, message_type: str,
content: str, parent_step: Optional[str] = None) -> str:
return workflow_visualizer.add_communication_step(from_component, to_component,
message_type, content, parent_step)
def track_tool_execution(tool_name: str, mcp_server: str, input_data: str,
parent_step: Optional[str] = None) -> str:
"""New helper for tracking tool executions with clear server/tool info."""
return workflow_visualizer.add_tool_execution_step(tool_name, mcp_server, input_data, parent_step)
def complete_workflow_step(step_id: str, status: str = 'completed',
metadata: Optional[Dict[str, Any]] = None,
details: Optional[Dict[str, Any]] = None):
workflow_visualizer.complete_step(step_id, status, metadata, details)
def get_workflow_visualization() -> str:
return workflow_visualizer.create_matplotlib_visualization()
def get_workflow_summary() -> Dict[str, Any]:
return workflow_visualizer.get_workflow_summary()
def reset_workflow():
global workflow_visualizer
workflow_visualizer = EnhancedWorkflowVisualizer()