Chai-Tea-Latte / ingestion_router.py
PercivalFletcher's picture
Upload 7 files
5abe5ee verified
# file: ingestion_router.py
import os
import time
import httpx
import zipfile
import io
import asyncio
from PIL import Image
from pathlib import Path
from urllib.parse import urlparse, unquote
from pydantic import HttpUrl
from concurrent.futures import ThreadPoolExecutor, as_completed
# --- Import our custom parsers ---
from pdf_parallel_parser import process_pdf_with_hybrid_parallel_sync
from complex_parser import process_image_element
# --- A simple parser for generic files (DOCX, etc.) using unstructured ---
from unstructured.partition.auto import partition
# --- Configuration ---
LOCAL_STORAGE_DIR = "data/"
# --- Synchronous, CPU-Bound Parsing Functions ---
def _process_generic_file_sync(file_content: bytes, filename: str) -> str:
"""Fallback parser for standard files like DOCX, PPTX, etc., using unstructured."""
print(f"Processing '{filename}' with unstructured (standard)...")
try:
elements = partition(file=io.BytesIO(file_content), file_filename=filename)
return "\n\n".join([el.text for el in elements])
except Exception as e:
print(f"Unstructured failed for {filename}: {e}")
return ""
def _process_zip_file_in_parallel(zip_content: bytes, temp_dir: Path) -> str:
"""Extracts and processes files from a ZIP archive in parallel."""
print("Initiating parallel processing of ZIP archive...")
all_extracted_texts = []
def process_single_zipped_file(zf: zipfile.ZipFile, file_info: zipfile.ZipInfo) -> str:
file_content = zf.read(file_info.filename)
file_extension = Path(file_info.filename).suffix.lower()
# Route to the appropriate synchronous parser
if file_extension == '.pdf':
temp_file_path = temp_dir / Path(file_info.filename).name
temp_file_path.parent.mkdir(parents=True, exist_ok=True)
temp_file_path.write_bytes(file_content)
return process_pdf_with_hybrid_parallel_sync(temp_file_path)
elif file_extension in ['.png', '.jpg', '.jpeg']:
return process_image_element(Image.open(io.BytesIO(file_content)))
elif file_extension in ['.docx', '.pptx', '.html']:
return _process_generic_file_sync(file_content, file_info.filename)
else:
print(f"Skipping unsupported file in ZIP: {file_info.filename}")
return ""
with zipfile.ZipFile(io.BytesIO(zip_content)) as zf:
file_list = [info for info in zf.infolist() if not info.is_dir()]
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
future_to_file = {executor.submit(process_single_zipped_file, zf, file_info): file_info for file_info in file_list}
for future in as_completed(future_to_file):
try:
text = future.result()
if text:
all_extracted_texts.append(f"--- Content from: {future_to_file[future].filename} ---\n{text}")
except Exception as e:
print(f"Error processing file '{future_to_file[future].filename}' from ZIP: {e}")
return "\n\n".join(all_extracted_texts)
# --- Main Asynchronous Ingestion and Routing Function ---
async def ingest_and_parse_document(doc_url: HttpUrl) -> str:
"""Asynchronously downloads and parses a document using the optimal strategy."""
print(f"Initiating processing for URL: {doc_url}")
os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True)
start_time = time.perf_counter()
try:
async with httpx.AsyncClient() as client:
response = await client.get(str(doc_url), timeout=120.0, follow_redirects=True)
response.raise_for_status()
doc_bytes = response.content
print("Download successful.")
filename = unquote(os.path.basename(urlparse(str(doc_url)).path)) or "downloaded_file"
local_file_path = Path(LOCAL_STORAGE_DIR) / filename
file_extension = local_file_path.suffix.lower()
# Run the appropriate CPU-bound parsing function in a separate thread
if file_extension == '.pdf':
local_file_path.write_bytes(doc_bytes)
doc_text = await asyncio.to_thread(process_pdf_with_hybrid_parallel_sync, local_file_path)
elif file_extension == '.zip':
doc_text = await asyncio.to_thread(_process_zip_file_in_parallel, doc_bytes, Path(LOCAL_STORAGE_DIR))
elif file_extension in ['.png', '.jpg', '.jpeg']:
image = Image.open(io.BytesIO(doc_bytes))
doc_text = await asyncio.to_thread(process_image_element, image)
elif file_extension in ['.docx', '.pptx', '.html']:
doc_text = await asyncio.to_thread(_process_generic_file_sync, doc_bytes, filename)
else:
raise ValueError(f"Unsupported file type: {file_extension}")
elapsed_time = time.perf_counter() - start_time
print(f"Total processing time: {elapsed_time:.4f} seconds.")
if not doc_text.strip():
raise ValueError("Document parsing yielded no content.")
return doc_text
except Exception as e:
print(f"An unexpected error occurred: {e}")
raise
# # Example of how to run the pipeline
# async def main():
# # Example URL pointing to a PDF with tables
# pdf_url = HttpUrl("https://www.w3.org/WAI/WCAG21/working-examples/pdf-table-linearized/table.pdf")
# try:
# content = await ingest_and_parse_document(pdf_url)
# print("\n--- FINAL EXTRACTED CONTENT ---")
# print(content)
# except Exception as e:
# print(f"Pipeline failed: {e}")
# if __name__ == "__main__":
# asyncio.run(main())