|
import gradio as gr |
|
import asyncio |
|
import json |
|
import logging |
|
from typing import List, Dict, Any, Optional, Tuple |
|
from dataclasses import dataclass, field |
|
import anthropic |
|
import aiohttp |
|
from mcp import ClientSession |
|
from mcp.client.sse import sse_client |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
@dataclass |
|
class MCPSSEServerConfig: |
|
name: str |
|
url: str |
|
headers: Dict[str, str] = field(default_factory=dict) |
|
timeout: int = 50 |
|
sse_read_timeout: int = 50 |
|
|
|
class FastAgentMCPClient: |
|
def __init__(self, config: MCPSSEServerConfig): |
|
self.config = config |
|
self.session = None |
|
self.tools = [] |
|
self.connected = False |
|
self.client_session = None |
|
|
|
async def connect(self): |
|
"""Establish connection using fast-agent-mcp""" |
|
try: |
|
logger.info(f"Connecting to {self.config.name} using fast-agent-mcp") |
|
|
|
|
|
self.client_session = await sse_client( |
|
self.config.url, |
|
headers=self.config.headers, |
|
timeout=self.config.timeout |
|
) |
|
|
|
|
|
await self.client_session.initialize() |
|
|
|
|
|
tools_result = await self.client_session.list_tools() |
|
self.tools = tools_result.tools if hasattr(tools_result, 'tools') else [] |
|
|
|
self.connected = True |
|
logger.info(f"Successfully connected to {self.config.name} with {len(self.tools)} tools") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to connect to {self.config.name} using fast-agent-mcp: {e}") |
|
|
|
await self._fallback_connect() |
|
|
|
async def _fallback_connect(self): |
|
"""Fallback connection method using manual SSE handling""" |
|
try: |
|
logger.info(f"Attempting fallback connection for {self.config.name}") |
|
|
|
|
|
connector = aiohttp.TCPConnector(limit=100, limit_per_host=30) |
|
self.session = aiohttp.ClientSession( |
|
timeout=aiohttp.ClientTimeout(total=self.config.timeout), |
|
connector=connector, |
|
connector_owner=True |
|
) |
|
|
|
|
|
await self._test_sse_endpoint() |
|
|
|
|
|
await self._probe_server_capabilities() |
|
|
|
self.connected = True |
|
logger.info(f"Fallback connection successful for {self.config.name}") |
|
|
|
except Exception as e: |
|
logger.warning(f"Fallback connection failed for {self.config.name}: {e}") |
|
|
|
self.connected = True |
|
self.tools = [] |
|
logger.info(f"Graceful connection established for {self.config.name} (no tools)") |
|
|
|
async def _test_sse_endpoint(self): |
|
"""Test SSE endpoint connectivity""" |
|
try: |
|
async with self.session.get( |
|
self.config.url, |
|
headers={ |
|
"Accept": "text/event-stream", |
|
"Cache-Control": "no-cache", |
|
"Connection": "keep-alive", |
|
"User-Agent": "Klaide-FastAgent-MCP/1.0.0", |
|
**self.config.headers |
|
}, |
|
timeout=aiohttp.ClientTimeout(total=10), |
|
allow_redirects=True, |
|
max_redirects=10 |
|
) as response: |
|
|
|
|
|
if response.history: |
|
redirect_chain = " -> ".join([str(h.url) for h in response.history]) |
|
logger.info(f"SSE endpoint redirected: {redirect_chain} -> {response.url}") |
|
|
|
if response.status in [200, 204]: |
|
logger.info(f"SSE endpoint accessible for {self.config.name}") |
|
elif response.status == 405: |
|
|
|
logger.warning(f"SSE endpoint returned 405 for {self.config.name} - continuing gracefully") |
|
else: |
|
logger.warning(f"SSE endpoint returned {response.status} for {self.config.name}") |
|
|
|
except Exception as e: |
|
logger.warning(f"SSE endpoint test failed for {self.config.name}: {e}") |
|
|
|
async def _probe_server_capabilities(self): |
|
"""Probe server for MCP capabilities""" |
|
probe_methods = [ |
|
self._probe_jsonrpc_post, |
|
self._probe_jsonrpc_get, |
|
self._probe_rest_api, |
|
self._probe_websocket |
|
] |
|
|
|
for method in probe_methods: |
|
try: |
|
tools = await method() |
|
if tools: |
|
self.tools = tools |
|
logger.info(f"Successfully probed {self.config.name} and found {len(tools)} tools") |
|
return |
|
except Exception as e: |
|
logger.debug(f"Probe method failed for {self.config.name}: {e}") |
|
continue |
|
|
|
|
|
self.tools = [] |
|
logger.info(f"All probe methods failed for {self.config.name}, setting empty tools") |
|
|
|
async def _probe_jsonrpc_post(self): |
|
"""Probe using JSON-RPC POST method""" |
|
|
|
init_message = { |
|
"jsonrpc": "2.0", |
|
"id": 1, |
|
"method": "initialize", |
|
"params": { |
|
"protocolVersion": "2024-11-05", |
|
"capabilities": {"tools": {}}, |
|
"clientInfo": {"name": "klaide-fast-agent", "version": "1.0.0"} |
|
} |
|
} |
|
|
|
async with self.session.post( |
|
self.config.url, |
|
json=init_message, |
|
headers={ |
|
"Content-Type": "application/json", |
|
"User-Agent": "Klaide-FastAgent-MCP/1.0.0", |
|
**self.config.headers |
|
}, |
|
timeout=aiohttp.ClientTimeout(total=15), |
|
allow_redirects=True, |
|
max_redirects=10 |
|
) as response: |
|
|
|
if response.status != 200: |
|
raise Exception(f"Init failed with status {response.status}") |
|
|
|
result = await response.json() |
|
if "error" in result: |
|
raise Exception(f"Init error: {result['error']}") |
|
|
|
|
|
tools_message = { |
|
"jsonrpc": "2.0", |
|
"id": 2, |
|
"method": "tools/list", |
|
"params": {} |
|
} |
|
|
|
async with self.session.post( |
|
self.config.url, |
|
json=tools_message, |
|
headers={ |
|
"Content-Type": "application/json", |
|
"User-Agent": "Klaide-FastAgent-MCP/1.0.0", |
|
**self.config.headers |
|
}, |
|
timeout=aiohttp.ClientTimeout(total=15), |
|
allow_redirects=True, |
|
max_redirects=10 |
|
) as response: |
|
|
|
if response.status != 200: |
|
raise Exception(f"Tools list failed with status {response.status}") |
|
|
|
tools_response = await response.json() |
|
if "result" in tools_response and "tools" in tools_response["result"]: |
|
return tools_response["result"]["tools"] |
|
|
|
return [] |
|
|
|
async def _probe_jsonrpc_get(self): |
|
"""Probe using JSON-RPC GET method with query parameters""" |
|
try: |
|
|
|
async with self.session.get( |
|
f"{self.config.url}?method=tools/list", |
|
headers={ |
|
"Accept": "application/json", |
|
"User-Agent": "Klaide-FastAgent-MCP/1.0.0", |
|
**self.config.headers |
|
}, |
|
timeout=aiohttp.ClientTimeout(total=10), |
|
allow_redirects=True, |
|
max_redirects=10 |
|
) as response: |
|
|
|
if response.status == 200: |
|
result = await response.json() |
|
if "tools" in result: |
|
return result["tools"] |
|
elif "result" in result and "tools" in result["result"]: |
|
return result["result"]["tools"] |
|
|
|
except Exception: |
|
pass |
|
|
|
return [] |
|
|
|
async def _probe_rest_api(self): |
|
"""Probe using REST API endpoints""" |
|
rest_endpoints = [ |
|
f"{self.config.url.rstrip('/sse')}/tools", |
|
f"{self.config.url.rstrip('/sse')}/api/tools", |
|
f"{self.config.url.rstrip('/sse')}/mcp/tools", |
|
f"{self.config.url}/tools" |
|
] |
|
|
|
for endpoint in rest_endpoints: |
|
try: |
|
async with self.session.get( |
|
endpoint, |
|
headers={ |
|
"Accept": "application/json", |
|
"User-Agent": "Klaide-FastAgent-MCP/1.0.0", |
|
**self.config.headers |
|
}, |
|
timeout=aiohttp.ClientTimeout(total=10), |
|
allow_redirects=True, |
|
max_redirects=10 |
|
) as response: |
|
|
|
if response.status == 200: |
|
result = await response.json() |
|
if isinstance(result, list): |
|
return result |
|
elif "tools" in result: |
|
return result["tools"] |
|
|
|
except Exception: |
|
continue |
|
|
|
return [] |
|
|
|
async def _probe_websocket(self): |
|
"""Probe using WebSocket connection""" |
|
try: |
|
|
|
ws_url = self.config.url.replace('https://', 'wss://').replace('http://', 'ws://') |
|
|
|
|
|
|
|
logger.debug(f"WebSocket probe not implemented for {ws_url}") |
|
return [] |
|
|
|
except Exception: |
|
return [] |
|
|
|
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any: |
|
"""Call a tool using the best available method""" |
|
if self.client_session: |
|
|
|
try: |
|
result = await self.client_session.call_tool(tool_name, arguments) |
|
return result.content if hasattr(result, 'content') else result |
|
except Exception as e: |
|
logger.error(f"Fast-agent tool call failed: {e}") |
|
|
|
|
|
|
|
if self.session: |
|
message = { |
|
"jsonrpc": "2.0", |
|
"id": int(asyncio.get_event_loop().time() * 1000), |
|
"method": "tools/call", |
|
"params": { |
|
"name": tool_name, |
|
"arguments": arguments |
|
} |
|
} |
|
|
|
try: |
|
async with self.session.post( |
|
self.config.url, |
|
json=message, |
|
headers={ |
|
"Content-Type": "application/json", |
|
"User-Agent": "Klaide-FastAgent-MCP/1.0.0", |
|
**self.config.headers |
|
}, |
|
timeout=aiohttp.ClientTimeout(total=self.config.sse_read_timeout), |
|
allow_redirects=True, |
|
max_redirects=10 |
|
) as response: |
|
|
|
if response.status == 200: |
|
result = await response.json() |
|
if "result" in result: |
|
return result["result"].get("content", []) |
|
|
|
raise Exception(f"Tool call failed with status {response.status}") |
|
|
|
except Exception as e: |
|
logger.error(f"Manual tool call failed: {e}") |
|
return [{"type": "text", "text": f"Error: {str(e)}"}] |
|
|
|
return [{"type": "text", "text": "Error: No connection available"}] |
|
|
|
async def close(self): |
|
"""Close all connections""" |
|
if self.client_session: |
|
try: |
|
await self.client_session.close() |
|
except Exception: |
|
pass |
|
|
|
if self.session: |
|
try: |
|
await self.session.close() |
|
except Exception: |
|
pass |
|
|
|
self.connected = False |
|
|
|
class MCPChatbot: |
|
def __init__(self, anthropic_api_key: str, mcp_servers: Dict[str, MCPSSEServerConfig]): |
|
self.anthropic_client = anthropic.Anthropic(api_key=anthropic_api_key) |
|
self.mcp_servers = mcp_servers |
|
self.clients = {} |
|
self.available_tools = {} |
|
|
|
async def initialize_mcp_servers(self): |
|
"""Initialize connections to all MCP SSE servers using fast-agent-mcp""" |
|
for server_name, server_config in self.mcp_servers.items(): |
|
try: |
|
logger.info(f"Connecting to MCP SSE server: {server_name}") |
|
|
|
client = FastAgentMCPClient(server_config) |
|
await client.connect() |
|
|
|
self.clients[server_name] = client |
|
self.available_tools[server_name] = client.tools |
|
|
|
if client.connected: |
|
logger.info(f"Successfully connected to {server_name} with {len(client.tools)} tools") |
|
else: |
|
logger.warning(f"Partial connection to {server_name}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to connect to {server_name}: {e}") |
|
continue |
|
|
|
def format_tools_for_claude(self) -> List[Dict]: |
|
"""Format tools from MCP for Claude API with enhanced context""" |
|
claude_tools = [] |
|
|
|
for server_name, tools in self.available_tools.items(): |
|
for tool in tools: |
|
|
|
server_context = "" |
|
if server_name == "burp_mcp": |
|
server_context = " (Burp Suite - Web Security Testing)" |
|
elif server_name == "viper_mcp": |
|
server_context = " (Metasploit - Penetration Testing)" |
|
|
|
tool_description = tool.get('description', f"Tool from {server_name}") |
|
enhanced_description = f"{tool_description}{server_context}" |
|
|
|
claude_tool = { |
|
"name": f"{server_name}_{tool.get('name', 'unknown')}", |
|
"description": enhanced_description, |
|
"input_schema": tool.get('inputSchema', { |
|
"type": "object", |
|
"properties": {}, |
|
"required": [] |
|
}), |
|
"server_context": { |
|
"server_name": server_name, |
|
"server_url": self.clients[server_name].config.url if server_name in self.clients else "", |
|
"capabilities": self._get_server_capabilities(server_name) |
|
} |
|
} |
|
claude_tools.append(claude_tool) |
|
|
|
return claude_tools |
|
|
|
def _get_server_capabilities(self, server_name: str) -> List[str]: |
|
"""Get capabilities for a specific server""" |
|
capabilities_map = { |
|
"burp_mcp": [ |
|
"Web application security testing", |
|
"Vulnerability scanning", |
|
"HTTP request/response analysis", |
|
"Spider/crawling functionality", |
|
"Intruder attacks", |
|
"Repeater functionality" |
|
], |
|
"viper_mcp": [ |
|
"Penetration testing", |
|
"Exploit development", |
|
"Payload generation", |
|
"Network reconnaissance", |
|
"Post-exploitation", |
|
"Metasploit module execution" |
|
] |
|
} |
|
return capabilities_map.get(server_name, []) |
|
|
|
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any: |
|
"""Execute tool through appropriate MCP server""" |
|
parts = tool_name.split('_', 1) |
|
if len(parts) < 2: |
|
raise ValueError(f"Invalid tool name format: {tool_name}") |
|
|
|
server_name = parts[0] |
|
actual_tool_name = parts[1] |
|
|
|
if server_name not in self.clients: |
|
raise ValueError(f"Server {server_name} not available") |
|
|
|
client = self.clients[server_name] |
|
|
|
if not client.connected: |
|
raise ValueError(f"Server {server_name} not connected") |
|
|
|
try: |
|
result = await client.call_tool(actual_tool_name, arguments) |
|
return result |
|
except Exception as e: |
|
logger.error(f"Error executing tool {tool_name}: {e}") |
|
return [{"type": "text", "text": f"Error: {str(e)}"}] |
|
|
|
async def chat(self, message: str, history: List[List[str]]) -> tuple: |
|
"""Main chat function using messages API with tools context""" |
|
try: |
|
|
|
tools = self.format_tools_for_claude() |
|
tools_context = await self._get_tools_context() |
|
|
|
|
|
system_content, messages = await self._build_messages_with_context(message, history, tools_context) |
|
|
|
|
|
response = self.anthropic_client.messages.create( |
|
model="claude-3-5-sonnet-20241022", |
|
max_tokens=4000, |
|
messages=messages, |
|
tools=tools, |
|
temperature=0.1, |
|
system=system_content |
|
) |
|
|
|
|
|
assistant_message = "" |
|
|
|
for content in response.content: |
|
if content.type == "text": |
|
assistant_message += content.text |
|
elif content.type == "tool_use": |
|
|
|
try: |
|
tool_result = await self.execute_tool( |
|
content.name, |
|
content.input |
|
) |
|
|
|
|
|
assistant_message += f"\n\n🔧 **Tool Executed**: {content.name}\n" |
|
|
|
if isinstance(tool_result, list): |
|
for item in tool_result: |
|
if isinstance(item, dict) and item.get("type") == "text": |
|
assistant_message += f"📊 **Result**: {item.get('text', '')}\n" |
|
else: |
|
assistant_message += f"📊 **Result**: {json.dumps(item, indent=2, ensure_ascii=False)}\n" |
|
else: |
|
assistant_message += f"📊 **Result**: {json.dumps(tool_result, indent=2, ensure_ascii=False)}\n" |
|
|
|
except Exception as e: |
|
assistant_message += f"\n\n❌ **Tool execution failed**: {content.name}\n" |
|
assistant_message += f"🚫 **Error**: {str(e)}\n" |
|
|
|
|
|
history.append([message, assistant_message]) |
|
|
|
return history, "" |
|
|
|
except Exception as e: |
|
error_msg = f"❌ **Error**: {str(e)}" |
|
history.append([message, error_msg]) |
|
return history, "" |
|
|
|
async def _build_messages_with_context(self, message: str, history: List[List[str]], tools_context: str) -> Tuple[str, List[Dict]]: |
|
"""Build messages array with tools context integration""" |
|
|
|
system_content = f"""You are Klaide, a Kali Linux AI Desktop assistant that controls cybersecurity tools through MCP servers. |
|
You help users perform penetration testing, vulnerability assessment, and security analysis. |
|
|
|
{tools_context} |
|
|
|
Instructions: |
|
1. Analyze the user's request in the context of available MCP tools |
|
2. Use the appropriate tools for cybersecurity tasks |
|
3. Provide helpful guidance and explanations |
|
4. Be specific about which Kali Linux tools or techniques to use |
|
5. Always prioritize security and ethical hacking practices |
|
6. When using tools, explain what you're doing and why |
|
|
|
Available tool format: Use the tools provided in the tools list for executing commands.""" |
|
messages = [] |
|
|
|
if history: |
|
for user_msg, assistant_msg in history[-5:]: |
|
messages.append({"role": "user", "content": user_msg}) |
|
if assistant_msg: |
|
messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
return system_content, messages |
|
|
|
async def _get_tools_context(self) -> str: |
|
"""Get context information from all connected MCP servers""" |
|
context_parts = [] |
|
|
|
context_parts.append("=== MCP SERVERS CONTEXT ===") |
|
|
|
for server_name, client in self.clients.items(): |
|
if not client.connected: |
|
continue |
|
|
|
context_parts.append(f"\n[{server_name.upper()} SERVER]") |
|
context_parts.append(f"URL: {client.config.url}") |
|
context_parts.append(f"Status: Connected") |
|
|
|
|
|
tools = self.available_tools.get(server_name, []) |
|
context_parts.append(f"Available Tools: {len(tools)}") |
|
|
|
if tools: |
|
context_parts.append("Tools List:") |
|
for tool in tools: |
|
tool_name = tool.get('name', 'unknown') |
|
tool_desc = tool.get('description', 'No description') |
|
context_parts.append(f" - {tool_name}: {tool_desc}") |
|
|
|
|
|
schema = tool.get('inputSchema', {}) |
|
if schema.get('properties'): |
|
props = list(schema['properties'].keys()) |
|
context_parts.append(f" Parameters: {', '.join(props)}") |
|
|
|
|
|
if server_name == "burp_mcp": |
|
context_parts.append("Capabilities:") |
|
context_parts.append(" - Web application security testing") |
|
context_parts.append(" - Vulnerability scanning") |
|
context_parts.append(" - HTTP request/response analysis") |
|
context_parts.append(" - Burp Suite integration") |
|
|
|
elif server_name == "viper_mcp": |
|
context_parts.append("Capabilities:") |
|
context_parts.append(" - Penetration testing") |
|
context_parts.append(" - Exploit development") |
|
context_parts.append(" - Metasploit Framework integration") |
|
context_parts.append(" - Payload generation") |
|
|
|
|
|
connected_count = sum(1 for client in self.clients.values() if client.connected) |
|
total_tools = sum(len(tools) for tools in self.available_tools.values()) |
|
|
|
context_parts.append(f"\n[SYSTEM STATUS]") |
|
context_parts.append(f"Connected Servers: {connected_count}/{len(self.clients)}") |
|
context_parts.append(f"Total Available Tools: {total_tools}") |
|
context_parts.append(f"MCP Client: Fast-Agent-MCP") |
|
|
|
return "\n".join(context_parts) |
|
|
|
async def _build_completion_prompt(self, message: str, history: List[List[str]], tools_context: str) -> str: |
|
"""Build completion prompt with conversation history and tools context (deprecated - now using messages)""" |
|
|
|
|
|
prompt_parts = [] |
|
|
|
|
|
prompt_parts.append("You are Klaide, a Kali Linux AI Desktop assistant that controls cybersecurity tools through MCP servers.") |
|
prompt_parts.append("You help users perform penetration testing, vulnerability assessment, and security analysis.") |
|
prompt_parts.append("") |
|
|
|
|
|
prompt_parts.append(tools_context) |
|
prompt_parts.append("") |
|
|
|
|
|
if history: |
|
prompt_parts.append("=== CONVERSATION HISTORY ===") |
|
for user_msg, assistant_msg in history[-5:]: |
|
prompt_parts.append(f"Human: {user_msg}") |
|
prompt_parts.append(f"Assistant: {assistant_msg}") |
|
prompt_parts.append("") |
|
|
|
|
|
prompt_parts.append("=== CURRENT REQUEST ===") |
|
prompt_parts.append(f"Human: {message}") |
|
prompt_parts.append("") |
|
|
|
|
|
prompt_parts.append("=== INSTRUCTIONS ===") |
|
prompt_parts.append("1. Analyze the user's request in the context of available MCP tools") |
|
prompt_parts.append("2. Use the appropriate tools for cybersecurity tasks") |
|
prompt_parts.append("3. Provide helpful cybersecurity guidance and explanations") |
|
prompt_parts.append("4. Be specific about which Kali Linux tools or techniques to use") |
|
prompt_parts.append("5. Always prioritize security and ethical hacking practices") |
|
prompt_parts.append("") |
|
prompt_parts.append("Assistant: ") |
|
|
|
return "\n".join(prompt_parts) |
|
|
|
async def _extract_tool_calls_from_completion(self, completion_text: str) -> List[Dict[str, Any]]: |
|
"""Extract tool calls from completion text (deprecated - now using native tool calling)""" |
|
|
|
|
|
import re |
|
|
|
tool_calls = [] |
|
|
|
|
|
pattern = r'TOOL_CALL\[([^\]]+)\]\(([^)]*)\)' |
|
matches = re.findall(pattern, completion_text) |
|
|
|
for match in matches: |
|
tool_name = match[0].strip() |
|
args_str = match[1].strip() |
|
|
|
|
|
try: |
|
if args_str: |
|
|
|
arguments = {} |
|
if '=' in args_str: |
|
pairs = args_str.split(',') |
|
for pair in pairs: |
|
if '=' in pair: |
|
key, value = pair.split('=', 1) |
|
key = key.strip().strip('"\'') |
|
value = value.strip().strip('"\'') |
|
arguments[key] = value |
|
else: |
|
|
|
arguments = json.loads(args_str) if args_str else {} |
|
else: |
|
arguments = {} |
|
|
|
tool_calls.append({ |
|
'name': tool_name, |
|
'arguments': arguments |
|
}) |
|
|
|
except Exception as e: |
|
logger.warning(f"Failed to parse tool arguments: {args_str}, error: {e}") |
|
|
|
tool_calls.append({ |
|
'name': tool_name, |
|
'arguments': {} |
|
}) |
|
|
|
return tool_calls |
|
|
|
async def get_server_status(self) -> str: |
|
"""Get comprehensive status of all MCP servers with enhanced context""" |
|
status = "📡 **Kali Linux MCP Servers Status (Fast-Agent-MCP + Enhanced Context):**\n\n" |
|
|
|
if not self.clients: |
|
status += "❌ **No servers configured**\n" |
|
status += "Please configure at least one MCP server URL in Settings.\n" |
|
return status |
|
|
|
for server_name, client in self.clients.items(): |
|
status += f"## 🖥️ **{server_name.upper()}**\n" |
|
status += f"**URL**: `{client.config.url}`\n" |
|
|
|
try: |
|
if not client.connected: |
|
status += "**Status**: ❌ **DISCONNECTED**\n" |
|
status += "**Connection Method**: Fast-Agent-MCP + Fallback\n" |
|
status += "**Tools**: ⚠️ No tools available\n" |
|
status += "**Context**: Not available for enhanced context\n\n" |
|
continue |
|
|
|
tools = self.available_tools.get(server_name, []) |
|
|
|
if not tools: |
|
status += "**Status**: ⚠️ **CONNECTED BUT NO TOOLS**\n" |
|
status += "**Connection Method**: Fast-Agent-MCP (Graceful mode)\n" |
|
status += "**Tools**: 📭 Empty tools list\n" |
|
status += "**Context**: Limited context for AI\n" |
|
else: |
|
status += f"**Status**: ✅ **CONNECTED & OPERATIONAL**\n" |
|
|
|
|
|
if client.client_session: |
|
status += "**Connection Method**: 🚀 Fast-Agent-MCP (Native)\n" |
|
else: |
|
status += "**Connection Method**: 🔄 Fast-Agent-MCP (Fallback)\n" |
|
|
|
status += f"**Tools Available**: 🛠️ **{len(tools)} tools**\n" |
|
status += f"**AI Context**: ✅ **Full context available**\n" |
|
|
|
|
|
status += "**Tools List** (Available with enhanced context):\n" |
|
for i, tool in enumerate(tools, 1): |
|
tool_name = tool.get('name', 'Unknown') |
|
tool_desc = tool.get('description', 'No description available') |
|
|
|
if len(tool_desc) > 60: |
|
tool_desc = tool_desc[:57] + "..." |
|
|
|
status += f" `{i:2d}.` **{tool_name}** - {tool_desc}\n" |
|
|
|
|
|
capabilities = self._get_server_capabilities(server_name) |
|
status += "**Enhanced Context Capabilities**:\n" |
|
for cap in capabilities: |
|
status += f" • {cap}\n" |
|
|
|
|
|
status += f"**Timeout**: {client.config.timeout}s\n" |
|
status += f"**SSE Timeout**: {client.config.sse_read_timeout}s\n" |
|
|
|
except Exception as e: |
|
status += "**Status**: ❌ **ERROR**\n" |
|
status += f"**Error Details**: {str(e)}\n" |
|
status += "**Tools**: ⚠️ Unable to retrieve tools\n" |
|
status += "**Context**: Error in enhanced context\n" |
|
|
|
status += "\n" + "─" * 50 + "\n\n" |
|
|
|
|
|
connected_count = sum(1 for client in self.clients.values() if client.connected) |
|
total_tools = sum(len(tools) for tools in self.available_tools.values()) |
|
|
|
status += "## 📊 **Enhanced Context Summary**\n" |
|
status += f"**Connected Servers**: {connected_count}/{len(self.clients)}\n" |
|
status += f"**Total Available Tools**: {total_tools}\n" |
|
status += f"**MCP Client**: Fast-Agent-MCP with enhanced context\n" |
|
status += f"**AI Model**: Claude-3.5-Sonnet (Messages API with Context)\n" |
|
|
|
|
|
if total_tools > 0: |
|
status += f"**Context Integration**: ✅ **Full MCP context available**\n" |
|
status += f"**AI Features**:\n" |
|
status += f" • Rich server context in system messages\n" |
|
status += f" • Native tool calling with Claude Messages API\n" |
|
status += f" • Enhanced cybersecurity guidance\n" |
|
status += f" • Conversation history integration\n" |
|
status += f" • Server capability awareness\n" |
|
else: |
|
status += f"**Context Integration**: ⚠️ **Limited context**\n" |
|
|
|
if connected_count == 0: |
|
status += "**Overall Status**: ❌ **NO SERVERS OPERATIONAL**\n" |
|
status += "**Action Required**: Check server URLs and network connectivity\n" |
|
elif connected_count == len(self.clients): |
|
status += "**Overall Status**: ✅ **ALL SYSTEMS OPERATIONAL**\n" |
|
status += "**Ready**: Klaide enhanced context mode ready for cybersecurity tasks\n" |
|
else: |
|
status += "**Overall Status**: ⚠️ **PARTIAL CONNECTIVITY**\n" |
|
status += "**Action**: Some servers need attention\n" |
|
|
|
return status |
|
|
|
async def close_all_connections(self): |
|
"""Close all MCP connections""" |
|
for client in self.clients.values(): |
|
await client.close() |
|
|
|
|
|
chatbot = None |
|
|
|
def create_mcp_servers_config(burp_url: str, viper_url: str) -> Dict[str, MCPSSEServerConfig]: |
|
"""Create MCP servers configuration from URLs""" |
|
servers = {} |
|
|
|
if burp_url.strip(): |
|
servers["burp_mcp"] = MCPSSEServerConfig( |
|
name="burp_mcp", |
|
url=burp_url.strip(), |
|
headers={}, |
|
timeout=50, |
|
sse_read_timeout=50 |
|
) |
|
|
|
if viper_url.strip(): |
|
servers["viper_mcp"] = MCPSSEServerConfig( |
|
name="viper_mcp", |
|
url=viper_url.strip(), |
|
headers={}, |
|
timeout=50, |
|
sse_read_timeout=50 |
|
) |
|
|
|
return servers |
|
|
|
async def initialize_chatbot(api_key: str, burp_url: str, viper_url: str): |
|
"""Initialize Klaide with API key and server URLs using fast-agent-mcp""" |
|
global chatbot |
|
|
|
if not api_key: |
|
return "❌ Please enter Anthropic API Key" |
|
|
|
if not burp_url.strip() and not viper_url.strip(): |
|
return "❌ Please enter at least one MCP server URL" |
|
|
|
try: |
|
mcp_servers = create_mcp_servers_config(burp_url, viper_url) |
|
chatbot = MCPChatbot(api_key, mcp_servers) |
|
await chatbot.initialize_mcp_servers() |
|
|
|
connected_servers = [name for name, client in chatbot.clients.items() if client.connected] |
|
total_tools = sum(len(tools) for tools in chatbot.available_tools.values()) |
|
|
|
if connected_servers: |
|
status_msg = f"✅ Klaide successfully initialized with Fast-Agent-MCP!\n" |
|
status_msg += f"🔗 Connected servers: {', '.join(connected_servers)}\n" |
|
status_msg += f"🛠️ Total tools available: {total_tools}\n" |
|
status_msg += f"🚀 MCP Client: Fast-Agent-MCP with fallback support\n" |
|
|
|
|
|
for server_name, client in chatbot.clients.items(): |
|
if client.connected: |
|
method = "Native" if client.client_session else "Fallback" |
|
status_msg += f"📡 {server_name}: {method} connection\n" |
|
|
|
return status_msg |
|
else: |
|
return "⚠️ Klaide initialized but no servers connected. Please check your URLs." |
|
|
|
except Exception as e: |
|
return f"❌ Initialization error: {str(e)}" |
|
|
|
async def chat_wrapper(message, history): |
|
"""Wrapper for chat function""" |
|
if not chatbot: |
|
history.append([message, "❌ Klaide not initialized. Please enter API Key first."]) |
|
return history, "" |
|
|
|
return await chatbot.chat(message, history) |
|
|
|
async def get_status(): |
|
"""Get server status""" |
|
if not chatbot: |
|
return "❌ Klaide not initialized" |
|
|
|
return await chatbot.get_server_status() |
|
|
|
async def cleanup(): |
|
"""Cleanup function""" |
|
global chatbot |
|
if chatbot: |
|
await chatbot.close_all_connections() |
|
|
|
|
|
def create_interface(): |
|
with gr.Blocks(title="Klaide (Kali Linux AI Desktop)", theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# 🐉 Klaide (**Kali Linux AI Desktop**)") |
|
gr.Markdown("Controlling Kali Linux Desktop with AI using MCP Server.") |
|
|
|
with gr.Tab("💬 Console"): |
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
chatbot_ui = gr.Chatbot( |
|
label="Klaide Console", |
|
height=500, |
|
show_copy_button=True, |
|
avatar_images=("assets/user.png", "assets/csalab.png") |
|
) |
|
|
|
with gr.Row(): |
|
msg = gr.Textbox( |
|
placeholder="Ask Klaide to control your Kali Linux tools...", |
|
label="Command Prompt", |
|
scale=4 |
|
) |
|
send_btn = gr.Button("Send", scale=1, variant="primary") |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### 🔧 Controls") |
|
clear_btn = gr.Button("Clear Chat", variant="secondary") |
|
|
|
gr.Markdown("### 📊 Server Status") |
|
status_btn = gr.Button("Refresh Status") |
|
status_display = gr.Markdown("Status will be displayed here...") |
|
|
|
with gr.Tab("⚙️ Settings"): |
|
gr.Markdown("## Setup Configuration") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
api_key_input = gr.Textbox( |
|
label="Anthropic API Key", |
|
type="password", |
|
placeholder="sk-ant-...", |
|
info="Required: Your Anthropic Claude API key" |
|
) |
|
|
|
burp_url_input = gr.Textbox( |
|
label="Burp MCP Server URL", |
|
placeholder="https://burp.csalab.app/sse", |
|
value="https://burp.csalab.app/sse", |
|
info="Optional: URL for Burp Suite MCP server" |
|
) |
|
|
|
viper_url_input = gr.Textbox( |
|
label="Viper MCP Server URL", |
|
placeholder="https://msf.csalab.app/your-id/sse", |
|
value="https://msf.csalab.app/02b77a05348211f0/sse", |
|
info="Optional: URL for Metasploit MCP server" |
|
) |
|
|
|
with gr.Row(): |
|
init_btn = gr.Button("Initialize Klaide", variant="primary", scale=2) |
|
test_urls_btn = gr.Button("Test URLs", variant="secondary", scale=1) |
|
|
|
init_status = gr.Textbox( |
|
label="Klaide Status", |
|
interactive=False, |
|
lines=4 |
|
) |
|
|
|
gr.Markdown("## Klaide (Kali Linux AI Desktop)") |
|
|
|
with gr.Accordion("Advanced Settings", open=False): |
|
gr.Markdown("### Timeout Configuration") |
|
timeout_slider = gr.Slider( |
|
minimum=10, |
|
maximum=120, |
|
value=50, |
|
step=5, |
|
label="Connection Timeout (seconds)", |
|
info="Timeout for server connections and requests" |
|
) |
|
|
|
gr.Markdown("### Custom Headers") |
|
custom_headers = gr.Textbox( |
|
label="Custom Headers (JSON format)", |
|
placeholder='{"Authorization": "Bearer token", "X-API-Key": "key"}', |
|
info="Optional: Custom headers for server requests" |
|
) |
|
|
|
|
|
def chat_fn(message, history): |
|
return asyncio.run(chat_wrapper(message, history)) |
|
|
|
def init_fn(api_key, burp_url, viper_url): |
|
return asyncio.run(initialize_chatbot(api_key, burp_url, viper_url)) |
|
|
|
def status_fn(): |
|
return asyncio.run(get_status()) |
|
|
|
async def test_urls_async(burp_url, viper_url): |
|
"""Test server URLs connectivity with fast-agent-mcp methods""" |
|
results = [] |
|
|
|
|
|
if burp_url.strip(): |
|
burp_result = await test_single_url_fast_agent("Burp", burp_url.strip()) |
|
results.append(burp_result) |
|
else: |
|
results.append("⏭️ Burp Server: URL not provided") |
|
|
|
|
|
if viper_url.strip(): |
|
viper_result = await test_single_url_fast_agent("Viper", viper_url.strip()) |
|
results.append(viper_result) |
|
else: |
|
results.append("⏭️ Viper Server: URL not provided") |
|
|
|
return "\n".join(results) |
|
|
|
async def test_single_url_fast_agent(server_name, url): |
|
"""Test a single URL using fast-agent-mcp approach""" |
|
test_results = [] |
|
|
|
|
|
try: |
|
config = MCPSSEServerConfig(name=f"test_{server_name.lower()}", url=url) |
|
test_client = FastAgentMCPClient(config) |
|
|
|
|
|
config.timeout = 10 |
|
await test_client.connect() |
|
|
|
if test_client.connected: |
|
tool_count = len(test_client.tools) |
|
if test_client.client_session: |
|
test_results.append(f"✅ {server_name} Server: Fast-Agent-MCP native ({tool_count} tools)") |
|
else: |
|
test_results.append(f"✅ {server_name} Server: Fast-Agent-MCP fallback ({tool_count} tools)") |
|
|
|
await test_client.close() |
|
return "\n".join(test_results) |
|
else: |
|
test_results.append(f"⚠️ {server_name} Server: Fast-Agent-MCP failed") |
|
|
|
await test_client.close() |
|
|
|
except Exception as e: |
|
test_results.append(f"❌ {server_name} Server: Fast-Agent-MCP error - {str(e)[:50]}...") |
|
|
|
|
|
try: |
|
connector = aiohttp.TCPConnector(limit=10, limit_per_host=5) |
|
async with aiohttp.ClientSession( |
|
timeout=aiohttp.ClientTimeout(total=10), |
|
connector=connector |
|
) as session: |
|
|
|
|
|
async with session.get( |
|
url, |
|
headers={ |
|
"Accept": "text/event-stream", |
|
"User-Agent": "Klaide-FastAgent-Test/1.0.0" |
|
}, |
|
allow_redirects=True, |
|
max_redirects=10 |
|
) as response: |
|
|
|
if response.status in [200, 204]: |
|
test_results.append(f"✅ {server_name} Server: SSE endpoint accessible") |
|
elif response.status == 405: |
|
test_results.append(f"⚠️ {server_name} Server: SSE returns 405 (will use graceful handling)") |
|
else: |
|
test_results.append(f"⚠️ {server_name} Server: SSE returns HTTP {response.status}") |
|
|
|
except Exception as e: |
|
test_results.append(f"❌ {server_name} Server: SSE test failed - {str(e)[:50]}...") |
|
|
|
return "\n".join(test_results) |
|
|
|
def test_urls_fn(burp_url, viper_url): |
|
return asyncio.run(test_urls_async(burp_url, viper_url)) |
|
|
|
|
|
send_btn.click( |
|
chat_fn, |
|
inputs=[msg, chatbot_ui], |
|
outputs=[chatbot_ui, msg] |
|
) |
|
|
|
msg.submit( |
|
chat_fn, |
|
inputs=[msg, chatbot_ui], |
|
outputs=[chatbot_ui, msg] |
|
) |
|
|
|
clear_btn.click( |
|
lambda: ([], ""), |
|
outputs=[chatbot_ui, msg] |
|
) |
|
|
|
init_btn.click( |
|
init_fn, |
|
inputs=[api_key_input, burp_url_input, viper_url_input], |
|
outputs=[init_status] |
|
) |
|
|
|
test_urls_btn.click( |
|
test_urls_fn, |
|
inputs=[burp_url_input, viper_url_input], |
|
outputs=[init_status] |
|
) |
|
|
|
status_btn.click( |
|
status_fn, |
|
outputs=[status_display] |
|
) |
|
|
|
|
|
demo.load(None, None, None) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
try: |
|
|
|
demo = create_interface() |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=True, |
|
debug=True |
|
) |
|
finally: |
|
|
|
asyncio.run(cleanup()) |