|
import asyncio
|
|
import pdb
|
|
import sys
|
|
import time
|
|
|
|
sys.path.append(".")
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
|
|
async def test_mcp_client():
|
|
from src.utils.mcp_client import setup_mcp_client_and_tools, create_tool_param_model
|
|
|
|
test_server_config = {
|
|
"mcpServers": {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"desktop-commander": {
|
|
"command": "npx",
|
|
"args": [
|
|
"-y",
|
|
"@wonderwhy-er/desktop-commander"
|
|
]
|
|
},
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
|
mcp_tools, mcp_client = await setup_mcp_client_and_tools(test_server_config)
|
|
|
|
for tool in mcp_tools:
|
|
tool_param_model = create_tool_param_model(tool)
|
|
print(tool.name)
|
|
print(tool.description)
|
|
print(tool_param_model.model_json_schema())
|
|
pdb.set_trace()
|
|
|
|
|
|
async def test_controller_with_mcp():
|
|
import os
|
|
from src.controller.custom_controller import CustomController
|
|
from browser_use.controller.registry.views import ActionModel
|
|
|
|
mcp_server_config = {
|
|
"mcpServers": {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"desktop-commander": {
|
|
"command": "npx",
|
|
"args": [
|
|
"-y",
|
|
"@wonderwhy-er/desktop-commander"
|
|
]
|
|
},
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
|
controller = CustomController()
|
|
await controller.setup_mcp_client(mcp_server_config)
|
|
action_name = "mcp.desktop-commander.execute_command"
|
|
action_info = controller.registry.registry.actions[action_name]
|
|
param_model = action_info.param_model
|
|
print(param_model.model_json_schema())
|
|
params = {"command": f"python ./tmp/test.py"
|
|
}
|
|
validated_params = param_model(**params)
|
|
ActionModel_ = controller.registry.create_action_model()
|
|
|
|
action_model = ActionModel_(**{action_name: validated_params})
|
|
result = await controller.act(action_model)
|
|
result = result.extracted_content
|
|
print(result)
|
|
if result and "Command is still running. Use read_output to get more output." in result and "PID" in \
|
|
result.split("\n")[0]:
|
|
pid = int(result.split("\n")[0].split("PID")[-1].strip())
|
|
action_name = "mcp.desktop-commander.read_output"
|
|
action_info = controller.registry.registry.actions[action_name]
|
|
param_model = action_info.param_model
|
|
print(param_model.model_json_schema())
|
|
params = {"pid": pid}
|
|
validated_params = param_model(**params)
|
|
action_model = ActionModel_(**{action_name: validated_params})
|
|
output_result = ""
|
|
while True:
|
|
time.sleep(1)
|
|
result = await controller.act(action_model)
|
|
result = result.extracted_content
|
|
if result:
|
|
pdb.set_trace()
|
|
output_result = result
|
|
break
|
|
print(output_result)
|
|
pdb.set_trace()
|
|
await controller.close_mcp_client()
|
|
pdb.set_trace()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
asyncio.run(test_controller_with_mcp())
|
|
|