Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
PDF Document Ingestion Script | |
This script processes complex PDF documents (like medical textbooks), extracts text and images, | |
chunks them intelligently, generates vector embeddings using state-of-the-art local models, | |
and stores them in a local ChromaDB vector database. | |
Author: Expert Python Developer | |
Python Version: 3.9+ | |
""" | |
import os | |
import uuid | |
import hashlib | |
from pathlib import Path | |
from typing import List, Dict, Any, Optional, Tuple | |
import logging | |
# Third-party imports | |
from tqdm import tqdm | |
from sentence_transformers import SentenceTransformer | |
import chromadb | |
from chromadb.config import Settings | |
from unstructured.partition.pdf import partition_pdf | |
from PIL import Image | |
import io | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# ============================================================================= | |
# CONFIGURATION SECTION | |
# ============================================================================= | |
# Input/Output Paths | |
SOURCE_DIRECTORY = "/home/tony/pdf_docs" # Directory containing PDF files to process | |
DB_PATH = "/home/tony/chromadb" # Path for persistent ChromaDB database | |
IMAGE_OUTPUT_DIRECTORY = "/home/tony/extracted_images" # Path for storing extracted images | |
# Model Configuration | |
TEXT_EMBEDDING_MODEL = "BAAI/bge-m3" # State-of-the-art text embedding model | |
IMAGE_EMBEDDING_MODEL = "clip-ViT-B-32" # CLIP model for image embeddings | |
# Database Configuration | |
COLLECTION_NAME = "medical_library" # ChromaDB collection name | |
# Processing Configuration | |
BATCH_SIZE = 100 # Number of chunks to process in each batch | |
MAX_CHUNK_SIZE = 1000 # Maximum characters per text chunk | |
# ============================================================================= | |
# INITIALIZATION FUNCTIONS | |
# ============================================================================= | |
def initialize_chromadb() -> Tuple[chromadb.Client, chromadb.Collection]: | |
""" | |
Initialize and return the ChromaDB client and collection. | |
Returns: | |
Tuple[chromadb.Client, chromadb.Collection]: The client and collection objects | |
""" | |
try: | |
# Ensure database directory exists | |
os.makedirs(DB_PATH, exist_ok=True) | |
# Initialize ChromaDB client with persistent storage | |
client = chromadb.PersistentClient( | |
path=DB_PATH, | |
settings=Settings( | |
anonymized_telemetry=False, | |
allow_reset=True | |
) | |
) | |
# Get or create collection | |
try: | |
collection = client.get_collection(name=COLLECTION_NAME) | |
logger.info(f"Using existing collection: {COLLECTION_NAME}") | |
except chromadb.errors.NotFoundError: | |
collection = client.create_collection( | |
name=COLLECTION_NAME, | |
metadata={"description": "Medical textbook PDF content with embeddings"} | |
) | |
logger.info(f"Created new collection: {COLLECTION_NAME}") | |
return client, collection | |
except Exception as e: | |
logger.error(f"Failed to initialize ChromaDB: {e}") | |
raise | |
def initialize_models() -> Tuple[SentenceTransformer, SentenceTransformer]: | |
""" | |
Load and return the text and image embedding models. | |
Returns: | |
Tuple[SentenceTransformer, SentenceTransformer]: Text and image models | |
""" | |
try: | |
logger.info("Loading text embedding model...") | |
text_model = SentenceTransformer(TEXT_EMBEDDING_MODEL) | |
logger.info("Loading image embedding model...") | |
image_model = SentenceTransformer(IMAGE_EMBEDDING_MODEL) | |
logger.info("Models loaded successfully!") | |
return text_model, image_model | |
except Exception as e: | |
logger.error(f"Failed to load models: {e}") | |
raise | |
def ensure_directories() -> None: | |
""" | |
Ensure all required directories exist. | |
""" | |
try: | |
os.makedirs(SOURCE_DIRECTORY, exist_ok=True) | |
os.makedirs(IMAGE_OUTPUT_DIRECTORY, exist_ok=True) | |
os.makedirs(DB_PATH, exist_ok=True) | |
logger.info("All directories verified/created successfully") | |
except Exception as e: | |
logger.error(f"Failed to create directories: {e}") | |
raise | |
# ============================================================================= | |
# DEDUPLICATION FUNCTIONS | |
# ============================================================================= | |
def calculate_file_hash(file_path: str) -> str: | |
""" | |
Calculate SHA-256 hash of a file for deduplication. | |
Args: | |
file_path (str): Path to the file | |
Returns: | |
str: SHA-256 hash of the file | |
""" | |
hash_sha256 = hashlib.sha256() | |
with open(file_path, "rb") as f: | |
for chunk in iter(lambda: f.read(4096), b""): | |
hash_sha256.update(chunk) | |
return hash_sha256.hexdigest() | |
def is_pdf_already_processed(pdf_path: str, collection: chromadb.Collection) -> bool: | |
""" | |
Check if a PDF has already been processed by checking its hash in the database. | |
Args: | |
pdf_path (str): Path to the PDF file | |
collection (chromadb.Collection): ChromaDB collection | |
Returns: | |
bool: True if already processed, False otherwise | |
""" | |
try: | |
file_hash = calculate_file_hash(pdf_path) | |
# Query the collection for any document with this file hash | |
result = collection.get(where={"file_hash": file_hash}, limit=1) | |
if len(result['ids']) > 0: | |
pdf_filename = Path(pdf_path).name | |
logger.info(f"PDF {pdf_filename} already processed (hash: {file_hash[:12]}...). Skipping.") | |
return True | |
return False | |
except Exception as e: | |
logger.warning(f"Error checking if PDF is already processed: {e}") | |
return False | |
# ============================================================================= | |
# DOCUMENT PROCESSING FUNCTIONS | |
# ============================================================================= | |
def process_pdf( | |
pdf_path: str, | |
text_model: SentenceTransformer, | |
image_model: SentenceTransformer, | |
collection: chromadb.Collection | |
) -> None: | |
""" | |
Process a single PDF file and store chunks in ChromaDB. | |
Args: | |
pdf_path (str): Path to the PDF file | |
text_model (SentenceTransformer): Text embedding model | |
image_model (SentenceTransformer): Image embedding model | |
collection (chromadb.Collection): ChromaDB collection | |
""" | |
try: | |
pdf_filename = Path(pdf_path).name | |
logger.info(f"Processing PDF: {pdf_filename}") | |
# Calculate file hash for deduplication | |
file_hash = calculate_file_hash(pdf_path) | |
# Parse PDF with unstructured | |
elements = partition_pdf( | |
filename=pdf_path, | |
strategy="hi_res", | |
extract_images_in_pdf=True, | |
infer_table_structure=True | |
) | |
if not elements: | |
logger.warning(f"No elements extracted from {pdf_filename}") | |
return | |
# Generate chunks from elements | |
chunks = create_chunks_from_elements(elements, pdf_filename, file_hash) | |
if not chunks: | |
logger.warning(f"No chunks created from {pdf_filename}") | |
return | |
# Process chunks in batches | |
process_chunks_in_batches(chunks, text_model, image_model, collection) | |
logger.info(f"Successfully processed {pdf_filename}: {len(chunks)} chunks") | |
except Exception as e: | |
logger.error(f"Error processing PDF {pdf_path}: {e}") | |
raise | |
def create_chunks_from_elements(elements: List, pdf_filename: str, file_hash: str) -> List[Dict[str, Any]]: | |
""" | |
Create chunks from unstructured elements (let unstructured handle the intelligent parsing). | |
Args: | |
elements (List): List of unstructured elements | |
pdf_filename (str): Name of the source PDF file | |
file_hash (str): SHA-256 hash of the PDF file for deduplication | |
Returns: | |
List[Dict[str, Any]]: List of chunk dictionaries | |
""" | |
chunks = [] | |
for i, element in enumerate(elements): | |
try: | |
element_type = element.category | |
page_number = getattr(element.metadata, 'page_number', 1) | |
# Handle image elements | |
if element_type == "Image" and hasattr(element, 'image_bytes'): | |
# Save image and create image chunk | |
image_path = save_image(element.image_bytes, pdf_filename, i) | |
if image_path: | |
chunks.append({ | |
'id': f"{pdf_filename}_img_{i}", | |
'content': image_path, | |
'type': 'image', | |
'metadata': { | |
'source_file': pdf_filename, | |
'page_number': page_number, | |
'element_type': element_type, | |
'image_path': image_path, | |
'file_hash': file_hash | |
} | |
}) | |
# Handle all text elements as individual chunks (unstructured already did the intelligent parsing) | |
else: | |
text_content = str(element).strip() | |
if text_content and len(text_content) > 20: # Skip very short fragments | |
chunks.append({ | |
'id': f"{pdf_filename}_text_{i}", | |
'content': text_content, | |
'type': 'text', | |
'metadata': { | |
'source_file': pdf_filename, | |
'page_number': page_number, | |
'element_type': element_type, | |
'file_hash': file_hash | |
} | |
}) | |
except Exception as e: | |
logger.warning(f"Error processing element {i}: {e}") | |
continue | |
return chunks | |
def save_image(image_bytes: bytes, pdf_filename: str, chunk_index: int) -> Optional[str]: | |
""" | |
Save image bytes to file and return the path. | |
Args: | |
image_bytes (bytes): Raw image data | |
pdf_filename (str): Source PDF filename | |
chunk_index (int): Index of the chunk | |
Returns: | |
Optional[str]: Path to saved image or None if failed | |
""" | |
try: | |
# Create unique filename | |
image_filename = f"{Path(pdf_filename).stem}_{chunk_index}_{uuid.uuid4().hex[:8]}.png" | |
image_path = os.path.join(IMAGE_OUTPUT_DIRECTORY, image_filename) | |
# Convert and save image | |
image = Image.open(io.BytesIO(image_bytes)) | |
image.save(image_path, format='PNG') | |
return image_path | |
except Exception as e: | |
logger.warning(f"Failed to save image: {e}") | |
return None | |
def process_chunks_in_batches( | |
chunks: List[Dict[str, Any]], | |
text_model: SentenceTransformer, | |
image_model: SentenceTransformer, | |
collection: chromadb.Collection | |
) -> None: | |
""" | |
Process chunks in batches and store in ChromaDB. | |
Args: | |
chunks (List[Dict[str, Any]]): List of chunks to process | |
text_model (SentenceTransformer): Text embedding model | |
image_model (SentenceTransformer): Image embedding model | |
collection (chromadb.Collection): ChromaDB collection | |
""" | |
for i in range(0, len(chunks), BATCH_SIZE): | |
batch = chunks[i:i + BATCH_SIZE] | |
try: | |
process_batch(batch, text_model, image_model, collection) | |
except Exception as e: | |
logger.error(f"Error processing batch {i//BATCH_SIZE + 1}: {e}") | |
# Continue with next batch instead of failing completely | |
continue | |
def process_batch( | |
batch: List[Dict[str, Any]], | |
text_model: SentenceTransformer, | |
image_model: SentenceTransformer, | |
collection: chromadb.Collection | |
) -> None: | |
""" | |
Process a single batch of chunks. | |
Args: | |
batch (List[Dict[str, Any]]): Batch of chunks to process | |
text_model (SentenceTransformer): Text embedding model | |
image_model (SentenceTransformer): Image embedding model | |
collection (chromadb.Collection): ChromaDB collection | |
""" | |
ids = [] | |
embeddings = [] | |
metadatas = [] | |
documents = [] | |
for chunk in batch: | |
try: | |
chunk_id = chunk['id'] | |
content = chunk['content'] | |
chunk_type = chunk['type'] | |
metadata = chunk['metadata'] | |
# Generate embedding based on type | |
if chunk_type == 'text': | |
embedding = text_model.encode(content).tolist() | |
document = content | |
elif chunk_type == 'image': | |
# For images, encode the image file | |
if os.path.exists(content): | |
embedding = image_model.encode(Image.open(content)).tolist() | |
document = f"Image from {metadata['source_file']} page {metadata['page_number']}" | |
else: | |
logger.warning(f"Image file not found: {content}") | |
continue | |
else: | |
logger.warning(f"Unknown chunk type: {chunk_type}") | |
continue | |
ids.append(chunk_id) | |
embeddings.append(embedding) | |
metadatas.append(metadata) | |
documents.append(document) | |
except Exception as e: | |
logger.warning(f"Error processing chunk {chunk.get('id', 'unknown')}: {e}") | |
continue | |
# Add batch to collection | |
if ids: | |
try: | |
collection.add( | |
ids=ids, | |
embeddings=embeddings, | |
metadatas=metadatas, | |
documents=documents | |
) | |
logger.debug(f"Added batch of {len(ids)} chunks to database") | |
except Exception as e: | |
logger.error(f"Error adding batch to database: {e}") | |
raise | |
# ============================================================================= | |
# MAIN EXECUTION | |
# ============================================================================= | |
def main(): | |
""" | |
Main execution function. | |
""" | |
try: | |
logger.info("Starting PDF ingestion process...") | |
# Ensure directories exist | |
ensure_directories() | |
# Initialize models and database | |
logger.info("Initializing models and database...") | |
text_model, image_model = initialize_models() | |
client, collection = initialize_chromadb() | |
# Get list of PDF files | |
pdf_files = [] | |
if os.path.exists(SOURCE_DIRECTORY): | |
pdf_files = [f for f in os.listdir(SOURCE_DIRECTORY) if f.lower().endswith('.pdf')] | |
if not pdf_files: | |
logger.warning(f"No PDF files found in {SOURCE_DIRECTORY}") | |
logger.info("Please add PDF files to the source directory and run again.") | |
return | |
logger.info(f"Found {len(pdf_files)} PDF files to process") | |
# Process each PDF file with progress bar | |
with tqdm(pdf_files, desc="Processing PDFs") as pbar: | |
for pdf_file in pbar: | |
pdf_path = os.path.join(SOURCE_DIRECTORY, pdf_file) | |
pbar.set_description(f"Processing {pdf_file}") | |
# Check if this PDF has already been processed | |
if is_pdf_already_processed(pdf_path, collection): | |
continue | |
try: | |
process_pdf(pdf_path, text_model, image_model, collection) | |
except Exception as e: | |
logger.error(f"Failed to process {pdf_file}: {e}") | |
continue | |
# Get final statistics | |
try: | |
count = collection.count() | |
logger.info(f"Ingestion complete! Total chunks in database: {count}") | |
except Exception as e: | |
logger.warning(f"Could not get final count: {e}") | |
logger.info("PDF ingestion process completed successfully!") | |
except Exception as e: | |
logger.error(f"Fatal error in main execution: {e}") | |
raise | |
if __name__ == "__main__": | |
main() | |