Shreyansh-HackRx / processing_utility.py
PercivalFletcher's picture
Create processing_utility.py
0b314ed verified
raw
history blame
15.6 kB
import httpx # An asynchronous HTTP client.
import os # To handle file paths and create directories.
import asyncio # To run synchronous libraries in an async environment.
from urllib.parse import unquote, urlparse # To get the filename from the URL.
import uuid # To generate unique filenames if needed.
from pydantic import HttpUrl
from langchain_pymupdf4llm import PyMuPDF4LLMLoader
import json
import re
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
import os
import time
import fitz # PyMuPDF
import httpx
import tempfile
import asyncio
import concurrent.futures
from typing import Optional
from pathlib import Path
import os
import argparse
from typing import Optional
from urllib.parse import urlparse, unquote
from llama_index.readers.file import PyMuPDFReader
from llama_index.core import Document # Make sure Document is imported if used elsewhere
from pydantic import HttpUrl # Assuming pydantic is installed for HttpUrl type hint
from concurrent.futures import ThreadPoolExecutor, as_completed
# Ensure required libraries are installed.
# You can install them using:
# pip install llama_cloud_services pydantic python-dotenv
from llama_cloud_services import LlamaExtract
from pydantic import BaseModel, Field
from dotenv import load_dotenv
import tempfile
from pathlib import Path
from llama_index.readers.file import PDFReader
from llama_index.readers.file import PyMuPDFReader
import PyPDF2
# Global variable for the extractor agent
llama_extract_agent = None
class Insurance(BaseModel):
"""
A Pydantic model to define the data schema for extraction.
The description helps guide the AI model.
"""
headings: str = Field(description="An array of headings")
class Insurance(BaseModel):
headings: str = Field(description="An array of headings")
def initialize_llama_extract_agent():
global llama_extract_agent
if llama_extract_agent is None:
print("Initializing LlamaExtract client and getting agent...")
try:
extractor = LlamaExtract()
llama_extract_agent = extractor.get_agent(name="insurance-parser")
print("LlamaExtract agent initialized.")
except Exception as e:
print(f"Error initializing LlamaExtract agent: {e}")
llama_extract_agent = None # Ensure it's None if there was an error
def extract_schema_from_file(file_path: str) -> Optional[Insurance]:
if not os.path.exists(file_path):
print(f"โŒ Error: The file '{file_path}' was not found.")
return None
if llama_extract_agent is None:
print("LlamaExtract agent not initialized. Attempting to initialize now.")
initialize_llama_extract_agent()
if llama_extract_agent is None:
print("LlamaExtract agent failed to initialize. Cannot proceed with extraction.")
return None
print(f"๐Ÿš€ Sending '{file_path}' to LlamaCloud for schema extraction...")
try:
result = llama_extract_agent.extract(file_path)
if result and result.data:
print("โœ… Extraction successful!")
return result.data
else:
print("โš ๏ธ Extraction did not return any data.")
return None
except Exception as e:
print(f"\nโŒ An error occurred during the API call: {e}")
print("Please check your API key, network connection, and file format.")
return None
def process_pdf_chunk(chunk_path: str) -> str:
"""
Worker function for the ProcessPoolExecutor.
It loads a single PDF chunk using PyMuPDF4LLMLoader and returns the
extracted markdown content, prepending the original page number.
Args:
chunk_path: The file path to a temporary PDF chunk.
Returns:
A string containing the markdown content for the chunk.
"""
try:
# Load the document chunk using LangChain's loader
loader = PyMuPDF4LLMLoader(chunk_path)
documents = loader.load()
page_contents = []
for doc in documents:
# Reconstruct the original page number from the filename for context
original_page_in_doc = int(doc.metadata.get('page', -1))
base_name = os.path.basename(chunk_path)
# Filename format is "chunk_START-END.pdf"
start_page_of_chunk = int(base_name.split('_')[1].split('-')[0])
# The final page number is the start of the chunk + the page within the chunk doc
actual_page_num = start_page_of_chunk + original_page_in_doc
page_contents.append(f"\n## Page {actual_page_num}\n{doc.page_content.strip()}")
return "".join(page_contents)
except Exception as e:
# Return an error message if a chunk fails to process
return f"Error processing chunk {os.path.basename(chunk_path)}: {e}"
def pdf_to_markdown_parallel(pdf_path: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str:
"""
Splits a local PDF into chunks, processes them in parallel using multiple
CPU cores, and then combines the results.
This method is ideal for very large documents and runs entirely locally.
Args:
pdf_path: The file path to the local PDF.
output_path: (Optional) The file path to save the output Markdown.
chunk_size: The number of pages to include in each parallel chunk.
Returns:
A string containing the full markdown content of the PDF.
"""
start_time = time.monotonic()
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"โŒ The file '{pdf_path}' was not found.")
temp_dir = "temp_pdf_chunks"
os.makedirs(temp_dir, exist_ok=True)
chunk_paths = []
markdown_text = ""
try:
# Step 1: Split the source PDF into smaller chunk files
print(f"Splitting '{os.path.basename(pdf_path)}' into chunks of {chunk_size} pages...")
doc = fitz.open(pdf_path)
num_pages = len(doc)
for i in range(0, num_pages, chunk_size):
chunk_start = i
chunk_end = min(i + chunk_size, num_pages)
# Create a new blank PDF and insert pages from the source
with fitz.open() as chunk_doc:
chunk_doc.insert_pdf(doc, from_page=chunk_start, to_page=chunk_end - 1)
# Naming convention includes page numbers for sorting
chunk_path = os.path.join(temp_dir, f"chunk_{chunk_start + 1}-{chunk_end}.pdf")
chunk_doc.save(chunk_path)
chunk_paths.append(chunk_path)
doc.close()
print(f"โœ… Successfully created {len(chunk_paths)} PDF chunks.")
# Step 2: Process the chunks in parallel
print(f"Processing chunks in parallel using up to {os.cpu_count()} CPU cores...")
results = {}
with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
# Map each future to its corresponding chunk path
future_to_chunk = {executor.submit(process_pdf_chunk, path): path for path in chunk_paths}
for future in concurrent.futures.as_completed(future_to_chunk):
chunk_path = future_to_chunk[future]
try:
# Store the result from the completed future
results[chunk_path] = future.result()
except Exception as e:
results[chunk_path] = f"Failed to process chunk {os.path.basename(chunk_path)}: {e}"
# Step 3: Combine results in the correct order
print("Combining processed chunks...")
# Sort results based on the original chunk path list to maintain order
sorted_results = [results[path] for path in chunk_paths]
markdown_text = "\n\n".join(sorted_results)
# Step 4: Save to file if an output path is provided
if output_path:
with open(output_path, "w", encoding="utf-8") as f:
f.write(markdown_text)
print(f"โœ… Markdown saved to: {output_path}")
finally:
# Step 5: Clean up temporary chunk files and directory
# print("Cleaning up temporary files...")
for path in chunk_paths:
if os.path.exists(path):
os.remove(path)
if os.path.exists(temp_dir):
try:
if not os.listdir(temp_dir):
os.rmdir(temp_dir)
except OSError as e:
print(f"Could not remove temp directory '{temp_dir}': {e}")
end_time = time.monotonic()
print(f"โฑ๏ธ Total processing time: {end_time - start_time:.2f} seconds")
return markdown_text
async def process_document(source: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str:
"""
High-level async function to process a PDF from a URL or local path.
It downloads the file if a URL is provided, then uses the local parallel
processor to convert it to markdown.
Args:
source: A local file path or a public URL to a PDF file.
output_path: (Optional) The file path to save the output Markdown.
chunk_size: The number of pages per chunk for parallel processing.
Returns:
A string containing the full markdown content of the PDF.
"""
# Check if the source is a URL
is_url = source.lower().startswith('http://') or source.lower().startswith('https://')
if is_url:
print(f"โฌ‡๏ธ Downloading from URL: {source}")
temp_pdf_file_path = None
try:
# Download the file asynchronously
async with httpx.AsyncClient() as client:
response = await client.get(source, timeout=60.0, follow_redirects=True)
response.raise_for_status()
# Save the downloaded content to a temporary file
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pdf') as temp_file:
temp_file.write(response.content)
temp_pdf_file_path = temp_file.name
print(f"โœ… Download complete. Saved to temporary file: {temp_pdf_file_path}")
# The parallel function is synchronous, so we run it in an executor
# to avoid blocking the asyncio event loop.
loop = asyncio.get_running_loop()
markdown_content = await loop.run_in_executor(
None, # Use the default thread pool executor
pdf_to_markdown_parallel,
temp_pdf_file_path,
output_path,
chunk_size
)
return markdown_content
finally:
# Clean up the temporary file after processing
if temp_pdf_file_path and os.path.exists(temp_pdf_file_path):
os.remove(temp_pdf_file_path)
print(f"๐Ÿ—‘๏ธ Cleaned up temporary file: {temp_pdf_file_path}")
else:
# If it's a local path, process it directly
print(f"โš™๏ธ Processing local file: {source}")
loop = asyncio.get_running_loop()
markdown_content = await loop.run_in_executor(
None,
pdf_to_markdown_parallel,
source,
output_path,
chunk_size
)
return markdown_content
# Define the batch size for parallel processing
BATCH_SIZE = 25
def process_page_batch(documents_batch: list[Document]) -> str:
"""
Helper function to extract content from a batch of LlamaIndex Document objects
and join them into a single string.
"""
return "\n\n".join([d.get_content() for d in documents_batch])
async def download_and_parse_document_using_llama_index(doc_url: HttpUrl) -> str:
"""
Asynchronously downloads a document, saves it to a local directory,
and then parses it using PyMuPDFReader from LlamaIndex.
The parsing of page content is parallelized using a ThreadPoolExecutor
with a specified batch size.
Args:
doc_url: The Pydantic-validated URL of the document to process.
Returns:
A single string containing the document's extracted text.
"""
print(f"Initiating download from: {doc_url}")
try:
# Create the local storage directory if it doesn't exist.
LOCAL_STORAGE_DIR = "data/"
os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True)
async with httpx.AsyncClient() as client:
response = await client.get(str(doc_url), timeout=30.0, follow_redirects=True)
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
doc_bytes = response.content
print("Download successful.")
# --- Logic to determine the local filename ---
parsed_path = urlparse(str(doc_url)).path
filename = unquote(os.path.basename(parsed_path))
if not filename:
# If the URL doesn't provide a filename, create a generic one.
# Ensure it's a PDF if we're using PyMuPDFReader for PDF parsing.
filename = "downloaded_document.pdf"
local_file_path = Path(os.path.join(LOCAL_STORAGE_DIR, filename))
# Save the downloaded document to the local file.
with open(local_file_path, "wb") as f:
f.write(doc_bytes)
print(f"Document saved locally at: {local_file_path}")
print("Parsing document with PyMuPDFReader...")
# PyMuPDFReader parsing logic: loads the entire document and returns a list of Document objects (one per page)
loader = PyMuPDFReader()
docs0 = loader.load_data(file_path=Path(local_file_path))
# Measure time for parallel doc_text creation
start_time_conversion = time.perf_counter()
all_extracted_texts = []
# Use ThreadPoolExecutor for parallel processing of page batches
# The max_workers argument can be adjusted based on available CPU cores
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
futures = []
# Divide docs0 (list of pages) into batches of BATCH_SIZE
for i in range(0, len(docs0), BATCH_SIZE):
batch = docs0[i:i + BATCH_SIZE]
# Submit each batch to the executor for processing
futures.append(executor.submit(process_page_batch, batch))
# Collect results as they complete
# as_completed returns futures as they finish, maintaining responsiveness
for future in as_completed(futures):
all_extracted_texts.append(future.result())
# Join all extracted texts from the batches into a single string
doc_text = "\n\n".join(all_extracted_texts)
end_time_conversion = time.perf_counter()
elapsed_time_conversion = end_time_conversion - start_time_conversion
print(f"Time taken for parallel doc_text creation: {elapsed_time_conversion:.6f} seconds.")
# The extracted text is now in 'doc_text'
if doc_text:
print(f"Parsing complete. Extracted {len(doc_text)} characters.")
# The local file is NOT deleted, as per original code.
return doc_text
else:
raise ValueError("PyMuPDFReader did not extract any content.")
except httpx.HTTPStatusError as e:
print(f"Error downloading document: HTTP status error: {e}")
raise
except Exception as e:
print(f"An unexpected error occurred during processing: {e}")
raise