""" Email and PDF Processing Module for Bank Statement Analysis """ import imaplib from email.message import Message import os import io import re import pandas as pd from typing import List, Dict, Optional, Tuple from dataclasses import dataclass from datetime import datetime, timedelta import PyPDF2 import fitz # PyMuPDF from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import logging @dataclass class BankTransaction: date: datetime description: str amount: float category: str = "Unknown" account: str = "" balance: Optional[float] = None @dataclass class StatementInfo: bank_name: str account_number: str statement_period: str transactions: List[BankTransaction] opening_balance: float closing_balance: float class EmailProcessor: def __init__(self, email_config: Dict): self.email_config = email_config self.logger = logging.getLogger(__name__) self.bank_patterns = { 'chase': r'chase\.com|jpmorgan', 'bofa': r'bankofamerica\.com|bofa', 'wells': r'wellsfargo\.com', 'citi': r'citi\.com|citibank', 'amex': r'americanexpress\.com|amex', 'hdfc': r'hdfcbank\.com', 'icici': r'icicibank\.com', 'sbi': r'sbi\.co\.in', 'axis': r'axisbank\.com', } async def connect_to_email(self) -> imaplib.IMAP4_SSL: """Connect to email server""" try: mail = imaplib.IMAP4_SSL(self.email_config['imap_server']) mail.login(self.email_config['email'], self.email_config['password']) return mail except Exception as e: self.logger.error(f"Failed to connect to email: {e}") raise async def fetch_bank_emails(self, days_back: int = 30) -> List[Message]: """Fetch emails from banks containing statements""" mail = await self.connect_to_email() mail.select('inbox') # Calculate date range end_date = datetime.now() start_date = end_date - timedelta(days=days_back) # Search for bank emails bank_domains = '|'.join(self.bank_patterns.values()) search_criteria = f'(FROM "{bank_domains}" SINCE "{start_date.strftime("%d-%b-%Y")}")' try: status, messages = mail.search(None, search_criteria) email_ids = messages[0].split() emails = [] for email_id in email_ids[-50:]: # Limit to recent 50 emails status, msg_data = mail.fetch(email_id, '(RFC822)') msg = Message.from_bytes(msg_data[0][1]) emails.append(msg) return emails finally: mail.close() mail.logout() def identify_bank(self, sender_email: str) -> str: """Identify bank from sender email""" sender_lower = sender_email.lower() for bank, pattern in self.bank_patterns.items(): if re.search(pattern, sender_lower): return bank return "unknown" async def extract_attachments(self, msg: Message) -> List[Tuple[str, bytes, str]]: """Extract PDF attachments from email""" attachments = [] self.logger.debug(f"Processing message with type: {type(msg)}") for part in msg.walk(): self.logger.debug(f"Processing part with type: {type(part)}") try: if part.get_content_disposition() == 'attachment': filename = part.get_filename() if filename and filename.lower().endswith('.pdf'): content = part.get_payload(decode=True) attachments.append((filename, content, 'pdf')) except Exception as e: self.logger.error(f"Error processing part: {e}, Part type: {type(part)}") continue return attachments class PDFProcessor: def __init__(self): self.logger = logging.getLogger(__name__) self.transaction_patterns = { 'date': r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})', 'amount': r'([\$\-]?[\d,]+\.?\d{0,2})', 'description': r'([A-Za-z0-9\s\*\#\-_]+)' } async def process_pdf(self, pdf_content: bytes, password: Optional[str] = None) -> StatementInfo: """Process PDF bank statement""" try: # Try PyMuPDF first doc = fitz.open(stream=pdf_content, filetype="pdf") if doc.needs_pass and password: if not doc.authenticate(password): raise ValueError("Invalid PDF password") elif doc.needs_pass and not password: raise ValueError("PDF requires password") text = "" for page in doc: text += page.get_text() doc.close() return await self.parse_statement_text(text) except Exception as e: self.logger.error(f"Error processing PDF: {e}") # Fallback to PyPDF2 return await self.process_pdf_fallback(pdf_content, password) async def process_pdf_fallback(self, pdf_content: bytes, password: Optional[str] = None) -> StatementInfo: """Fallback PDF processing with PyPDF2""" try: pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_content)) if pdf_reader.is_encrypted: if password: pdf_reader.decrypt(password) else: raise ValueError("PDF requires password") text = "" for page in pdf_reader.pages: text += page.extract_text() return await self.parse_statement_text(text) except Exception as e: self.logger.error(f"Fallback PDF processing failed: {e}") raise async def parse_statement_text(self, text: str) -> StatementInfo: """Parse bank statement text to extract transactions""" lines = text.split('\n') transactions = [] # Bank-specific parsing logic bank_name = self.detect_bank_from_text(text) account_number = self.extract_account_number(text) statement_period = self.extract_statement_period(text) # Extract transactions based on patterns for line in lines: transaction = self.parse_transaction_line(line) if transaction: transactions.append(transaction) # Extract balances opening_balance = self.extract_opening_balance(text) closing_balance = self.extract_closing_balance(text) return StatementInfo( bank_name=bank_name, account_number=account_number, statement_period=statement_period, transactions=transactions, opening_balance=opening_balance, closing_balance=closing_balance ) def detect_bank_from_text(self, text: str) -> str: """Detect bank from statement text""" text_lower = text.lower() if 'chase' in text_lower or 'jpmorgan' in text_lower: return 'Chase' elif 'bank of america' in text_lower or 'bofa' in text_lower: return 'Bank of America' elif 'wells fargo' in text_lower: return 'Wells Fargo' elif 'citibank' in text_lower or 'citi' in text_lower: return 'Citibank' elif 'american express' in text_lower or 'amex' in text_lower: return 'American Express' return 'Unknown Bank' def extract_account_number(self, text: str) -> str: """Extract account number from statement""" # Look for account number patterns patterns = [ r'Account\s+(?:Number|#)?\s*:\s*(\*\+\d{4})', r'Account\s+(\d{4,})', r'(\*\+\d{4})' ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: return match.group(1) return "Unknown" def extract_statement_period(self, text: str) -> str: """Extract statement period""" # Look for date ranges pattern = r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\s*(?:to|through|-)\s*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})' match = re.search(pattern, text, re.IGNORECASE) if match: return f"{match.group(1)} to {match.group(2)}" return "Unknown Period" def parse_transaction_line(self, line: str) -> Optional[BankTransaction]: """Parse individual transaction line""" # Common transaction line patterns patterns = [ # Date, Description, Amount r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\s+(.+?)\s+([\$\-]?[\d,]+\.?\d{0,2})$', # Date, Amount, Description r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\s+([\$\-]?[\d,]+\.?\d{0,2})\s+(.+)$' ] for pattern in patterns: match = re.search(pattern, line.strip()) if match: try: date_str = match.group(1) if len(match.groups()) == 3: if '$' in match.group(2) or match.group(2).replace('-', '').replace('.', '').replace(',', '').isdigit(): # Pattern: Date, Amount, Description amount_str = match.group(2) description = match.group(3) else: # Pattern: Date, Description, Amount description = match.group(2) amount_str = match.group(3) # Parse date transaction_date = self.parse_date(date_str) # Parse amount amount = self.parse_amount(amount_str) # Categorize transaction category = self.categorize_transaction(description) return BankTransaction( date=transaction_date, description=description.strip(), amount=amount, category=category ) except Exception as e: self.logger.debug(f"Failed to parse transaction line: {line}, Error: {e}") continue return None def parse_date(self, date_str: str) -> datetime: """Parse date string to datetime object""" # Try different date formats formats = ['%m/%d/%Y', '%m-%d-%Y', '%m/%d/%y', '%m-%d-%y'] for fmt in formats: try: return datetime.strptime(date_str, fmt) except ValueError: continue # If all fails, return current date return datetime.now() def parse_amount(self, amount_str: str) -> float: """Parse amount string to float""" # Clean amount string clean_amount = amount_str.replace('$', '').replace(',', '').strip() # Handle negative amounts is_negative = clean_amount.startswith('-') or clean_amount.startswith('(') clean_amount = clean_amount.replace('-', '').replace('(', '').replace(')', '') try: amount = float(clean_amount) return -amount if is_negative else amount except ValueError: return 0.0 def categorize_transaction(self, description: str) -> str: """Categorize transaction based on description""" desc_lower = description.lower() categories = { 'Food & Dining': ['restaurant', 'mcdonalds', 'starbucks', 'food', 'dining', 'cafe', 'pizza'], 'Shopping': ['amazon', 'walmart', 'target', 'shopping', 'store', 'retail'], 'Gas & Transport': ['shell', 'exxon', 'gas', 'fuel', 'uber', 'lyft', 'taxi'], 'Utilities': ['electric', 'water', 'gas bill', 'internet', 'phone', 'utility'], 'Entertainment': ['netflix', 'spotify', 'movie', 'entertainment', 'gaming'], 'Healthcare': ['pharmacy', 'doctor', 'hospital', 'medical', 'health'], 'Banking': ['atm', 'fee', 'interest', 'transfer', 'deposit'] } for category, keywords in categories.items(): if any(keyword in desc_lower for keyword in keywords): return category return 'Other' def extract_opening_balance(self, text: str) -> float: """Extract opening balance from statement""" patterns = [ r'Beginning\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})', r'Opening\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})', r'Previous\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})' ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: return float(match.group(1).replace(',', '')) return 0.0 def extract_closing_balance(self, text: str) -> float: """Extract closing balance from statement""" patterns = [ r'Ending\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})', r'Closing\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})', r'Current\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})' ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: return float(match.group(1).replace(',', '')) return 0.0 # Example usage if __name__ == "__main__": # Test PDF processing pdf_processor = PDFProcessor() # Example test with sample PDF content print("PDF Processor initialized successfully")