Spaces:
Sleeping
Sleeping
import httpx # An asynchronous HTTP client. | |
import os # To handle file paths and create directories. | |
import asyncio # To run synchronous libraries in an async environment. | |
from urllib.parse import unquote, urlparse # To get the filename from the URL. | |
import uuid # To generate unique filenames if needed. | |
from pydantic import HttpUrl | |
from langchain_pymupdf4llm import PyMuPDF4LLMLoader | |
import json | |
import re | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.schema import Document | |
import os | |
import time | |
import fitz # PyMuPDF | |
import httpx | |
import tempfile | |
import asyncio | |
import concurrent.futures | |
from typing import Optional | |
from pathlib import Path | |
import os | |
import argparse | |
from typing import Optional | |
from urllib.parse import urlparse, unquote | |
from llama_index.readers.file import PyMuPDFReader | |
from llama_index.core import Document # Make sure Document is imported if used elsewhere | |
from pydantic import HttpUrl # Assuming pydantic is installed for HttpUrl type hint | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
# Ensure required libraries are installed. | |
# You can install them using: | |
# pip install llama_cloud_services pydantic python-dotenv | |
from llama_cloud_services import LlamaExtract | |
from pydantic import BaseModel, Field | |
from dotenv import load_dotenv | |
import tempfile | |
from pathlib import Path | |
from llama_index.readers.file import PDFReader | |
from llama_index.readers.file import PyMuPDFReader | |
import PyPDF2 | |
# Global variable for the extractor agent | |
llama_extract_agent = None | |
class Insurance(BaseModel): | |
""" | |
A Pydantic model to define the data schema for extraction. | |
The description helps guide the AI model. | |
""" | |
headings: str = Field(description="An array of headings") | |
class Insurance(BaseModel): | |
headings: str = Field(description="An array of headings") | |
def initialize_llama_extract_agent(): | |
global llama_extract_agent | |
if llama_extract_agent is None: | |
print("Initializing LlamaExtract client and getting agent...") | |
try: | |
extractor = LlamaExtract() | |
llama_extract_agent = extractor.get_agent(name="insurance-parser") | |
print("LlamaExtract agent initialized.") | |
except Exception as e: | |
print(f"Error initializing LlamaExtract agent: {e}") | |
llama_extract_agent = None # Ensure it's None if there was an error | |
def extract_schema_from_file(file_path: str) -> Optional[Insurance]: | |
if not os.path.exists(file_path): | |
print(f"โ Error: The file '{file_path}' was not found.") | |
return None | |
if llama_extract_agent is None: | |
print("LlamaExtract agent not initialized. Attempting to initialize now.") | |
initialize_llama_extract_agent() | |
if llama_extract_agent is None: | |
print("LlamaExtract agent failed to initialize. Cannot proceed with extraction.") | |
return None | |
print(f"๐ Sending '{file_path}' to LlamaCloud for schema extraction...") | |
try: | |
result = llama_extract_agent.extract(file_path) | |
if result and result.data: | |
print("โ Extraction successful!") | |
return result.data | |
else: | |
print("โ ๏ธ Extraction did not return any data.") | |
return None | |
except Exception as e: | |
print(f"\nโ An error occurred during the API call: {e}") | |
print("Please check your API key, network connection, and file format.") | |
return None | |
def process_pdf_chunk(chunk_path: str) -> str: | |
""" | |
Worker function for the ProcessPoolExecutor. | |
It loads a single PDF chunk using PyMuPDF4LLMLoader and returns the | |
extracted markdown content, prepending the original page number. | |
Args: | |
chunk_path: The file path to a temporary PDF chunk. | |
Returns: | |
A string containing the markdown content for the chunk. | |
""" | |
try: | |
# Load the document chunk using LangChain's loader | |
loader = PyMuPDF4LLMLoader(chunk_path) | |
documents = loader.load() | |
page_contents = [] | |
for doc in documents: | |
# Reconstruct the original page number from the filename for context | |
original_page_in_doc = int(doc.metadata.get('page', -1)) | |
base_name = os.path.basename(chunk_path) | |
# Filename format is "chunk_START-END.pdf" | |
start_page_of_chunk = int(base_name.split('_')[1].split('-')[0]) | |
# The final page number is the start of the chunk + the page within the chunk doc | |
actual_page_num = start_page_of_chunk + original_page_in_doc | |
page_contents.append(f"\n## Page {actual_page_num}\n{doc.page_content.strip()}") | |
return "".join(page_contents) | |
except Exception as e: | |
# Return an error message if a chunk fails to process | |
return f"Error processing chunk {os.path.basename(chunk_path)}: {e}" | |
def pdf_to_markdown_parallel(pdf_path: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str: | |
""" | |
Splits a local PDF into chunks, processes them in parallel using multiple | |
CPU cores, and then combines the results. | |
This method is ideal for very large documents and runs entirely locally. | |
Args: | |
pdf_path: The file path to the local PDF. | |
output_path: (Optional) The file path to save the output Markdown. | |
chunk_size: The number of pages to include in each parallel chunk. | |
Returns: | |
A string containing the full markdown content of the PDF. | |
""" | |
start_time = time.monotonic() | |
if not os.path.exists(pdf_path): | |
raise FileNotFoundError(f"โ The file '{pdf_path}' was not found.") | |
temp_dir = "temp_pdf_chunks" | |
os.makedirs(temp_dir, exist_ok=True) | |
chunk_paths = [] | |
markdown_text = "" | |
try: | |
# Step 1: Split the source PDF into smaller chunk files | |
print(f"Splitting '{os.path.basename(pdf_path)}' into chunks of {chunk_size} pages...") | |
doc = fitz.open(pdf_path) | |
num_pages = len(doc) | |
for i in range(0, num_pages, chunk_size): | |
chunk_start = i | |
chunk_end = min(i + chunk_size, num_pages) | |
# Create a new blank PDF and insert pages from the source | |
with fitz.open() as chunk_doc: | |
chunk_doc.insert_pdf(doc, from_page=chunk_start, to_page=chunk_end - 1) | |
# Naming convention includes page numbers for sorting | |
chunk_path = os.path.join(temp_dir, f"chunk_{chunk_start + 1}-{chunk_end}.pdf") | |
chunk_doc.save(chunk_path) | |
chunk_paths.append(chunk_path) | |
doc.close() | |
print(f"โ Successfully created {len(chunk_paths)} PDF chunks.") | |
# Step 2: Process the chunks in parallel | |
print(f"Processing chunks in parallel using up to {os.cpu_count()} CPU cores...") | |
results = {} | |
with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor: | |
# Map each future to its corresponding chunk path | |
future_to_chunk = {executor.submit(process_pdf_chunk, path): path for path in chunk_paths} | |
for future in concurrent.futures.as_completed(future_to_chunk): | |
chunk_path = future_to_chunk[future] | |
try: | |
# Store the result from the completed future | |
results[chunk_path] = future.result() | |
except Exception as e: | |
results[chunk_path] = f"Failed to process chunk {os.path.basename(chunk_path)}: {e}" | |
# Step 3: Combine results in the correct order | |
print("Combining processed chunks...") | |
# Sort results based on the original chunk path list to maintain order | |
sorted_results = [results[path] for path in chunk_paths] | |
markdown_text = "\n\n".join(sorted_results) | |
# Step 4: Save to file if an output path is provided | |
if output_path: | |
with open(output_path, "w", encoding="utf-8") as f: | |
f.write(markdown_text) | |
print(f"โ Markdown saved to: {output_path}") | |
finally: | |
# Step 5: Clean up temporary chunk files and directory | |
# print("Cleaning up temporary files...") | |
for path in chunk_paths: | |
if os.path.exists(path): | |
os.remove(path) | |
if os.path.exists(temp_dir): | |
try: | |
if not os.listdir(temp_dir): | |
os.rmdir(temp_dir) | |
except OSError as e: | |
print(f"Could not remove temp directory '{temp_dir}': {e}") | |
end_time = time.monotonic() | |
print(f"โฑ๏ธ Total processing time: {end_time - start_time:.2f} seconds") | |
return markdown_text | |
async def process_document(source: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str: | |
""" | |
High-level async function to process a PDF from a URL or local path. | |
It downloads the file if a URL is provided, then uses the local parallel | |
processor to convert it to markdown. | |
Args: | |
source: A local file path or a public URL to a PDF file. | |
output_path: (Optional) The file path to save the output Markdown. | |
chunk_size: The number of pages per chunk for parallel processing. | |
Returns: | |
A string containing the full markdown content of the PDF. | |
""" | |
# Check if the source is a URL | |
is_url = source.lower().startswith('http://') or source.lower().startswith('https://') | |
if is_url: | |
print(f"โฌ๏ธ Downloading from URL: {source}") | |
temp_pdf_file_path = None | |
try: | |
# Download the file asynchronously | |
async with httpx.AsyncClient() as client: | |
response = await client.get(source, timeout=60.0, follow_redirects=True) | |
response.raise_for_status() | |
# Save the downloaded content to a temporary file | |
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pdf') as temp_file: | |
temp_file.write(response.content) | |
temp_pdf_file_path = temp_file.name | |
print(f"โ Download complete. Saved to temporary file: {temp_pdf_file_path}") | |
# The parallel function is synchronous, so we run it in an executor | |
# to avoid blocking the asyncio event loop. | |
loop = asyncio.get_running_loop() | |
markdown_content = await loop.run_in_executor( | |
None, # Use the default thread pool executor | |
pdf_to_markdown_parallel, | |
temp_pdf_file_path, | |
output_path, | |
chunk_size | |
) | |
return markdown_content | |
finally: | |
# Clean up the temporary file after processing | |
if temp_pdf_file_path and os.path.exists(temp_pdf_file_path): | |
os.remove(temp_pdf_file_path) | |
print(f"๐๏ธ Cleaned up temporary file: {temp_pdf_file_path}") | |
else: | |
# If it's a local path, process it directly | |
print(f"โ๏ธ Processing local file: {source}") | |
loop = asyncio.get_running_loop() | |
markdown_content = await loop.run_in_executor( | |
None, | |
pdf_to_markdown_parallel, | |
source, | |
output_path, | |
chunk_size | |
) | |
return markdown_content | |
# Define the batch size for parallel processing | |
BATCH_SIZE = 25 | |
def process_page_batch(documents_batch: list[Document]) -> str: | |
""" | |
Helper function to extract content from a batch of LlamaIndex Document objects | |
and join them into a single string. | |
""" | |
return "\n\n".join([d.get_content() for d in documents_batch]) | |
async def download_and_parse_document_using_llama_index(doc_url: HttpUrl) -> str: | |
""" | |
Asynchronously downloads a document, saves it to a local directory, | |
and then parses it using PyMuPDFReader from LlamaIndex. | |
The parsing of page content is parallelized using a ThreadPoolExecutor | |
with a specified batch size. | |
Args: | |
doc_url: The Pydantic-validated URL of the document to process. | |
Returns: | |
A single string containing the document's extracted text. | |
""" | |
print(f"Initiating download from: {doc_url}") | |
try: | |
# Create the local storage directory if it doesn't exist. | |
LOCAL_STORAGE_DIR = "data/" | |
os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True) | |
async with httpx.AsyncClient() as client: | |
response = await client.get(str(doc_url), timeout=30.0, follow_redirects=True) | |
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) | |
doc_bytes = response.content | |
print("Download successful.") | |
# --- Logic to determine the local filename --- | |
parsed_path = urlparse(str(doc_url)).path | |
filename = unquote(os.path.basename(parsed_path)) | |
if not filename: | |
# If the URL doesn't provide a filename, create a generic one. | |
# Ensure it's a PDF if we're using PyMuPDFReader for PDF parsing. | |
filename = "downloaded_document.pdf" | |
local_file_path = Path(os.path.join(LOCAL_STORAGE_DIR, filename)) | |
# Save the downloaded document to the local file. | |
with open(local_file_path, "wb") as f: | |
f.write(doc_bytes) | |
print(f"Document saved locally at: {local_file_path}") | |
print("Parsing document with PyMuPDFReader...") | |
# PyMuPDFReader parsing logic: loads the entire document and returns a list of Document objects (one per page) | |
loader = PyMuPDFReader() | |
docs0 = loader.load_data(file_path=Path(local_file_path)) | |
# Measure time for parallel doc_text creation | |
start_time_conversion = time.perf_counter() | |
all_extracted_texts = [] | |
# Use ThreadPoolExecutor for parallel processing of page batches | |
# The max_workers argument can be adjusted based on available CPU cores | |
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor: | |
futures = [] | |
# Divide docs0 (list of pages) into batches of BATCH_SIZE | |
for i in range(0, len(docs0), BATCH_SIZE): | |
batch = docs0[i:i + BATCH_SIZE] | |
# Submit each batch to the executor for processing | |
futures.append(executor.submit(process_page_batch, batch)) | |
# Collect results as they complete | |
# as_completed returns futures as they finish, maintaining responsiveness | |
for future in as_completed(futures): | |
all_extracted_texts.append(future.result()) | |
# Join all extracted texts from the batches into a single string | |
doc_text = "\n\n".join(all_extracted_texts) | |
end_time_conversion = time.perf_counter() | |
elapsed_time_conversion = end_time_conversion - start_time_conversion | |
print(f"Time taken for parallel doc_text creation: {elapsed_time_conversion:.6f} seconds.") | |
# The extracted text is now in 'doc_text' | |
if doc_text: | |
print(f"Parsing complete. Extracted {len(doc_text)} characters.") | |
# The local file is NOT deleted, as per original code. | |
return doc_text | |
else: | |
raise ValueError("PyMuPDFReader did not extract any content.") | |
except httpx.HTTPStatusError as e: | |
print(f"Error downloading document: HTTP status error: {e}") | |
raise | |
except Exception as e: | |
print(f"An unexpected error occurred during processing: {e}") | |
raise |