""" Email and PDF Processing Module for Bank Statement Analysis """ import imaplib from email.message import Message import os import io import re import pandas as pd from typing import List, Dict, Optional, Tuple from dataclasses import dataclass from datetime import datetime, timedelta import PyPDF2 import fitz # PyMuPDF from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import logging @dataclass class BankTransaction: date: datetime description: str amount: float category: str = "Unknown" account: str = "" balance: Optional[float] = None @dataclass class StatementInfo: bank_name: str account_number: str statement_period: str transactions: List[BankTransaction] opening_balance: float closing_balance: float class EmailProcessor: def __init__(self, email_config: Dict): self.email_config = email_config self.logger = logging.getLogger(__name__) self.bank_patterns = { 'chase': r'chase\.com|jpmorgan', 'bofa': r'bankofamerica\.com|bofa', 'wells': r'wellsfargo\.com', 'citi': r'citi\.com|citibank', 'amex': r'americanexpress\.com|amex', 'hdfc': r'hdfcbank\.com', 'icici': r'icicibank\.com', 'sbi': r'sbi\.co\.in', 'axis': r'axisbank\.com', } async def connect_to_email(self) -> imaplib.IMAP4_SSL: """Connect to email server""" try: mail = imaplib.IMAP4_SSL(self.email_config['imap_server']) mail.login(self.email_config['email'], self.email_config['password']) return mail except Exception as e: self.logger.error(f"Failed to connect to email: {e}") raise async def fetch_bank_emails(self, days_back: int = 30) -> List[Message]: """Fetch emails from banks containing statements""" mail = await self.connect_to_email() mail.select('inbox') # Calculate date range end_date = datetime.now() start_date = end_date - timedelta(days=days_back) # Search for bank emails bank_domains = '|'.join(self.bank_patterns.values()) search_criteria = f'(FROM "{bank_domains}" SINCE "{start_date.strftime("%d-%b-%Y")}")' try: status, messages = mail.search(None, search_criteria) email_ids = messages[0].split() emails = [] for email_id in email_ids[-50:]: # Limit to recent 50 emails status, msg_data = mail.fetch(email_id, '(RFC822)') msg = Message.from_bytes(msg_data[0][1]) emails.append(msg) return emails finally: mail.close() mail.logout() def identify_bank(self, sender_email: str) -> str: """Identify bank from sender email""" sender_lower = sender_email.lower() for bank, pattern in self.bank_patterns.items(): if re.search(pattern, sender_lower): return bank return "unknown" async def extract_attachments(self, msg: Message) -> List[Tuple[str, bytes, str]]: """Extract PDF attachments from email""" attachments = [] self.logger.debug(f"Processing message with type: {type(msg)}") for part in msg.walk(): self.logger.debug(f"Processing part with type: {type(part)}") try: if part.get_content_disposition() == 'attachment': filename = part.get_filename() if filename and filename.lower().endswith('.pdf'): content = part.get_payload(decode=True) attachments.append((filename, content, 'pdf')) except Exception as e: self.logger.error(f"Error processing part: {e}, Part type: {type(part)}") continue return attachments class PDFProcessor: def __init__(self): self.logger = logging.getLogger(__name__) self.transaction_patterns = { 'date': r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})', 'amount': r'([\$\-]?[\d,]+\.?\d{0,2})', 'description': r'([A-Za-z0-9\s\*\#\-_]+)' } async def process_pdf(self, pdf_content: bytes, password: Optional[str] = None) -> StatementInfo: """Process PDF bank statement""" try: # Try PyMuPDF first doc = fitz.open(stream=pdf_content, filetype="pdf") if doc.needs_pass and password: if not doc.authenticate(password): raise ValueError("Invalid PDF password") elif doc.needs_pass and not password: raise ValueError("PDF requires password") text = "" for page in doc: text += page.get_text() doc.close() return await self.parse_statement_text(text) except Exception as e: self.logger.error(f"Error processing PDF: {e}") # Fallback to PyPDF2 return await self.process_pdf_fallback(pdf_content, password) async def process_pdf_fallback(self, pdf_content: bytes, password: Optional[str] = None) -> StatementInfo: """Fallback PDF processing with PyPDF2""" try: pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_content)) if pdf_reader.is_encrypted: if password: pdf_reader.decrypt(password) else: raise ValueError("PDF requires password") text = "" for page in pdf_reader.pages: text += page.extract_text() return await self.parse_statement_text(text) except Exception as e: self.logger.error(f"Fallback PDF processing failed: {e}") raise async def parse_statement_text(self, text: str) -> StatementInfo: """Parse bank statement text to extract transactions""" lines = text.split('\n') transactions = [] # Bank-specific parsing logic bank_name = self.detect_bank_from_text(text) account_number = self.extract_account_number(text) statement_period = self.extract_statement_period(text) # Check if this is HDFC format and use multi-line parsing if 'hdfc' in bank_name.lower(): transactions = self.parse_hdfc_multiline_transactions(lines) else: # Extract transactions based on patterns for other banks for line in lines: transaction = self.parse_transaction_line(line) if transaction: transactions.append(transaction) # Extract balances opening_balance = self.extract_opening_balance(text) closing_balance = self.extract_closing_balance(text) return StatementInfo( bank_name=bank_name, account_number=account_number, statement_period=statement_period, transactions=transactions, opening_balance=opening_balance, closing_balance=closing_balance ) def detect_bank_from_text(self, text: str) -> str: """Detect bank from statement text""" text_lower = text.lower() if 'hdfc bank' in text_lower or 'hdfc' in text_lower: return 'HDFC Bank' elif 'icici bank' in text_lower or 'icici' in text_lower: return 'ICICI Bank' elif 'state bank of india' in text_lower or 'sbi' in text_lower: return 'State Bank of India' elif 'axis bank' in text_lower or 'axis' in text_lower: return 'Axis Bank' elif 'kotak' in text_lower: return 'Kotak Mahindra Bank' elif 'chase' in text_lower or 'jpmorgan' in text_lower: return 'Chase' elif 'bank of america' in text_lower or 'bofa' in text_lower: return 'Bank of America' elif 'wells fargo' in text_lower: return 'Wells Fargo' elif 'citibank' in text_lower or 'citi' in text_lower: return 'Citibank' elif 'american express' in text_lower or 'amex' in text_lower: return 'American Express' return 'Unknown Bank' def extract_account_number(self, text: str) -> str: """Extract account number from statement""" # Look for account number patterns patterns = [ r':\s*(\d{14,18})\s*$', # HDFC actual format (18691610049835) - line ending with colon and number r'Account\s+Number\s*:\s*(\d{14,18})', # HDFC actual format (18691610049835) r'Account\s+Number\s*:\s*(\d+)', # HDFC format r'Account\s+(?:Number|#)?\s*:\s*(\*+\d{4})', # Masked format r'Account\s+(\d{4,})', r'(\*+\d{4})', r'A/c\s+No\.?\s*:\s*(\d+)', # Alternative format ] # Look for the specific pattern in the HDFC statement lines = text.split('\n') for i, line in enumerate(lines): if 'Account Number' in line and i + 1 < len(lines): next_line = lines[i + 1].strip() # Check if next line contains the account number if re.match(r':\s*(\d{14,18})', next_line): match = re.search(r':\s*(\d{14,18})', next_line) if match: return match.group(1) for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE) if match: return match.group(1) return "Unknown" def extract_statement_period(self, text: str) -> str: """Extract statement period""" # Look for date ranges pattern = r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\s*(?:to|through|-)\s*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})' match = re.search(pattern, text, re.IGNORECASE) if match: return f"{match.group(1)} to {match.group(2)}" return "Unknown Period" def parse_transaction_line(self, line: str) -> Optional[BankTransaction]: """Parse individual transaction line""" # Skip header lines, empty lines, and reference lines if not line.strip(): return None line_lower = line.lower() if any(header in line_lower for header in ['txn date', 'narration', 'withdrawals', 'deposits', 'closing balance', 'ref ', 'value dt']): return None # Skip lines that are just reference numbers or continuation lines if re.match(r'^\s*\d{10,}\s*$', line.strip()) or line.strip().startswith('Ref '): return None # HDFC Bank specific patterns - exact format from the actual statement hdfc_patterns = [ # Format from actual HDFC statement: Date, Description, Withdrawals, Deposits, Closing Balance r'(\d{2}/\d{2}/\d{4})\s+(.+?)\s+(\d{1,3}(?:,\d{3})*\.\d{2})\s+(\d{1,3}(?:,\d{3})*\.\d{2})\s+(\d{1,3}(?:,\d{3})*\.\d{2})$', # Alternative format with no commas in amounts r'(\d{2}/\d{2}/\d{4})\s+(.+?)\s+(\d+\.\d{2})\s+(\d+\.\d{2})\s+(\d{1,3}(?:,\d{3})*\.\d{2})$', # Format for salary/deposits with description at the end r'(\d{2}/\d{2}/\d{4})\s+(.+?)\s+Value\s+Dt\s+\d{2}/\d{2}/\d{4}(?:\s+Ref\s+\d+)?\s+(\d+\.\d{2})\s+(\d{1,3}(?:,\d{3})*\.\d{2})\s+(\d{1,3}(?:,\d{3})*\.\d{2})$', ] # Try HDFC patterns first for pattern in hdfc_patterns: match = re.search(pattern, line.strip()) if match: try: date_str = match.group(1) description = match.group(2).strip() # Check if this is a standard format or the salary format if "Value Dt" in line and len(match.groups()) >= 5: # This is the salary/deposit format withdrawal_str = "0.00" deposit_str = match.group(3) closing_balance_str = match.group(4) else: # Standard format withdrawal_str = match.group(3) deposit_str = match.group(4) closing_balance_str = match.group(5) # Parse amounts withdrawal = float(withdrawal_str.replace(',', '')) if withdrawal_str != '0.00' else 0 deposit = float(deposit_str.replace(',', '')) if deposit_str != '0.00' else 0 closing_balance = float(closing_balance_str.replace(',', '')) # Skip if both withdrawal and deposit are zero if withdrawal == 0 and deposit == 0: continue # Determine amount (negative for withdrawals, positive for deposits) if withdrawal > 0 and deposit == 0: amount = -withdrawal elif deposit > 0 and withdrawal == 0: amount = deposit else: # If both have values, something is wrong with parsing continue # Parse date transaction_date = self.parse_date(date_str) # Clean up description - remove extra whitespace and continuation text description = re.sub(r'\s+', ' ', description).strip() # Categorize transaction category = self.categorize_transaction(description) return BankTransaction( date=transaction_date, description=description, amount=amount, category=category, balance=closing_balance ) except Exception as e: self.logger.debug(f"Failed to parse HDFC transaction line: {line}, Error: {e}") continue # Try to match multi-line transactions (where the line continues) # This is common in the actual HDFC statement format if re.match(r'^\d{2}/\d{2}/\d{4}\s+', line.strip()): # This looks like the start of a transaction but didn't match our patterns # It might be a multi-line transaction try: parts = line.strip().split() if len(parts) >= 1 and re.match(r'\d{2}/\d{2}/\d{4}', parts[0]): date_str = parts[0] description = ' '.join(parts[1:]) # We don't have amount info in this line, so we can't create a full transaction # But we can log it for debugging self.logger.debug(f"Potential multi-line transaction start: {line}") except Exception as e: self.logger.debug(f"Failed to parse potential multi-line transaction: {line}, Error: {e}") return None def parse_date(self, date_str: str) -> datetime: """Parse date string to datetime object""" # Try different date formats (Indian banks typically use DD/MM/YYYY) formats = ['%d/%m/%Y', '%d-%m-%Y', '%d/%m/%y', '%d-%m-%y', '%m/%d/%Y', '%m-%d-%Y', '%m/%d/%y', '%m-%d-%y'] for fmt in formats: try: return datetime.strptime(date_str, fmt) except ValueError: continue # If all fails, return current date return datetime.now() def parse_amount(self, amount_str: str) -> float: """Parse amount string to float""" # Clean amount string clean_amount = amount_str.replace('$', '').replace(',', '').strip() # Handle negative amounts is_negative = clean_amount.startswith('-') or clean_amount.startswith('(') clean_amount = clean_amount.replace('-', '').replace('(', '').replace(')', '') try: amount = float(clean_amount) return -amount if is_negative else amount except ValueError: return 0.0 def categorize_transaction(self, description: str) -> str: """Categorize transaction based on description""" desc_lower = description.lower() # Check for UPI transactions first if 'upi' in desc_lower: # Extract merchant/payee name from UPI description if any(food_keyword in desc_lower for food_keyword in ['swiggy', 'zomato', 'dominos', 'pizza', 'restaurant', 'food', 'bhavan', 'chaupati', 'cafe', 'hotel', 'kitchen', 'biryani']): return 'Food & Dining' elif any(shop_keyword in desc_lower for shop_keyword in ['amazon', 'flipkart', 'myntra', 'shopping', 'store']): return 'Shopping' elif any(transport_keyword in desc_lower for transport_keyword in ['uber', 'ola', 'rapido', 'metro', 'petrol', 'fuel']): return 'Gas & Transport' elif any(util_keyword in desc_lower for util_keyword in ['electricity', 'water', 'gas', 'internet', 'mobile', 'recharge']): return 'Utilities' elif any(ent_keyword in desc_lower for ent_keyword in ['netflix', 'spotify', 'prime', 'hotstar', 'movie']): return 'Entertainment' else: return 'UPI Transfer' categories = { 'Food & Dining': ['restaurant', 'mcdonalds', 'starbucks', 'food', 'dining', 'cafe', 'pizza', 'swiggy', 'zomato', 'dominos'], 'Shopping': ['amazon', 'walmart', 'target', 'shopping', 'store', 'retail', 'flipkart', 'myntra', 'ajio'], 'Gas & Transport': ['shell', 'exxon', 'gas', 'fuel', 'uber', 'lyft', 'taxi', 'ola', 'rapido', 'metro', 'petrol'], 'Utilities': ['electric', 'water', 'gas bill', 'internet', 'phone', 'utility', 'mobile', 'recharge', 'electricity'], 'Entertainment': ['netflix', 'spotify', 'movie', 'entertainment', 'gaming', 'prime', 'hotstar', 'youtube'], 'Healthcare': ['pharmacy', 'doctor', 'hospital', 'medical', 'health', 'apollo', 'medplus'], 'Banking': ['atm', 'fee', 'interest', 'transfer', 'deposit', 'charges', 'penalty'], 'Investment': ['mutual fund', 'sip', 'equity', 'stock', 'zerodha', 'groww', 'investment'], 'Insurance': ['insurance', 'premium', 'policy', 'lic', 'hdfc life', 'icici prudential'] } for category, keywords in categories.items(): if any(keyword in desc_lower for keyword in keywords): return category return 'Other' def extract_opening_balance(self, text: str) -> float: """Extract opening balance from statement""" patterns = [ r'Opening\s+Balance\s*:\s*Rs\.?\s*([\d,]+\.?\d{0,2})', # HDFC format r'Opening\s+Balance\s*:\s*([\d,]+\.?\d{0,2})', # HDFC format without Rs r'Beginning\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})', r'Previous\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})', r'Balance\s+B/F\s*:\s*Rs\.?\s*([\d,]+\.?\d{0,2})', # Balance brought forward ] # Look for the specific pattern in the HDFC statement lines = text.split('\n') for i, line in enumerate(lines): if 'Opening Balance' in line and i + 1 < len(lines): next_line = lines[i + 1].strip() # Check if next line contains the balance balance_match = re.match(r':\s*([\d,]+\.?\d{0,2})', next_line) if balance_match: return float(balance_match.group(1).replace(',', '')) for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: return float(match.group(1).replace(',', '')) return 0.0 def extract_closing_balance(self, text: str) -> float: """Extract closing balance from statement""" patterns = [ r'Closing\s+Balance\s*:\s*([\d,]+\.?\d{0,2})', # HDFC format r'Ending\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})', r'Current\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})', # Look for the final balance in the summary section r'2,41,657\.95', # The specific closing balance from this statement ] # First try to find the last transaction's balance lines = text.split('\n') for i in range(len(lines) - 1, -1, -1): line = lines[i].strip() # Look for the pattern of a balance amount balance_match = re.match(r'^([\d,]+\.?\d{0,2})$', line) if balance_match: balance_str = balance_match.group(1) # Check if this looks like a reasonable balance (not a small amount) try: balance = float(balance_str.replace(',', '')) if balance > 1000: # Reasonable account balance return balance except ValueError: continue # Fallback to pattern matching for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: return float(match.group(1).replace(',', '')) return 0.0 def parse_hdfc_multiline_transactions(self, lines: List[str]) -> List[BankTransaction]: """Parse HDFC bank statement transactions that span multiple lines""" transactions = [] i = 0 while i < len(lines): line = lines[i].strip() # Skip empty lines and headers if not line or any(header in line.lower() for header in ['txn date', 'narration', 'withdrawals', 'deposits', 'closing balance', 'page ', 'customer id', 'account number', 'statement from', 'hdfc bank']): i += 1 continue # Look for date pattern at start of line date_match = re.match(r'^(\d{2}/\d{2}/\d{4})$', line) if date_match: date_str = date_match.group(1) # Collect description lines and look for amounts description_lines = [] withdrawal = 0 deposit = 0 closing_balance = 0 j = i + 1 while j < len(lines): next_line = lines[j].strip() # Check if we hit another date (start of next transaction) if re.match(r'^\d{2}/\d{2}/\d{4}$', next_line): break # Check if this line is just an amount (withdrawal or deposit) amount_match = re.match(r'^(\d{1,3}(?:,\d{3})*\.\d{2})$', next_line) if amount_match: amount_value = float(amount_match.group(1).replace(',', '')) # Look ahead to see if there's another amount (0.00) or balance if j + 1 < len(lines): next_next_line = lines[j + 1].strip() next_amount_match = re.match(r'^(\d{1,3}(?:,\d{3})*\.\d{2})$', next_next_line) if next_amount_match: second_amount = float(next_amount_match.group(1).replace(',', '')) # Look for closing balance (third amount) if j + 2 < len(lines): balance_line = lines[j + 2].strip() balance_match = re.match(r'^(\d{1,3}(?:,\d{3})*\.\d{2})$', balance_line) if balance_match: closing_balance = float(balance_match.group(1).replace(',', '')) # Determine which is withdrawal and which is deposit if amount_value > 0 and second_amount == 0: withdrawal = amount_value deposit = 0 elif amount_value == 0 and second_amount > 0: withdrawal = 0 deposit = second_amount else: # Both have values, need to determine based on context # For now, assume first non-zero is the transaction amount if amount_value > second_amount: withdrawal = amount_value deposit = 0 else: withdrawal = 0 deposit = second_amount # We found a complete transaction, break j += 3 # Skip the amount lines break else: # Only two amounts, second might be balance if second_amount > amount_value: # Second amount is likely the balance closing_balance = second_amount if amount_value > 0: withdrawal = amount_value deposit = 0 else: # First amount might be balance, second is transaction closing_balance = amount_value if second_amount > 0: deposit = second_amount withdrawal = 0 j += 2 break else: # Only one more amount, treat as balance closing_balance = second_amount if amount_value > 0: withdrawal = amount_value deposit = 0 j += 2 break else: # Only one amount, might be transaction amount # Look for balance in subsequent lines withdrawal = amount_value deposit = 0 # Continue looking for balance j += 1 continue else: # Last line, treat as transaction amount withdrawal = amount_value deposit = 0 j += 1 break # If not an amount, treat as description elif next_line and not re.match(r'^\d+$', next_line): # Not just a number description_lines.append(next_line) j += 1 else: j += 1 # Create transaction if we have valid data if description_lines and (withdrawal > 0 or deposit > 0): # Combine description lines description = ' '.join(description_lines).strip() # Clean up description description = re.sub(r'\s+', ' ', description) description = re.sub(r'Value\s+Dt\s+\d{2}/\d{2}/\d{4}(?:\s+Ref\s+\d+)?', '', description) description = description.strip() # Determine final amount (negative for withdrawals, positive for deposits) if withdrawal > 0: amount = -withdrawal else: amount = deposit # Parse date transaction_date = self.parse_date(date_str) # Categorize transaction category = self.categorize_transaction(description) transaction = BankTransaction( date=transaction_date, description=description, amount=amount, category=category, balance=closing_balance if closing_balance > 0 else None ) transactions.append(transaction) self.logger.debug(f"Parsed transaction: {date_str} | {description} | {amount}") # Move to next transaction i = j else: i += 1 self.logger.info(f"Parsed {len(transactions)} transactions from HDFC statement") return transactions # Example usage if __name__ == "__main__": # Test PDF processing pdf_processor = PDFProcessor() # Example test with sample PDF content print("PDF Processor initialized successfully")