PercivalFletcher commited on
Commit
0b314ed
·
verified ·
1 Parent(s): 6e103a7

Create processing_utility.py

Browse files
Files changed (1) hide show
  1. processing_utility.py +380 -0
processing_utility.py ADDED
@@ -0,0 +1,380 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import httpx # An asynchronous HTTP client.
2
+ import os # To handle file paths and create directories.
3
+ import asyncio # To run synchronous libraries in an async environment.
4
+ from urllib.parse import unquote, urlparse # To get the filename from the URL.
5
+ import uuid # To generate unique filenames if needed.
6
+
7
+ from pydantic import HttpUrl
8
+ from langchain_pymupdf4llm import PyMuPDF4LLMLoader
9
+ import json
10
+ import re
11
+ from langchain.text_splitter import RecursiveCharacterTextSplitter
12
+ from langchain.schema import Document
13
+ import os
14
+ import time
15
+ import fitz # PyMuPDF
16
+ import httpx
17
+ import tempfile
18
+ import asyncio
19
+ import concurrent.futures
20
+ from typing import Optional
21
+ from pathlib import Path
22
+
23
+ import os
24
+ import argparse
25
+ from typing import Optional
26
+
27
+ from urllib.parse import urlparse, unquote
28
+ from llama_index.readers.file import PyMuPDFReader
29
+ from llama_index.core import Document # Make sure Document is imported if used elsewhere
30
+ from pydantic import HttpUrl # Assuming pydantic is installed for HttpUrl type hint
31
+ from concurrent.futures import ThreadPoolExecutor, as_completed
32
+
33
+ # Ensure required libraries are installed.
34
+ # You can install them using:
35
+ # pip install llama_cloud_services pydantic python-dotenv
36
+
37
+ from llama_cloud_services import LlamaExtract
38
+ from pydantic import BaseModel, Field
39
+ from dotenv import load_dotenv
40
+ import tempfile
41
+ from pathlib import Path
42
+
43
+ from llama_index.readers.file import PDFReader
44
+ from llama_index.readers.file import PyMuPDFReader
45
+
46
+ import PyPDF2
47
+
48
+ # Global variable for the extractor agent
49
+ llama_extract_agent = None
50
+
51
+
52
+ class Insurance(BaseModel):
53
+ """
54
+ A Pydantic model to define the data schema for extraction.
55
+ The description helps guide the AI model.
56
+ """
57
+ headings: str = Field(description="An array of headings")
58
+
59
+
60
+ class Insurance(BaseModel):
61
+ headings: str = Field(description="An array of headings")
62
+
63
+ def initialize_llama_extract_agent():
64
+ global llama_extract_agent
65
+ if llama_extract_agent is None:
66
+ print("Initializing LlamaExtract client and getting agent...")
67
+ try:
68
+ extractor = LlamaExtract()
69
+ llama_extract_agent = extractor.get_agent(name="insurance-parser")
70
+ print("LlamaExtract agent initialized.")
71
+ except Exception as e:
72
+ print(f"Error initializing LlamaExtract agent: {e}")
73
+ llama_extract_agent = None # Ensure it's None if there was an error
74
+
75
+
76
+ def extract_schema_from_file(file_path: str) -> Optional[Insurance]:
77
+ if not os.path.exists(file_path):
78
+ print(f"❌ Error: The file '{file_path}' was not found.")
79
+ return None
80
+
81
+ if llama_extract_agent is None:
82
+ print("LlamaExtract agent not initialized. Attempting to initialize now.")
83
+ initialize_llama_extract_agent()
84
+ if llama_extract_agent is None:
85
+ print("LlamaExtract agent failed to initialize. Cannot proceed with extraction.")
86
+ return None
87
+
88
+ print(f"🚀 Sending '{file_path}' to LlamaCloud for schema extraction...")
89
+
90
+ try:
91
+ result = llama_extract_agent.extract(file_path)
92
+
93
+ if result and result.data:
94
+ print("✅ Extraction successful!")
95
+ return result.data
96
+ else:
97
+ print("⚠️ Extraction did not return any data.")
98
+ return None
99
+
100
+ except Exception as e:
101
+ print(f"\n❌ An error occurred during the API call: {e}")
102
+ print("Please check your API key, network connection, and file format.")
103
+ return None
104
+
105
+
106
+ def process_pdf_chunk(chunk_path: str) -> str:
107
+ """
108
+ Worker function for the ProcessPoolExecutor.
109
+
110
+ It loads a single PDF chunk using PyMuPDF4LLMLoader and returns the
111
+ extracted markdown content, prepending the original page number.
112
+
113
+ Args:
114
+ chunk_path: The file path to a temporary PDF chunk.
115
+ Returns:
116
+ A string containing the markdown content for the chunk.
117
+ """
118
+ try:
119
+ # Load the document chunk using LangChain's loader
120
+ loader = PyMuPDF4LLMLoader(chunk_path)
121
+ documents = loader.load()
122
+
123
+ page_contents = []
124
+ for doc in documents:
125
+ # Reconstruct the original page number from the filename for context
126
+ original_page_in_doc = int(doc.metadata.get('page', -1))
127
+ base_name = os.path.basename(chunk_path)
128
+ # Filename format is "chunk_START-END.pdf"
129
+ start_page_of_chunk = int(base_name.split('_')[1].split('-')[0])
130
+
131
+ # The final page number is the start of the chunk + the page within the chunk doc
132
+ actual_page_num = start_page_of_chunk + original_page_in_doc
133
+
134
+ page_contents.append(f"\n## Page {actual_page_num}\n{doc.page_content.strip()}")
135
+
136
+ return "".join(page_contents)
137
+ except Exception as e:
138
+ # Return an error message if a chunk fails to process
139
+ return f"Error processing chunk {os.path.basename(chunk_path)}: {e}"
140
+
141
+
142
+ def pdf_to_markdown_parallel(pdf_path: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str:
143
+ """
144
+ Splits a local PDF into chunks, processes them in parallel using multiple
145
+ CPU cores, and then combines the results.
146
+ This method is ideal for very large documents and runs entirely locally.
147
+ Args:
148
+ pdf_path: The file path to the local PDF.
149
+ output_path: (Optional) The file path to save the output Markdown.
150
+ chunk_size: The number of pages to include in each parallel chunk.
151
+ Returns:
152
+ A string containing the full markdown content of the PDF.
153
+ """
154
+ start_time = time.monotonic()
155
+
156
+ if not os.path.exists(pdf_path):
157
+ raise FileNotFoundError(f"❌ The file '{pdf_path}' was not found.")
158
+
159
+ temp_dir = "temp_pdf_chunks"
160
+ os.makedirs(temp_dir, exist_ok=True)
161
+ chunk_paths = []
162
+ markdown_text = ""
163
+
164
+ try:
165
+ # Step 1: Split the source PDF into smaller chunk files
166
+ print(f"Splitting '{os.path.basename(pdf_path)}' into chunks of {chunk_size} pages...")
167
+ doc = fitz.open(pdf_path)
168
+ num_pages = len(doc)
169
+ for i in range(0, num_pages, chunk_size):
170
+ chunk_start = i
171
+ chunk_end = min(i + chunk_size, num_pages)
172
+
173
+ # Create a new blank PDF and insert pages from the source
174
+ with fitz.open() as chunk_doc:
175
+ chunk_doc.insert_pdf(doc, from_page=chunk_start, to_page=chunk_end - 1)
176
+ # Naming convention includes page numbers for sorting
177
+ chunk_path = os.path.join(temp_dir, f"chunk_{chunk_start + 1}-{chunk_end}.pdf")
178
+ chunk_doc.save(chunk_path)
179
+ chunk_paths.append(chunk_path)
180
+ doc.close()
181
+ print(f"✅ Successfully created {len(chunk_paths)} PDF chunks.")
182
+
183
+ # Step 2: Process the chunks in parallel
184
+ print(f"Processing chunks in parallel using up to {os.cpu_count()} CPU cores...")
185
+ results = {}
186
+ with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
187
+ # Map each future to its corresponding chunk path
188
+ future_to_chunk = {executor.submit(process_pdf_chunk, path): path for path in chunk_paths}
189
+
190
+ for future in concurrent.futures.as_completed(future_to_chunk):
191
+ chunk_path = future_to_chunk[future]
192
+ try:
193
+ # Store the result from the completed future
194
+ results[chunk_path] = future.result()
195
+ except Exception as e:
196
+ results[chunk_path] = f"Failed to process chunk {os.path.basename(chunk_path)}: {e}"
197
+
198
+ # Step 3: Combine results in the correct order
199
+ print("Combining processed chunks...")
200
+ # Sort results based on the original chunk path list to maintain order
201
+ sorted_results = [results[path] for path in chunk_paths]
202
+ markdown_text = "\n\n".join(sorted_results)
203
+
204
+ # Step 4: Save to file if an output path is provided
205
+ if output_path:
206
+ with open(output_path, "w", encoding="utf-8") as f:
207
+ f.write(markdown_text)
208
+ print(f"✅ Markdown saved to: {output_path}")
209
+
210
+ finally:
211
+ # Step 5: Clean up temporary chunk files and directory
212
+ # print("Cleaning up temporary files...")
213
+ for path in chunk_paths:
214
+ if os.path.exists(path):
215
+ os.remove(path)
216
+ if os.path.exists(temp_dir):
217
+ try:
218
+ if not os.listdir(temp_dir):
219
+ os.rmdir(temp_dir)
220
+ except OSError as e:
221
+ print(f"Could not remove temp directory '{temp_dir}': {e}")
222
+
223
+ end_time = time.monotonic()
224
+ print(f"⏱️ Total processing time: {end_time - start_time:.2f} seconds")
225
+ return markdown_text
226
+
227
+
228
+ async def process_document(source: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str:
229
+ """
230
+ High-level async function to process a PDF from a URL or local path.
231
+ It downloads the file if a URL is provided, then uses the local parallel
232
+ processor to convert it to markdown.
233
+ Args:
234
+ source: A local file path or a public URL to a PDF file.
235
+ output_path: (Optional) The file path to save the output Markdown.
236
+ chunk_size: The number of pages per chunk for parallel processing.
237
+ Returns:
238
+ A string containing the full markdown content of the PDF.
239
+ """
240
+ # Check if the source is a URL
241
+ is_url = source.lower().startswith('http://') or source.lower().startswith('https://')
242
+
243
+ if is_url:
244
+ print(f"⬇️ Downloading from URL: {source}")
245
+ temp_pdf_file_path = None
246
+ try:
247
+ # Download the file asynchronously
248
+ async with httpx.AsyncClient() as client:
249
+ response = await client.get(source, timeout=60.0, follow_redirects=True)
250
+ response.raise_for_status()
251
+
252
+ # Save the downloaded content to a temporary file
253
+ with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pdf') as temp_file:
254
+ temp_file.write(response.content)
255
+ temp_pdf_file_path = temp_file.name
256
+
257
+ print(f"✅ Download complete. Saved to temporary file: {temp_pdf_file_path}")
258
+
259
+ # The parallel function is synchronous, so we run it in an executor
260
+ # to avoid blocking the asyncio event loop.
261
+ loop = asyncio.get_running_loop()
262
+ markdown_content = await loop.run_in_executor(
263
+ None, # Use the default thread pool executor
264
+ pdf_to_markdown_parallel,
265
+ temp_pdf_file_path,
266
+ output_path,
267
+ chunk_size
268
+ )
269
+ return markdown_content
270
+ finally:
271
+ # Clean up the temporary file after processing
272
+ if temp_pdf_file_path and os.path.exists(temp_pdf_file_path):
273
+ os.remove(temp_pdf_file_path)
274
+ print(f"🗑️ Cleaned up temporary file: {temp_pdf_file_path}")
275
+ else:
276
+ # If it's a local path, process it directly
277
+ print(f"⚙️ Processing local file: {source}")
278
+ loop = asyncio.get_running_loop()
279
+ markdown_content = await loop.run_in_executor(
280
+ None,
281
+ pdf_to_markdown_parallel,
282
+ source,
283
+ output_path,
284
+ chunk_size
285
+ )
286
+ return markdown_content
287
+
288
+
289
+ # Define the batch size for parallel processing
290
+ BATCH_SIZE = 25
291
+
292
+ def process_page_batch(documents_batch: list[Document]) -> str:
293
+ """
294
+ Helper function to extract content from a batch of LlamaIndex Document objects
295
+ and join them into a single string.
296
+ """
297
+ return "\n\n".join([d.get_content() for d in documents_batch])
298
+
299
+ async def download_and_parse_document_using_llama_index(doc_url: HttpUrl) -> str:
300
+ """
301
+ Asynchronously downloads a document, saves it to a local directory,
302
+ and then parses it using PyMuPDFReader from LlamaIndex.
303
+ The parsing of page content is parallelized using a ThreadPoolExecutor
304
+ with a specified batch size.
305
+ Args:
306
+ doc_url: The Pydantic-validated URL of the document to process.
307
+ Returns:
308
+ A single string containing the document's extracted text.
309
+ """
310
+ print(f"Initiating download from: {doc_url}")
311
+ try:
312
+ # Create the local storage directory if it doesn't exist.
313
+ LOCAL_STORAGE_DIR = "data/"
314
+ os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True)
315
+
316
+ async with httpx.AsyncClient() as client:
317
+ response = await client.get(str(doc_url), timeout=30.0, follow_redirects=True)
318
+ response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
319
+ doc_bytes = response.content
320
+ print("Download successful.")
321
+
322
+ # --- Logic to determine the local filename ---
323
+ parsed_path = urlparse(str(doc_url)).path
324
+ filename = unquote(os.path.basename(parsed_path))
325
+ if not filename:
326
+ # If the URL doesn't provide a filename, create a generic one.
327
+ # Ensure it's a PDF if we're using PyMuPDFReader for PDF parsing.
328
+ filename = "downloaded_document.pdf"
329
+ local_file_path = Path(os.path.join(LOCAL_STORAGE_DIR, filename))
330
+
331
+ # Save the downloaded document to the local file.
332
+ with open(local_file_path, "wb") as f:
333
+ f.write(doc_bytes)
334
+ print(f"Document saved locally at: {local_file_path}")
335
+ print("Parsing document with PyMuPDFReader...")
336
+
337
+ # PyMuPDFReader parsing logic: loads the entire document and returns a list of Document objects (one per page)
338
+ loader = PyMuPDFReader()
339
+ docs0 = loader.load_data(file_path=Path(local_file_path))
340
+
341
+ # Measure time for parallel doc_text creation
342
+ start_time_conversion = time.perf_counter()
343
+
344
+ all_extracted_texts = []
345
+ # Use ThreadPoolExecutor for parallel processing of page batches
346
+ # The max_workers argument can be adjusted based on available CPU cores
347
+ with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
348
+ futures = []
349
+ # Divide docs0 (list of pages) into batches of BATCH_SIZE
350
+ for i in range(0, len(docs0), BATCH_SIZE):
351
+ batch = docs0[i:i + BATCH_SIZE]
352
+ # Submit each batch to the executor for processing
353
+ futures.append(executor.submit(process_page_batch, batch))
354
+
355
+ # Collect results as they complete
356
+ # as_completed returns futures as they finish, maintaining responsiveness
357
+ for future in as_completed(futures):
358
+ all_extracted_texts.append(future.result())
359
+
360
+ # Join all extracted texts from the batches into a single string
361
+ doc_text = "\n\n".join(all_extracted_texts)
362
+
363
+ end_time_conversion = time.perf_counter()
364
+ elapsed_time_conversion = end_time_conversion - start_time_conversion
365
+ print(f"Time taken for parallel doc_text creation: {elapsed_time_conversion:.6f} seconds.")
366
+
367
+ # The extracted text is now in 'doc_text'
368
+ if doc_text:
369
+ print(f"Parsing complete. Extracted {len(doc_text)} characters.")
370
+ # The local file is NOT deleted, as per original code.
371
+ return doc_text
372
+ else:
373
+ raise ValueError("PyMuPDFReader did not extract any content.")
374
+
375
+ except httpx.HTTPStatusError as e:
376
+ print(f"Error downloading document: HTTP status error: {e}")
377
+ raise
378
+ except Exception as e:
379
+ print(f"An unexpected error occurred during processing: {e}")
380
+ raise