File size: 18,777 Bytes
8b54db1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
"""
Tool Manager for CodeAct Agent.
Unified management system for all types of tools: local functions, decorated tools, and MCP tools.
"""

from typing import Dict, List, Optional, Callable, Any, Union
from enum import Enum
from dataclasses import dataclass
import re

# Import components
from .tool_registry import ToolRegistry, create_module2api_from_functions
from .mcp_manager import MCPManager


class ToolSource(Enum):
    """Enumeration of tool sources."""
    LOCAL = "local"
    DECORATED = "decorated"
    MCP = "mcp"
    ALL = "all"


@dataclass
class ToolInfo:
    """Comprehensive tool information."""
    name: str
    description: str
    source: ToolSource
    function: Optional[Callable] = None
    schema: Optional[Dict] = None
    server: Optional[str] = None  # For MCP tools
    module: Optional[str] = None
    required_parameters: List[Dict] = None
    optional_parameters: List[Dict] = None

    def __post_init__(self):
        if self.required_parameters is None:
            self.required_parameters = []
        if self.optional_parameters is None:
            self.optional_parameters = []


class ToolManager:
    """
    Unified tool management system for CodeAct Agent.

    Manages all types of tools:
    - Local functions (legacy function registry)
    - Decorated tools (@tool decorator)
    - MCP tools (Model Context Protocol)

    Provides a single, consistent interface for tool operations.
    """

    def __init__(self, console_display=None):
        """
        Initialize the ToolManager.

        Args:
            console_display: Optional console display for MCP status output
        """
        # Core components
        self.tool_registry = ToolRegistry()
        self.mcp_manager = MCPManager(console_display)

        # Unified tool catalog
        self._tool_catalog: Dict[str, ToolInfo] = {}

        # Legacy function registry (for backward compatibility)
        self._legacy_functions: Dict[str, Callable] = {}

        # Initialize with decorated tools from function_tools.py
        self._discover_decorated_tools()

    # ====================
    # CORE TOOL MANAGEMENT
    # ====================

    def add_tool(self, tool: Union[Callable, Dict], name: str = None,
                 description: str = None, source: ToolSource = ToolSource.LOCAL) -> bool:
        """
        Add a tool to the manager.

        Args:
            tool: Either a callable function or a tool schema dict
            name: Optional custom name (defaults to function.__name__)
            description: Optional description (defaults to function.__doc__)
            source: Source type of the tool

        Returns:
            True if successfully added
        """
        try:
            if callable(tool):
                # Handle callable functions
                tool_name = name or tool.__name__
                tool_desc = description or tool.__doc__ or f"Function {tool.__name__}"

                # Add to tool registry
                success = self.tool_registry.add_function_directly(tool_name, tool, tool_desc)

                if success:
                    # Create ToolInfo and add to catalog
                    tool_info = ToolInfo(
                        name=tool_name,
                        description=tool_desc,
                        source=source,
                        function=tool,
                        schema=self._create_schema_from_function(tool_name, tool, tool_desc)
                    )
                    self._tool_catalog[tool_name] = tool_info

                    if source == ToolSource.LOCAL:
                        self._legacy_functions[tool_name] = tool

                    return True

            elif isinstance(tool, dict):
                # Handle tool schema dictionaries
                tool_name = tool.get("name") or name
                tool_desc = tool.get("description") or description

                if not tool_name:
                    print("Warning: Tool schema must have a name")
                    return False

                # Register with tool registry
                success = self.tool_registry.register_tool(tool)

                if success:
                    tool_info = ToolInfo(
                        name=tool_name,
                        description=tool_desc,
                        source=source,
                        schema=tool,
                        required_parameters=tool.get("required_parameters", []),
                        optional_parameters=tool.get("optional_parameters", [])
                    )
                    self._tool_catalog[tool_name] = tool_info
                    return True

            return False

        except Exception as e:
            print(f"Error adding tool {name}: {e}")
            return False

    def remove_tool(self, name: str) -> bool:
        """Remove a tool by name from all registries."""
        try:
            success = False

            # Remove from tool registry
            if self.tool_registry.remove_tool_by_name(name):
                success = True

            # Remove from MCP if it's an MCP tool
            if name in self.mcp_manager.mcp_functions:
                if self.mcp_manager.remove_mcp_tool(name, self.tool_registry):
                    success = True

            # Remove from legacy functions
            if name in self._legacy_functions:
                del self._legacy_functions[name]
                success = True

            # Remove from catalog
            if name in self._tool_catalog:
                del self._tool_catalog[name]
                success = True

            return success

        except Exception as e:
            print(f"Error removing tool {name}: {e}")
            return False

    def get_tool(self, name: str) -> Optional[ToolInfo]:
        """Get comprehensive tool information by name."""
        return self._tool_catalog.get(name)

    def get_tool_function(self, name: str) -> Optional[Callable]:
        """Get the actual function object by name."""
        tool_info = self.get_tool(name)
        if tool_info and tool_info.function:
            return tool_info.function

        # Check tool registry
        return self.tool_registry.get_function_by_name(name)

    # ====================
    # TOOL DISCOVERY AND LISTING
    # ====================

    def list_tools(self, source: ToolSource = ToolSource.ALL,
                   include_details: bool = False) -> List[Dict]:
        """
        List tools with optional filtering by source.

        Args:
            source: Filter by tool source (LOCAL, DECORATED, MCP, ALL)
            include_details: Whether to include detailed information

        Returns:
            List of tool dictionaries
        """
        tools = []

        for tool_name, tool_info in self._tool_catalog.items():
            if source == ToolSource.ALL or tool_info.source == source:
                if include_details:
                    tools.append({
                        "name": tool_info.name,
                        "description": tool_info.description,
                        "source": tool_info.source.value,
                        "server": tool_info.server,
                        "module": tool_info.module,
                        "has_function": tool_info.function is not None,
                        "required_params": len(tool_info.required_parameters),
                        "optional_params": len(tool_info.optional_parameters)
                    })
                else:
                    tools.append({
                        "name": tool_info.name,
                        "description": tool_info.description,
                        "source": tool_info.source.value
                    })

        return sorted(tools, key=lambda x: x["name"])

    def search_tools(self, query: str, search_descriptions: bool = True) -> List[Dict]:
        """
        Search tools by name and optionally description.

        Args:
            query: Search query (supports regex)
            search_descriptions: Whether to also search in descriptions

        Returns:
            List of matching tools
        """
        pattern = re.compile(query, re.IGNORECASE)
        matching_tools = []

        for tool_name, tool_info in self._tool_catalog.items():
            match = False

            # Search in name
            if pattern.search(tool_name):
                match = True

            # Search in description if enabled
            elif search_descriptions and pattern.search(tool_info.description or ""):
                match = True

            if match:
                matching_tools.append({
                    "name": tool_info.name,
                    "description": tool_info.description,
                    "source": tool_info.source.value,
                    "server": tool_info.server
                })

        return sorted(matching_tools, key=lambda x: x["name"])

    def get_tools_by_source(self, source: ToolSource) -> Dict[str, ToolInfo]:
        """Get all tools from a specific source."""
        return {
            name: tool_info
            for name, tool_info in self._tool_catalog.items()
            if tool_info.source == source
        }

    # ====================
    # MCP INTEGRATION
    # ====================

    def add_mcp_server(self, config_path: str = "./mcp_config.yaml") -> None:
        """Add MCP tools from configuration file."""
        try:
            # Use MCP manager to load tools
            self.mcp_manager.add_mcp(config_path, self.tool_registry)

            # Update our catalog with MCP tools
            mcp_tools = self.mcp_manager.list_mcp_tools()
            for tool_name, tool_data in mcp_tools.items():
                tool_info = ToolInfo(
                    name=tool_name,
                    description=tool_data.get("description", "MCP tool"),
                    source=ToolSource.MCP,
                    function=tool_data.get("function"),
                    server=tool_data.get("server"),
                    module=tool_data.get("module"),
                    required_parameters=tool_data.get("required_parameters", []),
                    optional_parameters=tool_data.get("optional_parameters", [])
                )
                self._tool_catalog[tool_name] = tool_info

        except Exception as e:
            print(f"Error adding MCP server: {e}")

    def list_mcp_servers(self) -> Dict[str, List[str]]:
        """List all MCP servers and their tools."""
        mcp_tools = self.get_tools_by_source(ToolSource.MCP)
        servers = {}

        for tool_name, tool_info in mcp_tools.items():
            server_name = tool_info.server or "unknown"
            if server_name not in servers:
                servers[server_name] = []
            servers[server_name].append(tool_name)

        return servers

    def show_mcp_status(self) -> None:
        """Display detailed MCP status."""
        self.mcp_manager.show_mcp_status()

    def get_mcp_summary(self) -> Dict[str, Any]:
        """Get MCP tools summary."""
        return self.mcp_manager.get_mcp_summary()

    # ====================
    # TOOL EXECUTION SUPPORT
    # ====================

    def get_all_functions(self) -> Dict[str, Callable]:
        """Get all available functions as a dictionary."""
        functions = {}

        # Add from tool registry
        functions.update(self.tool_registry.get_all_functions())

        # Add from legacy functions
        functions.update(self._legacy_functions)

        # Add MCP functions
        mcp_tools = self.mcp_manager.list_mcp_tools()
        for tool_name, tool_data in mcp_tools.items():
            if tool_data.get("function"):
                functions[tool_name] = tool_data["function"]

        return functions

    def get_tool_schemas(self, openai_format: bool = True) -> List[Dict]:
        """
        Get tool schemas for all tools.

        Args:
            openai_format: Whether to format as OpenAI function schemas

        Returns:
            List of tool schemas
        """
        schemas = []

        for tool_name, tool_info in self._tool_catalog.items():
            if openai_format:
                # Convert to OpenAI function schema format
                schema = {
                    "type": "function",
                    "function": {
                        "name": tool_info.name,
                        "description": tool_info.description,
                        "parameters": {
                            "type": "object",
                            "properties": {},
                            "required": []
                        }
                    }
                }

                # Add required parameters
                for param in tool_info.required_parameters:
                    param_schema = {
                        "type": param.get("type", "string"),
                        "description": param.get("description", "")
                    }
                    # Add enum values if present
                    if "enum" in param:
                        param_schema["enum"] = param["enum"]
                    schema["function"]["parameters"]["properties"][param["name"]] = param_schema
                    schema["function"]["parameters"]["required"].append(param["name"])

                # Add optional parameters
                for param in tool_info.optional_parameters:
                    param_schema = {
                        "type": param.get("type", "string"),
                        "description": param.get("description", "")
                    }
                    # Add enum values if present
                    if "enum" in param:
                        param_schema["enum"] = param["enum"]
                    if "default" in param:
                        param_schema["default"] = param["default"]
                    schema["function"]["parameters"]["properties"][param["name"]] = param_schema

                schemas.append(schema)
            else:
                # Return raw schema
                if tool_info.schema:
                    schemas.append(tool_info.schema)

        return schemas

    # ====================
    # STATISTICS AND REPORTING
    # ====================

    def get_tool_statistics(self) -> Dict[str, Any]:
        """Get comprehensive tool statistics."""
        stats = {
            "total_tools": len(self._tool_catalog),
            "by_source": {source.value: 0 for source in ToolSource if source != ToolSource.ALL},
            "with_functions": 0,
            "mcp_servers": len(self.list_mcp_servers()),
            "tool_registry_size": len(self.tool_registry.tools),
            "legacy_functions": len(self._legacy_functions)
        }

        for tool_info in self._tool_catalog.values():
            stats["by_source"][tool_info.source.value] += 1
            if tool_info.function:
                stats["with_functions"] += 1

        return stats

    def validate_tools(self) -> Dict[str, List[str]]:
        """Validate all tools and return any issues found."""
        issues = {
            "missing_functions": [],
            "missing_descriptions": [],
            "duplicate_names": [],
            "invalid_schemas": []
        }

        seen_names = set()
        for tool_name, tool_info in self._tool_catalog.items():
            # Check for duplicates
            if tool_name in seen_names:
                issues["duplicate_names"].append(tool_name)
            seen_names.add(tool_name)

            # Check for missing functions (except MCP tools which may not have direct functions)
            if not tool_info.function and tool_info.source != ToolSource.MCP:
                issues["missing_functions"].append(tool_name)

            # Check for missing descriptions
            if not tool_info.description or tool_info.description.strip() == "":
                issues["missing_descriptions"].append(tool_name)

        return issues

    # ====================
    # PRIVATE METHODS
    # ====================

    def _discover_decorated_tools(self) -> None:
        """Discover and register tools marked with @tool decorator."""
        try:
            from .builtin_tools import get_all_tool_functions

            tool_functions = get_all_tool_functions()
            for func in tool_functions:
                name = getattr(func, '_tool_name', func.__name__)
                description = getattr(func, '_tool_description', func.__doc__ or f"Function {func.__name__}")

                tool_info = ToolInfo(
                    name=name,
                    description=description,
                    source=ToolSource.DECORATED,
                    function=func,
                    schema=self._create_schema_from_function(name, func, description)
                )
                self._tool_catalog[name] = tool_info

                # Also add to tool registry for consistency
                self.tool_registry.add_function_directly(name, func, description)

        except ImportError:
            print("Warning: Could not import builtin_tools module for decorated tool discovery")

    def _create_schema_from_function(self, name: str, function: Callable, description: str) -> Dict:
        """Create a tool schema from a function object."""
        return self.tool_registry._create_schema_from_function(name, function, description)

    def _refresh_catalog(self) -> None:
        """Refresh the tool catalog from all sources."""
        # Clear current catalog
        self._tool_catalog.clear()

        # Re-discover decorated tools
        self._discover_decorated_tools()

        # Re-add MCP tools
        mcp_tools = self.mcp_manager.list_mcp_tools()
        for tool_name, tool_data in mcp_tools.items():
            tool_info = ToolInfo(
                name=tool_name,
                description=tool_data.get("description", "MCP tool"),
                source=ToolSource.MCP,
                function=tool_data.get("function"),
                server=tool_data.get("server"),
                module=tool_data.get("module"),
                required_parameters=tool_data.get("required_parameters", []),
                optional_parameters=tool_data.get("optional_parameters", [])
            )
            self._tool_catalog[tool_name] = tool_info

    # ====================
    # LEGACY COMPATIBILITY
    # ====================

    def add_legacy_functions(self, functions: Dict[str, Callable]) -> int:
        """Add legacy functions for backward compatibility."""
        added_count = 0
        for name, func in functions.items():
            if self.add_tool(func, name, source=ToolSource.LOCAL):
                added_count += 1
        return added_count