"""
Gradio Web Interface for Spend Analyzer MCP - Real PDF Processing
"""
import gradio as gr
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import json
import os
import asyncio
import requests
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
import logging
import time
import tempfile
import threading
# Import our local modules
from email_processor import PDFProcessor
from spend_analyzer import SpendAnalyzer
from secure_storage_utils import SecureStorageManager
from mcp_server import create_mcp_app, run_mcp_server
class RealSpendAnalyzerInterface:
def __init__(self):
self.current_analysis = None
self.user_sessions = {}
self.detected_currency = "$" # Default currency
self.currency_symbol = "$" # Current currency symbol
self.logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Initialize processors
self.pdf_processor = PDFProcessor()
self.spend_analyzer = SpendAnalyzer()
self.secure_storage = SecureStorageManager()
# MCP server state
self.mcp_server_thread = None
self.mcp_server_running = False
self.mcp_server_logs = []
# Load API keys from environment or config file on startup
self._load_initial_api_settings()
# Currency detection patterns
self.currency_patterns = {
'USD': {'symbols': ['$', 'USD', 'US$'], 'regex': r'\$|USD|US\$'},
'INR': {'symbols': ['₹', 'Rs', 'Rs.', 'INR'], 'regex': r'₹|Rs\.?|INR'},
'EUR': {'symbols': ['€', 'EUR'], 'regex': r'€|EUR'},
'GBP': {'symbols': ['£', 'GBP'], 'regex': r'£|GBP'},
'CAD': {'symbols': ['C$', 'CAD'], 'regex': r'C\$|CAD'},
'AUD': {'symbols': ['A$', 'AUD'], 'regex': r'A\$|AUD'},
'JPY': {'symbols': ['¥', 'JPY'], 'regex': r'¥|JPY'},
'CNY': {'symbols': ['¥', 'CNY', 'RMB'], 'regex': r'CNY|RMB'},
}
def create_interface(self):
"""Create the main Gradio interface"""
with gr.Blocks(
title="Spend Analyzer MCP - Real PDF Processing",
css="""
.main-header { text-align: center; margin: 20px 0; }
.status-box { padding: 10px; border-radius: 5px; margin: 10px 0; }
.success-box { background-color: #d4edda; border: 1px solid #c3e6cb; }
.error-box { background-color: #f8d7da; border: 1px solid #f5c6cb; }
.warning-box { background-color: #fff3cd; border: 1px solid #ffeaa7; }
.info-box { background-color: #e7f3ff; border: 1px solid #b3d9ff; }
"""
) as interface:
gr.Markdown("# 💰 Spend Analyzer MCP - Real PDF Processing", elem_classes=["main-header"])
gr.Markdown("*Analyze your real bank statement PDFs with AI-powered insights*")
# Info notice
gr.HTML('
📄 Real PDF Processing: Upload your actual bank statement PDFs for comprehensive financial analysis.
')
with gr.Tabs():
# Tab 1: PDF Upload & Processing
with gr.TabItem("📄 PDF Upload & Analysis"):
self._create_pdf_processing_tab()
# Tab 2: Analysis Dashboard
with gr.TabItem("📊 Analysis Dashboard"):
self._create_dashboard_tab()
# Tab 3: AI Financial Advisor
with gr.TabItem("🤖 AI Financial Advisor"):
self._create_chat_tab()
# Tab 4: Transaction Management
with gr.TabItem("📋 Transaction Management"):
self._create_transaction_tab()
# Tab 5: Settings & Export
with gr.TabItem("⚙️ Settings & Export"):
self._create_settings_tab()
# Tab 6: MCP Server
with gr.TabItem("🔌 MCP Server"):
self._create_mcp_tab()
# AI Analysis Disclaimer
gr.HTML('''
⚠️ Important Notice: AI analysis results are generated automatically and may contain errors.
Please verify all financial insights and recommendations for accuracy before making any financial decisions.
''')
return interface
def detect_currency_from_text(self, text: str) -> Tuple[str, str]:
"""Detect currency from PDF text content"""
import re
text_lower = text.lower()
# Check for currency patterns in order of specificity
for currency_code, currency_info in self.currency_patterns.items():
pattern = currency_info['regex']
if re.search(pattern, text, re.IGNORECASE):
# Return currency code and primary symbol
return currency_code, currency_info['symbols'][0]
# Default fallback based on bank detection
if any(bank in text_lower for bank in ['hdfc', 'icici', 'sbi', 'axis', 'kotak']):
return 'INR', '₹'
elif any(bank in text_lower for bank in ['chase', 'bofa', 'wells', 'citi']):
return 'USD', '$'
elif any(bank in text_lower for bank in ['hsbc', 'barclays', 'lloyds']):
return 'GBP', '£'
# Default to USD
return 'USD', '$'
def update_currency_in_interface(self, currency_code: str, currency_symbol: str):
"""Update currency throughout the interface"""
self.detected_currency = currency_code
self.currency_symbol = currency_symbol
self.logger.info(f"Currency detected: {currency_code} ({currency_symbol})")
def format_amount(self, amount: float) -> str:
"""Format amount with detected currency"""
return f"{self.currency_symbol}{amount:,.2f}"
def _create_pdf_processing_tab(self):
"""Create PDF processing tab"""
gr.Markdown("## 📄 Upload & Process Bank Statement PDFs")
gr.Markdown("*Upload your bank statement PDFs for real financial analysis*")
with gr.Row():
with gr.Column(scale=2):
# File upload section
gr.Markdown("### 📁 File Upload")
pdf_upload = gr.File(
label="Upload Bank Statement PDFs",
file_count="multiple",
file_types=[".pdf"],
height=150
)
# Password section
gr.Markdown("### 🔐 PDF Passwords (if needed)")
pdf_passwords_input = gr.Textbox(
label="PDF Passwords (JSON format)",
placeholder='{"statement1.pdf": "password123", "statement2.pdf": "password456"}',
lines=3
)
# Processing options
gr.Markdown("### ⚙️ Processing Options")
with gr.Row():
auto_categorize = gr.Checkbox(
label="Auto-categorize transactions",
value=True
)
detect_duplicates = gr.Checkbox(
label="Detect duplicate transactions",
value=True
)
# Process button
process_pdf_btn = gr.Button("🚀 Process PDFs", variant="primary", size="lg")
with gr.Column(scale=1):
# Status and results
processing_status = gr.HTML()
# Processing progress
gr.Markdown("### 📊 Processing Results")
processing_results = gr.JSON(
label="Detailed Results",
visible=False
)
# Quick stats
quick_stats = gr.HTML()
# Event handler
process_pdf_btn.click(
fn=self._process_real_pdfs,
inputs=[pdf_upload, pdf_passwords_input, auto_categorize, detect_duplicates],
outputs=[processing_status, processing_results, quick_stats]
)
def _create_dashboard_tab(self):
"""Create analysis dashboard tab"""
gr.Markdown("## 📊 Financial Analysis Dashboard")
with gr.Row():
refresh_btn = gr.Button("🔄 Refresh Dashboard")
export_btn = gr.Button("📤 Export Analysis")
clear_btn = gr.Button("🗑️ Clear Data", variant="stop")
# Summary cards
gr.Markdown("### 💰 Financial Summary")
with gr.Row():
total_income = gr.Number(label="Total Income ($)", interactive=False)
total_expenses = gr.Number(label="Total Expenses ($)", interactive=False)
net_cashflow = gr.Number(label="Net Cash Flow ($)", interactive=False)
transaction_count = gr.Number(label="Total Transactions", interactive=False)
# Charts section
gr.Markdown("### 📈 Visual Analysis")
with gr.Row():
with gr.Column():
spending_by_category = gr.Plot(label="Spending by Category")
monthly_trends = gr.Plot(label="Monthly Spending Trends")
with gr.Column():
income_vs_expenses = gr.Plot(label="Income vs Expenses")
top_merchants = gr.Plot(label="Top Merchants")
# Insights section
gr.Markdown("### 🎯 Financial Insights")
with gr.Row():
with gr.Column():
budget_alerts = gr.HTML(label="Budget Alerts")
spending_insights = gr.HTML(label="Spending Insights")
with gr.Column():
recommendations = gr.HTML(label="AI Recommendations")
unusual_transactions = gr.HTML(label="Unusual Transactions")
# Detailed data
with gr.Accordion("📋 Detailed Transaction Data", open=False):
transaction_table = gr.Dataframe(
headers=["Date", "Description", "Amount", "Category", "Account"],
interactive=True,
label="All Transactions"
)
# Status displays for clear function
clear_status = gr.HTML()
clear_info = gr.HTML()
# Event handlers
refresh_btn.click(
fn=self._refresh_dashboard,
outputs=[total_income, total_expenses, net_cashflow, transaction_count,
spending_by_category, monthly_trends, income_vs_expenses, top_merchants,
budget_alerts, spending_insights, recommendations, unusual_transactions,
transaction_table]
)
export_btn.click(
fn=self._export_analysis,
outputs=[gr.File(label="Analysis Export")]
)
clear_btn.click(
fn=self._clear_data,
outputs=[clear_status, clear_info]
)
def _create_chat_tab(self):
"""Create AI chat tab"""
gr.Markdown("## 🤖 AI Financial Advisor")
gr.Markdown("*Get personalized insights about your spending patterns using configured AI*")
with gr.Row():
with gr.Column(scale=3):
# AI Provider Selection
gr.Markdown("### 🤖 Select AI Provider")
with gr.Row():
ai_provider_selector = gr.Dropdown(
choices=["No AI Configured"],
label="Available AI Providers",
value="No AI Configured",
scale=3
)
refresh_ai_btn = gr.Button("🔄 Refresh", size="sm", scale=1)
fetch_models_btn = gr.Button("📥 Fetch Models", size="sm", scale=1, visible=False)
# Model selection for LM Studio
lm_studio_models = gr.Dropdown(
choices=[],
label="Available LM Studio Models",
visible=False
)
# Chat interface
chatbot = gr.Chatbot(
label="Financial Advisor Chat",
height=400,
show_label=True
)
with gr.Row():
msg_input = gr.Textbox(
placeholder="Ask about your spending patterns, budgets, or financial goals...",
label="Your Question",
scale=4
)
send_btn = gr.Button("Send", variant="primary", scale=1)
# Quick question buttons
gr.Markdown("### 🎯 Quick Questions")
with gr.Row():
budget_btn = gr.Button("💰 Budget Analysis", size="sm")
trends_btn = gr.Button("📈 Spending Trends", size="sm")
tips_btn = gr.Button("💡 Save Money Tips", size="sm")
unusual_btn = gr.Button("🚨 Unusual Activity", size="sm")
with gr.Row():
categories_btn = gr.Button("📊 Category Breakdown", size="sm")
merchants_btn = gr.Button("🏪 Top Merchants", size="sm")
monthly_btn = gr.Button("📅 Monthly Analysis", size="sm")
goals_btn = gr.Button("🎯 Financial Goals", size="sm")
with gr.Column(scale=1):
chat_status = gr.HTML()
# AI Status
gr.Markdown("### 🤖 AI Status")
ai_status_display = gr.HTML(
value='⚠️ No AI configured. Please configure AI in Settings.
'
)
# Analysis context
gr.Markdown("### 📊 Analysis Context")
context_info = gr.JSON(
label="Available Data",
value={"status": "Upload PDFs to start analysis"}
)
# Chat settings
gr.Markdown("### ⚙️ Chat Settings")
response_style = gr.Radio(
choices=["Detailed", "Concise", "Technical"],
label="Response Style",
value="Detailed"
)
# Event handlers
send_btn.click(
fn=self._handle_chat_message,
inputs=[msg_input, chatbot, response_style, ai_provider_selector],
outputs=[chatbot, msg_input, chat_status]
)
msg_input.submit(
fn=self._handle_chat_message,
inputs=[msg_input, chatbot, response_style, ai_provider_selector],
outputs=[chatbot, msg_input, chat_status]
)
refresh_ai_btn.click(
fn=self._refresh_ai_providers,
outputs=[ai_provider_selector, ai_status_display, fetch_models_btn, lm_studio_models]
)
fetch_models_btn.click(
fn=self._fetch_lm_studio_models,
inputs=[ai_provider_selector],
outputs=[lm_studio_models, chat_status]
)
ai_provider_selector.change(
fn=self._on_ai_provider_change,
inputs=[ai_provider_selector],
outputs=[fetch_models_btn, lm_studio_models, ai_status_display]
)
# Quick question handlers
budget_btn.click(lambda: "How am I doing with my budget this month?", outputs=[msg_input])
trends_btn.click(lambda: "What are my spending trends over the last few months?", outputs=[msg_input])
tips_btn.click(lambda: "What are specific ways I can save money based on my spending?", outputs=[msg_input])
unusual_btn.click(lambda: "Are there any unusual transactions I should be aware of?", outputs=[msg_input])
categories_btn.click(lambda: "Break down my spending by category", outputs=[msg_input])
merchants_btn.click(lambda: "Who are my top merchants and how much do I spend with them?", outputs=[msg_input])
monthly_btn.click(lambda: "Analyze my monthly spending patterns", outputs=[msg_input])
goals_btn.click(lambda: "Help me set realistic financial goals based on my spending", outputs=[msg_input])
def _create_transaction_tab(self):
"""Create transaction management tab"""
gr.Markdown("## 📋 Transaction Management")
gr.Markdown("*Review, edit, and categorize your transactions*")
with gr.Row():
with gr.Column(scale=2):
# Transaction filters
gr.Markdown("### 🔍 Filter Transactions")
with gr.Row():
date_from = gr.Textbox(label="From Date (YYYY-MM-DD)", placeholder="2024-01-01")
date_to = gr.Textbox(label="To Date (YYYY-MM-DD)", placeholder="2024-12-31")
with gr.Row():
category_filter = gr.Dropdown(
choices=["All", "Food & Dining", "Shopping", "Gas & Transport",
"Utilities", "Entertainment", "Healthcare", "Other"],
label="Category Filter",
value="All"
)
amount_filter = gr.Radio(
choices=["All", "Income Only", "Expenses Only", "> $100", "> $500"],
label="Amount Filter",
value="All"
)
filter_btn = gr.Button("🔍 Apply Filters", variant="secondary")
# Transaction editing
gr.Markdown("### ✏️ Edit Transaction")
with gr.Row():
edit_transaction_id = gr.Number(label="Transaction ID", precision=0)
edit_category = gr.Dropdown(
choices=["Food & Dining", "Shopping", "Gas & Transport",
"Utilities", "Entertainment", "Healthcare", "Other"],
label="New Category"
)
update_btn = gr.Button("💾 Update Transaction", variant="primary")
with gr.Column(scale=1):
# Transaction stats
gr.Markdown("### 📊 Transaction Statistics")
transaction_stats = gr.HTML()
# Category management
gr.Markdown("### 🏷️ Category Management")
add_category = gr.Textbox(label="Add New Category")
add_category_btn = gr.Button("➕ Add Category")
category_status = gr.HTML()
# Filtered transactions table
filtered_transactions = gr.Dataframe(
headers=["ID", "Date", "Description", "Amount", "Category", "Account"],
interactive=False,
label="Filtered Transactions"
)
# Event handlers
filter_btn.click(
fn=self._filter_transactions,
inputs=[date_from, date_to, category_filter, amount_filter],
outputs=[filtered_transactions, transaction_stats]
)
update_btn.click(
fn=self._update_transaction,
inputs=[edit_transaction_id, edit_category],
outputs=[category_status, filtered_transactions]
)
add_category_btn.click(
fn=self._add_category,
inputs=[add_category],
outputs=[category_status, edit_category, category_filter]
)
def _create_settings_tab(self):
"""Create settings and export tab"""
gr.Markdown("## ⚙️ Settings & Export")
with gr.Tabs():
with gr.TabItem("AI API Configuration"):
gr.Markdown("### 🤖 AI API Settings")
gr.Markdown("*Configure AI providers for enhanced analysis and insights*")
# Add simple warning about API key persistence
gr.HTML(self.secure_storage.create_simple_warning_html())
with gr.Row():
with gr.Column():
# AI Provider Selection
ai_provider = gr.Radio(
choices=["Claude (Anthropic)", "SambaNova", "LM Studio", "Ollama", "Custom API"],
label="AI Provider",
value="Claude (Anthropic)"
)
# API Configuration based on provider
with gr.Group():
gr.Markdown("#### API Configuration")
# Claude/Anthropic Settings
claude_api_key = gr.Textbox(
label="Claude API Key",
type="password",
placeholder="sk-ant-...",
visible=True
)
claude_model = gr.Dropdown(
choices=["claude-3-5-sonnet-20241022", "claude-3-5-haiku-20241022", "claude-3-opus-20240229"],
label="Claude Model",
value="claude-3-5-sonnet-20241022",
visible=True
)
# SambaNova Settings
sambanova_api_key = gr.Textbox(
label="SambaNova API Key",
type="password",
placeholder="Your SambaNova API key",
visible=False
)
sambanova_model = gr.Dropdown(
choices=["Meta-Llama-3.1-8B-Instruct", "Meta-Llama-3.1-70B-Instruct", "Meta-Llama-3.1-405B-Instruct"],
label="SambaNova Model",
value="Meta-Llama-3.1-70B-Instruct",
visible=False
)
# LM Studio Settings
lm_studio_url = gr.Textbox(
label="LM Studio URL",
placeholder="http://localhost:1234/v1",
value="http://localhost:1234/v1",
visible=False
)
lm_studio_model = gr.Textbox(
label="LM Studio Model Name",
placeholder="local-model",
visible=False
)
# Ollama Settings
ollama_url = gr.Textbox(
label="Ollama URL",
placeholder="http://localhost:11434",
value="http://localhost:11434",
visible=False
)
ollama_model = gr.Dropdown(
choices=["llama3.1", "llama3.1:70b", "mistral", "codellama", "phi3"],
label="Ollama Model",
value="llama3.1",
visible=False
)
# Custom API Settings
custom_api_url = gr.Textbox(
label="Custom API URL",
placeholder="https://api.example.com/v1",
visible=False
)
custom_api_key = gr.Textbox(
label="Custom API Key",
type="password",
placeholder="Your custom API key",
visible=False
)
custom_model_list = gr.Textbox(
label="Available Models (comma-separated)",
placeholder="model1, model2, model3",
visible=False
)
custom_selected_model = gr.Textbox(
label="Selected Model",
placeholder="model1",
visible=False
)
# AI Settings
with gr.Group():
gr.Markdown("#### AI Analysis Settings")
ai_temperature = gr.Slider(
minimum=0.0,
maximum=2.0,
value=0.7,
step=0.1,
label="Temperature (Creativity)"
)
ai_max_tokens = gr.Slider(
minimum=100,
maximum=4000,
value=1000,
step=100,
label="Max Tokens"
)
enable_ai_insights = gr.Checkbox(
label="Enable AI-powered insights",
value=True
)
enable_ai_recommendations = gr.Checkbox(
label="Enable AI recommendations",
value=True
)
save_ai_settings_btn = gr.Button("💾 Save AI Settings", variant="primary")
with gr.Column():
ai_settings_status = gr.HTML()
# Test AI Connection
gr.Markdown("#### 🔍 Test AI Connection")
test_ai_btn = gr.Button("🧪 Test AI Connection", variant="secondary")
ai_test_result = gr.HTML()
# Current AI Settings Display
gr.Markdown("#### 📋 Current AI Configuration")
current_ai_settings = gr.JSON(
label="Active AI Settings",
value={"provider": "None", "status": "Not configured"}
)
# AI Usage Statistics
gr.Markdown("#### 📊 AI Usage Statistics")
ai_usage_stats = gr.HTML(
value='No usage data available
'
)
with gr.TabItem("Budget Settings"):
gr.Markdown("### 💰 Monthly Budget Configuration")
with gr.Row():
with gr.Column():
budget_categories = gr.CheckboxGroup(
choices=["Food & Dining", "Shopping", "Gas & Transport",
"Utilities", "Entertainment", "Healthcare", "Other"],
label="Categories to Budget",
value=["Food & Dining", "Shopping", "Gas & Transport"]
)
budget_amounts = gr.JSON(
label="Budget Amounts ($)",
value={
"Food & Dining": 500,
"Shopping": 300,
"Gas & Transport": 200,
"Utilities": 150,
"Entertainment": 100,
"Healthcare": 200,
"Other": 100
}
)
save_budgets_btn = gr.Button("💾 Save Budget Settings", variant="primary")
with gr.Column():
budget_status = gr.HTML()
current_budgets = gr.JSON(label="Current Budget Settings")
with gr.TabItem("Export Options"):
gr.Markdown("### 📤 Data Export")
with gr.Row():
with gr.Column():
export_format = gr.Radio(
choices=["JSON", "CSV", "Excel"],
label="Export Format",
value="CSV"
)
export_options = gr.CheckboxGroup(
choices=["Raw Transactions", "Analysis Summary", "Charts Data", "Recommendations"],
label="Include in Export",
value=["Raw Transactions", "Analysis Summary"]
)
date_range_export = gr.CheckboxGroup(
choices=["Last 30 days", "Last 90 days", "Last 6 months", "All data"],
label="Date Range",
value=["All data"]
)
export_data_btn = gr.Button("📤 Export Data", variant="primary")
with gr.Column():
export_status = gr.HTML()
gr.Markdown("### 📊 Export Preview")
export_preview = gr.JSON(label="Export Preview")
with gr.TabItem("Processing Settings"):
gr.Markdown("### ⚙️ PDF Processing Configuration")
processing_settings = gr.JSON(
label="Processing Settings",
value={
"auto_categorize": True,
"detect_duplicates": True,
"merge_similar_transactions": False,
"confidence_threshold": 0.8,
"date_format": "auto",
"amount_format": "auto"
}
)
save_processing_btn = gr.Button("💾 Save Processing Settings", variant="primary")
processing_status = gr.HTML()
# Event handlers
save_budgets_btn.click(
fn=self._save_budget_settings,
inputs=[budget_categories, budget_amounts],
outputs=[budget_status, current_budgets]
)
export_data_btn.click(
fn=self._export_data,
inputs=[export_format, export_options, date_range_export],
outputs=[export_status, export_preview, gr.File(label="Export File")]
)
save_processing_btn.click(
fn=self._save_processing_settings,
inputs=[processing_settings],
outputs=[processing_status]
)
# AI Configuration Event Handlers
def update_ai_provider_visibility(provider):
"""Update visibility of AI provider-specific fields"""
claude_visible = provider == "Claude (Anthropic)"
sambanova_visible = provider == "SambaNova"
lm_studio_visible = provider == "LM Studio"
ollama_visible = provider == "Ollama"
custom_visible = provider == "Custom API"
return (
gr.update(visible=claude_visible), # claude_api_key
gr.update(visible=claude_visible), # claude_model
gr.update(visible=sambanova_visible), # sambanova_api_key
gr.update(visible=sambanova_visible), # sambanova_model
gr.update(visible=lm_studio_visible), # lm_studio_url
gr.update(visible=lm_studio_visible), # lm_studio_model
gr.update(visible=ollama_visible), # ollama_url
gr.update(visible=ollama_visible), # ollama_model
gr.update(visible=custom_visible), # custom_api_url
gr.update(visible=custom_visible), # custom_api_key
gr.update(visible=custom_visible), # custom_model_list
gr.update(visible=custom_visible), # custom_selected_model
)
ai_provider.change(
fn=update_ai_provider_visibility,
inputs=[ai_provider],
outputs=[claude_api_key, claude_model, sambanova_api_key, sambanova_model,
lm_studio_url, lm_studio_model, ollama_url, ollama_model,
custom_api_url, custom_api_key, custom_model_list, custom_selected_model]
)
save_ai_settings_btn.click(
fn=self._save_ai_settings,
inputs=[ai_provider, claude_api_key, claude_model, sambanova_api_key, sambanova_model,
lm_studio_url, lm_studio_model, ollama_url, ollama_model,
custom_api_url, custom_api_key, custom_model_list, custom_selected_model,
ai_temperature, ai_max_tokens, enable_ai_insights, enable_ai_recommendations],
outputs=[ai_settings_status, current_ai_settings]
)
test_ai_btn.click(
fn=self._test_ai_connection,
inputs=[ai_provider, claude_api_key, sambanova_api_key, lm_studio_url, ollama_url, custom_api_url],
outputs=[ai_test_result]
)
# Implementation methods
def _process_real_pdfs(self, files, passwords_json, auto_categorize, detect_duplicates):
"""Process real PDF files"""
try:
if not files:
return (' No files uploaded
',
gr.update(visible=False), "")
# Update status
status_html = ' Processing PDF files...
'
# Parse passwords if provided
passwords = {}
if isinstance(passwords_json, dict):
passwords = passwords_json
elif passwords_json.strip():
try:
passwords = json.loads(passwords_json)
except json.JSONDecodeError:
return (' Invalid JSON format for passwords
',
gr.update(visible=False), "")
all_transactions = []
processed_files = []
# Process each PDF
for file in files:
try:
# Read file content
with open(file.name, 'rb') as f:
pdf_content = f.read()
# Get password for this file
file_password = passwords.get(os.path.basename(file.name))
# Process PDF
statement_info = asyncio.run(
self.pdf_processor.process_pdf(pdf_content, file_password)
)
# Detect currency from the first PDF processed
if not hasattr(self, '_currency_detected') or not self._currency_detected:
# Read PDF text for currency detection
try:
import fitz
doc = fitz.open(stream=pdf_content, filetype="pdf")
text = ""
for page in doc:
text += page.get_text()
doc.close()
# Detect currency
currency_code, currency_symbol = self.detect_currency_from_text(text)
self.update_currency_in_interface(currency_code, currency_symbol)
self._currency_detected = True
except Exception as e:
self.logger.warning(f"Currency detection failed: {e}")
# Fallback to bank-based detection
bank_name = statement_info.bank_name.lower()
if any(bank in bank_name for bank in ['hdfc', 'icici', 'sbi', 'axis', 'kotak']):
self.update_currency_in_interface('INR', '₹')
else:
self.update_currency_in_interface('USD', '$')
self._currency_detected = True
# Add transactions
all_transactions.extend(statement_info.transactions)
processed_files.append({
'filename': os.path.basename(file.name),
'bank': statement_info.bank_name,
'account': statement_info.account_number,
'period': statement_info.statement_period,
'transaction_count': len(statement_info.transactions),
'opening_balance': statement_info.opening_balance,
'closing_balance': statement_info.closing_balance,
'status': 'success'
})
except Exception as e:
processed_files.append({
'filename': os.path.basename(file.name),
'status': 'error',
'error': str(e)
})
if not all_transactions:
return (' No transactions found in uploaded files
',
gr.update(value={"processed_files": processed_files}, visible=True), "")
# Load transactions into analyzer
self.spend_analyzer.load_transactions(all_transactions)
# Generate analysis
self.current_analysis = self.spend_analyzer.export_analysis_data()
# Create success status
status_html = f' Successfully processed {len(processed_files)} files with {len(all_transactions)} transactions
'
# Create quick stats
total_income = sum(t.amount for t in all_transactions if t.amount > 0)
total_expenses = abs(sum(t.amount for t in all_transactions if t.amount < 0))
quick_stats_html = f'''
📊 Quick Statistics
- Currency Detected: {self.detected_currency} ({self.currency_symbol})
- Total Income: {self.format_amount(total_income)}
- Total Expenses: {self.format_amount(total_expenses)}
- Net Cash Flow: {self.format_amount(total_income - total_expenses)}
- Transaction Count: {len(all_transactions)}
'''
results = {
"processed_files": processed_files,
"total_transactions": len(all_transactions),
"analysis_summary": {
"total_income": total_income,
"total_expenses": total_expenses,
"net_cash_flow": total_income - total_expenses
}
}
return (status_html,
gr.update(value=results, visible=True),
quick_stats_html)
except Exception as e:
error_html = f' Processing error: {str(e)}
'
return error_html, gr.update(visible=False), ""
def _refresh_dashboard(self):
"""Refresh dashboard with current analysis"""
if not self.current_analysis:
empty_return = (0, 0, 0, 0, None, None, None, None,
' No analysis data available
',
' Process PDFs first
',
' No recommendations available
',
' No unusual transactions detected
',
pd.DataFrame())
return empty_return
try:
summary = self.current_analysis.get('financial_summary', {})
insights = self.current_analysis.get('spending_insights', [])
# Summary metrics
total_income = summary.get('total_income', 0)
total_expenses = summary.get('total_expenses', 0)
net_cashflow = summary.get('net_cash_flow', 0)
transaction_count = self.current_analysis.get('transaction_count', 0)
# Create charts
charts = self._create_charts(insights, summary)
# Create insights HTML
insights_html = self._create_insights_html()
# Create transaction table
transaction_df = self._create_transaction_dataframe()
return (total_income, total_expenses, net_cashflow, transaction_count,
charts['spending_by_category'], charts['monthly_trends'],
charts['income_vs_expenses'], charts['top_merchants'],
insights_html['budget_alerts'], insights_html['spending_insights'],
insights_html['recommendations'], insights_html['unusual_transactions'],
transaction_df)
except Exception as e:
error_msg = f' Dashboard error: {str(e)}
'
empty_return = (0, 0, 0, 0, None, None, None, None,
error_msg, error_msg, error_msg, error_msg, pd.DataFrame())
return empty_return
def _create_charts(self, insights, summary):
"""Create visualization charts"""
charts = {}
# Spending by category chart
if insights:
categories = [insight['category'] for insight in insights]
amounts = [insight['total_amount'] for insight in insights]
charts['spending_by_category'] = px.pie(
values=amounts,
names=categories,
title="Spending by Category"
)
else:
charts['spending_by_category'] = None
# Monthly trends (placeholder)
charts['monthly_trends'] = None
charts['income_vs_expenses'] = None
charts['top_merchants'] = None
return charts
def _create_insights_html(self):
"""Create insights HTML sections"""
insights = {}
if not self.current_analysis:
# Return empty insights if no analysis available
insights['budget_alerts'] = ' No analysis data available
'
insights['spending_insights'] = ' No analysis data available
'
insights['recommendations'] = ' No analysis data available
'
insights['unusual_transactions'] = ' No analysis data available
'
return insights
# Budget alerts
budget_alerts = self.current_analysis.get('budget_alerts', [])
if budget_alerts:
alerts_html = ' Budget Alerts:
'
for alert in budget_alerts:
if isinstance(alert, dict):
alerts_html += f'- {alert.get("category", "Unknown")}: {alert.get("percentage_used", 0):.1f}% used
'
alerts_html += '
'
else:
alerts_html = ' All budgets on track
'
insights['budget_alerts'] = alerts_html
# Spending insights
spending_insights = self.current_analysis.get('spending_insights', [])
if spending_insights:
insights_html = ' Spending Insights:
'
for insight in spending_insights[:3]:
if isinstance(insight, dict):
insights_html += f'- {insight.get("category", "Unknown")}: ${insight.get("total_amount", 0):.2f} ({insight.get("percentage_of_total", 0):.1f}%)
'
insights_html += '
'
else:
insights_html = 'No spending insights available
'
insights['spending_insights'] = insights_html
# Recommendations
recommendations = self.current_analysis.get('recommendations', [])
if recommendations:
rec_html = ' Recommendations:
'
for rec in recommendations[:3]:
if rec: # Check if recommendation is not None/empty
rec_html += f'- {rec}
'
rec_html += '
'
else:
rec_html = 'No specific recommendations available
'
insights['recommendations'] = rec_html
# Unusual transactions
financial_summary = self.current_analysis.get('financial_summary', {})
unusual = financial_summary.get('unusual_transactions', []) if financial_summary else []
if unusual:
unusual_html = ' Unusual Transactions:
'
for trans in unusual[:3]:
if isinstance(trans, dict):
desc = trans.get("description", "Unknown")
amount = trans.get("amount", 0)
unusual_html += f'- {desc}: ${amount:.2f}
'
unusual_html += '
'
else:
unusual_html = ' No unusual transactions detected
'
insights['unusual_transactions'] = unusual_html
return insights
def _create_transaction_dataframe(self):
"""Create transaction dataframe for display"""
# This would create a dataframe from the actual transactions
# For now, return empty dataframe
return pd.DataFrame(columns=["Date", "Description", "Amount", "Category", "Account"])
def _filter_transactions(self, date_from, date_to, category_filter, amount_filter):
"""Filter transactions based on criteria"""
# Placeholder implementation
return pd.DataFrame(), 'Filtering functionality would be implemented here
'
def _update_transaction(self, transaction_id, new_category):
"""Update transaction category"""
return ' Transaction updated
', pd.DataFrame()
def _add_category(self, new_category):
"""Add new transaction category"""
return ' Category added
', gr.update(), gr.update()
def _save_budget_settings(self, categories, amounts):
"""Save budget settings"""
try:
budget_settings = {cat: amounts.get(cat, 0) for cat in categories}
self.user_sessions['budgets'] = budget_settings
# Apply budgets to analyzer
self.spend_analyzer.set_budgets(budget_settings)
status_html = ' Budget settings saved and applied
'
return status_html, budget_settings
except Exception as e:
error_html = f' Error saving budgets: {str(e)}
'
return error_html, {}
def _export_data(self, export_format, export_options, date_range):
"""Export analysis data"""
if not self.current_analysis:
return ' No data to export
', {}, None
try:
# Create export data
export_data = {}
if "Analysis Summary" in export_options:
export_data['summary'] = self.current_analysis.get('financial_summary', {})
if "Raw Transactions" in export_options:
export_data['transactions'] = [] # Would populate with actual transaction data
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix=f'.{export_format.lower()}', delete=False) as f:
if export_format == "JSON":
json.dump(export_data, f, indent=2, default=str)
elif export_format == "CSV":
# Would create CSV format
f.write("Export functionality would create CSV here")
file_path = f.name
status_html = ' Data exported successfully
'
return status_html, export_data, file_path
except Exception as e:
error_html = f' Export error: {str(e)}
'
return error_html, {}, None
def _save_processing_settings(self, settings):
"""Save processing settings"""
try:
self.user_sessions['processing_settings'] = settings
return ' Processing settings saved
'
except Exception as e:
return f' Error saving settings: {str(e)}
'
def _export_analysis(self):
"""Export current analysis"""
if not self.current_analysis:
return None
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(self.current_analysis, f, indent=2, default=str)
return f.name
except Exception as e:
self.logger.error(f"Export error: {e}")
return None
def _clear_data(self):
"""Clear all data"""
self.current_analysis = None
self.spend_analyzer = SpendAnalyzer() # Reset analyzer
return (' All data cleared
',
' Ready for new PDF upload
')
def _save_ai_settings(self, ai_provider, claude_api_key, claude_model, sambanova_api_key, sambanova_model,
lm_studio_url, lm_studio_model, ollama_url, ollama_model,
custom_api_url, custom_api_key, custom_model_list, custom_selected_model,
ai_temperature, ai_max_tokens, enable_ai_insights, enable_ai_recommendations):
"""Save AI API settings"""
try:
# Create AI settings dictionary
ai_settings = {
"provider": ai_provider,
"temperature": ai_temperature,
"max_tokens": ai_max_tokens,
"enable_insights": enable_ai_insights,
"enable_recommendations": enable_ai_recommendations,
"timestamp": datetime.now().isoformat()
}
# Add provider-specific settings
if ai_provider == "Claude (Anthropic)":
ai_settings.update({
"api_key": claude_api_key if claude_api_key else "",
"model": claude_model,
"api_url": "https://api.anthropic.com"
})
elif ai_provider == "SambaNova":
ai_settings.update({
"api_key": sambanova_api_key if sambanova_api_key else "",
"model": sambanova_model,
"api_url": "https://api.sambanova.ai"
})
elif ai_provider == "LM Studio":
ai_settings.update({
"api_url": lm_studio_url,
"model": lm_studio_model,
"api_key": "" # LM Studio typically doesn't require API key
})
elif ai_provider == "Ollama":
ai_settings.update({
"api_url": ollama_url,
"model": ollama_model,
"api_key": "" # Ollama typically doesn't require API key
})
elif ai_provider == "Custom API":
ai_settings.update({
"api_url": custom_api_url,
"api_key": custom_api_key if custom_api_key else "",
"model": custom_selected_model,
"available_models": [m.strip() for m in custom_model_list.split(",") if m.strip()] if custom_model_list else []
})
# Save to user sessions
self.user_sessions['ai_settings'] = ai_settings
# Try to save to secure storage if enabled
storage_saved = False
try:
# This would integrate with the JavaScript secure storage
# For now, we'll just indicate the option is available
storage_saved = True # Placeholder
except Exception as e:
self.logger.warning(f"Secure storage save failed: {e}")
# Create status message
if storage_saved:
status_html = f'''
✅ AI settings saved successfully for {ai_provider}
💡 Enable browser secure storage to persist across sessions
'''
else:
status_html = f'''
✅ AI settings saved for {ai_provider}
⚠️ Warning: Settings will be lost on page reload.
Consider using environment variables or secure storage.
'''
# Create current settings display (without sensitive data)
display_settings = ai_settings.copy()
if 'api_key' in display_settings and display_settings['api_key']:
display_settings['api_key'] = "***" + display_settings['api_key'][-4:] if len(display_settings['api_key']) > 4 else "***"
display_settings['status'] = 'Configured'
display_settings['storage_warning'] = 'Settings stored in memory only - will be lost on page reload'
return status_html, display_settings
except Exception as e:
error_html = f'❌ Error saving AI settings: {str(e)}
'
return error_html, {"provider": "None", "status": "Error", "error": str(e)}
def _test_ai_connection(self, ai_provider, claude_api_key, sambanova_api_key, lm_studio_url, ollama_url, custom_api_url):
"""Test AI API connection"""
try:
if ai_provider == "Claude (Anthropic)":
if not claude_api_key:
return '❌ Claude API key is required
'
# Here you would implement actual API test
return '✅ Claude API connection test successful
'
elif ai_provider == "SambaNova":
if not sambanova_api_key:
return '❌ SambaNova API key is required
'
# Here you would implement actual API test
return '✅ SambaNova API connection test successful
'
elif ai_provider == "LM Studio":
if not lm_studio_url:
return '❌ LM Studio URL is required
'
# Test connection and fetch models
try:
response = requests.get(f"{lm_studio_url}/v1/models", timeout=10)
if response.status_code == 200:
models_data = response.json()
model_count = len(models_data.get('data', []))
return f'✅ LM Studio connection successful! Found {model_count} models
'
else:
return f'❌ LM Studio connection failed: {response.status_code}
'
except Exception as e:
return f'❌ LM Studio connection failed: {str(e)}
'
elif ai_provider == "Ollama":
if not ollama_url:
return '❌ Ollama URL is required
'
# Here you would implement actual connection test
return '✅ Ollama connection test successful
'
elif ai_provider == "Custom API":
if not custom_api_url:
return '❌ Custom API URL is required
'
# Here you would implement actual API test
return '✅ Custom API connection test successful
'
else:
return '⚠️ Please select an AI provider first
'
except Exception as e:
return f'❌ Connection test failed: {str(e)}
'
def _fetch_lm_studio_models_settings(self, lm_studio_url):
"""Fetch available models from LM Studio in settings"""
try:
if not lm_studio_url:
return gr.update(choices=[]), '❌ LM Studio URL is required
'
# Ensure URL doesn't have /v1 suffix for the base URL
base_url = lm_studio_url.rstrip('/').replace('/v1', '')
# Fetch models from LM Studio
response = requests.get(f"{base_url}/v1/models", timeout=10)
if response.status_code == 200:
models_data = response.json()
model_names = [model['id'] for model in models_data.get('data', [])]
if model_names:
return (
gr.update(choices=model_names, value=model_names[0] if model_names else None),
f'✅ Found {len(model_names)} models
'
)
else:
return (
gr.update(choices=["No models found"]),
'⚠️ No models found in LM Studio
'
)
else:
return (
gr.update(choices=["Connection failed"]),
f'❌ Failed to connect to LM Studio: {response.status_code}
'
)
except Exception as e:
return (
gr.update(choices=["Error"]),
f'❌ Error fetching models: {str(e)}
'
)
def _handle_chat_message(self, message, chat_history, response_style, selected_ai_provider):
"""Handle chat messages with AI integration"""
if not message.strip():
return chat_history, "", ' Please enter a message
'
# Check if AI is configured
ai_settings = self.user_sessions.get('ai_settings')
if not ai_settings or selected_ai_provider == "No AI Configured":
response = "Please configure an AI provider in Settings first to get personalized insights."
status_html = ' No AI configured
'
elif not self.current_analysis:
response = "Please upload and process your PDF statements first to get personalized financial insights."
status_html = ' No data available
'
else:
# Generate AI response
try:
response = self._generate_ai_response(message, response_style, ai_settings)
status_html = ' AI response generated
'
except Exception as e:
response = f"Error generating AI response: {str(e)}. Using fallback response."
summary = self.current_analysis.get('financial_summary', {})
response += f" Based on your financial data: Total income ${summary.get('total_income', 0):.2f}, Total expenses ${summary.get('total_expenses', 0):.2f}."
status_html = ' AI error, using fallback
'
# Add to chat history
chat_history = chat_history or []
chat_history.append([message, response])
return chat_history, "", status_html
def _generate_ai_response(self, message: str, response_style: str, ai_settings: dict) -> str:
"""Generate AI response using configured provider"""
# Prepare financial context
financial_context = self._prepare_financial_context()
# Create prompt based on response style
prompt = self._create_financial_prompt(message, financial_context, response_style)
# Call appropriate AI provider
provider = ai_settings.get('provider', '')
if provider == "Claude (Anthropic)":
return self._call_claude_api(prompt, ai_settings)
elif provider == "SambaNova":
return self._call_sambanova_api(prompt, ai_settings)
elif provider == "LM Studio":
return self._call_lm_studio_api(prompt, ai_settings)
elif provider == "Ollama":
return self._call_ollama_api(prompt, ai_settings)
elif provider == "Custom API":
return self._call_custom_api(prompt, ai_settings)
else:
return "AI provider not supported. Please check your configuration."
def _prepare_financial_context(self) -> str:
"""Prepare financial context for AI prompt"""
if not self.current_analysis:
return "No financial data available."
summary = self.current_analysis.get('financial_summary', {})
insights = self.current_analysis.get('spending_insights', [])
context = f"""
Financial Summary:
- Total Income: {self.format_amount(summary.get('total_income', 0))}
- Total Expenses: {self.format_amount(summary.get('total_expenses', 0))}
- Net Cash Flow: {self.format_amount(summary.get('net_cash_flow', 0))}
- Currency: {self.detected_currency}
Spending Insights:
"""
for insight in insights[:5]:
if isinstance(insight, dict):
context += f"- {insight.get('category', 'Unknown')}: {self.format_amount(insight.get('total_amount', 0))} ({insight.get('percentage_of_total', 0):.1f}%)\n"
return context
def _create_financial_prompt(self, user_message: str, financial_context: str, response_style: str) -> str:
"""Create AI prompt for financial analysis"""
style_instructions = {
"Detailed": "Provide a comprehensive and detailed analysis with specific recommendations.",
"Concise": "Provide a brief, to-the-point response focusing on key insights.",
"Technical": "Provide a technical analysis with specific numbers and financial metrics."
}
prompt = f"""You are a professional financial advisor analyzing a user's spending data.
{financial_context}
User Question: {user_message}
Response Style: {style_instructions.get(response_style, 'Provide a helpful response.')}
Please provide personalized financial insights and recommendations based on the data above. Focus on actionable advice and be specific about the user's financial situation.
"""
return prompt
def _call_claude_api(self, prompt: str, ai_settings: dict) -> str:
"""Call Claude API"""
try:
import anthropic
client = anthropic.Anthropic(api_key=ai_settings.get('api_key'))
response = client.messages.create(
model=ai_settings.get('model', 'claude-3-5-sonnet-20241022'),
max_tokens=ai_settings.get('max_tokens', 1000),
temperature=ai_settings.get('temperature', 0.7),
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
except Exception as e:
return f"Claude API error: {str(e)}"
def _call_sambanova_api(self, prompt: str, ai_settings: dict) -> str:
"""Call SambaNova API"""
try:
headers = {
"Authorization": f"Bearer {ai_settings.get('api_key')}",
"Content-Type": "application/json"
}
data = {
"model": ai_settings.get('model', 'Meta-Llama-3.1-70B-Instruct'),
"messages": [{"role": "user", "content": prompt}],
"temperature": ai_settings.get('temperature', 0.7),
"max_tokens": ai_settings.get('max_tokens', 1000)
}
response = requests.post(
f"{ai_settings.get('api_url', 'https://api.sambanova.ai')}/v1/chat/completions",
headers=headers,
json=data,
timeout=30
)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
else:
return f"SambaNova API error: {response.status_code} - {response.text}"
except Exception as e:
return f"SambaNova API error: {str(e)}"
def _call_lm_studio_api(self, prompt: str, ai_settings: dict) -> str:
"""Call LM Studio API"""
try:
headers = {"Content-Type": "application/json"}
data = {
"model": ai_settings.get('model', 'local-model'),
"messages": [{"role": "user", "content": prompt}],
"temperature": ai_settings.get('temperature', 0.7),
"max_tokens": ai_settings.get('max_tokens', 1000)
}
response = requests.post(
f"{ai_settings.get('api_url', 'http://localhost:1234')}/v1/chat/completions",
headers=headers,
json=data,
timeout=30
)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
else:
return f"LM Studio API error: {response.status_code} - {response.text}"
except Exception as e:
return f"LM Studio API error: {str(e)}"
def _call_ollama_api(self, prompt: str, ai_settings: dict) -> str:
"""Call Ollama API"""
try:
data = {
"model": ai_settings.get('model', 'llama3.1'),
"prompt": prompt,
"stream": False,
"options": {
"temperature": ai_settings.get('temperature', 0.7),
"num_predict": ai_settings.get('max_tokens', 1000)
}
}
response = requests.post(
f"{ai_settings.get('api_url', 'http://localhost:11434')}/api/generate",
json=data,
timeout=30
)
if response.status_code == 200:
return response.json()['response']
else:
return f"Ollama API error: {response.status_code} - {response.text}"
except Exception as e:
return f"Ollama API error: {str(e)}"
def _call_custom_api(self, prompt: str, ai_settings: dict) -> str:
"""Call Custom API"""
try:
headers = {
"Content-Type": "application/json"
}
if ai_settings.get('api_key'):
headers["Authorization"] = f"Bearer {ai_settings.get('api_key')}"
data = {
"model": ai_settings.get('model', 'default'),
"messages": [{"role": "user", "content": prompt}],
"temperature": ai_settings.get('temperature', 0.7),
"max_tokens": ai_settings.get('max_tokens', 1000)
}
response = requests.post(
f"{ai_settings.get('api_url')}/chat/completions",
headers=headers,
json=data,
timeout=30
)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
else:
return f"Custom API error: {response.status_code} - {response.text}"
except Exception as e:
return f"Custom API error: {str(e)}"
def _refresh_ai_providers(self):
"""Refresh available AI providers from saved settings"""
try:
ai_settings = self.user_sessions.get('ai_settings')
if ai_settings and ai_settings.get('provider'):
provider_name = ai_settings['provider']
model_name = ai_settings.get('model', 'default')
provider_display = f"{provider_name} ({model_name})"
choices = [provider_display]
selected = provider_display
# Show fetch models button for LM Studio
show_fetch_btn = provider_name == "LM Studio"
show_models_dropdown = provider_name == "LM Studio"
status_html = f'✅ AI Provider: {provider_name}
'
return (
gr.update(choices=choices, value=selected),
status_html,
gr.update(visible=show_fetch_btn),
gr.update(visible=show_models_dropdown)
)
else:
return (
gr.update(choices=["No AI Configured"], value="No AI Configured"),
'⚠️ No AI configured. Please configure AI in Settings.
',
gr.update(visible=False),
gr.update(visible=False)
)
except Exception as e:
return (
gr.update(choices=["Error"], value="Error"),
f'❌ Error refreshing AI providers: {str(e)}
',
gr.update(visible=False),
gr.update(visible=False)
)
def _fetch_lm_studio_models(self, selected_provider):
"""Fetch available models from LM Studio"""
try:
ai_settings = self.user_sessions.get('ai_settings')
if not ai_settings or ai_settings.get('provider') != "LM Studio":
return gr.update(choices=[]), '❌ LM Studio not configured
'
api_url = ai_settings.get('api_url', 'http://localhost:1234')
# Fetch models from LM Studio
response = requests.get(f"{api_url}/v1/models", timeout=10)
if response.status_code == 200:
models_data = response.json()
model_names = [model['id'] for model in models_data.get('data', [])]
if model_names:
return (
gr.update(choices=model_names, visible=True),
f'✅ Found {len(model_names)} models
'
)
else:
return (
gr.update(choices=["No models found"], visible=True),
'⚠️ No models found in LM Studio
'
)
else:
return (
gr.update(choices=["Connection failed"], visible=True),
f'❌ Failed to connect to LM Studio: {response.status_code}
'
)
except Exception as e:
return (
gr.update(choices=["Error"], visible=True),
f'❌ Error fetching models: {str(e)}
'
)
def _on_ai_provider_change(self, selected_provider):
"""Handle AI provider selection change"""
try:
ai_settings = self.user_sessions.get('ai_settings')
if selected_provider == "No AI Configured" or not ai_settings:
return (
gr.update(visible=False), # fetch_models_btn
gr.update(visible=False), # lm_studio_models
'⚠️ No AI configured. Please configure AI in Settings.
'
)
provider_name = ai_settings.get('provider', '')
show_fetch_btn = provider_name == "LM Studio"
show_models_dropdown = provider_name == "LM Studio"
status_html = f'✅ Selected: {selected_provider}
'
return (
gr.update(visible=show_fetch_btn),
gr.update(visible=show_models_dropdown),
status_html
)
except Exception as e:
return (
gr.update(visible=False),
gr.update(visible=False),
f'❌ Error: {str(e)}
'
)
def _create_mcp_tab(self):
"""Create MCP server tab"""
gr.Markdown("## 🔌 Model Context Protocol (MCP) Server")
gr.Markdown("*Manage the MCP server for integration with Claude and other AI systems*")
with gr.Row():
with gr.Column(scale=2):
# Server status and controls
gr.Markdown("### 🖥️ Server Status & Controls")
mcp_status = gr.HTML(
value='MCP Server is not running
'
)
with gr.Row():
mcp_host = gr.Textbox(label="Host", value="0.0.0.0")
mcp_port = gr.Number(label="Port", value=8000, precision=0)
with gr.Row():
start_mcp_btn = gr.Button("🚀 Start MCP Server", variant="primary")
stop_mcp_btn = gr.Button("⏹️ Stop MCP Server", variant="stop")
# Server logs
gr.Markdown("### 📋 Server Logs")
mcp_logs = gr.Textbox(
label="Server Logs",
lines=10,
max_lines=20,
interactive=False
)
# Test server
gr.Markdown("### 🧪 Test MCP Server")
test_mcp_btn = gr.Button("🔍 Test MCP Connection", variant="secondary")
test_result = gr.HTML()
with gr.Column(scale=1):
# MCP Info
gr.Markdown("### ℹ️ MCP Server Information")
gr.HTML('''
What is MCP?
The Model Context Protocol (MCP) allows AI systems like Claude to interact with your financial data and analysis tools.
Available Endpoints:
- /mcp - Main MCP protocol endpoint
- /docs - API documentation
Registered Tools:
- process_email_statements - Process bank statements from email
- analyze_pdf_statements - Analyze uploaded PDF statements
- get_ai_analysis - Get AI financial analysis
Registered Resources:
- spending-insights - Current spending insights by category
- budget-alerts - Current budget alerts and overspending warnings
- financial-summary - Comprehensive financial summary
''')
# Usage example
gr.Markdown("### 📝 Usage Example")
gr.Code(
label="Python Example",
value='''
import requests
import json
# Initialize MCP
init_msg = {
"jsonrpc": "2.0",
"id": "1",
"method": "initialize"
}
response = requests.post(
"http://localhost:8000/mcp",
json=init_msg
)
print(json.dumps(response.json(), indent=2))
# List available tools
tools_msg = {
"jsonrpc": "2.0",
"id": "2",
"method": "tools/list"
}
response = requests.post(
"http://localhost:8000/mcp",
json=tools_msg
)
print(json.dumps(response.json(), indent=2))
''',
language="python"
)
# Event handlers
start_mcp_btn.click(
fn=self._start_mcp_server,
inputs=[mcp_host, mcp_port],
outputs=[mcp_status, mcp_logs]
)
stop_mcp_btn.click(
fn=self._stop_mcp_server,
outputs=[mcp_status, mcp_logs]
)
test_mcp_btn.click(
fn=self._test_mcp_server,
inputs=[mcp_host, mcp_port],
outputs=[test_result]
)
def _start_mcp_server(self, host, port):
"""Start the MCP server in a separate thread"""
if self.mcp_server_thread and self.mcp_server_thread.is_alive():
return (
'MCP Server is already running
',
"\n".join(self.mcp_server_logs)
)
try:
# Clear logs
self.mcp_server_logs = []
self.mcp_server_logs.append(f"Starting MCP server on {host}:{port}...")
# Define a function to capture logs
def run_server_with_logs():
try:
self.mcp_server_running = True
self.mcp_server_logs.append("MCP server started successfully")
self.mcp_server_logs.append(f"MCP endpoint available at: http://{host}:{port}/mcp")
self.mcp_server_logs.append(f"API documentation available at: http://{host}:{port}/docs")
run_mcp_server(host=host, port=port)
except Exception as e:
self.mcp_server_logs.append(f"Error in MCP server: {str(e)}")
finally:
self.mcp_server_running = False
self.mcp_server_logs.append("MCP server stopped")
# Start server in a thread
self.mcp_server_thread = threading.Thread(target=run_server_with_logs)
self.mcp_server_thread.daemon = True
self.mcp_server_thread.start()
# Give it a moment to start
time.sleep(1)
if self.mcp_server_running:
return (
f'✅ MCP Server running on {host}:{port}
',
"\n".join(self.mcp_server_logs)
)
else:
return (
'❌ Failed to start MCP Server
',
"\n".join(self.mcp_server_logs)
)
except Exception as e:
error_msg = f"Error starting MCP server: {str(e)}"
self.mcp_server_logs.append(error_msg)
return (
f'❌ {error_msg}
',
"\n".join(self.mcp_server_logs)
)
def _stop_mcp_server(self):
"""Stop the MCP server"""
if not self.mcp_server_thread or not self.mcp_server_thread.is_alive():
return (
'MCP Server is not running
',
"\n".join(self.mcp_server_logs)
)
try:
# There's no clean way to stop a uvicorn server in a thread
# This is a workaround that will be improved in the future
self.mcp_server_logs.append("Stopping MCP server...")
self.mcp_server_running = False
# In a real implementation, we would use a proper shutdown mechanism
# For now, we'll just update the UI to show it's stopped
return (
'MCP Server stopping... Please restart the application to fully stop the server
',
"\n".join(self.mcp_server_logs)
)
except Exception as e:
error_msg = f"Error stopping MCP server: {str(e)}"
self.mcp_server_logs.append(error_msg)
return (
f'❌ {error_msg}
',
"\n".join(self.mcp_server_logs)
)
def _test_mcp_server(self, host, port):
"""Test the MCP server connection"""
try:
import requests
import json
# Initialize request
init_msg = {
"jsonrpc": "2.0",
"id": "test",
"method": "initialize"
}
# Send request
response = requests.post(
f"http://{host}:{port}/mcp",
json=init_msg,
timeout=5
)
if response.status_code == 200:
result = response.json()
if "result" in result:
server_info = result["result"].get("serverInfo", {})
server_name = server_info.get("name", "Unknown")
server_version = server_info.get("version", "Unknown")
return f'''
✅ MCP Server connection successful!
Server: {server_name}
Version: {server_version}
Protocol: {result["result"].get("protocolVersion", "Unknown")}
'''
else:
return f'''
⚠️ MCP Server responded but with unexpected format:
{json.dumps(result, indent=2)}
'''
else:
return f'''
❌ MCP Server connection failed with status code: {response.status_code}
Response: {response.text}
'''
except requests.exceptions.ConnectionError:
return '''
❌ Connection error: MCP Server is not running or not accessible at the specified host/port
'''
except Exception as e:
return f'''
❌ Error testing MCP server: {str(e)}
'''
def _load_initial_api_settings(self):
"""Load API settings from environment variables or config file on startup"""
try:
# Try to load from environment variables first
env_config = self.secure_storage.load_from_environment()
if env_config:
self.user_sessions['env_api_settings'] = env_config
self.logger.info(f"Loaded API settings from environment for: {list(env_config.keys())}")
# Try to load from config file
config_file = self.secure_storage.load_config_from_file()
if config_file:
self.user_sessions['file_api_settings'] = config_file
self.logger.info("Loaded API settings from config file")
except Exception as e:
self.logger.warning(f"Failed to load initial API settings: {e}")
# Launch the interface
def launch_interface():
"""Launch the Gradio interface"""
interface = RealSpendAnalyzerInterface()
app = interface.create_interface()
print(" Starting Spend Analyzer MCP - Real PDF Processing")
print(" Upload your bank statement PDFs for analysis")
print(" Opening in browser...")
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=True,
show_error=True,
inbrowser=True
)
if __name__ == "__main__":
launch_interface()