Spaces:
Runtime error
Runtime error
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
@Time : 2023/5/11 14:42 | |
@Author : alexanderwu | |
@File : manager.py | |
""" | |
from metagpt.llm import LLM | |
from metagpt.logs import logger | |
from metagpt.schema import Message | |
class Manager: | |
def __init__(self, llm: LLM = LLM()): | |
self.llm = llm # Large Language Model | |
self.role_directions = { | |
"BOSS": "Product Manager", | |
"Product Manager": "Architect", | |
"Architect": "Engineer", | |
"Engineer": "QA Engineer", | |
"QA Engineer": "Product Manager" | |
} | |
self.prompt_template = """ | |
Given the following message: | |
{message} | |
And the current status of roles: | |
{roles} | |
Which role should handle this message? | |
""" | |
async def handle(self, message: Message, environment): | |
""" | |
管理员处理信息,现在简单的将信息递交给下一个人 | |
The administrator processes the information, now simply passes the information on to the next person | |
:param message: | |
:param environment: | |
:return: | |
""" | |
# Get all roles from the environment | |
roles = environment.get_roles() | |
# logger.debug(f"{roles=}, {message=}") | |
# Build a context for the LLM to understand the situation | |
# context = { | |
# "message": str(message), | |
# "roles": {role.name: role.get_info() for role in roles}, | |
# } | |
# Ask the LLM to decide which role should handle the message | |
# chosen_role_name = self.llm.ask(self.prompt_template.format(context)) | |
# FIXME: 现在通过简单的字典决定流向,但之后还是应该有思考过程 | |
#The direction of flow is now determined by a simple dictionary, but there should still be a thought process afterwards | |
next_role_profile = self.role_directions[message.role] | |
# logger.debug(f"{next_role_profile}") | |
for _, role in roles.items(): | |
if next_role_profile == role.profile: | |
next_role = role | |
break | |
else: | |
logger.error(f"No available role can handle message: {message}.") | |
return | |
# Find the chosen role and handle the message | |
return await next_role.handle(message) | |