Spaces:
Sleeping
Sleeping
# file: ingestion_router.py | |
import os | |
import time | |
import httpx | |
import zipfile | |
import io | |
import asyncio | |
from PIL import Image | |
from pathlib import Path | |
from urllib.parse import urlparse, unquote | |
from pydantic import HttpUrl | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
# --- Import our custom parsers --- | |
from pdf_parallel_parser import process_pdf_with_hybrid_parallel_sync | |
from complex_parser import process_image_element | |
# --- A simple parser for generic files (DOCX, etc.) using unstructured --- | |
from unstructured.partition.auto import partition | |
# --- Configuration --- | |
LOCAL_STORAGE_DIR = "data/" | |
# --- Synchronous, CPU-Bound Parsing Functions --- | |
def _process_generic_file_sync(file_content: bytes, filename: str) -> str: | |
"""Fallback parser for standard files like DOCX, PPTX, etc., using unstructured.""" | |
print(f"Processing '{filename}' with unstructured (standard)...") | |
try: | |
elements = partition(file=io.BytesIO(file_content), file_filename=filename) | |
return "\n\n".join([el.text for el in elements]) | |
except Exception as e: | |
print(f"Unstructured failed for {filename}: {e}") | |
return "" | |
def _process_zip_file_in_parallel(zip_content: bytes, temp_dir: Path) -> str: | |
"""Extracts and processes files from a ZIP archive in parallel.""" | |
print("Initiating parallel processing of ZIP archive...") | |
all_extracted_texts = [] | |
def process_single_zipped_file(zf: zipfile.ZipFile, file_info: zipfile.ZipInfo) -> str: | |
file_content = zf.read(file_info.filename) | |
file_extension = Path(file_info.filename).suffix.lower() | |
# Route to the appropriate synchronous parser | |
if file_extension == '.pdf': | |
temp_file_path = temp_dir / Path(file_info.filename).name | |
temp_file_path.parent.mkdir(parents=True, exist_ok=True) | |
temp_file_path.write_bytes(file_content) | |
return process_pdf_with_hybrid_parallel_sync(temp_file_path) | |
elif file_extension in ['.png', '.jpg', '.jpeg']: | |
return process_image_element(Image.open(io.BytesIO(file_content))) | |
elif file_extension in ['.docx', '.pptx', '.html']: | |
return _process_generic_file_sync(file_content, file_info.filename) | |
else: | |
print(f"Skipping unsupported file in ZIP: {file_info.filename}") | |
return "" | |
with zipfile.ZipFile(io.BytesIO(zip_content)) as zf: | |
file_list = [info for info in zf.infolist() if not info.is_dir()] | |
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor: | |
future_to_file = {executor.submit(process_single_zipped_file, zf, file_info): file_info for file_info in file_list} | |
for future in as_completed(future_to_file): | |
try: | |
text = future.result() | |
if text: | |
all_extracted_texts.append(f"--- Content from: {future_to_file[future].filename} ---\n{text}") | |
except Exception as e: | |
print(f"Error processing file '{future_to_file[future].filename}' from ZIP: {e}") | |
return "\n\n".join(all_extracted_texts) | |
# --- Main Asynchronous Ingestion and Routing Function --- | |
async def ingest_and_parse_document(doc_url: HttpUrl) -> str: | |
"""Asynchronously downloads and parses a document using the optimal strategy.""" | |
print(f"Initiating processing for URL: {doc_url}") | |
os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True) | |
start_time = time.perf_counter() | |
try: | |
async with httpx.AsyncClient() as client: | |
response = await client.get(str(doc_url), timeout=120.0, follow_redirects=True) | |
response.raise_for_status() | |
doc_bytes = response.content | |
print("Download successful.") | |
filename = unquote(os.path.basename(urlparse(str(doc_url)).path)) or "downloaded_file" | |
local_file_path = Path(LOCAL_STORAGE_DIR) / filename | |
file_extension = local_file_path.suffix.lower() | |
# Run the appropriate CPU-bound parsing function in a separate thread | |
if file_extension == '.pdf': | |
local_file_path.write_bytes(doc_bytes) | |
doc_text = await asyncio.to_thread(process_pdf_with_hybrid_parallel_sync, local_file_path) | |
elif file_extension == '.zip': | |
doc_text = await asyncio.to_thread(_process_zip_file_in_parallel, doc_bytes, Path(LOCAL_STORAGE_DIR)) | |
elif file_extension in ['.png', '.jpg', '.jpeg']: | |
image = Image.open(io.BytesIO(doc_bytes)) | |
doc_text = await asyncio.to_thread(process_image_element, image) | |
elif file_extension in ['.docx', '.pptx', '.html']: | |
doc_text = await asyncio.to_thread(_process_generic_file_sync, doc_bytes, filename) | |
else: | |
raise ValueError(f"Unsupported file type: {file_extension}") | |
elapsed_time = time.perf_counter() - start_time | |
print(f"Total processing time: {elapsed_time:.4f} seconds.") | |
if not doc_text.strip(): | |
raise ValueError("Document parsing yielded no content.") | |
return doc_text | |
except Exception as e: | |
print(f"An unexpected error occurred: {e}") | |
raise | |
# # Example of how to run the pipeline | |
# async def main(): | |
# # Example URL pointing to a PDF with tables | |
# pdf_url = HttpUrl("https://www.w3.org/WAI/WCAG21/working-examples/pdf-table-linearized/table.pdf") | |
# try: | |
# content = await ingest_and_parse_document(pdf_url) | |
# print("\n--- FINAL EXTRACTED CONTENT ---") | |
# print(content) | |
# except Exception as e: | |
# print(f"Pipeline failed: {e}") | |
# if __name__ == "__main__": | |
# asyncio.run(main()) |