File size: 3,321 Bytes
a19a241
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# file: document_processing.py

import os
import time
import httpx
from pathlib import Path
from urllib.parse import urlparse, unquote
from llama_index.readers.file import PyMuPDFReader
from llama_index.core import Document as LlamaDocument
from concurrent.futures import ThreadPoolExecutor, as_completed
from pydantic import HttpUrl
from typing import List

# Define the batch size for parallel processing
BATCH_SIZE = 25

def _process_page_batch(documents_batch: List[LlamaDocument]) -> str:
    """
    Helper function to extract content from a batch of LlamaIndex Document objects
    and join them into a single string.
    """
    return "\n\n".join([d.get_content() for d in documents_batch])

async def ingest_and_parse_document(doc_url: HttpUrl) -> str:
    """
    Asynchronously downloads a document, saves it locally, and parses it to
    Markdown text using PyMuPDFReader with parallel processing.

    Args:
        doc_url: The Pydantic-validated URL of the document to process.

    Returns:
        A single string containing the document's extracted text.
    """
    print(f"Initiating download from: {doc_url}")
    LOCAL_STORAGE_DIR = "data/"
    os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True)

    try:
        # Asynchronously download the document
        async with httpx.AsyncClient() as client:
            response = await client.get(str(doc_url), timeout=30.0, follow_redirects=True)
            response.raise_for_status()
            doc_bytes = response.content
        print("Download successful.")

        # Determine a valid local filename
        parsed_path = urlparse(str(doc_url)).path
        filename = unquote(os.path.basename(parsed_path)) or "downloaded_document.pdf"
        local_file_path = Path(os.path.join(LOCAL_STORAGE_DIR, filename))

        # Save the document locally
        with open(local_file_path, "wb") as f:
            f.write(doc_bytes)
        print(f"Document saved locally at: {local_file_path}")

        # Parse the document using LlamaIndex's PyMuPDFReader
        print("Parsing document with PyMuPDFReader...")
        loader = PyMuPDFReader()
        docs_from_loader = loader.load_data(file_path=local_file_path)

        # Parallelize the extraction of text from loaded pages
        start_time = time.perf_counter()
        all_extracted_texts = []
        with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
            futures = [
                executor.submit(_process_page_batch, docs_from_loader[i:i + BATCH_SIZE])
                for i in range(0, len(docs_from_loader), BATCH_SIZE)
            ]
            for future in as_completed(futures):
                all_extracted_texts.append(future.result())

        doc_text = "\n\n".join(all_extracted_texts)
        elapsed_time = time.perf_counter() - start_time
        print(f"Time taken for parallel text extraction: {elapsed_time:.4f} seconds.")

        if not doc_text:
            raise ValueError("Document parsing yielded no content.")

        print(f"Parsing complete. Extracted {len(doc_text)} characters.")
        return doc_text

    except httpx.HTTPStatusError as e:
        print(f"Error downloading document: {e}")
        raise
    except Exception as e:
        print(f"An unexpected error occurred during document processing: {e}")
        raise