|
import time |
|
from typing import Dict, List, Any, Optional |
|
from dataclasses import dataclass, asdict |
|
import re |
|
|
|
|
|
try: |
|
import networkx as nx |
|
import matplotlib.pyplot as plt |
|
plt.switch_backend('Agg') |
|
import matplotlib |
|
matplotlib.use('Agg') |
|
import warnings |
|
warnings.filterwarnings('ignore', category=UserWarning, module='matplotlib') |
|
except ImportError: |
|
print("Warning: Visualization deps missing. Install with: pip install networkx matplotlib") |
|
nx = None |
|
plt = None |
|
|
|
@dataclass |
|
class WorkflowStep: |
|
step_id: str |
|
step_type: str |
|
timestamp: float |
|
content: str |
|
metadata: Dict[str, Any] |
|
duration: Optional[float] = None |
|
status: str = 'pending' |
|
parent_step: Optional[str] = None |
|
details: Optional[Dict[str, Any]] = None |
|
mcp_server: Optional[str] = None |
|
tool_name: Optional[str] = None |
|
|
|
class EnhancedWorkflowVisualizer: |
|
def __init__(self): |
|
self.steps: List[WorkflowStep] = [] |
|
self.current_step: Optional[WorkflowStep] = None |
|
self.start_time = time.time() |
|
self.step_counter = 0 |
|
|
|
|
|
self.server_display_names = { |
|
"7860": "Semantic Server", |
|
"7861": "Token Counter", |
|
"7862": "Sentiment Analysis", |
|
"7863": "Health Monitor" |
|
} |
|
|
|
def _extract_mcp_server_from_url(self, url_or_content: str) -> Optional[str]: |
|
"""Extract MCP server name from URL or content.""" |
|
if not url_or_content: |
|
return None |
|
|
|
|
|
port_match = re.search(r':(\d{4})', url_or_content) |
|
if port_match: |
|
port = port_match.group(1) |
|
return self.server_display_names.get(port, f"Port {port}") |
|
|
|
|
|
if "semantic" in url_or_content.lower(): |
|
return "Semantic Server" |
|
elif "token" in url_or_content.lower(): |
|
return "Token Counter" |
|
elif "sentiment" in url_or_content.lower(): |
|
return "Sentiment Analysis" |
|
elif "health" in url_or_content.lower(): |
|
return "Health Monitor" |
|
|
|
return None |
|
|
|
def _extract_tool_name(self, content: str) -> Optional[str]: |
|
"""Extract tool name from content.""" |
|
|
|
function_patterns = [ |
|
|
|
r'\b(sentiment_analysis)\s*\(', |
|
r'\b(count_tokens_openai_gpt4)\s*\(', |
|
r'\b(count_tokens_openai_gpt3)\s*\(', |
|
r'\b(count_tokens_openai_davinci)\s*\(', |
|
r'\b(count_tokens_bert_family)\s*\(', |
|
r'\b(count_tokens_roberta_family)\s*\(', |
|
r'\b(count_tokens_gpt2_family)\s*\(', |
|
r'\b(count_tokens_t5_family)\s*\(', |
|
r'\b(count_tokens_distilbert)\s*\(', |
|
r'\b(semantic_similarity)\s*\(', |
|
r'\b(find_similar_sentences)\s*\(', |
|
r'\b(extract_semantic_keywords)\s*\(', |
|
r'\b(semantic_search_in_text)\s*\(', |
|
r'\b(health_check)\s*\(', |
|
r'\b(server_status)\s*\(', |
|
r'\b(get_server_info)\s*\(', |
|
|
|
|
|
r'(\w*sentiment_analysis\w*)', |
|
r'(\w*semantic_similarity\w*)', |
|
r'(\w*find_similar_sentences\w*)', |
|
r'(\w*extract_semantic_keywords\w*)', |
|
r'(\w*semantic_search_in_text\w*)', |
|
r'(\w*count_tokens_\w+)', |
|
r'(\w*health_check\w*)', |
|
r'(\w*server_status\w*)' |
|
] |
|
|
|
|
|
for pattern in function_patterns: |
|
match = re.search(pattern, content, re.IGNORECASE) |
|
if match: |
|
tool_name = match.group(1) |
|
|
|
if tool_name not in ['print', 'len', 'str', 'int', 'float', 'final_answer', 'sse', 'model']: |
|
return tool_name |
|
|
|
|
|
if "count_tokens_openai_gpt4" in content: |
|
return "count_tokens_openai_gpt4" |
|
elif "sentiment_analysis" in content: |
|
return "sentiment_analysis" |
|
elif "extract_semantic_keywords" in content: |
|
return "extract_semantic_keywords" |
|
elif "semantic_similarity" in content: |
|
return "semantic_similarity" |
|
elif "find_similar_sentences" in content: |
|
return "find_similar_sentences" |
|
elif "semantic_search_in_text" in content: |
|
return "semantic_search_in_text" |
|
|
|
return None |
|
|
|
def add_step(self, step_type: str, content: str, metadata: Optional[Dict[str, Any]] = None, |
|
parent_step: Optional[str] = None, details: Optional[Dict[str, Any]] = None, |
|
mcp_server: Optional[str] = None, tool_name: Optional[str] = None) -> str: |
|
step_id = f"{step_type}_{self.step_counter}" |
|
self.step_counter += 1 |
|
|
|
|
|
if not mcp_server: |
|
mcp_server = self._extract_mcp_server_from_url(content) |
|
if not tool_name: |
|
tool_name = self._extract_tool_name(content) |
|
|
|
step = WorkflowStep( |
|
step_id=step_id, |
|
step_type=step_type, |
|
timestamp=time.time(), |
|
content=content, |
|
metadata=metadata or {}, |
|
status='running', |
|
parent_step=parent_step, |
|
details=details or {}, |
|
mcp_server=mcp_server, |
|
tool_name=tool_name |
|
) |
|
self.steps.append(step) |
|
self.current_step = step |
|
return step_id |
|
|
|
def complete_step(self, step_id: str, status: str = 'completed', |
|
additional_metadata: Optional[Dict[str, Any]] = None, |
|
details: Optional[Dict[str, Any]] = None): |
|
for step in self.steps: |
|
if step.step_id == step_id: |
|
step.status = status |
|
step.duration = time.time() - step.timestamp |
|
if additional_metadata and step.metadata is not None: |
|
step.metadata.update(additional_metadata) |
|
if details and step.details is not None: |
|
step.details.update(details) |
|
break |
|
|
|
def add_communication_step(self, from_component: str, to_component: str, |
|
message_type: str, content: str, |
|
parent_step: Optional[str] = None) -> str: |
|
"""Add a communication step between components.""" |
|
step_type = f"comm_{from_component}_to_{to_component}" |
|
|
|
|
|
mcp_server = self._extract_mcp_server_from_url(content) |
|
tool_name = self._extract_tool_name(content) |
|
|
|
details = { |
|
"from": from_component, |
|
"to": to_component, |
|
"message_type": message_type, |
|
"content_preview": content[:100] + "..." if len(content) > 100 else content |
|
} |
|
return self.add_step(step_type, f"{message_type}: {from_component} → {to_component}", |
|
parent_step=parent_step, details=details, |
|
mcp_server=mcp_server, tool_name=tool_name) |
|
|
|
def add_tool_execution_step(self, tool_name: str, mcp_server: str, |
|
input_data: str, parent_step: Optional[str] = None) -> str: |
|
"""Specialized method for tool execution steps.""" |
|
content = f"Executing {tool_name} on {mcp_server}" |
|
return self.add_step("tool_execution", content, |
|
parent_step=parent_step, |
|
mcp_server=mcp_server, |
|
tool_name=tool_name, |
|
details={"input_preview": input_data[:50] + "..." if len(input_data) > 50 else input_data}) |
|
|
|
def generate_graph(self) -> Any: |
|
if nx is None: |
|
return None |
|
|
|
G = nx.DiGraph() |
|
|
|
|
|
color_map = { |
|
'input': '#e3f2fd', |
|
'agent_init': '#f3e5f5', |
|
'agent_process': '#e8f5e8', |
|
'comm_agent_to_mcp': '#fff3e0', |
|
'comm_mcp_to_server': '#ffebee', |
|
'comm_server_to_mcp': '#e0f2f1', |
|
'comm_mcp_to_agent': '#f9fbe7', |
|
'llm_call': '#fce4ec', |
|
'tool_execution': '#e1f5fe', |
|
'response': '#f1f8e9', |
|
'error': '#ffcdd2' |
|
} |
|
|
|
|
|
for step in self.steps: |
|
color = color_map.get(step.step_type, '#f5f5f5') |
|
|
|
|
|
duration_str = f" ({step.duration:.2f}s)" if step.duration else "" |
|
|
|
|
|
label_parts = [] |
|
|
|
|
|
step_display = step.step_type.replace('_', ' ').title() |
|
label_parts.append(step_display) |
|
|
|
|
|
if step.mcp_server: |
|
label_parts.append(f"📡 {step.mcp_server}") |
|
|
|
|
|
if step.tool_name: |
|
label_parts.append(f"🔧 {step.tool_name}") |
|
|
|
|
|
content_preview = step.content[:20] + "..." if len(step.content) > 20 else step.content |
|
if not step.tool_name or step.tool_name.lower() not in content_preview.lower(): |
|
label_parts.append(content_preview) |
|
|
|
|
|
if duration_str: |
|
label_parts.append(duration_str) |
|
|
|
label = "\n".join(label_parts) |
|
|
|
G.add_node(step.step_id, |
|
label=label, |
|
color=color, |
|
step_type=step.step_type, |
|
status=step.status, |
|
mcp_server=step.mcp_server, |
|
tool_name=step.tool_name) |
|
|
|
|
|
for i, step in enumerate(self.steps): |
|
if step.parent_step: |
|
|
|
G.add_edge(step.parent_step, step.step_id, edge_type='parent') |
|
elif i > 0: |
|
|
|
G.add_edge(self.steps[i-1].step_id, step.step_id, edge_type='sequence') |
|
|
|
return G |
|
|
|
def create_matplotlib_visualization(self) -> str: |
|
if nx is None or plt is None: |
|
return "" |
|
|
|
G = self.generate_graph() |
|
if not G or len(G.nodes()) == 0: |
|
return "" |
|
|
|
|
|
fig, ax = plt.subplots(figsize=(20, 12)) |
|
|
|
|
|
try: |
|
pos = nx.spring_layout(G, k=3, iterations=150, seed=42) |
|
except: |
|
pos = nx.circular_layout(G) |
|
|
|
|
|
node_colors = [] |
|
node_labels = {} |
|
node_sizes = [] |
|
|
|
for node_id in G.nodes(): |
|
step = next(s for s in self.steps if s.step_id == node_id) |
|
|
|
|
|
if step.status == 'error': |
|
color = '#ff5252' |
|
elif step.status == 'completed': |
|
|
|
if step.mcp_server == "Semantic Server": |
|
base_color = '#4caf50' |
|
elif step.mcp_server == "Token Counter": |
|
base_color = '#2196f3' |
|
elif step.mcp_server == "Sentiment Analysis": |
|
base_color = '#ff9800' |
|
elif step.mcp_server == "Health Monitor": |
|
base_color = '#9c27b0' |
|
else: |
|
|
|
base_colors = { |
|
'input': '#4caf50', |
|
'agent_init': '#9c27b0', |
|
'agent_process': '#2e7d32', |
|
'comm_agent_to_mcp': '#ff9800', |
|
'comm_mcp_to_server': '#f44336', |
|
'comm_server_to_mcp': '#009688', |
|
'comm_mcp_to_agent': '#8bc34a', |
|
'llm_call': '#e91e63', |
|
'tool_execution': '#03a9f4', |
|
'response': '#4caf50' |
|
} |
|
base_color = base_colors.get(step.step_type, '#607d8b') |
|
color = base_color |
|
else: |
|
color = '#bdbdbd' |
|
|
|
node_colors.append(color) |
|
|
|
|
|
label_parts = [] |
|
|
|
|
|
step_display = step.step_type.replace('_', ' ').title() |
|
label_parts.append(f"**{step_display}**") |
|
|
|
|
|
if step.mcp_server: |
|
label_parts.append(f"📡 {step.mcp_server}") |
|
|
|
|
|
if step.tool_name: |
|
label_parts.append(f"🔧 **{step.tool_name}**") |
|
|
|
|
|
if step.duration: |
|
label_parts.append(f"⏱️ {step.duration:.2f}s") |
|
|
|
node_labels[node_id] = "\n".join(label_parts) |
|
|
|
|
|
if step.step_type == 'tool_execution': |
|
node_sizes.append(5000) |
|
elif step.step_type in ['input', 'response']: |
|
node_sizes.append(4000) |
|
elif 'comm_' in step.step_type: |
|
node_sizes.append(2500) |
|
else: |
|
node_sizes.append(3000) |
|
|
|
|
|
nx.draw(G, pos, |
|
node_color=node_colors, |
|
node_size=node_sizes, |
|
font_size=9, |
|
font_weight='bold', |
|
arrows=True, |
|
arrowsize=20, |
|
edge_color='#666666', |
|
alpha=0.9, |
|
ax=ax, |
|
arrowstyle='->') |
|
|
|
|
|
nx.draw_networkx_labels(G, pos, node_labels, font_size=8, ax=ax) |
|
|
|
|
|
ax.set_title("MCP Agent Workflow: Server & Tool Execution Flow", |
|
fontsize=20, pad=25, fontweight='bold') |
|
ax.axis('off') |
|
|
|
|
|
legend_elements = [ |
|
plt.Rectangle((0,0),1,1, facecolor='#4caf50', label='Semantic Server'), |
|
plt.Rectangle((0,0),1,1, facecolor='#2196f3', label='Token Counter Server'), |
|
plt.Rectangle((0,0),1,1, facecolor='#ff9800', label='Sentiment Analysis Server'), |
|
plt.Rectangle((0,0),1,1, facecolor='#9c27b0', label='Health Monitor Server'), |
|
plt.Rectangle((0,0),1,1, facecolor='#e91e63', label='LLM Calls'), |
|
plt.Rectangle((0,0),1,1, facecolor='#607d8b', label='Agent Processing'), |
|
] |
|
ax.legend(handles=legend_elements, loc='upper left', bbox_to_anchor=(0, 1)) |
|
|
|
fig.set_constrained_layout(True) |
|
|
|
|
|
import tempfile |
|
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png') |
|
plt.savefig(temp_file.name, format='png', dpi=300, bbox_inches='tight') |
|
plt.close(fig) |
|
|
|
return temp_file.name |
|
|
|
def get_workflow_summary(self) -> Dict[str, Any]: |
|
total_duration = time.time() - self.start_time |
|
|
|
|
|
step_counts = {} |
|
server_usage = {} |
|
tool_usage = {} |
|
communication_steps = [] |
|
processing_steps = [] |
|
|
|
for step in self.steps: |
|
step_counts[step.step_type] = step_counts.get(step.step_type, 0) + 1 |
|
|
|
|
|
if step.mcp_server: |
|
server_usage[step.mcp_server] = server_usage.get(step.mcp_server, 0) + 1 |
|
|
|
|
|
if step.tool_name: |
|
tool_usage[step.tool_name] = tool_usage.get(step.tool_name, 0) + 1 |
|
|
|
if 'comm_' in step.step_type: |
|
communication_steps.append({ |
|
'step_id': step.step_id, |
|
'from': step.details.get('from', 'unknown') if step.details else 'unknown', |
|
'to': step.details.get('to', 'unknown') if step.details else 'unknown', |
|
'message_type': step.details.get('message_type', 'unknown') if step.details else 'unknown', |
|
'mcp_server': step.mcp_server, |
|
'tool_name': step.tool_name, |
|
'duration': step.duration, |
|
'status': step.status |
|
}) |
|
else: |
|
processing_steps.append({ |
|
'step_id': step.step_id, |
|
'type': step.step_type, |
|
'content': step.content[:50] + "..." if len(step.content) > 50 else step.content, |
|
'mcp_server': step.mcp_server, |
|
'tool_name': step.tool_name, |
|
'duration': step.duration, |
|
'status': step.status |
|
}) |
|
|
|
|
|
completed_steps = [s for s in self.steps if s.duration is not None] |
|
avg_duration = (sum(s.duration or 0 for s in completed_steps) / len(completed_steps)) if completed_steps else 0 |
|
|
|
return { |
|
'total_steps': len(self.steps), |
|
'total_duration': round(total_duration, 3), |
|
'average_step_duration': round(avg_duration, 3), |
|
'step_counts': step_counts, |
|
'server_usage': server_usage, |
|
'tool_usage': tool_usage, |
|
'communication_flow': communication_steps, |
|
'processing_steps': processing_steps, |
|
'status': 'completed' if all(s.status in ['completed', 'error'] for s in self.steps) else 'running', |
|
'error_count': sum(1 for s in self.steps if s.status == 'error'), |
|
'success_rate': round((sum(1 for s in self.steps if s.status == 'completed') / len(self.steps)) * 100, 1) if self.steps else 0, |
|
'detailed_steps': [asdict(s) for s in self.steps] |
|
} |
|
|
|
|
|
workflow_visualizer = EnhancedWorkflowVisualizer() |
|
|
|
|
|
def track_workflow_step(step_type: str, content: str, metadata: Optional[Dict[str, Any]] = None, |
|
parent_step: Optional[str] = None, mcp_server: Optional[str] = None, |
|
tool_name: Optional[str] = None) -> str: |
|
return workflow_visualizer.add_step(step_type, content, metadata, parent_step, |
|
mcp_server=mcp_server, tool_name=tool_name) |
|
|
|
def track_communication(from_component: str, to_component: str, message_type: str, |
|
content: str, parent_step: Optional[str] = None) -> str: |
|
return workflow_visualizer.add_communication_step(from_component, to_component, |
|
message_type, content, parent_step) |
|
|
|
def track_tool_execution(tool_name: str, mcp_server: str, input_data: str, |
|
parent_step: Optional[str] = None) -> str: |
|
"""New helper for tracking tool executions with clear server/tool info.""" |
|
return workflow_visualizer.add_tool_execution_step(tool_name, mcp_server, input_data, parent_step) |
|
|
|
def complete_workflow_step(step_id: str, status: str = 'completed', |
|
metadata: Optional[Dict[str, Any]] = None, |
|
details: Optional[Dict[str, Any]] = None): |
|
workflow_visualizer.complete_step(step_id, status, metadata, details) |
|
|
|
def get_workflow_visualization() -> str: |
|
return workflow_visualizer.create_matplotlib_visualization() |
|
|
|
def get_workflow_summary() -> Dict[str, Any]: |
|
return workflow_visualizer.get_workflow_summary() |
|
|
|
def reset_workflow(): |
|
global workflow_visualizer |
|
workflow_visualizer = EnhancedWorkflowVisualizer() |