Spaces:
Runtime error
Runtime error
File size: 7,039 Bytes
57b8424 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# Description: Research assistant class that handles the research process for a given question.
# libraries
import asyncio
import json
import hashlib
from actions.web_search import web_search
from actions.web_scrape import async_browse
from processing.text import \
write_to_file, \
create_message, \
create_chat_completion, \
read_txt_files, \
write_md_to_pdf
from config import Config
from agent import prompts
import os
import string
CFG = Config()
class ResearchAgent:
def __init__(self, question, agent, agent_role_prompt, websocket=None):
""" Initializes the research assistant with the given question.
Args: question (str): The question to research
Returns: None
"""
self.question = question
self.agent = agent
self.agent_role_prompt = agent_role_prompt if agent_role_prompt else prompts.generate_agent_role_prompt(agent)
self.visited_urls = set()
self.research_summary = ""
self.dir_path = f"./outputs/{hashlib.sha1(question.encode()).hexdigest()}"
self.websocket = websocket
async def stream_output(self, output):
if not self.websocket:
return print(output)
await self.websocket.send_json({"type": "logs", "output": output})
async def summarize(self, text, topic):
""" Summarizes the given text for the given topic.
Args: text (str): The text to summarize
topic (str): The topic to summarize the text for
Returns: str: The summarized text
"""
messages = [create_message(text, topic)]
await self.stream_output(f"π Summarizing text for query: {text}")
return create_chat_completion(
model=CFG.fast_llm_model,
messages=messages,
)
async def get_new_urls(self, url_set_input):
""" Gets the new urls from the given url set.
Args: url_set_input (set[str]): The url set to get the new urls from
Returns: list[str]: The new urls from the given url set
"""
new_urls = []
for url in url_set_input:
if url not in self.visited_urls:
await self.stream_output(f"β
Adding source url to research: {url}\n")
self.visited_urls.add(url)
new_urls.append(url)
return new_urls
async def call_agent(self, action, stream=False, websocket=None):
messages = [{
"role": "system",
"content": self.agent_role_prompt
}, {
"role": "user",
"content": action,
}]
answer = create_chat_completion(
model=CFG.smart_llm_model,
messages=messages,
stream=stream,
websocket=websocket,
)
return answer
async def create_search_queries(self):
""" Creates the search queries for the given question.
Args: None
Returns: list[str]: The search queries for the given question
"""
result = await self.call_agent(prompts.generate_search_queries_prompt(self.question))
await self.stream_output(f"π§ I will conduct my research based on the following queries: {result}...")
return json.loads(result)
async def async_search(self, query):
""" Runs the async search for the given query.
Args: query (str): The query to run the async search for
Returns: list[str]: The async search for the given query
"""
search_results = json.loads(web_search(query))
new_search_urls = self.get_new_urls([url.get("href") for url in search_results])
await self.stream_output(f"π Browsing the following sites for relevant information: {new_search_urls}...")
# Create a list to hold the coroutine objects
tasks = [async_browse(url, query, self.websocket) for url in await new_search_urls]
# Gather the results as they become available
responses = await asyncio.gather(*tasks, return_exceptions=True)
return responses
async def run_search_summary(self, query):
""" Runs the search summary for the given query.
Args: query (str): The query to run the search summary for
Returns: str: The search summary for the given query
"""
await self.stream_output(f"π Running research for '{query}'...")
responses = await self.async_search(query)
result = "\n".join(responses)
os.makedirs(os.path.dirname(f"{self.dir_path}/research-{query}.txt"), exist_ok=True)
write_to_file(f"{self.dir_path}/research-{query}.txt", result)
return result
async def conduct_research(self):
""" Conducts the research for the given question.
Args: None
Returns: str: The research for the given question
"""
self.research_summary = read_txt_files(self.dir_path) if os.path.isdir(self.dir_path) else ""
if not self.research_summary:
search_queries = await self.create_search_queries()
for query in search_queries:
research_result = await self.run_search_summary(query)
self.research_summary += f"{research_result}\n\n"
await self.stream_output(f"Total research words: {len(self.research_summary.split(' '))}")
return self.research_summary
async def create_concepts(self):
""" Creates the concepts for the given question.
Args: None
Returns: list[str]: The concepts for the given question
"""
result = self.call_agent(prompts.generate_concepts_prompt(self.question, self.research_summary))
await self.stream_output(f"I will research based on the following concepts: {result}\n")
return json.loads(result)
async def write_report(self, report_type, websocket=None):
""" Writes the report for the given question.
Args: None
Returns: str: The report for the given question
"""
report_type_func = prompts.get_report_by_type(report_type)
await self.stream_output(f"βοΈ Writing {report_type} for research task: {self.question}...")
answer = await self.call_agent(report_type_func(self.question, self.research_summary),
stream=websocket is not None, websocket=websocket)
# if websocket is True than we are streaming gpt response, so we need to wait for the final response
final_report = await answer if websocket else answer
path = await write_md_to_pdf(report_type, self.dir_path, final_report)
return answer, path
async def write_lessons(self):
""" Writes lessons on essential concepts of the research.
Args: None
Returns: None
"""
concepts = await self.create_concepts()
for concept in concepts:
answer = await self.call_agent(prompts.generate_lesson_prompt(concept), stream=True)
await write_md_to_pdf("Lesson", self.dir_path, answer)
|