|
""" |
|
Email and PDF Processing Module for Bank Statement Analysis |
|
""" |
|
import imaplib |
|
from email.message import Message |
|
import os |
|
import io |
|
import re |
|
import pandas as pd |
|
from typing import List, Dict, Optional, Tuple |
|
from dataclasses import dataclass |
|
from datetime import datetime, timedelta |
|
import PyPDF2 |
|
import fitz |
|
from email.mime.multipart import MIMEMultipart |
|
from email.mime.text import MIMEText |
|
import logging |
|
|
|
@dataclass |
|
class BankTransaction: |
|
date: datetime |
|
description: str |
|
amount: float |
|
category: str = "Unknown" |
|
account: str = "" |
|
balance: Optional[float] = None |
|
|
|
@dataclass |
|
class StatementInfo: |
|
bank_name: str |
|
account_number: str |
|
statement_period: str |
|
transactions: List[BankTransaction] |
|
opening_balance: float |
|
closing_balance: float |
|
|
|
class EmailProcessor: |
|
def __init__(self, email_config: Dict): |
|
self.email_config = email_config |
|
self.logger = logging.getLogger(__name__) |
|
self.bank_patterns = { |
|
'chase': r'chase\.com|jpmorgan', |
|
'bofa': r'bankofamerica\.com|bofa', |
|
'wells': r'wellsfargo\.com', |
|
'citi': r'citi\.com|citibank', |
|
'amex': r'americanexpress\.com|amex', |
|
'hdfc': r'hdfcbank\.com', |
|
'icici': r'icicibank\.com', |
|
'sbi': r'sbi\.co\.in', |
|
'axis': r'axisbank\.com', |
|
} |
|
|
|
async def connect_to_email(self) -> imaplib.IMAP4_SSL: |
|
"""Connect to email server""" |
|
try: |
|
mail = imaplib.IMAP4_SSL(self.email_config['imap_server']) |
|
mail.login(self.email_config['email'], self.email_config['password']) |
|
return mail |
|
except Exception as e: |
|
self.logger.error(f"Failed to connect to email: {e}") |
|
raise |
|
|
|
async def fetch_bank_emails(self, days_back: int = 30) -> List[Message]: |
|
"""Fetch emails from banks containing statements""" |
|
mail = await self.connect_to_email() |
|
mail.select('inbox') |
|
|
|
|
|
end_date = datetime.now() |
|
start_date = end_date - timedelta(days=days_back) |
|
|
|
|
|
bank_domains = '|'.join(self.bank_patterns.values()) |
|
search_criteria = f'(FROM "{bank_domains}" SINCE "{start_date.strftime("%d-%b-%Y")}")' |
|
|
|
try: |
|
status, messages = mail.search(None, search_criteria) |
|
email_ids = messages[0].split() |
|
|
|
emails = [] |
|
for email_id in email_ids[-50:]: |
|
status, msg_data = mail.fetch(email_id, '(RFC822)') |
|
msg = Message.from_bytes(msg_data[0][1]) |
|
emails.append(msg) |
|
|
|
return emails |
|
finally: |
|
mail.close() |
|
mail.logout() |
|
|
|
def identify_bank(self, sender_email: str) -> str: |
|
"""Identify bank from sender email""" |
|
sender_lower = sender_email.lower() |
|
for bank, pattern in self.bank_patterns.items(): |
|
if re.search(pattern, sender_lower): |
|
return bank |
|
return "unknown" |
|
|
|
async def extract_attachments(self, msg: Message) -> List[Tuple[str, bytes, str]]: |
|
"""Extract PDF attachments from email""" |
|
attachments = [] |
|
self.logger.debug(f"Processing message with type: {type(msg)}") |
|
|
|
for part in msg.walk(): |
|
self.logger.debug(f"Processing part with type: {type(part)}") |
|
try: |
|
if part.get_content_disposition() == 'attachment': |
|
filename = part.get_filename() |
|
if filename and filename.lower().endswith('.pdf'): |
|
content = part.get_payload(decode=True) |
|
attachments.append((filename, content, 'pdf')) |
|
except Exception as e: |
|
self.logger.error(f"Error processing part: {e}, Part type: {type(part)}") |
|
continue |
|
|
|
return attachments |
|
|
|
class PDFProcessor: |
|
def __init__(self): |
|
self.logger = logging.getLogger(__name__) |
|
self.transaction_patterns = { |
|
'date': r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})', |
|
'amount': r'([\$\-]?[\d,]+\.?\d{0,2})', |
|
'description': r'([A-Za-z0-9\s\*\#\-_]+)' |
|
} |
|
|
|
async def process_pdf(self, pdf_content: bytes, password: Optional[str] = None) -> StatementInfo: |
|
"""Process PDF bank statement""" |
|
try: |
|
|
|
doc = fitz.open(stream=pdf_content, filetype="pdf") |
|
|
|
if doc.needs_pass and password: |
|
if not doc.authenticate(password): |
|
raise ValueError("Invalid PDF password") |
|
elif doc.needs_pass and not password: |
|
raise ValueError("PDF requires password") |
|
|
|
text = "" |
|
for page in doc: |
|
text += page.get_text() |
|
|
|
doc.close() |
|
|
|
return await self.parse_statement_text(text) |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error processing PDF: {e}") |
|
|
|
return await self.process_pdf_fallback(pdf_content, password) |
|
|
|
async def process_pdf_fallback(self, pdf_content: bytes, password: Optional[str] = None) -> StatementInfo: |
|
"""Fallback PDF processing with PyPDF2""" |
|
try: |
|
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_content)) |
|
|
|
if pdf_reader.is_encrypted: |
|
if password: |
|
pdf_reader.decrypt(password) |
|
else: |
|
raise ValueError("PDF requires password") |
|
|
|
text = "" |
|
for page in pdf_reader.pages: |
|
text += page.extract_text() |
|
|
|
return await self.parse_statement_text(text) |
|
|
|
except Exception as e: |
|
self.logger.error(f"Fallback PDF processing failed: {e}") |
|
raise |
|
|
|
async def parse_statement_text(self, text: str) -> StatementInfo: |
|
"""Parse bank statement text to extract transactions""" |
|
lines = text.split('\n') |
|
transactions = [] |
|
|
|
|
|
bank_name = self.detect_bank_from_text(text) |
|
account_number = self.extract_account_number(text) |
|
statement_period = self.extract_statement_period(text) |
|
|
|
|
|
for line in lines: |
|
transaction = self.parse_transaction_line(line) |
|
if transaction: |
|
transactions.append(transaction) |
|
|
|
|
|
opening_balance = self.extract_opening_balance(text) |
|
closing_balance = self.extract_closing_balance(text) |
|
|
|
return StatementInfo( |
|
bank_name=bank_name, |
|
account_number=account_number, |
|
statement_period=statement_period, |
|
transactions=transactions, |
|
opening_balance=opening_balance, |
|
closing_balance=closing_balance |
|
) |
|
|
|
def detect_bank_from_text(self, text: str) -> str: |
|
"""Detect bank from statement text""" |
|
text_lower = text.lower() |
|
if 'chase' in text_lower or 'jpmorgan' in text_lower: |
|
return 'Chase' |
|
elif 'bank of america' in text_lower or 'bofa' in text_lower: |
|
return 'Bank of America' |
|
elif 'wells fargo' in text_lower: |
|
return 'Wells Fargo' |
|
elif 'citibank' in text_lower or 'citi' in text_lower: |
|
return 'Citibank' |
|
elif 'american express' in text_lower or 'amex' in text_lower: |
|
return 'American Express' |
|
return 'Unknown Bank' |
|
|
|
def extract_account_number(self, text: str) -> str: |
|
"""Extract account number from statement""" |
|
|
|
patterns = [ |
|
r'Account\s+(?:Number|#)?\s*:\s*(\*\+\d{4})', |
|
r'Account\s+(\d{4,})', |
|
r'(\*\+\d{4})' |
|
] |
|
|
|
for pattern in patterns: |
|
match = re.search(pattern, text, re.IGNORECASE) |
|
if match: |
|
return match.group(1) |
|
return "Unknown" |
|
|
|
def extract_statement_period(self, text: str) -> str: |
|
"""Extract statement period""" |
|
|
|
pattern = r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\s*(?:to|through|-)\s*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})' |
|
match = re.search(pattern, text, re.IGNORECASE) |
|
|
|
if match: |
|
return f"{match.group(1)} to {match.group(2)}" |
|
return "Unknown Period" |
|
|
|
def parse_transaction_line(self, line: str) -> Optional[BankTransaction]: |
|
"""Parse individual transaction line""" |
|
|
|
patterns = [ |
|
|
|
r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\s+(.+?)\s+([\$\-]?[\d,]+\.?\d{0,2})$', |
|
|
|
r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\s+([\$\-]?[\d,]+\.?\d{0,2})\s+(.+)$' |
|
] |
|
|
|
for pattern in patterns: |
|
match = re.search(pattern, line.strip()) |
|
if match: |
|
try: |
|
date_str = match.group(1) |
|
if len(match.groups()) == 3: |
|
if '$' in match.group(2) or match.group(2).replace('-', '').replace('.', '').replace(',', '').isdigit(): |
|
|
|
amount_str = match.group(2) |
|
description = match.group(3) |
|
else: |
|
|
|
description = match.group(2) |
|
amount_str = match.group(3) |
|
|
|
|
|
transaction_date = self.parse_date(date_str) |
|
|
|
|
|
amount = self.parse_amount(amount_str) |
|
|
|
|
|
category = self.categorize_transaction(description) |
|
|
|
return BankTransaction( |
|
date=transaction_date, |
|
description=description.strip(), |
|
amount=amount, |
|
category=category |
|
) |
|
|
|
except Exception as e: |
|
self.logger.debug(f"Failed to parse transaction line: {line}, Error: {e}") |
|
continue |
|
return None |
|
|
|
def parse_date(self, date_str: str) -> datetime: |
|
"""Parse date string to datetime object""" |
|
|
|
formats = ['%m/%d/%Y', '%m-%d-%Y', '%m/%d/%y', '%m-%d-%y'] |
|
|
|
for fmt in formats: |
|
try: |
|
return datetime.strptime(date_str, fmt) |
|
except ValueError: |
|
continue |
|
|
|
return datetime.now() |
|
|
|
def parse_amount(self, amount_str: str) -> float: |
|
"""Parse amount string to float""" |
|
|
|
clean_amount = amount_str.replace('$', '').replace(',', '').strip() |
|
|
|
|
|
is_negative = clean_amount.startswith('-') or clean_amount.startswith('(') |
|
clean_amount = clean_amount.replace('-', '').replace('(', '').replace(')', '') |
|
|
|
try: |
|
amount = float(clean_amount) |
|
return -amount if is_negative else amount |
|
except ValueError: |
|
return 0.0 |
|
|
|
def categorize_transaction(self, description: str) -> str: |
|
"""Categorize transaction based on description""" |
|
desc_lower = description.lower() |
|
|
|
categories = { |
|
'Food & Dining': ['restaurant', 'mcdonalds', 'starbucks', 'food', 'dining', 'cafe', 'pizza'], |
|
'Shopping': ['amazon', 'walmart', 'target', 'shopping', 'store', 'retail'], |
|
'Gas & Transport': ['shell', 'exxon', 'gas', 'fuel', 'uber', 'lyft', 'taxi'], |
|
'Utilities': ['electric', 'water', 'gas bill', 'internet', 'phone', 'utility'], |
|
'Entertainment': ['netflix', 'spotify', 'movie', 'entertainment', 'gaming'], |
|
'Healthcare': ['pharmacy', 'doctor', 'hospital', 'medical', 'health'], |
|
'Banking': ['atm', 'fee', 'interest', 'transfer', 'deposit'] |
|
} |
|
|
|
for category, keywords in categories.items(): |
|
if any(keyword in desc_lower for keyword in keywords): |
|
return category |
|
return 'Other' |
|
|
|
def extract_opening_balance(self, text: str) -> float: |
|
"""Extract opening balance from statement""" |
|
patterns = [ |
|
r'Beginning\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})', |
|
r'Opening\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})', |
|
r'Previous\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})' |
|
] |
|
|
|
for pattern in patterns: |
|
match = re.search(pattern, text, re.IGNORECASE) |
|
if match: |
|
return float(match.group(1).replace(',', '')) |
|
return 0.0 |
|
|
|
def extract_closing_balance(self, text: str) -> float: |
|
"""Extract closing balance from statement""" |
|
patterns = [ |
|
r'Ending\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})', |
|
r'Closing\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})', |
|
r'Current\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})' |
|
] |
|
|
|
for pattern in patterns: |
|
match = re.search(pattern, text, re.IGNORECASE) |
|
if match: |
|
return float(match.group(1).replace(',', '')) |
|
return 0.0 |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
pdf_processor = PDFProcessor() |
|
|
|
|
|
print("PDF Processor initialized successfully") |
|
|