Spaces:
Runtime error
Runtime error
File size: 5,065 Bytes
57b8424 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
"""Text processing functions"""
import urllib
from typing import Dict, Generator, Optional
import string
from selenium.webdriver.remote.webdriver import WebDriver
from config import Config
from agent.llm_utils import create_chat_completion
import os
from md2pdf.core import md2pdf
CFG = Config()
def split_text(text: str, max_length: int = 8192) -> Generator[str, None, None]:
"""Split text into chunks of a maximum length
Args:
text (str): The text to split
max_length (int, optional): The maximum length of each chunk. Defaults to 8192.
Yields:
str: The next chunk of text
Raises:
ValueError: If the text is longer than the maximum length
"""
paragraphs = text.split("\n")
current_length = 0
current_chunk = []
for paragraph in paragraphs:
if current_length + len(paragraph) + 1 <= max_length:
current_chunk.append(paragraph)
current_length += len(paragraph) + 1
else:
yield "\n".join(current_chunk)
current_chunk = [paragraph]
current_length = len(paragraph) + 1
if current_chunk:
yield "\n".join(current_chunk)
def summarize_text(
url: str, text: str, question: str, driver: Optional[WebDriver] = None
) -> str:
"""Summarize text using the OpenAI API
Args:
url (str): The url of the text
text (str): The text to summarize
question (str): The question to ask the model
driver (WebDriver): The webdriver to use to scroll the page
Returns:
str: The summary of the text
"""
if not text:
return "Error: No text to summarize"
summaries = []
chunks = list(split_text(text))
scroll_ratio = 1 / len(chunks)
print(f"Summarizing url: {url} with total chunks: {len(chunks)}")
for i, chunk in enumerate(chunks):
if driver:
scroll_to_percentage(driver, scroll_ratio * i)
#memory_to_add = f"Source: {url}\n" f"Raw content part#{i + 1}: {chunk}"
#MEMORY.add_documents([Document(page_content=memory_to_add)])
messages = [create_message(chunk, question)]
summary = create_chat_completion(
model=CFG.fast_llm_model,
messages=messages,
max_tokens=CFG.summary_token_limit
)
summaries.append(summary)
#memory_to_add = f"Source: {url}\n" f"Content summary part#{i + 1}: {summary}"
#MEMORY.add_documents([Document(page_content=memory_to_add)])
combined_summary = "\n".join(summaries)
messages = [create_message(combined_summary, question)]
final_summary = create_chat_completion(
model=CFG.fast_llm_model,
messages=messages,
max_tokens=CFG.summary_token_limit
)
print("Final summary length: ", len(combined_summary))
print(final_summary)
return final_summary
def scroll_to_percentage(driver: WebDriver, ratio: float) -> None:
"""Scroll to a percentage of the page
Args:
driver (WebDriver): The webdriver to use
ratio (float): The percentage to scroll to
Raises:
ValueError: If the ratio is not between 0 and 1
"""
if ratio < 0 or ratio > 1:
raise ValueError("Percentage should be between 0 and 1")
driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {ratio});")
def create_message(chunk: str, question: str) -> Dict[str, str]:
"""Create a message for the chat completion
Args:
chunk (str): The chunk of text to summarize
question (str): The question to answer
Returns:
Dict[str, str]: The message to send to the chat completion
"""
return {
"role": "user",
"content": f'"""{chunk}""" Using the above text, answer in short the following'
f' question: "{question}" -- if the question cannot be answered using the text,'
" simply summarize the text. "
"Include all factual information, numbers, stats etc if available.",
}
def write_to_file(filename: str, text: str) -> None:
"""Write text to a file
Args:
text (str): The text to write
filename (str): The filename to write to
"""
with open(filename, "w") as file:
file.write(text)
async def write_md_to_pdf(task: str, path: str, text: str) -> None:
file_path = f"{path}/{task}"
write_to_file(f"{file_path}.md", text)
md_to_pdf(f"{file_path}.md", f"{file_path}.pdf")
print(f"{task} written to {file_path}.pdf")
encoded_file_path = urllib.parse.quote(f"{file_path}.pdf")
return encoded_file_path
def read_txt_files(directory):
all_text = ''
for filename in os.listdir(directory):
if filename.endswith('.txt'):
with open(os.path.join(directory, filename), 'r') as file:
all_text += file.read() + '\n'
return all_text
def md_to_pdf(input_file, output_file):
md2pdf(output_file,
md_content=None,
md_file_path=input_file,
css_file_path=None,
base_url=None)
|