Zulelee's picture
Upload 62 files
57b8424
"""Text processing functions"""
import urllib
from typing import Dict, Generator, Optional
import string
from selenium.webdriver.remote.webdriver import WebDriver
from config import Config
from agent.llm_utils import create_chat_completion
import os
from md2pdf.core import md2pdf
CFG = Config()
def split_text(text: str, max_length: int = 8192) -> Generator[str, None, None]:
"""Split text into chunks of a maximum length
Args:
text (str): The text to split
max_length (int, optional): The maximum length of each chunk. Defaults to 8192.
Yields:
str: The next chunk of text
Raises:
ValueError: If the text is longer than the maximum length
"""
paragraphs = text.split("\n")
current_length = 0
current_chunk = []
for paragraph in paragraphs:
if current_length + len(paragraph) + 1 <= max_length:
current_chunk.append(paragraph)
current_length += len(paragraph) + 1
else:
yield "\n".join(current_chunk)
current_chunk = [paragraph]
current_length = len(paragraph) + 1
if current_chunk:
yield "\n".join(current_chunk)
def summarize_text(
url: str, text: str, question: str, driver: Optional[WebDriver] = None
) -> str:
"""Summarize text using the OpenAI API
Args:
url (str): The url of the text
text (str): The text to summarize
question (str): The question to ask the model
driver (WebDriver): The webdriver to use to scroll the page
Returns:
str: The summary of the text
"""
if not text:
return "Error: No text to summarize"
summaries = []
chunks = list(split_text(text))
scroll_ratio = 1 / len(chunks)
print(f"Summarizing url: {url} with total chunks: {len(chunks)}")
for i, chunk in enumerate(chunks):
if driver:
scroll_to_percentage(driver, scroll_ratio * i)
#memory_to_add = f"Source: {url}\n" f"Raw content part#{i + 1}: {chunk}"
#MEMORY.add_documents([Document(page_content=memory_to_add)])
messages = [create_message(chunk, question)]
summary = create_chat_completion(
model=CFG.fast_llm_model,
messages=messages,
max_tokens=CFG.summary_token_limit
)
summaries.append(summary)
#memory_to_add = f"Source: {url}\n" f"Content summary part#{i + 1}: {summary}"
#MEMORY.add_documents([Document(page_content=memory_to_add)])
combined_summary = "\n".join(summaries)
messages = [create_message(combined_summary, question)]
final_summary = create_chat_completion(
model=CFG.fast_llm_model,
messages=messages,
max_tokens=CFG.summary_token_limit
)
print("Final summary length: ", len(combined_summary))
print(final_summary)
return final_summary
def scroll_to_percentage(driver: WebDriver, ratio: float) -> None:
"""Scroll to a percentage of the page
Args:
driver (WebDriver): The webdriver to use
ratio (float): The percentage to scroll to
Raises:
ValueError: If the ratio is not between 0 and 1
"""
if ratio < 0 or ratio > 1:
raise ValueError("Percentage should be between 0 and 1")
driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {ratio});")
def create_message(chunk: str, question: str) -> Dict[str, str]:
"""Create a message for the chat completion
Args:
chunk (str): The chunk of text to summarize
question (str): The question to answer
Returns:
Dict[str, str]: The message to send to the chat completion
"""
return {
"role": "user",
"content": f'"""{chunk}""" Using the above text, answer in short the following'
f' question: "{question}" -- if the question cannot be answered using the text,'
" simply summarize the text. "
"Include all factual information, numbers, stats etc if available.",
}
def write_to_file(filename: str, text: str) -> None:
"""Write text to a file
Args:
text (str): The text to write
filename (str): The filename to write to
"""
with open(filename, "w") as file:
file.write(text)
async def write_md_to_pdf(task: str, path: str, text: str) -> None:
file_path = f"{path}/{task}"
write_to_file(f"{file_path}.md", text)
md_to_pdf(f"{file_path}.md", f"{file_path}.pdf")
print(f"{task} written to {file_path}.pdf")
encoded_file_path = urllib.parse.quote(f"{file_path}.pdf")
return encoded_file_path
def read_txt_files(directory):
all_text = ''
for filename in os.listdir(directory):
if filename.endswith('.txt'):
with open(os.path.join(directory, filename), 'r') as file:
all_text += file.read() + '\n'
return all_text
def md_to_pdf(input_file, output_file):
md2pdf(output_file,
md_content=None,
md_file_path=input_file,
css_file_path=None,
base_url=None)