Spaces:
Sleeping
Sleeping
| import httpx # An asynchronous HTTP client. | |
| import os # To handle file paths and create directories. | |
| import asyncio # To run synchronous libraries in an async environment. | |
| from urllib.parse import unquote, urlparse # To get the filename from the URL. | |
| import uuid # To generate unique filenames if needed. | |
| from pydantic import HttpUrl | |
| from langchain_pymupdf4llm import PyMuPDF4LLMLoader | |
| import json | |
| import re | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.schema import Document | |
| import os | |
| import time | |
| import fitz # PyMuPDF | |
| import httpx | |
| import tempfile | |
| import asyncio | |
| import concurrent.futures | |
| from typing import Optional | |
| from pathlib import Path | |
| import os | |
| import argparse | |
| from typing import Optional | |
| from urllib.parse import urlparse, unquote | |
| from llama_index.readers.file import PyMuPDFReader | |
| from llama_index.core import Document # Make sure Document is imported if used elsewhere | |
| from pydantic import HttpUrl # Assuming pydantic is installed for HttpUrl type hint | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| # Ensure required libraries are installed. | |
| # You can install them using: | |
| # pip install llama_cloud_services pydantic python-dotenv | |
| from llama_cloud_services import LlamaExtract | |
| from pydantic import BaseModel, Field | |
| from dotenv import load_dotenv | |
| import tempfile | |
| from pathlib import Path | |
| from llama_index.readers.file import PDFReader | |
| from llama_index.readers.file import PyMuPDFReader | |
| import PyPDF2 | |
| # Global variable for the extractor agent | |
| llama_extract_agent = None | |
| class Insurance(BaseModel): | |
| """ | |
| A Pydantic model to define the data schema for extraction. | |
| The description helps guide the AI model. | |
| """ | |
| headings: str = Field(description="An array of headings") | |
| class Insurance(BaseModel): | |
| headings: str = Field(description="An array of headings") | |
| def initialize_llama_extract_agent(): | |
| global llama_extract_agent | |
| if llama_extract_agent is None: | |
| print("Initializing LlamaExtract client and getting agent...") | |
| try: | |
| extractor = LlamaExtract() | |
| llama_extract_agent = extractor.get_agent(name="insurance-parser") | |
| print("LlamaExtract agent initialized.") | |
| except Exception as e: | |
| print(f"Error initializing LlamaExtract agent: {e}") | |
| llama_extract_agent = None # Ensure it's None if there was an error | |
| def extract_schema_from_file(file_path: str) -> Optional[Insurance]: | |
| if not os.path.exists(file_path): | |
| print(f"โ Error: The file '{file_path}' was not found.") | |
| return None | |
| if llama_extract_agent is None: | |
| print("LlamaExtract agent not initialized. Attempting to initialize now.") | |
| initialize_llama_extract_agent() | |
| if llama_extract_agent is None: | |
| print("LlamaExtract agent failed to initialize. Cannot proceed with extraction.") | |
| return None | |
| print(f"๐ Sending '{file_path}' to LlamaCloud for schema extraction...") | |
| try: | |
| result = llama_extract_agent.extract(file_path) | |
| if result and result.data: | |
| print("โ Extraction successful!") | |
| return result.data | |
| else: | |
| print("โ ๏ธ Extraction did not return any data.") | |
| return None | |
| except Exception as e: | |
| print(f"\nโ An error occurred during the API call: {e}") | |
| print("Please check your API key, network connection, and file format.") | |
| return None | |
| def process_pdf_chunk(chunk_path: str) -> str: | |
| """ | |
| Worker function for the ProcessPoolExecutor. | |
| It loads a single PDF chunk using PyMuPDF4LLMLoader and returns the | |
| extracted markdown content, prepending the original page number. | |
| Args: | |
| chunk_path: The file path to a temporary PDF chunk. | |
| Returns: | |
| A string containing the markdown content for the chunk. | |
| """ | |
| try: | |
| # Load the document chunk using LangChain's loader | |
| loader = PyMuPDF4LLMLoader(chunk_path) | |
| documents = loader.load() | |
| page_contents = [] | |
| for doc in documents: | |
| # Reconstruct the original page number from the filename for context | |
| original_page_in_doc = int(doc.metadata.get('page', -1)) | |
| base_name = os.path.basename(chunk_path) | |
| # Filename format is "chunk_START-END.pdf" | |
| start_page_of_chunk = int(base_name.split('_')[1].split('-')[0]) | |
| # The final page number is the start of the chunk + the page within the chunk doc | |
| actual_page_num = start_page_of_chunk + original_page_in_doc | |
| page_contents.append(f"\n## Page {actual_page_num}\n{doc.page_content.strip()}") | |
| return "".join(page_contents) | |
| except Exception as e: | |
| # Return an error message if a chunk fails to process | |
| return f"Error processing chunk {os.path.basename(chunk_path)}: {e}" | |
| def pdf_to_markdown_parallel(pdf_path: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str: | |
| """ | |
| Splits a local PDF into chunks, processes them in parallel using multiple | |
| CPU cores, and then combines the results. | |
| This method is ideal for very large documents and runs entirely locally. | |
| Args: | |
| pdf_path: The file path to the local PDF. | |
| output_path: (Optional) The file path to save the output Markdown. | |
| chunk_size: The number of pages to include in each parallel chunk. | |
| Returns: | |
| A string containing the full markdown content of the PDF. | |
| """ | |
| start_time = time.monotonic() | |
| if not os.path.exists(pdf_path): | |
| raise FileNotFoundError(f"โ The file '{pdf_path}' was not found.") | |
| temp_dir = "temp_pdf_chunks" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| chunk_paths = [] | |
| markdown_text = "" | |
| try: | |
| # Step 1: Split the source PDF into smaller chunk files | |
| print(f"Splitting '{os.path.basename(pdf_path)}' into chunks of {chunk_size} pages...") | |
| doc = fitz.open(pdf_path) | |
| num_pages = len(doc) | |
| for i in range(0, num_pages, chunk_size): | |
| chunk_start = i | |
| chunk_end = min(i + chunk_size, num_pages) | |
| # Create a new blank PDF and insert pages from the source | |
| with fitz.open() as chunk_doc: | |
| chunk_doc.insert_pdf(doc, from_page=chunk_start, to_page=chunk_end - 1) | |
| # Naming convention includes page numbers for sorting | |
| chunk_path = os.path.join(temp_dir, f"chunk_{chunk_start + 1}-{chunk_end}.pdf") | |
| chunk_doc.save(chunk_path) | |
| chunk_paths.append(chunk_path) | |
| doc.close() | |
| print(f"โ Successfully created {len(chunk_paths)} PDF chunks.") | |
| # Step 2: Process the chunks in parallel | |
| print(f"Processing chunks in parallel using up to {os.cpu_count()} CPU cores...") | |
| results = {} | |
| with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor: | |
| # Map each future to its corresponding chunk path | |
| future_to_chunk = {executor.submit(process_pdf_chunk, path): path for path in chunk_paths} | |
| for future in concurrent.futures.as_completed(future_to_chunk): | |
| chunk_path = future_to_chunk[future] | |
| try: | |
| # Store the result from the completed future | |
| results[chunk_path] = future.result() | |
| except Exception as e: | |
| results[chunk_path] = f"Failed to process chunk {os.path.basename(chunk_path)}: {e}" | |
| # Step 3: Combine results in the correct order | |
| print("Combining processed chunks...") | |
| # Sort results based on the original chunk path list to maintain order | |
| sorted_results = [results[path] for path in chunk_paths] | |
| markdown_text = "\n\n".join(sorted_results) | |
| # Step 4: Save to file if an output path is provided | |
| if output_path: | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| f.write(markdown_text) | |
| print(f"โ Markdown saved to: {output_path}") | |
| finally: | |
| # Step 5: Clean up temporary chunk files and directory | |
| # print("Cleaning up temporary files...") | |
| for path in chunk_paths: | |
| if os.path.exists(path): | |
| os.remove(path) | |
| if os.path.exists(temp_dir): | |
| try: | |
| if not os.listdir(temp_dir): | |
| os.rmdir(temp_dir) | |
| except OSError as e: | |
| print(f"Could not remove temp directory '{temp_dir}': {e}") | |
| end_time = time.monotonic() | |
| print(f"โฑ๏ธ Total processing time: {end_time - start_time:.2f} seconds") | |
| return markdown_text | |
| async def process_document(source: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str: | |
| """ | |
| High-level async function to process a PDF from a URL or local path. | |
| It downloads the file if a URL is provided, then uses the local parallel | |
| processor to convert it to markdown. | |
| Args: | |
| source: A local file path or a public URL to a PDF file. | |
| output_path: (Optional) The file path to save the output Markdown. | |
| chunk_size: The number of pages per chunk for parallel processing. | |
| Returns: | |
| A string containing the full markdown content of the PDF. | |
| """ | |
| # Check if the source is a URL | |
| is_url = source.lower().startswith('http://') or source.lower().startswith('https://') | |
| if is_url: | |
| print(f"โฌ๏ธ Downloading from URL: {source}") | |
| temp_pdf_file_path = None | |
| try: | |
| # Download the file asynchronously | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(source, timeout=60.0, follow_redirects=True) | |
| response.raise_for_status() | |
| # Save the downloaded content to a temporary file | |
| with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pdf') as temp_file: | |
| temp_file.write(response.content) | |
| temp_pdf_file_path = temp_file.name | |
| print(f"โ Download complete. Saved to temporary file: {temp_pdf_file_path}") | |
| # The parallel function is synchronous, so we run it in an executor | |
| # to avoid blocking the asyncio event loop. | |
| loop = asyncio.get_running_loop() | |
| markdown_content = await loop.run_in_executor( | |
| None, # Use the default thread pool executor | |
| pdf_to_markdown_parallel, | |
| temp_pdf_file_path, | |
| output_path, | |
| chunk_size | |
| ) | |
| return markdown_content | |
| finally: | |
| # Clean up the temporary file after processing | |
| if temp_pdf_file_path and os.path.exists(temp_pdf_file_path): | |
| os.remove(temp_pdf_file_path) | |
| print(f"๐๏ธ Cleaned up temporary file: {temp_pdf_file_path}") | |
| else: | |
| # If it's a local path, process it directly | |
| print(f"โ๏ธ Processing local file: {source}") | |
| loop = asyncio.get_running_loop() | |
| markdown_content = await loop.run_in_executor( | |
| None, | |
| pdf_to_markdown_parallel, | |
| source, | |
| output_path, | |
| chunk_size | |
| ) | |
| return markdown_content | |
| # Define the batch size for parallel processing | |
| BATCH_SIZE = 25 | |
| def process_page_batch(documents_batch: list[Document]) -> str: | |
| """ | |
| Helper function to extract content from a batch of LlamaIndex Document objects | |
| and join them into a single string. | |
| """ | |
| return "\n\n".join([d.get_content() for d in documents_batch]) | |
| async def download_and_parse_document_using_llama_index(doc_url: HttpUrl) -> str: | |
| """ | |
| Asynchronously downloads a document, saves it to a local directory, | |
| and then parses it using PyMuPDFReader from LlamaIndex. | |
| The parsing of page content is parallelized using a ThreadPoolExecutor | |
| with a specified batch size. | |
| Args: | |
| doc_url: The Pydantic-validated URL of the document to process. | |
| Returns: | |
| A single string containing the document's extracted text. | |
| """ | |
| print(f"Initiating download from: {doc_url}") | |
| try: | |
| # Create the local storage directory if it doesn't exist. | |
| LOCAL_STORAGE_DIR = "data/" | |
| os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True) | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(str(doc_url), timeout=30.0, follow_redirects=True) | |
| response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) | |
| doc_bytes = response.content | |
| print("Download successful.") | |
| # --- Logic to determine the local filename --- | |
| parsed_path = urlparse(str(doc_url)).path | |
| filename = unquote(os.path.basename(parsed_path)) | |
| if not filename: | |
| # If the URL doesn't provide a filename, create a generic one. | |
| # Ensure it's a PDF if we're using PyMuPDFReader for PDF parsing. | |
| filename = "downloaded_document.pdf" | |
| local_file_path = Path(os.path.join(LOCAL_STORAGE_DIR, filename)) | |
| # Save the downloaded document to the local file. | |
| with open(local_file_path, "wb") as f: | |
| f.write(doc_bytes) | |
| print(f"Document saved locally at: {local_file_path}") | |
| print("Parsing document with PyMuPDFReader...") | |
| # PyMuPDFReader parsing logic: loads the entire document and returns a list of Document objects (one per page) | |
| loader = PyMuPDFReader() | |
| docs0 = loader.load_data(file_path=Path(local_file_path)) | |
| # Measure time for parallel doc_text creation | |
| start_time_conversion = time.perf_counter() | |
| all_extracted_texts = [] | |
| # Use ThreadPoolExecutor for parallel processing of page batches | |
| # The max_workers argument can be adjusted based on available CPU cores | |
| with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor: | |
| futures = [] | |
| # Divide docs0 (list of pages) into batches of BATCH_SIZE | |
| for i in range(0, len(docs0), BATCH_SIZE): | |
| batch = docs0[i:i + BATCH_SIZE] | |
| # Submit each batch to the executor for processing | |
| futures.append(executor.submit(process_page_batch, batch)) | |
| # Collect results as they complete | |
| # as_completed returns futures as they finish, maintaining responsiveness | |
| for future in as_completed(futures): | |
| all_extracted_texts.append(future.result()) | |
| # Join all extracted texts from the batches into a single string | |
| doc_text = "\n\n".join(all_extracted_texts) | |
| end_time_conversion = time.perf_counter() | |
| elapsed_time_conversion = end_time_conversion - start_time_conversion | |
| print(f"Time taken for parallel doc_text creation: {elapsed_time_conversion:.6f} seconds.") | |
| # The extracted text is now in 'doc_text' | |
| if doc_text: | |
| print(f"Parsing complete. Extracted {len(doc_text)} characters.") | |
| # The local file is NOT deleted, as per original code. | |
| return doc_text | |
| else: | |
| raise ValueError("PyMuPDFReader did not extract any content.") | |
| except httpx.HTTPStatusError as e: | |
| print(f"Error downloading document: HTTP status error: {e}") | |
| raise | |
| except Exception as e: | |
| print(f"An unexpected error occurred during processing: {e}") | |
| raise |