Spend-Analyzer-MCP / spend_analyzer.py
Balamurugan Thayalan
Initial Commit
499796e
"""
Spend Analyzer - Financial Intelligence and Analysis Module
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
import json
from collections import defaultdict
import logging
@dataclass
class SpendingInsight:
category: str
total_amount: float
transaction_count: int
average_transaction: float
percentage_of_total: float
trend: str # 'increasing', 'decreasing', 'stable'
top_merchants: List[str]
@dataclass
class BudgetAlert:
category: str
budget_limit: float
current_spending: float
percentage_used: float
alert_level: str # 'warning', 'critical', 'info'
days_remaining: int
@dataclass
class FinancialSummary:
total_income: float
total_expenses: float
net_cash_flow: float
largest_expense: Dict
most_frequent_category: str
unusual_transactions: List[Dict]
monthly_trends: Dict[str, float]
class SpendAnalyzer:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.transactions_df = pd.DataFrame()
self.budgets = {}
def load_transactions(self, transactions: List) -> None:
"""Load transactions into pandas DataFrame for analysis"""
try:
# Convert transactions to DataFrame
data = []
for trans in transactions:
if hasattr(trans, '__dict__'):
data.append(asdict(trans))
else:
data.append(trans)
self.transactions_df = pd.DataFrame(data)
if not self.transactions_df.empty:
# Ensure date column is datetime
self.transactions_df['date'] = pd.to_datetime(self.transactions_df['date'])
# Sort by date
self.transactions_df = self.transactions_df.sort_values('date')
# Add derived columns
self.transactions_df['month'] = self.transactions_df['date'].dt.to_period('M')
self.transactions_df['week'] = self.transactions_df['date'].dt.to_period('W')
self.transactions_df['day_of_week'] = self.transactions_df['date'].dt.day_name()
self.logger.info(f"Loaded {len(self.transactions_df)} transactions")
except Exception as e:
self.logger.error(f"Error loading transactions: {e}")
raise
def set_budgets(self, budgets: Dict[str, float]) -> None:
"""Set budget limits for categories"""
self.budgets = budgets
def analyze_spending_by_category(self, months_back: int = 6) -> List[SpendingInsight]:
"""Analyze spending patterns by category"""
if self.transactions_df.empty:
return []
# Filter to recent months
cutoff_date = datetime.now() - timedelta(days=months_back * 30)
recent_df = self.transactions_df[self.transactions_df['date'] >= cutoff_date]
# Filter only expenses (negative amounts)
expenses_df = recent_df[recent_df['amount'] < 0].copy()
expenses_df['amount'] = expenses_df['amount'].abs() # Make positive for analysis
insights = []
total_spending = expenses_df['amount'].sum()
if total_spending == 0:
self.logger.warning("Total spending is zero; no insights can be generated.")
return insights
# Group by category
category_stats = expenses_df.groupby('category').agg({
'amount': ['sum', 'count', 'mean'],
'description': lambda x: list(x.value_counts().head(3).index)
}).round(2)
category_stats.columns = ['total', 'count', 'average', 'top_merchants']
for category, stats in category_stats.iterrows():
# Calculate trend
trend = self._calculate_trend(expenses_df, category)
insight = SpendingInsight(
category=category,
total_amount=stats['total'],
transaction_count=stats['count'],
average_transaction=stats['average'],
percentage_of_total=(stats['total'] / total_spending) * 100,
trend=trend,
top_merchants=stats['top_merchants'][:3]
)
insights.append(insight)
# Sort by total amount descending
insights.sort(key=lambda x: x.total_amount, reverse=True)
return insights
def _calculate_trend(self, df: pd.DataFrame, category: str) -> str:
"""Calculate spending trend for a category"""
try:
category_df = df[df['category'] == category]
monthly_spending = category_df.groupby('month')['amount'].sum()
if len(monthly_spending) < 2:
return 'stable'
# Calculate trend using linear regression slope
x = np.arange(len(monthly_spending))
y = monthly_spending.values
slope = np.polyfit(x, y, 1)[0]
if slope > 0.1:
return 'increasing'
elif slope < -0.1:
return 'decreasing'
else:
return 'stable'
except Exception:
return 'stable'
def check_budget_alerts(self) -> List[BudgetAlert]:
"""Check for budget alerts and overspending"""
if self.transactions_df.empty or not self.budgets:
return []
alerts = []
current_month = datetime.now().replace(day=1)
month_df = self.transactions_df[
(self.transactions_df['date'] >= current_month) &
(self.transactions_df['amount'] < 0) # Only expenses
].copy()
month_df['amount'] = month_df['amount'].abs()
# Days remaining in month
import calendar
days_in_month = calendar.monthrange(current_month.year, current_month.month)[1]
days_remaining = days_in_month - datetime.now().day
# Check each budget category
for category, budget_limit in self.budgets.items():
current_spending = month_df[month_df['category'] == category]['amount'].sum()
percentage_used = (current_spending / budget_limit) * 100
# Determine alert level
if percentage_used >= 100:
alert_level = 'critical'
elif percentage_used >= 80:
alert_level = 'warning'
else:
alert_level = 'info'
alert = BudgetAlert(
category=category,
budget_limit=budget_limit,
current_spending=current_spending,
percentage_used=percentage_used,
alert_level=alert_level,
days_remaining=days_remaining
)
alerts.append(alert)
return alerts
def generate_financial_summary(self) -> FinancialSummary:
"""Generate comprehensive financial summary"""
if self.transactions_df.empty:
return FinancialSummary(0, 0, 0, {}, "", [], {})
# Calculate basic metrics
income_df = self.transactions_df[self.transactions_df['amount'] > 0]
expense_df = self.transactions_df[self.transactions_df['amount'] < 0]
total_income = income_df['amount'].sum()
total_expenses = abs(expense_df['amount'].sum())
net_cash_flow = total_income - total_expenses
# Largest expense
if not expense_df.empty:
largest_expense_row = expense_df.loc[expense_df['amount'].idxmin()]
largest_expense = {
'amount': abs(largest_expense_row['amount']),
'description': largest_expense_row['description'],
'date': largest_expense_row['date'].strftime('%Y-%m-%d'),
'category': largest_expense_row['category']
}
else:
largest_expense = {}
# Most frequent category
most_frequent_category = expense_df['category'].mode().iloc[0] if not expense_df.empty else ""
# Unusual transactions (outliers)
unusual_transactions = self._detect_unusual_transactions()
# Monthly trends
monthly_trends = self._calculate_monthly_trends()
return FinancialSummary(
total_income=total_income,
total_expenses=total_expenses,
net_cash_flow=net_cash_flow,
largest_expense=largest_expense,
most_frequent_category=most_frequent_category,
unusual_transactions=unusual_transactions,
monthly_trends=monthly_trends
)
def _detect_unusual_transactions(self) -> List[Dict]:
"""Detect unusual transactions using statistical methods"""
if self.transactions_df.empty:
return []
unusual = []
# Detect amount outliers by category
for category in self.transactions_df['category'].unique():
category_df = self.transactions_df[
(self.transactions_df['category'] == category) &
(self.transactions_df['amount'] < 0)
].copy()
if len(category_df) < 5: # Need sufficient data
continue
amounts = category_df['amount'].abs()
Q1 = amounts.quantile(0.25)
Q3 = amounts.quantile(0.75)
IQR = Q3 - Q1
# Define outliers as values beyond 1.5 * IQR
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = category_df[(amounts < lower_bound) | (amounts > upper_bound)]
for _, row in outliers.iterrows():
unusual.append({
'date': row['date'].strftime('%Y-%m-%d'),
'description': row['description'],
'amount': abs(row['amount']),
'category': row['category'],
'reason': 'Amount significantly higher than usual for this category'
})
# Detect frequency outliers (multiple transactions same day/merchant)
daily_merchant = self.transactions_df.groupby([
self.transactions_df['date'].dt.date, 'description'
]).size()
frequent_same_day = daily_merchant[daily_merchant > 3]
for (date, merchant), count in frequent_same_day.items():
unusual.append({
'date': str(date),
'description': merchant,
'count': count,
'reason': f'{count} transactions with same merchant on same day'
})
return unusual[:10] # Return top 10 unusual transactions
def _calculate_monthly_trends(self) -> Dict[str, float]:
"""Calculate monthly spending trends"""
if self.transactions_df.empty:
return {}
# Get last 12 months of expense data
expense_df = self.transactions_df[self.transactions_df['amount'] < 0].copy()
expense_df['amount'] = expense_df['amount'].abs()
monthly_spending = expense_df.groupby('month')['amount'].sum()
# Get last 6 months for trend calculation
recent_months = monthly_spending.tail(6)
trends = {}
if len(recent_months) >= 2:
# Overall trend
x = np.arange(len(recent_months))
y = recent_months.values
slope = np.polyfit(x, y, 1)[0]
trends['overall_trend'] = slope
# Month-over-month change
if len(recent_months) >= 2:
current_month = recent_months.iloc[-1]
previous_month = recent_months.iloc[-2]
mom_change = ((current_month - previous_month) / previous_month) * 100
trends['month_over_month_change'] = mom_change
# Average monthly spending
trends['average_monthly'] = recent_months.mean()
trends['highest_month'] = recent_months.max()
trends['lowest_month'] = recent_months.min()
return trends
def predict_future_spending(self, months_ahead: int = 3) -> Dict[str, float]:
"""Predict future spending based on historical trends"""
if self.transactions_df.empty:
return {}
# Get historical monthly spending by category
expense_df = self.transactions_df[self.transactions_df['amount'] < 0].copy()
expense_df['amount'] = expense_df['amount'].abs()
monthly_category_spending = expense_df.groupby(['month', 'category'])['amount'].sum().unstack(fill_value=0)
predictions = {}
for category in monthly_category_spending.columns:
category_data = monthly_category_spending[category]
if len(category_data) >= 3: # Need at least 3 months of data
# Simple linear trend prediction
x = np.arange(len(category_data))
y = category_data.values
# Fit linear model
coeffs = np.polyfit(x, y, 1)
slope, intercept = coeffs
# Predict future months
future_months = []
for i in range(1, months_ahead + 1):
future_x = len(category_data) + i - 1
predicted_amount = slope * future_x + intercept
future_months.append(max(0, predicted_amount)) # Don't predict negative spending
predictions[category] = {
'next_month': future_months[0] if future_months else 0,
'total_predicted': sum(future_months),
'average_predicted': np.mean(future_months) if future_months else 0
}
return predictions
def get_spending_recommendations(self) -> List[str]:
"""Generate spending recommendations based on analysis"""
recommendations = []
if self.transactions_df.empty:
return ["No transaction data available for analysis"]
# Analyze spending patterns
insights = self.analyze_spending_by_category()
budget_alerts = self.check_budget_alerts()
summary = self.generate_financial_summary()
# Check for overspending categories
overspending_categories = [alert for alert in budget_alerts if alert.percentage_used > 100]
if overspending_categories:
for alert in overspending_categories:
recommendations.append(
f"You've exceeded your {alert.category} budget by "
f"${alert.current_spending - alert.budget_limit:.2f} this month. "
f"Consider reducing spending in this category."
)
# Check for high-spending categories
if insights:
top_category = insights[0]
if top_category.percentage_of_total > 40:
recommendations.append(
f"{top_category.category} accounts for {top_category.percentage_of_total:.1f}% "
f"of your spending. Consider if this allocation aligns with your priorities."
)
# Check cash flow
if summary.net_cash_flow < 0:
recommendations.append(
f"Your expenses (${summary.total_expenses:.2f}) exceed your income "
f"(${summary.total_income:.2f}) by ${abs(summary.net_cash_flow):.2f}. "
f"Focus on reducing expenses or increasing income."
)
# Check for increasing trends
increasing_categories = [i for i in insights if i.trend == 'increasing']
if increasing_categories:
top_increasing = increasing_categories[0]
recommendations.append(
f"Your {top_increasing.category} spending is trending upward. "
f"Monitor this category to avoid budget overruns."
)
# Unusual transaction patterns
if summary.unusual_transactions:
recommendations.append(
f"Found {len(summary.unusual_transactions)} unusual transactions. "
f"Review these for potential errors or unauthorized charges."
)
# Positive reinforcement
decreasing_categories = [i for i in insights if i.trend == 'decreasing']
if decreasing_categories:
recommendations.append(
f"Great job reducing {decreasing_categories[0].category} spending! "
f"This trend is helping improve your financial health."
)
if not recommendations:
recommendations.append("Your spending patterns look healthy. Keep up the good work!")
return recommendations
def export_analysis_data(self) -> Dict:
"""Export all analysis data for Claude API integration"""
return {
'spending_insights': [asdict(insight) for insight in self.analyze_spending_by_category()],
'budget_alerts': [asdict(alert) for alert in self.check_budget_alerts()],
'financial_summary': asdict(self.generate_financial_summary()),
'predictions': self.predict_future_spending(),
'recommendations': self.get_spending_recommendations(),
'transaction_count': len(self.transactions_df),
'analysis_date': datetime.now().isoformat()
}
# Example usage and testing
if __name__ == "__main__":
# Test the spend analyzer
analyzer = SpendAnalyzer()
# Sample transaction data for testing
sample_transactions = [
{
'date': datetime.now() - timedelta(days=5),
'description': 'Amazon Purchase',
'amount': -45.67,
'category': 'Shopping'
},
{
'date': datetime.now() - timedelta(days=10),
'description': 'Grocery Store',
'amount': -120.50,
'category': 'Food & Dining'
},
{
'date': datetime.now() - timedelta(days=15),
'description': 'Salary Deposit',
'amount': 3000.00,
'category': 'Income'
}
]
analyzer.load_transactions(sample_transactions)
analyzer.set_budgets({'Shopping': 100, 'Food & Dining': 200})
insights = analyzer.analyze_spending_by_category()
print(f"Generated {len(insights)} spending insights")
recommendations = analyzer.get_spending_recommendations()
print(f"Generated {len(recommendations)} recommendations")