| import math | |
| import re | |
| from statistics import median | |
| from bs4 import BeautifulSoup | |
| from langchain.docstore.document import Document | |
| from langchain.document_loaders import PDFMinerPDFasHTMLLoader, WebBaseLoader | |
| import tiktoken | |
| deep_strip = lambda text: re.sub(r"\s+", " ", text or "").strip() | |
| def process_documents(urls): | |
| snippets = [] | |
| documents = {} | |
| for source_id, url in enumerate(urls): | |
| snippet = ( | |
| process_pdf(url, source_id) | |
| if url.endswith(".pdf") | |
| else process_web(url, source_id) | |
| ) | |
| snippets.extend(snippet) | |
| documents[str(source_id)] = Document( | |
| page_content="\n".join([snip.page_content for snip in snippet]), | |
| metadata={ | |
| "source_url": url, | |
| "source_type": "pdf" if url.endswith(".pdf") else "web", | |
| "source_id": source_id, | |
| "chunk_id": source_id, | |
| }, | |
| ) | |
| for snip in snippet: | |
| documents[snip.metadata["chunk_id"]] = snip | |
| return snippets, documents | |
| def process_web(url, source_id): | |
| data = WebBaseLoader(url).load()[0] | |
| document_snippets = [ | |
| Document( | |
| page_content=deep_strip(data.page_content), | |
| metadata={ | |
| "header": data.metadata["title"], | |
| "source_url": url, | |
| "source_type": "web", | |
| "chunk_id": source_id, | |
| "source_id": source_id, | |
| }, | |
| ) | |
| ] | |
| return document_snippets | |
| def process_pdf(url, source_id): | |
| data = PDFMinerPDFasHTMLLoader(url).load()[0] | |
| content = BeautifulSoup(data.page_content, "html.parser").find_all("div") | |
| snippets = get_pdf_snippets(content) | |
| filtered_snippets = filter_pdf_snippets(snippets, new_line_threshold_ratio=0.4) | |
| median_font_size = math.ceil( | |
| median([font_size for _, font_size in filtered_snippets]) | |
| ) | |
| semantic_snippets = get_pdf_semantic_snippets(filtered_snippets, median_font_size) | |
| document_snippets = [ | |
| Document( | |
| page_content=deep_strip(snip[1]["header_text"]) + " " + deep_strip(snip[0]), | |
| metadata={ | |
| "header": " ".join(snip[1]["header_text"].split()[:10]), | |
| "source_url": url, | |
| "source_type": "pdf", | |
| "chunk_id": f"{source_id}_{i:02d}", | |
| "source_id": source_id, | |
| }, | |
| ) | |
| for i, snip in enumerate(semantic_snippets) | |
| ] | |
| return document_snippets | |
| def get_pdf_snippets(content): | |
| current_font_size = None | |
| current_text = "" | |
| snippets = [] | |
| for cntnt in content: | |
| span = cntnt.find("span") | |
| if not span: | |
| continue | |
| style = span.get("style") | |
| if not style: | |
| continue | |
| font_size = re.findall("font-size:(\d+)px", style) | |
| if not font_size: | |
| continue | |
| font_size = int(font_size[0]) | |
| if not current_font_size: | |
| current_font_size = font_size | |
| if font_size == current_font_size: | |
| current_text += cntnt.text | |
| else: | |
| snippets.append((current_text, current_font_size)) | |
| current_font_size = font_size | |
| current_text = cntnt.text | |
| snippets.append((current_text, current_font_size)) | |
| return snippets | |
| def filter_pdf_snippets(content_list, new_line_threshold_ratio): | |
| filtered_list = [] | |
| for e, (content, font_size) in enumerate(content_list): | |
| newline_count = content.count("\n") | |
| total_chars = len(content) | |
| ratio = newline_count / total_chars | |
| if ratio <= new_line_threshold_ratio: | |
| filtered_list.append((content, font_size)) | |
| return filtered_list | |
| def get_pdf_semantic_snippets(filtered_snippets, median_font_size): | |
| semantic_snippets = [] | |
| current_header = None | |
| current_content = [] | |
| header_font_size = None | |
| content_font_sizes = [] | |
| for content, font_size in filtered_snippets: | |
| if font_size > median_font_size: | |
| if current_header is not None: | |
| metadata = { | |
| "header_font_size": header_font_size, | |
| "content_font_size": ( | |
| median(content_font_sizes) if content_font_sizes else None | |
| ), | |
| "header_text": current_header, | |
| } | |
| semantic_snippets.append((current_content, metadata)) | |
| current_content = [] | |
| content_font_sizes = [] | |
| current_header = content | |
| header_font_size = font_size | |
| else: | |
| content_font_sizes.append(font_size) | |
| if current_content: | |
| current_content += " " + content | |
| else: | |
| current_content = content | |
| if current_header is not None: | |
| metadata = { | |
| "header_font_size": header_font_size, | |
| "content_font_size": ( | |
| median(content_font_sizes) if content_font_sizes else None | |
| ), | |
| "header_text": current_header, | |
| } | |
| semantic_snippets.append((current_content, metadata)) | |
| return semantic_snippets | |
| def num_tokens(string): | |
| return len(tiktoken.get_encoding("cl100k_base").encode(string, disallowed_special=())) | |