ChadGPT / document_processor.py
PercivalFletcher's picture
Upload 6 files
a19a241 verified
raw
history blame
3.32 kB
# file: document_processing.py
import os
import time
import httpx
from pathlib import Path
from urllib.parse import urlparse, unquote
from llama_index.readers.file import PyMuPDFReader
from llama_index.core import Document as LlamaDocument
from concurrent.futures import ThreadPoolExecutor, as_completed
from pydantic import HttpUrl
from typing import List
# Define the batch size for parallel processing
BATCH_SIZE = 25
def _process_page_batch(documents_batch: List[LlamaDocument]) -> str:
"""
Helper function to extract content from a batch of LlamaIndex Document objects
and join them into a single string.
"""
return "\n\n".join([d.get_content() for d in documents_batch])
async def ingest_and_parse_document(doc_url: HttpUrl) -> str:
"""
Asynchronously downloads a document, saves it locally, and parses it to
Markdown text using PyMuPDFReader with parallel processing.
Args:
doc_url: The Pydantic-validated URL of the document to process.
Returns:
A single string containing the document's extracted text.
"""
print(f"Initiating download from: {doc_url}")
LOCAL_STORAGE_DIR = "data/"
os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True)
try:
# Asynchronously download the document
async with httpx.AsyncClient() as client:
response = await client.get(str(doc_url), timeout=30.0, follow_redirects=True)
response.raise_for_status()
doc_bytes = response.content
print("Download successful.")
# Determine a valid local filename
parsed_path = urlparse(str(doc_url)).path
filename = unquote(os.path.basename(parsed_path)) or "downloaded_document.pdf"
local_file_path = Path(os.path.join(LOCAL_STORAGE_DIR, filename))
# Save the document locally
with open(local_file_path, "wb") as f:
f.write(doc_bytes)
print(f"Document saved locally at: {local_file_path}")
# Parse the document using LlamaIndex's PyMuPDFReader
print("Parsing document with PyMuPDFReader...")
loader = PyMuPDFReader()
docs_from_loader = loader.load_data(file_path=local_file_path)
# Parallelize the extraction of text from loaded pages
start_time = time.perf_counter()
all_extracted_texts = []
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
futures = [
executor.submit(_process_page_batch, docs_from_loader[i:i + BATCH_SIZE])
for i in range(0, len(docs_from_loader), BATCH_SIZE)
]
for future in as_completed(futures):
all_extracted_texts.append(future.result())
doc_text = "\n\n".join(all_extracted_texts)
elapsed_time = time.perf_counter() - start_time
print(f"Time taken for parallel text extraction: {elapsed_time:.4f} seconds.")
if not doc_text:
raise ValueError("Document parsing yielded no content.")
print(f"Parsing complete. Extracted {len(doc_text)} characters.")
return doc_text
except httpx.HTTPStatusError as e:
print(f"Error downloading document: {e}")
raise
except Exception as e:
print(f"An unexpected error occurred during document processing: {e}")
raise