Spaces:
Running
Running
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
@Time : 2023/5/11 14:43 | |
@Author : alexanderwu | |
@File : engineer.py | |
@Modified By: mashenquan, 2023-11-1. In accordance with Chapter 2.2.1 and 2.2.2 of RFC 116: | |
1. Modify the data type of the `cause_by` value in the `Message` to a string, and utilize the new message | |
distribution feature for message filtering. | |
2. Consolidate message reception and processing logic within `_observe`. | |
3. Fix bug: Add logic for handling asynchronous message processing when messages are not ready. | |
4. Supplemented the external transmission of internal messages. | |
@Modified By: mashenquan, 2023-11-27. | |
1. According to Section 2.2.3.1 of RFC 135, replace file data in the message with the file name. | |
2. According to the design in Section 2.2.3.5.5 of RFC 135, add incremental iteration functionality. | |
@Modified By: mashenquan, 2023-12-5. Enhance the workflow to navigate to WriteCode or QaEngineer based on the results | |
of SummarizeCode. | |
""" | |
from __future__ import annotations | |
import json | |
from collections import defaultdict | |
from pathlib import Path | |
from typing import Optional, Set | |
from metagpt.actions import Action, WriteCode, WriteCodeReview, WriteTasks | |
from metagpt.actions.fix_bug import FixBug | |
from metagpt.actions.project_management_an import REFINED_TASK_LIST, TASK_LIST | |
from metagpt.actions.summarize_code import SummarizeCode | |
from metagpt.actions.write_code_plan_and_change_an import WriteCodePlanAndChange | |
from metagpt.const import ( | |
BUGFIX_FILENAME, | |
CODE_PLAN_AND_CHANGE_FILE_REPO, | |
REQUIREMENT_FILENAME, | |
SYSTEM_DESIGN_FILE_REPO, | |
TASK_FILE_REPO, | |
) | |
from metagpt.logs import logger | |
from metagpt.roles import Role | |
from metagpt.schema import ( | |
CodePlanAndChangeContext, | |
CodeSummarizeContext, | |
CodingContext, | |
Document, | |
Documents, | |
Message, | |
) | |
from metagpt.utils.common import any_to_name, any_to_str, any_to_str_set | |
IS_PASS_PROMPT = """ | |
{context} | |
---- | |
Does the above log indicate anything that needs to be done? | |
If there are any tasks to be completed, please answer 'NO' along with the to-do list in JSON format; | |
otherwise, answer 'YES' in JSON format. | |
""" | |
class Engineer(Role): | |
""" | |
Represents an Engineer role responsible for writing and possibly reviewing code. | |
Attributes: | |
name (str): Name of the engineer. | |
profile (str): Role profile, default is 'Engineer'. | |
goal (str): Goal of the engineer. | |
constraints (str): Constraints for the engineer. | |
n_borg (int): Number of borgs. | |
use_code_review (bool): Whether to use code review. | |
""" | |
name: str = "Alex" | |
profile: str = "Engineer" | |
goal: str = "write elegant, readable, extensible, efficient code" | |
constraints: str = ( | |
"the code should conform to standards like google-style and be modular and maintainable. " | |
"Use same language as user requirement" | |
) | |
n_borg: int = 1 | |
use_code_review: bool = False | |
code_todos: list = [] | |
summarize_todos: list = [] | |
next_todo_action: str = "" | |
n_summarize: int = 0 | |
def __init__(self, **kwargs) -> None: | |
super().__init__(**kwargs) | |
self.set_actions([WriteCode]) | |
self._watch([WriteTasks, SummarizeCode, WriteCode, WriteCodeReview, FixBug, WriteCodePlanAndChange]) | |
self.code_todos = [] | |
self.summarize_todos = [] | |
self.next_todo_action = any_to_name(WriteCode) | |
def _parse_tasks(task_msg: Document) -> list[str]: | |
m = json.loads(task_msg.content) | |
return m.get(TASK_LIST.key) or m.get(REFINED_TASK_LIST.key) | |
async def _act_sp_with_cr(self, review=False) -> Set[str]: | |
changed_files = set() | |
for todo in self.code_todos: | |
""" | |
# Select essential information from the historical data to reduce the length of the prompt (summarized from human experience): | |
1. All from Architect | |
2. All from ProjectManager | |
3. Do we need other codes (currently needed)? | |
TODO: The goal is not to need it. After clear task decomposition, based on the design idea, you should be able to write a single file without needing other codes. If you can't, it means you need a clearer definition. This is the key to writing longer code. | |
""" | |
coding_context = await todo.run() | |
# Code review | |
if review: | |
action = WriteCodeReview(i_context=coding_context, context=self.context, llm=self.llm) | |
self._init_action(action) | |
coding_context = await action.run() | |
dependencies = {coding_context.design_doc.root_relative_path, coding_context.task_doc.root_relative_path} | |
if self.config.inc: | |
dependencies.add(coding_context.code_plan_and_change_doc.root_relative_path) | |
await self.project_repo.srcs.save( | |
filename=coding_context.filename, | |
dependencies=list(dependencies), | |
content=coding_context.code_doc.content, | |
) | |
msg = Message( | |
content=coding_context.model_dump_json(), | |
instruct_content=coding_context, | |
role=self.profile, | |
cause_by=WriteCode, | |
) | |
self.rc.memory.add(msg) | |
changed_files.add(coding_context.code_doc.filename) | |
if not changed_files: | |
logger.info("Nothing has changed.") | |
return changed_files | |
async def _act(self) -> Message | None: | |
"""Determines the mode of action based on whether code review is used.""" | |
if self.rc.todo is None: | |
return None | |
if isinstance(self.rc.todo, WriteCodePlanAndChange): | |
self.next_todo_action = any_to_name(WriteCode) | |
return await self._act_code_plan_and_change() | |
if isinstance(self.rc.todo, WriteCode): | |
self.next_todo_action = any_to_name(SummarizeCode) | |
return await self._act_write_code() | |
if isinstance(self.rc.todo, SummarizeCode): | |
self.next_todo_action = any_to_name(WriteCode) | |
return await self._act_summarize() | |
return None | |
async def _act_write_code(self): | |
changed_files = await self._act_sp_with_cr(review=self.use_code_review) | |
return Message( | |
content="\n".join(changed_files), | |
role=self.profile, | |
cause_by=WriteCodeReview if self.use_code_review else WriteCode, | |
send_to=self, | |
sent_from=self, | |
) | |
async def _act_summarize(self): | |
tasks = [] | |
for todo in self.summarize_todos: | |
summary = await todo.run() | |
summary_filename = Path(todo.i_context.design_filename).with_suffix(".md").name | |
dependencies = {todo.i_context.design_filename, todo.i_context.task_filename} | |
for filename in todo.i_context.codes_filenames: | |
rpath = self.project_repo.src_relative_path / filename | |
dependencies.add(str(rpath)) | |
await self.project_repo.resources.code_summary.save( | |
filename=summary_filename, content=summary, dependencies=dependencies | |
) | |
is_pass, reason = await self._is_pass(summary) | |
if not is_pass: | |
todo.i_context.reason = reason | |
tasks.append(todo.i_context.model_dump()) | |
await self.project_repo.docs.code_summary.save( | |
filename=Path(todo.i_context.design_filename).name, | |
content=todo.i_context.model_dump_json(), | |
dependencies=dependencies, | |
) | |
else: | |
await self.project_repo.docs.code_summary.delete(filename=Path(todo.i_context.design_filename).name) | |
logger.info(f"--max-auto-summarize-code={self.config.max_auto_summarize_code}") | |
if not tasks or self.config.max_auto_summarize_code == 0: | |
return Message( | |
content="", | |
role=self.profile, | |
cause_by=SummarizeCode, | |
sent_from=self, | |
send_to="Edward", # The name of QaEngineer | |
) | |
# The maximum number of times the 'SummarizeCode' action is automatically invoked, with -1 indicating unlimited. | |
# This parameter is used for debugging the workflow. | |
self.n_summarize += 1 if self.config.max_auto_summarize_code > self.n_summarize else 0 | |
return Message( | |
content=json.dumps(tasks), role=self.profile, cause_by=SummarizeCode, send_to=self, sent_from=self | |
) | |
async def _act_code_plan_and_change(self): | |
"""Write code plan and change that guides subsequent WriteCode and WriteCodeReview""" | |
node = await self.rc.todo.run() | |
code_plan_and_change = node.instruct_content.model_dump_json() | |
dependencies = { | |
REQUIREMENT_FILENAME, | |
str(self.project_repo.docs.prd.root_path / self.rc.todo.i_context.prd_filename), | |
str(self.project_repo.docs.system_design.root_path / self.rc.todo.i_context.design_filename), | |
str(self.project_repo.docs.task.root_path / self.rc.todo.i_context.task_filename), | |
} | |
code_plan_and_change_filepath = Path(self.rc.todo.i_context.design_filename) | |
await self.project_repo.docs.code_plan_and_change.save( | |
filename=code_plan_and_change_filepath.name, content=code_plan_and_change, dependencies=dependencies | |
) | |
await self.project_repo.resources.code_plan_and_change.save( | |
filename=code_plan_and_change_filepath.with_suffix(".md").name, | |
content=node.content, | |
dependencies=dependencies, | |
) | |
return Message( | |
content=code_plan_and_change, | |
role=self.profile, | |
cause_by=WriteCodePlanAndChange, | |
send_to=self, | |
sent_from=self, | |
) | |
async def _is_pass(self, summary) -> (str, str): | |
rsp = await self.llm.aask(msg=IS_PASS_PROMPT.format(context=summary), stream=False) | |
logger.info(rsp) | |
if "YES" in rsp: | |
return True, rsp | |
return False, rsp | |
async def _think(self) -> Action | None: | |
if not self.src_workspace: | |
self.src_workspace = self.git_repo.workdir / self.git_repo.workdir.name | |
write_plan_and_change_filters = any_to_str_set([WriteTasks, FixBug]) | |
write_code_filters = any_to_str_set([WriteTasks, WriteCodePlanAndChange, SummarizeCode]) | |
summarize_code_filters = any_to_str_set([WriteCode, WriteCodeReview]) | |
if not self.rc.news: | |
return None | |
msg = self.rc.news[0] | |
if self.config.inc and msg.cause_by in write_plan_and_change_filters: | |
logger.debug(f"TODO WriteCodePlanAndChange:{msg.model_dump_json()}") | |
await self._new_code_plan_and_change_action(cause_by=msg.cause_by) | |
return self.rc.todo | |
if msg.cause_by in write_code_filters: | |
logger.debug(f"TODO WriteCode:{msg.model_dump_json()}") | |
await self._new_code_actions() | |
return self.rc.todo | |
if msg.cause_by in summarize_code_filters and msg.sent_from == any_to_str(self): | |
logger.debug(f"TODO SummarizeCode:{msg.model_dump_json()}") | |
await self._new_summarize_actions() | |
return self.rc.todo | |
return None | |
async def _new_coding_context(self, filename, dependency) -> CodingContext: | |
old_code_doc = await self.project_repo.srcs.get(filename) | |
if not old_code_doc: | |
old_code_doc = Document(root_path=str(self.project_repo.src_relative_path), filename=filename, content="") | |
dependencies = {Path(i) for i in await dependency.get(old_code_doc.root_relative_path)} | |
task_doc = None | |
design_doc = None | |
code_plan_and_change_doc = await self._get_any_code_plan_and_change() if await self._is_fixbug() else None | |
for i in dependencies: | |
if str(i.parent.as_posix()) == TASK_FILE_REPO: | |
task_doc = await self.project_repo.docs.task.get(i.name) | |
elif str(i.parent.as_posix()) == SYSTEM_DESIGN_FILE_REPO: | |
design_doc = await self.project_repo.docs.system_design.get(i.name) | |
elif str(i.parent.as_posix()) == CODE_PLAN_AND_CHANGE_FILE_REPO: | |
code_plan_and_change_doc = await self.project_repo.docs.code_plan_and_change.get(i.name) | |
if not task_doc or not design_doc: | |
logger.error(f'Detected source code "{filename}" from an unknown origin.') | |
raise ValueError(f'Detected source code "{filename}" from an unknown origin.') | |
context = CodingContext( | |
filename=filename, | |
design_doc=design_doc, | |
task_doc=task_doc, | |
code_doc=old_code_doc, | |
code_plan_and_change_doc=code_plan_and_change_doc, | |
) | |
return context | |
async def _new_coding_doc(self, filename, dependency): | |
context = await self._new_coding_context(filename, dependency) | |
coding_doc = Document( | |
root_path=str(self.project_repo.src_relative_path), filename=filename, content=context.model_dump_json() | |
) | |
return coding_doc | |
async def _new_code_actions(self): | |
bug_fix = await self._is_fixbug() | |
# Prepare file repos | |
changed_src_files = self.project_repo.srcs.all_files if bug_fix else self.project_repo.srcs.changed_files | |
changed_task_files = self.project_repo.docs.task.changed_files | |
changed_files = Documents() | |
# Recode caused by upstream changes. | |
for filename in changed_task_files: | |
design_doc = await self.project_repo.docs.system_design.get(filename) | |
task_doc = await self.project_repo.docs.task.get(filename) | |
code_plan_and_change_doc = await self.project_repo.docs.code_plan_and_change.get(filename) | |
task_list = self._parse_tasks(task_doc) | |
for task_filename in task_list: | |
old_code_doc = await self.project_repo.srcs.get(task_filename) | |
if not old_code_doc: | |
old_code_doc = Document( | |
root_path=str(self.project_repo.src_relative_path), filename=task_filename, content="" | |
) | |
if not code_plan_and_change_doc: | |
context = CodingContext( | |
filename=task_filename, design_doc=design_doc, task_doc=task_doc, code_doc=old_code_doc | |
) | |
else: | |
context = CodingContext( | |
filename=task_filename, | |
design_doc=design_doc, | |
task_doc=task_doc, | |
code_doc=old_code_doc, | |
code_plan_and_change_doc=code_plan_and_change_doc, | |
) | |
coding_doc = Document( | |
root_path=str(self.project_repo.src_relative_path), | |
filename=task_filename, | |
content=context.model_dump_json(), | |
) | |
if task_filename in changed_files.docs: | |
logger.warning( | |
f"Log to expose potential conflicts: {coding_doc.model_dump_json()} & " | |
f"{changed_files.docs[task_filename].model_dump_json()}" | |
) | |
changed_files.docs[task_filename] = coding_doc | |
self.code_todos = [ | |
WriteCode(i_context=i, context=self.context, llm=self.llm) for i in changed_files.docs.values() | |
] | |
# Code directly modified by the user. | |
dependency = await self.git_repo.get_dependency() | |
for filename in changed_src_files: | |
if filename in changed_files.docs: | |
continue | |
coding_doc = await self._new_coding_doc(filename=filename, dependency=dependency) | |
changed_files.docs[filename] = coding_doc | |
self.code_todos.append(WriteCode(i_context=coding_doc, context=self.context, llm=self.llm)) | |
if self.code_todos: | |
self.set_todo(self.code_todos[0]) | |
async def _new_summarize_actions(self): | |
src_files = self.project_repo.srcs.all_files | |
# Generate a SummarizeCode action for each pair of (system_design_doc, task_doc). | |
summarizations = defaultdict(list) | |
for filename in src_files: | |
dependencies = await self.project_repo.srcs.get_dependency(filename=filename) | |
ctx = CodeSummarizeContext.loads(filenames=list(dependencies)) | |
summarizations[ctx].append(filename) | |
for ctx, filenames in summarizations.items(): | |
ctx.codes_filenames = filenames | |
new_summarize = SummarizeCode(i_context=ctx, context=self.context, llm=self.llm) | |
for i, act in enumerate(self.summarize_todos): | |
if act.i_context.task_filename == new_summarize.i_context.task_filename: | |
self.summarize_todos[i] = new_summarize | |
new_summarize = None | |
break | |
if new_summarize: | |
self.summarize_todos.append(new_summarize) | |
if self.summarize_todos: | |
self.set_todo(self.summarize_todos[0]) | |
self.summarize_todos.pop(0) | |
async def _new_code_plan_and_change_action(self, cause_by: str): | |
"""Create a WriteCodePlanAndChange action for subsequent to-do actions.""" | |
files = self.project_repo.all_files | |
options = {} | |
if cause_by != any_to_str(FixBug): | |
requirement_doc = await self.project_repo.docs.get(REQUIREMENT_FILENAME) | |
options["requirement"] = requirement_doc.content | |
else: | |
fixbug_doc = await self.project_repo.docs.get(BUGFIX_FILENAME) | |
options["issue"] = fixbug_doc.content | |
code_plan_and_change_ctx = CodePlanAndChangeContext.loads(files, **options) | |
self.rc.todo = WriteCodePlanAndChange(i_context=code_plan_and_change_ctx, context=self.context, llm=self.llm) | |
def action_description(self) -> str: | |
"""AgentStore uses this attribute to display to the user what actions the current role should take.""" | |
return self.next_todo_action | |
async def _is_fixbug(self) -> bool: | |
fixbug_doc = await self.project_repo.docs.get(BUGFIX_FILENAME) | |
return bool(fixbug_doc and fixbug_doc.content) | |
async def _get_any_code_plan_and_change(self) -> Optional[Document]: | |
changed_files = self.project_repo.docs.code_plan_and_change.changed_files | |
for filename in changed_files.keys(): | |
doc = await self.project_repo.docs.code_plan_and_change.get(filename) | |
if doc and doc.content: | |
return doc | |
return None | |