Spaces:
Sleeping
Sleeping
# file: document_processing.py | |
import os | |
import time | |
import httpx | |
from pathlib import Path | |
from urllib.parse import urlparse, unquote | |
from llama_index.readers.file import PyMuPDFReader | |
from llama_index.core import Document as LlamaDocument | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from pydantic import HttpUrl | |
from typing import List | |
# Define the batch size for parallel processing | |
BATCH_SIZE = 25 | |
def _process_page_batch(documents_batch: List[LlamaDocument]) -> str: | |
""" | |
Helper function to extract content from a batch of LlamaIndex Document objects | |
and join them into a single string. | |
""" | |
return "\n\n".join([d.get_content() for d in documents_batch]) | |
async def ingest_and_parse_document(doc_url: HttpUrl) -> str: | |
""" | |
Asynchronously downloads a document, saves it locally, and parses it to | |
Markdown text using PyMuPDFReader with parallel processing. | |
Args: | |
doc_url: The Pydantic-validated URL of the document to process. | |
Returns: | |
A single string containing the document's extracted text. | |
""" | |
print(f"Initiating download from: {doc_url}") | |
LOCAL_STORAGE_DIR = "data/" | |
os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True) | |
try: | |
# Asynchronously download the document | |
async with httpx.AsyncClient() as client: | |
response = await client.get(str(doc_url), timeout=30.0, follow_redirects=True) | |
response.raise_for_status() | |
doc_bytes = response.content | |
print("Download successful.") | |
# Determine a valid local filename | |
parsed_path = urlparse(str(doc_url)).path | |
filename = unquote(os.path.basename(parsed_path)) or "downloaded_document.pdf" | |
local_file_path = Path(os.path.join(LOCAL_STORAGE_DIR, filename)) | |
# Save the document locally | |
with open(local_file_path, "wb") as f: | |
f.write(doc_bytes) | |
print(f"Document saved locally at: {local_file_path}") | |
# Parse the document using LlamaIndex's PyMuPDFReader | |
print("Parsing document with PyMuPDFReader...") | |
loader = PyMuPDFReader() | |
docs_from_loader = loader.load_data(file_path=local_file_path) | |
# Parallelize the extraction of text from loaded pages | |
start_time = time.perf_counter() | |
all_extracted_texts = [] | |
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor: | |
futures = [ | |
executor.submit(_process_page_batch, docs_from_loader[i:i + BATCH_SIZE]) | |
for i in range(0, len(docs_from_loader), BATCH_SIZE) | |
] | |
for future in as_completed(futures): | |
all_extracted_texts.append(future.result()) | |
doc_text = "\n\n".join(all_extracted_texts) | |
elapsed_time = time.perf_counter() - start_time | |
print(f"Time taken for parallel text extraction: {elapsed_time:.4f} seconds.") | |
if not doc_text: | |
raise ValueError("Document parsing yielded no content.") | |
print(f"Parsing complete. Extracted {len(doc_text)} characters.") | |
return doc_text | |
except httpx.HTTPStatusError as e: | |
print(f"Error downloading document: {e}") | |
raise | |
except Exception as e: | |
print(f"An unexpected error occurred during document processing: {e}") | |
raise |