#!/usr/bin/env python """ @Modified By: mashenquan, 2023/8/22. A definition has been provided for the return value of _think: returning false indicates that further reasoning cannot continue. @Modified By: mashenquan, 2023-11-1. According to Chapter 2.2.1 and 2.2.2 of RFC 116, change the data type of the `cause_by` value in the `Message` to a string to support the new message distribution feature. """ import asyncio import re from pydantic import BaseModel from metagpt.actions import Action, CollectLinks, ConductResearch, WebBrowseAndSummarize from metagpt.actions.research import get_research_system_text from metagpt.const import RESEARCH_PATH from metagpt.logs import logger from metagpt.roles.role import Role, RoleReactMode from metagpt.schema import Message class Report(BaseModel): topic: str links: dict[str, list[str]] = None summaries: list[tuple[str, str]] = None content: str = "" class Researcher(Role): name: str = "David" profile: str = "Researcher" goal: str = "Gather information and conduct research" constraints: str = "Ensure accuracy and relevance of information" language: str = "en-us" enable_concurrency: bool = True def __init__(self, **kwargs): super().__init__(**kwargs) self.set_actions([CollectLinks, WebBrowseAndSummarize, ConductResearch]) self._set_react_mode(RoleReactMode.BY_ORDER.value, len(self.actions)) if self.language not in ("en-us", "zh-cn"): logger.warning(f"The language `{self.language}` has not been tested, it may not work.") async def _act(self) -> Message: logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})") todo = self.rc.todo msg = self.rc.memory.get(k=1)[0] if isinstance(msg.instruct_content, Report): instruct_content = msg.instruct_content topic = instruct_content.topic else: topic = msg.content research_system_text = self.research_system_text(topic, todo) if isinstance(todo, CollectLinks): links = await todo.run(topic, 4, 4) ret = Message( content="", instruct_content=Report(topic=topic, links=links), role=self.profile, cause_by=todo ) elif isinstance(todo, WebBrowseAndSummarize): links = instruct_content.links todos = ( todo.run(*url, query=query, system_text=research_system_text) for (query, url) in links.items() if url ) if self.enable_concurrency: summaries = await asyncio.gather(*todos) else: summaries = [await i for i in todos] summaries = list((url, summary) for i in summaries for (url, summary) in i.items() if summary) ret = Message( content="", instruct_content=Report(topic=topic, summaries=summaries), role=self.profile, cause_by=todo ) else: summaries = instruct_content.summaries summary_text = "\n---\n".join(f"url: {url}\nsummary: {summary}" for (url, summary) in summaries) content = await self.rc.todo.run(topic, summary_text, system_text=research_system_text) ret = Message( content="", instruct_content=Report(topic=topic, content=content), role=self.profile, cause_by=self.rc.todo, ) self.rc.memory.add(ret) return ret def research_system_text(self, topic, current_task: Action) -> str: """BACKWARD compatible This allows sub-class able to define its own system prompt based on topic. return the previous implementation to have backward compatible Args: topic: language: Returns: str """ return get_research_system_text(topic, self.language) async def react(self) -> Message: msg = await super().react() report = msg.instruct_content self.write_report(report.topic, report.content) return msg def write_report(self, topic: str, content: str): filename = re.sub(r'[\\/:"*?<>|]+', " ", topic) filename = filename.replace("\n", "") if not RESEARCH_PATH.exists(): RESEARCH_PATH.mkdir(parents=True) filepath = RESEARCH_PATH / f"{filename}.md" filepath.write_text(content) if __name__ == "__main__": import fire async def main(topic: str, language: str = "en-us", enable_concurrency: bool = True): role = Researcher(language=language, enable_concurrency=enable_concurrency) await role.run(topic) fire.Fire(main)