"""
utils.py - Advanced utilities with improved processing and caching for LegalMind AI
"""
import os
import re
import hashlib
import json
import time
from typing import Callable, Dict, List, Optional, Any
from datetime import datetime
import shutil
# Langchain imports
from langchain_text_splitters import (
RecursiveCharacterTextSplitter,
MarkdownHeaderTextSplitter,
SentenceTransformersTokenTextSplitter
)
from langchain_community.document_loaders import PDFPlumberLoader, PyPDFLoader
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.schema import Document
# Import project configuration
from config import (
PDFS_DIR,
VECTORSTORE_DIR,
CACHE_DIR,
LOGS_DIR,
EMBEDDING_MODELS
)
def get_embedding_model(model_name="all-MiniLM-L6-v2"):
"""
Get configured embedding model with appropriate settings
Args:
model_name: Name of the HuggingFace model to use
Returns:
Configured embedding model
"""
if model_name not in EMBEDDING_MODELS:
print(f"Warning: Unknown embedding model {model_name}. Defaulting to all-MiniLM-L6-v2.")
model_name = "all-MiniLM-L6-v2"
try:
return HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True} # Improved retrieval with normalized embeddings
)
except Exception as e:
print(f"Error initializing embedding model {model_name}: {e}")
print("Falling back to all-MiniLM-L6-v2 model")
return HuggingFaceEmbeddings(
model_name="all-MiniLM-L6-v2",
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True}
)
def get_file_hash(file_path):
"""
Get a unique hash for a file to use for caching
Args:
file_path: Path to the file
Returns:
MD5 hash of the file
"""
hasher = hashlib.md5()
try:
with open(file_path, 'rb') as file:
buf = file.read(65536)
while len(buf) > 0:
hasher.update(buf)
buf = file.read(65536)
return hasher.hexdigest()
except Exception as e:
print(f"Error calculating file hash: {e}")
return None
def preprocess_text(text):
"""
Clean and simplify text to improve processing quality
Args:
text: Raw text to process
Returns:
Cleaned text
"""
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Fix common OCR errors
text = re.sub(r'(\w)- (\w)', r'\1\2', text) # Fix hyphenated words
# Clean up formatting artifacts
text = re.sub(r'\.{3,}', '...', text) # Normalize ellipses
text = re.sub(r'_{3,}', '___', text) # Normalize underscores
# Fix quotation marks
text = re.sub(r'``|\'\'', '"', text)
# Remove headers/footers (common in legal documents)
text = re.sub(r'^\s*page \d+\s*of \d+\s*$', '', text, flags=re.IGNORECASE | re.MULTILINE)
return text.strip()
def extract_metadata_from_pdf(file_path):
"""
Extract metadata from PDF file
Args:
file_path: Path to the PDF file
Returns:
Dictionary of metadata
"""
try:
from pypdf import PdfReader
reader = PdfReader(file_path)
metadata = {
"filename": os.path.basename(file_path),
"num_pages": len(reader.pages),
"pdf_info": {}
}
# Extract more specific metadata if available
if reader.metadata:
for key in reader.metadata:
if reader.metadata[key]:
metadata["pdf_info"][key.lower()] = str(reader.metadata[key])
return metadata
except Exception as e:
print(f"Error extracting PDF metadata: {e}")
return {
"filename": os.path.basename(file_path),
"error": str(e)
}
def get_text_splitter(chunking_method="standard", chunk_size=1500, chunk_overlap=150):
"""
Get appropriate text splitter based on method
Args:
chunking_method: Method for splitting text ("standard" or "semantic")
chunk_size: Size of each chunk
chunk_overlap: Overlap between chunks
Returns:
Configured text splitter
"""
if chunking_method.lower() == "semantic":
# Semantic chunking is better for preserving meaning across chunks
try:
return SentenceTransformersTokenTextSplitter(
model_name="all-MiniLM-L6-v2",
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
except Exception as e:
print(f"Error initializing semantic text splitter: {e}. Falling back to standard.")
return RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
add_start_index=True
)
else:
# Standard recursive splitter
return RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
add_start_index=True
)
def detect_document_structure(text):
"""
Detect if a document has a specific structure (like legal sections)
Args:
text: Document text
Returns:
Dictionary with structure information
"""
structure_info = {
"has_sections": False,
"has_headers": False,
"section_pattern": None
}
# Check for section patterns common in legal documents
section_patterns = [
r'Section \d+\.',
r'§ \d+',
r'Article \d+\.',
r'Chapter \d+\.'
]
for pattern in section_patterns:
if re.search(pattern, text):
structure_info["has_sections"] = True
structure_info["section_pattern"] = pattern
break
# Check for markdown-style headers
if re.search(r'#+\s+\w+', text):
structure_info["has_headers"] = True
return structure_info
def process_pdf(
file_path,
progress_callback=None,
max_pages=0,
sample_mode=False,
chunking_method="standard",
embedding_model="all-MiniLM-L6-v2",
force_reprocess=False,
chunk_size=1500,
chunk_overlap=150
):
"""
Load and process a PDF file with enhanced features
Args:
file_path: Path to the PDF file
progress_callback: Function to call with progress updates (0-100)
max_pages: Maximum number of pages to process (0 for all)
sample_mode: Whether to use sample mode (process only sample pages)
chunking_method: Method for splitting text ("standard" or "semantic")
embedding_model: Name of the embedding model to use
force_reprocess: Force reprocessing even if cached version exists
chunk_size: Size of each chunk
chunk_overlap: Overlap between chunks
Returns:
FAISS vector database or None if processing failed
"""
if not os.path.exists(file_path):
if progress_callback:
progress_callback(f"File not found: {file_path}", -1)
return None
try:
# Start timing for performance metrics
start_time = time.time()
# Check for cached version first
file_hash = get_file_hash(file_path)
if not file_hash:
if progress_callback:
progress_callback("Could not calculate file hash", -1)
return None
# Build cache path with all parameters
cache_parts = [file_hash]
if sample_mode:
cache_parts.append("sample")
if max_pages > 0:
cache_parts.append(f"max{max_pages}")
cache_parts.append(chunking_method)
cache_parts.append(embedding_model.replace("-", "_"))
cache_name = "_".join(cache_parts)
cache_path = os.path.join(VECTORSTORE_DIR, cache_name)
# Check for cached version unless force reprocessing
if not force_reprocess and os.path.exists(cache_path + ".faiss"):
try:
if progress_callback:
progress_callback("Loading from cache...", 20)
# Get the embedding model
embeddings = get_embedding_model(embedding_model)
# Load from cache
vector_db = FAISS.load_local(cache_path, embeddings)
if progress_callback:
progress_callback("Successfully loaded from cache", 100)
# Log cache hit
print(f"Cache hit for {file_path} with parameters: {cache_name}")
return vector_db
except Exception as e:
print(f"Error loading from cache: {e}, proceeding with normal processing")
# If loading fails, proceed with normal processing
# Extract metadata from PDF
if progress_callback:
progress_callback("Extracting document metadata...", 5)
pdf_metadata = extract_metadata_from_pdf(file_path)
# Update progress
if progress_callback:
progress_callback("Loading document...", 10)
# Try different PDF loaders in case one fails
documents = []
try:
loader = PDFPlumberLoader(file_path)
documents = loader.load()
except Exception as e:
print(f"PDFPlumberLoader failed, trying PyPDFLoader: {e}")
try:
loader = PyPDFLoader(file_path)
documents = loader.load()
except Exception as e2:
if progress_callback:
progress_callback(f"Error loading PDF with multiple loaders: {e2}", -1)
return None
if not documents:
if progress_callback:
progress_callback("No content found in PDF", -1)
return None
# Apply page limits and sampling
total_pages = len(documents)
if max_pages > 0 and total_pages > max_pages:
documents = documents[:max_pages]
processed_pages = max_pages
elif sample_mode:
# In sample mode, take a representative sample throughout the document
sample_size = min(10, total_pages)
# Take pages from throughout the document, not just the beginning
if total_pages > sample_size:
step = total_pages // sample_size
sample_indices = [i * step for i in range(sample_size)]
documents = [documents[i] for i in sample_indices if i < total_pages]
processed_pages = len(documents)
else:
processed_pages = total_pages
# Add progress update
if progress_callback:
progress_callback(f"Processing {processed_pages} pages...", 20)
# Apply document preprocessing to improve quality
for i, doc in enumerate(documents):
# Add page information to metadata
if not hasattr(doc, 'metadata'):
doc.metadata = {}
doc.metadata['page'] = i + 1
doc.metadata['source'] = os.path.basename(file_path)
# Preprocess the content
doc.page_content = preprocess_text(doc.page_content)
# Progress update for large documents
if progress_callback and i % 10 == 0 and total_pages > 20:
progress_percentage = 20 + int((i / total_pages) * 20)
progress_callback(f"Preprocessing page {i+1}/{total_pages}...", progress_percentage)
# Update progress
if progress_callback:
progress_callback("Analyzing document structure...", 40)
# Join all text for structure analysis
all_text = "\n".join([doc.page_content for doc in documents])
structure_info = detect_document_structure(all_text)
# Get appropriate text splitter based on document structure
if progress_callback:
progress_callback(f"Splitting into chunks using {chunking_method} method...", 45)
text_splitter = get_text_splitter(
chunking_method=chunking_method,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
# Split documents into chunks
text_chunks = text_splitter.split_documents(documents)
if not text_chunks:
if progress_callback:
progress_callback("No chunks created from document", -1)
return None
# Add progress update
if progress_callback:
progress_callback(f"Created {len(text_chunks)} chunks. Creating embeddings...", 60)
# Create vector database with the selected embedding model
try:
embeddings = get_embedding_model(embedding_model)
vector_db = FAISS.from_documents(
text_chunks,
embeddings,
normalize_L2=True # Adds performance optimization
)
except Exception as e:
if progress_callback:
progress_callback(f"Error creating embeddings: {e}", -1)
return None
# Add metadata to the vector database
processing_metadata = {
"document": pdf_metadata,
"processing": {
"timestamp": datetime.now().isoformat(),
"num_pages": processed_pages,
"total_pages": total_pages,
"num_chunks": len(text_chunks),
"embedding_model": embedding_model,
"chunking_method": chunking_method,
"sample_mode": sample_mode,
"max_pages": max_pages,
"processing_time": time.time() - start_time,
"document_structure": structure_info
}
}
# Save metadata
metadata_path = f"{cache_path}_metadata.json"
try:
with open(metadata_path, "w") as f:
json.dump(processing_metadata, f, indent=2)
except Exception as e:
print(f"Warning: Could not save metadata: {e}")
# Update progress
if progress_callback:
progress_callback("Saving to cache...", 85)
# Save to cache for future use
try:
vector_db.save_local(cache_path)
except Exception as e:
print(f"Warning: Could not save to cache: {e}")
# Continue even if saving to cache fails
# Update progress
if progress_callback:
progress_callback("Processing complete!", 100)
return vector_db
except Exception as e:
print(f"Error processing PDF: {e}")
if progress_callback:
progress_callback(f"Error: {str(e)}", -1) # -1 indicates error
return None
def clean_response(response):
"""
Clean LLM response from any tags or extra formatting
Args:
response: Raw LLM response
Returns:
Cleaned text
"""
if response is None:
return "No response generated."
if hasattr(response, 'content'):
clean_text = response.content
else:
clean_text = str(response)
# Remove thinking tags and their content
clean_text = re.sub(r'.*?', '', clean_text, flags=re.DOTALL)
# Remove any remaining HTML tags
clean_text = re.sub(r'<[^>]+>', '', clean_text)
# Remove extra whitespace and newlines
clean_text = re.sub(r'\n\s*\n', '\n\n', clean_text.strip())
return clean_text
def format_legal_document(text, format_type="markdown"):
"""
Format a legal document for better readability
Args:
text: Raw document text
format_type: Output format ("markdown", "html", or "text")
Returns:
Formatted document
"""
# First, clean the text
text = preprocess_text(text)
# Replace section identifiers with formatted versions
# Section numbers
text = re.sub(r'(Section|SECTION)\s+(\d+)', r'## Section \2', text)
# Legal section symbols
text = re.sub(r'§\s*(\d+)', r'## § \1', text)
# Format case names (italics)
if format_type == "markdown":
text = re.sub(r'([A-Z][a-z]+\s+v\.\s+[A-Z][a-z]+)', r'*\1*', text)
elif format_type == "html":
text = re.sub(r'([A-Z][a-z]+\s+v\.\s+[A-Z][a-z]+)', r'\1', text)
# Format citations
citation_pattern = r'\(\d+\s+[A-Za-z\.]+\s+\d+(?:,\s*\d+)?\)'
if format_type == "markdown":
text = re.sub(citation_pattern, r'`\g<0>`', text)
elif format_type == "html":
text = re.sub(citation_pattern, r'\g<0>
', text)
return text
def list_cached_documents():
"""
List all cached documents
Returns:
List of dictionaries with document information
"""
documents = []
if not os.path.exists(VECTORSTORE_DIR):
return documents
# Get all FAISS files
for file in os.listdir(VECTORSTORE_DIR):
if file.endswith(".faiss"):
base_name = file[:-6] # Remove .faiss extension
# Look for metadata file
metadata_file = os.path.join(VECTORSTORE_DIR, f"{base_name}_metadata.json")
metadata = {}
if os.path.exists(metadata_file):
try:
with open(metadata_file, "r") as f:
metadata = json.load(f)
except Exception:
pass
document_info = {
"id": base_name,
"filename": metadata.get("document", {}).get("filename", base_name),
"processed_at": metadata.get("processing", {}).get("timestamp", "Unknown"),
"pages": metadata.get("processing", {}).get("num_pages", "Unknown"),
"chunks": metadata.get("processing", {}).get("num_chunks", "Unknown"),
"embedding_model": metadata.get("processing", {}).get("embedding_model", "Unknown")
}
documents.append(document_info)
return documents
def delete_cached_document(document_id):
"""
Delete a cached document
Args:
document_id: ID of the document to delete
Returns:
True if successful, False otherwise
"""
try:
# Delete .faiss file
faiss_file = os.path.join(VECTORSTORE_DIR, f"{document_id}.faiss")
if os.path.exists(faiss_file):
os.remove(faiss_file)
# Delete .pkl file
pkl_file = os.path.join(VECTORSTORE_DIR, f"{document_id}.pkl")
if os.path.exists(pkl_file):
os.remove(pkl_file)
# Delete metadata file
metadata_file = os.path.join(VECTORSTORE_DIR, f"{document_id}_metadata.json")
if os.path.exists(metadata_file):
os.remove(metadata_file)
return True
except Exception as e:
print(f"Error deleting cached document: {e}")
return False
def clear_cache():
"""
Clear all cache files
Returns:
Number of files deleted
"""
count = 0
try:
if os.path.exists(VECTORSTORE_DIR):
for file in os.listdir(VECTORSTORE_DIR):
file_path = os.path.join(VECTORSTORE_DIR, file)
if os.path.isfile(file_path):
os.remove(file_path)
count += 1
if os.path.exists(CACHE_DIR):
for file in os.listdir(CACHE_DIR):
file_path = os.path.join(CACHE_DIR, file)
if os.path.isfile(file_path):
os.remove(file_path)
count += 1
return count
except Exception as e:
print(f"Error clearing cache: {e}")
return count