|
""" |
|
Gradio Web Interface for Spend Analyzer MCP - Real PDF Processing |
|
""" |
|
import gradio as gr |
|
import pandas as pd |
|
import plotly.express as px |
|
import plotly.graph_objects as go |
|
import json |
|
import os |
|
import asyncio |
|
import requests |
|
from typing import Dict, List, Optional, Tuple |
|
from datetime import datetime, timedelta |
|
import logging |
|
import time |
|
import tempfile |
|
import threading |
|
|
|
|
|
from email_processor import PDFProcessor |
|
from spend_analyzer import SpendAnalyzer |
|
from secure_storage_utils import SecureStorageManager |
|
from mcp_server import create_mcp_app, run_mcp_server |
|
|
|
class RealSpendAnalyzerInterface: |
|
def __init__(self): |
|
self.current_analysis = None |
|
self.user_sessions = {} |
|
self.detected_currency = "$" |
|
self.currency_symbol = "$" |
|
self.logger = logging.getLogger(__name__) |
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
self.pdf_processor = PDFProcessor() |
|
self.spend_analyzer = SpendAnalyzer() |
|
self.secure_storage = SecureStorageManager() |
|
|
|
|
|
self.mcp_server_thread = None |
|
self.mcp_server_running = False |
|
self.mcp_server_logs = [] |
|
|
|
|
|
self._load_initial_api_settings() |
|
|
|
|
|
self.currency_patterns = { |
|
'USD': {'symbols': ['$', 'USD', 'US$'], 'regex': r'\$|USD|US\$'}, |
|
'INR': {'symbols': ['₹', 'Rs', 'Rs.', 'INR'], 'regex': r'₹|Rs\.?|INR'}, |
|
'EUR': {'symbols': ['€', 'EUR'], 'regex': r'€|EUR'}, |
|
'GBP': {'symbols': ['£', 'GBP'], 'regex': r'£|GBP'}, |
|
'CAD': {'symbols': ['C$', 'CAD'], 'regex': r'C\$|CAD'}, |
|
'AUD': {'symbols': ['A$', 'AUD'], 'regex': r'A\$|AUD'}, |
|
'JPY': {'symbols': ['¥', 'JPY'], 'regex': r'¥|JPY'}, |
|
'CNY': {'symbols': ['¥', 'CNY', 'RMB'], 'regex': r'CNY|RMB'}, |
|
} |
|
|
|
def create_interface(self): |
|
"""Create the main Gradio interface""" |
|
with gr.Blocks( |
|
title="Spend Analyzer MCP - Real PDF Processing", |
|
css=""" |
|
.main-header { text-align: center; margin: 20px 0; } |
|
.status-box { padding: 10px; border-radius: 5px; margin: 10px 0; } |
|
.success-box { background-color: #d4edda; border: 1px solid #c3e6cb; } |
|
.error-box { background-color: #f8d7da; border: 1px solid #f5c6cb; } |
|
.warning-box { background-color: #fff3cd; border: 1px solid #ffeaa7; } |
|
.info-box { background-color: #e7f3ff; border: 1px solid #b3d9ff; } |
|
""" |
|
) as interface: |
|
gr.Markdown("# 💰 Spend Analyzer MCP - Real PDF Processing", elem_classes=["main-header"]) |
|
gr.Markdown("*Analyze your real bank statement PDFs with AI-powered insights*") |
|
|
|
|
|
|
|
gr.HTML('<div class="info-box">📄 <strong>Real PDF Processing:</strong> Upload your actual bank statement PDFs for comprehensive financial analysis.</div>') |
|
|
|
with gr.Tabs(): |
|
|
|
with gr.TabItem("📄 PDF Upload & Analysis"): |
|
self._create_pdf_processing_tab() |
|
|
|
|
|
with gr.TabItem("📊 Analysis Dashboard"): |
|
self._create_dashboard_tab() |
|
|
|
|
|
with gr.TabItem("🤖 AI Financial Advisor"): |
|
self._create_chat_tab() |
|
|
|
|
|
with gr.TabItem("📋 Transaction Management"): |
|
self._create_transaction_tab() |
|
|
|
|
|
with gr.TabItem("⚙️ Settings & Export"): |
|
self._create_settings_tab() |
|
|
|
|
|
with gr.TabItem("🔌 MCP Server"): |
|
self._create_mcp_tab() |
|
|
|
|
|
gr.HTML(''' |
|
<div class="warning-box" style="margin-top: 20px; text-align: center;"> |
|
⚠️ <strong>Important Notice:</strong> AI analysis results are generated automatically and may contain errors. |
|
Please verify all financial insights and recommendations for accuracy before making any financial decisions. |
|
</div> |
|
''') |
|
|
|
return interface |
|
|
|
def detect_currency_from_text(self, text: str) -> Tuple[str, str]: |
|
"""Detect currency from PDF text content""" |
|
import re |
|
|
|
text_lower = text.lower() |
|
|
|
|
|
for currency_code, currency_info in self.currency_patterns.items(): |
|
pattern = currency_info['regex'] |
|
if re.search(pattern, text, re.IGNORECASE): |
|
|
|
return currency_code, currency_info['symbols'][0] |
|
|
|
|
|
if any(bank in text_lower for bank in ['hdfc', 'icici', 'sbi', 'axis', 'kotak']): |
|
return 'INR', '₹' |
|
elif any(bank in text_lower for bank in ['chase', 'bofa', 'wells', 'citi']): |
|
return 'USD', '$' |
|
elif any(bank in text_lower for bank in ['hsbc', 'barclays', 'lloyds']): |
|
return 'GBP', '£' |
|
|
|
|
|
return 'USD', '$' |
|
|
|
def update_currency_in_interface(self, currency_code: str, currency_symbol: str): |
|
"""Update currency throughout the interface""" |
|
self.detected_currency = currency_code |
|
self.currency_symbol = currency_symbol |
|
self.logger.info(f"Currency detected: {currency_code} ({currency_symbol})") |
|
|
|
def format_amount(self, amount: float) -> str: |
|
"""Format amount with detected currency""" |
|
return f"{self.currency_symbol}{amount:,.2f}" |
|
|
|
def _create_pdf_processing_tab(self): |
|
"""Create PDF processing tab""" |
|
gr.Markdown("## 📄 Upload & Process Bank Statement PDFs") |
|
gr.Markdown("*Upload your bank statement PDFs for real financial analysis*") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
|
|
gr.Markdown("### 📁 File Upload") |
|
pdf_upload = gr.File( |
|
label="Upload Bank Statement PDFs", |
|
file_count="multiple", |
|
file_types=[".pdf"], |
|
height=150 |
|
) |
|
|
|
|
|
gr.Markdown("### 🔐 PDF Passwords (if needed)") |
|
pdf_passwords_input = gr.Textbox( |
|
label="PDF Passwords (JSON format)", |
|
placeholder='{"statement1.pdf": "password123", "statement2.pdf": "password456"}', |
|
lines=3 |
|
) |
|
|
|
|
|
gr.Markdown("### ⚙️ Processing Options") |
|
with gr.Row(): |
|
auto_categorize = gr.Checkbox( |
|
label="Auto-categorize transactions", |
|
value=True |
|
) |
|
detect_duplicates = gr.Checkbox( |
|
label="Detect duplicate transactions", |
|
value=True |
|
) |
|
|
|
|
|
process_pdf_btn = gr.Button("🚀 Process PDFs", variant="primary", size="lg") |
|
|
|
with gr.Column(scale=1): |
|
|
|
processing_status = gr.HTML() |
|
|
|
|
|
gr.Markdown("### 📊 Processing Results") |
|
processing_results = gr.JSON( |
|
label="Detailed Results", |
|
visible=False |
|
) |
|
|
|
|
|
quick_stats = gr.HTML() |
|
|
|
|
|
process_pdf_btn.click( |
|
fn=self._process_real_pdfs, |
|
inputs=[pdf_upload, pdf_passwords_input, auto_categorize, detect_duplicates], |
|
outputs=[processing_status, processing_results, quick_stats] |
|
) |
|
|
|
def _create_dashboard_tab(self): |
|
"""Create analysis dashboard tab""" |
|
gr.Markdown("## 📊 Financial Analysis Dashboard") |
|
|
|
with gr.Row(): |
|
refresh_btn = gr.Button("🔄 Refresh Dashboard") |
|
export_btn = gr.Button("📤 Export Analysis") |
|
clear_btn = gr.Button("🗑️ Clear Data", variant="stop") |
|
|
|
|
|
gr.Markdown("### 💰 Financial Summary") |
|
with gr.Row(): |
|
total_income = gr.Number(label="Total Income ($)", interactive=False) |
|
total_expenses = gr.Number(label="Total Expenses ($)", interactive=False) |
|
net_cashflow = gr.Number(label="Net Cash Flow ($)", interactive=False) |
|
transaction_count = gr.Number(label="Total Transactions", interactive=False) |
|
|
|
|
|
gr.Markdown("### 📈 Visual Analysis") |
|
with gr.Row(): |
|
with gr.Column(): |
|
spending_by_category = gr.Plot(label="Spending by Category") |
|
monthly_trends = gr.Plot(label="Monthly Spending Trends") |
|
|
|
with gr.Column(): |
|
income_vs_expenses = gr.Plot(label="Income vs Expenses") |
|
top_merchants = gr.Plot(label="Top Merchants") |
|
|
|
|
|
gr.Markdown("### 🎯 Financial Insights") |
|
with gr.Row(): |
|
with gr.Column(): |
|
budget_alerts = gr.HTML(label="Budget Alerts") |
|
spending_insights = gr.HTML(label="Spending Insights") |
|
|
|
with gr.Column(): |
|
recommendations = gr.HTML(label="AI Recommendations") |
|
unusual_transactions = gr.HTML(label="Unusual Transactions") |
|
|
|
|
|
with gr.Accordion("📋 Detailed Transaction Data", open=False): |
|
transaction_table = gr.Dataframe( |
|
headers=["Date", "Description", "Amount", "Category", "Account"], |
|
interactive=True, |
|
label="All Transactions" |
|
) |
|
|
|
|
|
clear_status = gr.HTML() |
|
clear_info = gr.HTML() |
|
|
|
|
|
refresh_btn.click( |
|
fn=self._refresh_dashboard, |
|
outputs=[total_income, total_expenses, net_cashflow, transaction_count, |
|
spending_by_category, monthly_trends, income_vs_expenses, top_merchants, |
|
budget_alerts, spending_insights, recommendations, unusual_transactions, |
|
transaction_table] |
|
) |
|
|
|
export_btn.click( |
|
fn=self._export_analysis, |
|
outputs=[gr.File(label="Analysis Export")] |
|
) |
|
|
|
clear_btn.click( |
|
fn=self._clear_data, |
|
outputs=[clear_status, clear_info] |
|
) |
|
|
|
def _create_chat_tab(self): |
|
"""Create AI chat tab""" |
|
gr.Markdown("## 🤖 AI Financial Advisor") |
|
gr.Markdown("*Get personalized insights about your spending patterns using configured AI*") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
|
|
gr.Markdown("### 🤖 Select AI Provider") |
|
with gr.Row(): |
|
ai_provider_selector = gr.Dropdown( |
|
choices=["No AI Configured"], |
|
label="Available AI Providers", |
|
value="No AI Configured", |
|
scale=3 |
|
) |
|
refresh_ai_btn = gr.Button("🔄 Refresh", size="sm", scale=1) |
|
fetch_models_btn = gr.Button("📥 Fetch Models", size="sm", scale=1, visible=False) |
|
|
|
|
|
lm_studio_models = gr.Dropdown( |
|
choices=[], |
|
label="Available LM Studio Models", |
|
visible=False |
|
) |
|
|
|
|
|
chatbot = gr.Chatbot( |
|
label="Financial Advisor Chat", |
|
height=400, |
|
show_label=True |
|
) |
|
|
|
with gr.Row(): |
|
msg_input = gr.Textbox( |
|
placeholder="Ask about your spending patterns, budgets, or financial goals...", |
|
label="Your Question", |
|
scale=4 |
|
) |
|
send_btn = gr.Button("Send", variant="primary", scale=1) |
|
|
|
|
|
gr.Markdown("### 🎯 Quick Questions") |
|
with gr.Row(): |
|
budget_btn = gr.Button("💰 Budget Analysis", size="sm") |
|
trends_btn = gr.Button("📈 Spending Trends", size="sm") |
|
tips_btn = gr.Button("💡 Save Money Tips", size="sm") |
|
unusual_btn = gr.Button("🚨 Unusual Activity", size="sm") |
|
|
|
with gr.Row(): |
|
categories_btn = gr.Button("📊 Category Breakdown", size="sm") |
|
merchants_btn = gr.Button("🏪 Top Merchants", size="sm") |
|
monthly_btn = gr.Button("📅 Monthly Analysis", size="sm") |
|
goals_btn = gr.Button("🎯 Financial Goals", size="sm") |
|
|
|
with gr.Column(scale=1): |
|
chat_status = gr.HTML() |
|
|
|
|
|
gr.Markdown("### 🤖 AI Status") |
|
ai_status_display = gr.HTML( |
|
value='<div class="warning-box">⚠️ No AI configured. Please configure AI in Settings.</div>' |
|
) |
|
|
|
|
|
gr.Markdown("### 📊 Analysis Context") |
|
context_info = gr.JSON( |
|
label="Available Data", |
|
value={"status": "Upload PDFs to start analysis"} |
|
) |
|
|
|
|
|
gr.Markdown("### ⚙️ Chat Settings") |
|
response_style = gr.Radio( |
|
choices=["Detailed", "Concise", "Technical"], |
|
label="Response Style", |
|
value="Detailed" |
|
) |
|
|
|
|
|
send_btn.click( |
|
fn=self._handle_chat_message, |
|
inputs=[msg_input, chatbot, response_style, ai_provider_selector], |
|
outputs=[chatbot, msg_input, chat_status] |
|
) |
|
|
|
msg_input.submit( |
|
fn=self._handle_chat_message, |
|
inputs=[msg_input, chatbot, response_style, ai_provider_selector], |
|
outputs=[chatbot, msg_input, chat_status] |
|
) |
|
|
|
refresh_ai_btn.click( |
|
fn=self._refresh_ai_providers, |
|
outputs=[ai_provider_selector, ai_status_display, fetch_models_btn, lm_studio_models] |
|
) |
|
|
|
fetch_models_btn.click( |
|
fn=self._fetch_lm_studio_models, |
|
inputs=[ai_provider_selector], |
|
outputs=[lm_studio_models, chat_status] |
|
) |
|
|
|
ai_provider_selector.change( |
|
fn=self._on_ai_provider_change, |
|
inputs=[ai_provider_selector], |
|
outputs=[fetch_models_btn, lm_studio_models, ai_status_display] |
|
) |
|
|
|
|
|
budget_btn.click(lambda: "How am I doing with my budget this month?", outputs=[msg_input]) |
|
trends_btn.click(lambda: "What are my spending trends over the last few months?", outputs=[msg_input]) |
|
tips_btn.click(lambda: "What are specific ways I can save money based on my spending?", outputs=[msg_input]) |
|
unusual_btn.click(lambda: "Are there any unusual transactions I should be aware of?", outputs=[msg_input]) |
|
categories_btn.click(lambda: "Break down my spending by category", outputs=[msg_input]) |
|
merchants_btn.click(lambda: "Who are my top merchants and how much do I spend with them?", outputs=[msg_input]) |
|
monthly_btn.click(lambda: "Analyze my monthly spending patterns", outputs=[msg_input]) |
|
goals_btn.click(lambda: "Help me set realistic financial goals based on my spending", outputs=[msg_input]) |
|
|
|
def _create_transaction_tab(self): |
|
"""Create transaction management tab""" |
|
gr.Markdown("## 📋 Transaction Management") |
|
gr.Markdown("*Review, edit, and categorize your transactions*") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
|
|
gr.Markdown("### 🔍 Filter Transactions") |
|
with gr.Row(): |
|
date_from = gr.Textbox(label="From Date (YYYY-MM-DD)", placeholder="2024-01-01") |
|
date_to = gr.Textbox(label="To Date (YYYY-MM-DD)", placeholder="2024-12-31") |
|
|
|
with gr.Row(): |
|
category_filter = gr.Dropdown( |
|
choices=["All", "Food & Dining", "Shopping", "Gas & Transport", |
|
"Utilities", "Entertainment", "Healthcare", "Other"], |
|
label="Category Filter", |
|
value="All" |
|
) |
|
amount_filter = gr.Radio( |
|
choices=["All", "Income Only", "Expenses Only", "> $100", "> $500"], |
|
label="Amount Filter", |
|
value="All" |
|
) |
|
|
|
filter_btn = gr.Button("🔍 Apply Filters", variant="secondary") |
|
|
|
|
|
gr.Markdown("### ✏️ Edit Transaction") |
|
with gr.Row(): |
|
edit_transaction_id = gr.Number(label="Transaction ID", precision=0) |
|
edit_category = gr.Dropdown( |
|
choices=["Food & Dining", "Shopping", "Gas & Transport", |
|
"Utilities", "Entertainment", "Healthcare", "Other"], |
|
label="New Category" |
|
) |
|
|
|
update_btn = gr.Button("💾 Update Transaction", variant="primary") |
|
|
|
with gr.Column(scale=1): |
|
|
|
gr.Markdown("### 📊 Transaction Statistics") |
|
transaction_stats = gr.HTML() |
|
|
|
|
|
gr.Markdown("### 🏷️ Category Management") |
|
add_category = gr.Textbox(label="Add New Category") |
|
add_category_btn = gr.Button("➕ Add Category") |
|
|
|
category_status = gr.HTML() |
|
|
|
|
|
filtered_transactions = gr.Dataframe( |
|
headers=["ID", "Date", "Description", "Amount", "Category", "Account"], |
|
interactive=False, |
|
label="Filtered Transactions" |
|
) |
|
|
|
|
|
filter_btn.click( |
|
fn=self._filter_transactions, |
|
inputs=[date_from, date_to, category_filter, amount_filter], |
|
outputs=[filtered_transactions, transaction_stats] |
|
) |
|
|
|
update_btn.click( |
|
fn=self._update_transaction, |
|
inputs=[edit_transaction_id, edit_category], |
|
outputs=[category_status, filtered_transactions] |
|
) |
|
|
|
add_category_btn.click( |
|
fn=self._add_category, |
|
inputs=[add_category], |
|
outputs=[category_status, edit_category, category_filter] |
|
) |
|
|
|
def _create_settings_tab(self): |
|
"""Create settings and export tab""" |
|
gr.Markdown("## ⚙️ Settings & Export") |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("AI API Configuration"): |
|
gr.Markdown("### 🤖 AI API Settings") |
|
gr.Markdown("*Configure AI providers for enhanced analysis and insights*") |
|
|
|
|
|
gr.HTML(self.secure_storage.create_simple_warning_html()) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
|
|
ai_provider = gr.Radio( |
|
choices=["Claude (Anthropic)", "SambaNova", "LM Studio", "Ollama", "Custom API"], |
|
label="AI Provider", |
|
value="Claude (Anthropic)" |
|
) |
|
|
|
|
|
with gr.Group(): |
|
gr.Markdown("#### API Configuration") |
|
|
|
|
|
claude_api_key = gr.Textbox( |
|
label="Claude API Key", |
|
type="password", |
|
placeholder="sk-ant-...", |
|
visible=True |
|
) |
|
claude_model = gr.Dropdown( |
|
choices=["claude-3-5-sonnet-20241022", "claude-3-5-haiku-20241022", "claude-3-opus-20240229"], |
|
label="Claude Model", |
|
value="claude-3-5-sonnet-20241022", |
|
visible=True |
|
) |
|
|
|
|
|
sambanova_api_key = gr.Textbox( |
|
label="SambaNova API Key", |
|
type="password", |
|
placeholder="Your SambaNova API key", |
|
visible=False |
|
) |
|
sambanova_model = gr.Dropdown( |
|
choices=["Meta-Llama-3.1-8B-Instruct", "Meta-Llama-3.1-70B-Instruct", "Meta-Llama-3.1-405B-Instruct"], |
|
label="SambaNova Model", |
|
value="Meta-Llama-3.1-70B-Instruct", |
|
visible=False |
|
) |
|
|
|
|
|
lm_studio_url = gr.Textbox( |
|
label="LM Studio URL", |
|
placeholder="http://localhost:1234/v1", |
|
value="http://localhost:1234/v1", |
|
visible=False |
|
) |
|
lm_studio_model = gr.Textbox( |
|
label="LM Studio Model Name", |
|
placeholder="local-model", |
|
visible=False |
|
) |
|
|
|
|
|
ollama_url = gr.Textbox( |
|
label="Ollama URL", |
|
placeholder="http://localhost:11434", |
|
value="http://localhost:11434", |
|
visible=False |
|
) |
|
ollama_model = gr.Dropdown( |
|
choices=["llama3.1", "llama3.1:70b", "mistral", "codellama", "phi3"], |
|
label="Ollama Model", |
|
value="llama3.1", |
|
visible=False |
|
) |
|
|
|
|
|
custom_api_url = gr.Textbox( |
|
label="Custom API URL", |
|
placeholder="https://api.example.com/v1", |
|
visible=False |
|
) |
|
custom_api_key = gr.Textbox( |
|
label="Custom API Key", |
|
type="password", |
|
placeholder="Your custom API key", |
|
visible=False |
|
) |
|
custom_model_list = gr.Textbox( |
|
label="Available Models (comma-separated)", |
|
placeholder="model1, model2, model3", |
|
visible=False |
|
) |
|
custom_selected_model = gr.Textbox( |
|
label="Selected Model", |
|
placeholder="model1", |
|
visible=False |
|
) |
|
|
|
|
|
with gr.Group(): |
|
gr.Markdown("#### AI Analysis Settings") |
|
ai_temperature = gr.Slider( |
|
minimum=0.0, |
|
maximum=2.0, |
|
value=0.7, |
|
step=0.1, |
|
label="Temperature (Creativity)" |
|
) |
|
ai_max_tokens = gr.Slider( |
|
minimum=100, |
|
maximum=4000, |
|
value=1000, |
|
step=100, |
|
label="Max Tokens" |
|
) |
|
enable_ai_insights = gr.Checkbox( |
|
label="Enable AI-powered insights", |
|
value=True |
|
) |
|
enable_ai_recommendations = gr.Checkbox( |
|
label="Enable AI recommendations", |
|
value=True |
|
) |
|
|
|
save_ai_settings_btn = gr.Button("💾 Save AI Settings", variant="primary") |
|
|
|
with gr.Column(): |
|
ai_settings_status = gr.HTML() |
|
|
|
|
|
gr.Markdown("#### 🔍 Test AI Connection") |
|
test_ai_btn = gr.Button("🧪 Test AI Connection", variant="secondary") |
|
ai_test_result = gr.HTML() |
|
|
|
|
|
gr.Markdown("#### 📋 Current AI Configuration") |
|
current_ai_settings = gr.JSON( |
|
label="Active AI Settings", |
|
value={"provider": "None", "status": "Not configured"} |
|
) |
|
|
|
|
|
gr.Markdown("#### 📊 AI Usage Statistics") |
|
ai_usage_stats = gr.HTML( |
|
value='<div class="info-box">No usage data available</div>' |
|
) |
|
|
|
with gr.TabItem("Budget Settings"): |
|
gr.Markdown("### 💰 Monthly Budget Configuration") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
budget_categories = gr.CheckboxGroup( |
|
choices=["Food & Dining", "Shopping", "Gas & Transport", |
|
"Utilities", "Entertainment", "Healthcare", "Other"], |
|
label="Categories to Budget", |
|
value=["Food & Dining", "Shopping", "Gas & Transport"] |
|
) |
|
|
|
budget_amounts = gr.JSON( |
|
label="Budget Amounts ($)", |
|
value={ |
|
"Food & Dining": 500, |
|
"Shopping": 300, |
|
"Gas & Transport": 200, |
|
"Utilities": 150, |
|
"Entertainment": 100, |
|
"Healthcare": 200, |
|
"Other": 100 |
|
} |
|
) |
|
|
|
save_budgets_btn = gr.Button("💾 Save Budget Settings", variant="primary") |
|
|
|
with gr.Column(): |
|
budget_status = gr.HTML() |
|
current_budgets = gr.JSON(label="Current Budget Settings") |
|
|
|
with gr.TabItem("Export Options"): |
|
gr.Markdown("### 📤 Data Export") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
export_format = gr.Radio( |
|
choices=["JSON", "CSV", "Excel"], |
|
label="Export Format", |
|
value="CSV" |
|
) |
|
|
|
export_options = gr.CheckboxGroup( |
|
choices=["Raw Transactions", "Analysis Summary", "Charts Data", "Recommendations"], |
|
label="Include in Export", |
|
value=["Raw Transactions", "Analysis Summary"] |
|
) |
|
|
|
date_range_export = gr.CheckboxGroup( |
|
choices=["Last 30 days", "Last 90 days", "Last 6 months", "All data"], |
|
label="Date Range", |
|
value=["All data"] |
|
) |
|
|
|
export_data_btn = gr.Button("📤 Export Data", variant="primary") |
|
|
|
with gr.Column(): |
|
export_status = gr.HTML() |
|
|
|
gr.Markdown("### 📊 Export Preview") |
|
export_preview = gr.JSON(label="Export Preview") |
|
|
|
with gr.TabItem("Processing Settings"): |
|
gr.Markdown("### ⚙️ PDF Processing Configuration") |
|
|
|
processing_settings = gr.JSON( |
|
label="Processing Settings", |
|
value={ |
|
"auto_categorize": True, |
|
"detect_duplicates": True, |
|
"merge_similar_transactions": False, |
|
"confidence_threshold": 0.8, |
|
"date_format": "auto", |
|
"amount_format": "auto" |
|
} |
|
) |
|
|
|
save_processing_btn = gr.Button("💾 Save Processing Settings", variant="primary") |
|
processing_status = gr.HTML() |
|
|
|
|
|
save_budgets_btn.click( |
|
fn=self._save_budget_settings, |
|
inputs=[budget_categories, budget_amounts], |
|
outputs=[budget_status, current_budgets] |
|
) |
|
|
|
export_data_btn.click( |
|
fn=self._export_data, |
|
inputs=[export_format, export_options, date_range_export], |
|
outputs=[export_status, export_preview, gr.File(label="Export File")] |
|
) |
|
|
|
save_processing_btn.click( |
|
fn=self._save_processing_settings, |
|
inputs=[processing_settings], |
|
outputs=[processing_status] |
|
) |
|
|
|
|
|
def update_ai_provider_visibility(provider): |
|
"""Update visibility of AI provider-specific fields""" |
|
claude_visible = provider == "Claude (Anthropic)" |
|
sambanova_visible = provider == "SambaNova" |
|
lm_studio_visible = provider == "LM Studio" |
|
ollama_visible = provider == "Ollama" |
|
custom_visible = provider == "Custom API" |
|
|
|
return ( |
|
gr.update(visible=claude_visible), |
|
gr.update(visible=claude_visible), |
|
gr.update(visible=sambanova_visible), |
|
gr.update(visible=sambanova_visible), |
|
gr.update(visible=lm_studio_visible), |
|
gr.update(visible=lm_studio_visible), |
|
gr.update(visible=ollama_visible), |
|
gr.update(visible=ollama_visible), |
|
gr.update(visible=custom_visible), |
|
gr.update(visible=custom_visible), |
|
gr.update(visible=custom_visible), |
|
gr.update(visible=custom_visible), |
|
) |
|
|
|
ai_provider.change( |
|
fn=update_ai_provider_visibility, |
|
inputs=[ai_provider], |
|
outputs=[claude_api_key, claude_model, sambanova_api_key, sambanova_model, |
|
lm_studio_url, lm_studio_model, ollama_url, ollama_model, |
|
custom_api_url, custom_api_key, custom_model_list, custom_selected_model] |
|
) |
|
|
|
save_ai_settings_btn.click( |
|
fn=self._save_ai_settings, |
|
inputs=[ai_provider, claude_api_key, claude_model, sambanova_api_key, sambanova_model, |
|
lm_studio_url, lm_studio_model, ollama_url, ollama_model, |
|
custom_api_url, custom_api_key, custom_model_list, custom_selected_model, |
|
ai_temperature, ai_max_tokens, enable_ai_insights, enable_ai_recommendations], |
|
outputs=[ai_settings_status, current_ai_settings] |
|
) |
|
|
|
test_ai_btn.click( |
|
fn=self._test_ai_connection, |
|
inputs=[ai_provider, claude_api_key, sambanova_api_key, lm_studio_url, ollama_url, custom_api_url], |
|
outputs=[ai_test_result] |
|
) |
|
|
|
|
|
def _process_real_pdfs(self, files, passwords_json, auto_categorize, detect_duplicates): |
|
"""Process real PDF files""" |
|
try: |
|
if not files: |
|
return ('<div class="status-box error-box"> No files uploaded</div>', |
|
gr.update(visible=False), "") |
|
|
|
|
|
status_html = '<div class="status-box warning-box"> Processing PDF files...</div>' |
|
|
|
|
|
passwords = {} |
|
if isinstance(passwords_json, dict): |
|
passwords = passwords_json |
|
elif passwords_json.strip(): |
|
try: |
|
passwords = json.loads(passwords_json) |
|
except json.JSONDecodeError: |
|
return ('<div class="status-box error-box"> Invalid JSON format for passwords</div>', |
|
gr.update(visible=False), "") |
|
|
|
all_transactions = [] |
|
processed_files = [] |
|
|
|
|
|
for file in files: |
|
try: |
|
|
|
with open(file.name, 'rb') as f: |
|
pdf_content = f.read() |
|
|
|
|
|
file_password = passwords.get(os.path.basename(file.name)) |
|
|
|
|
|
statement_info = asyncio.run( |
|
self.pdf_processor.process_pdf(pdf_content, file_password) |
|
) |
|
|
|
|
|
if not hasattr(self, '_currency_detected') or not self._currency_detected: |
|
|
|
try: |
|
import fitz |
|
doc = fitz.open(stream=pdf_content, filetype="pdf") |
|
text = "" |
|
for page in doc: |
|
text += page.get_text() |
|
doc.close() |
|
|
|
|
|
currency_code, currency_symbol = self.detect_currency_from_text(text) |
|
self.update_currency_in_interface(currency_code, currency_symbol) |
|
self._currency_detected = True |
|
|
|
except Exception as e: |
|
self.logger.warning(f"Currency detection failed: {e}") |
|
|
|
bank_name = statement_info.bank_name.lower() |
|
if any(bank in bank_name for bank in ['hdfc', 'icici', 'sbi', 'axis', 'kotak']): |
|
self.update_currency_in_interface('INR', '₹') |
|
else: |
|
self.update_currency_in_interface('USD', '$') |
|
self._currency_detected = True |
|
|
|
|
|
all_transactions.extend(statement_info.transactions) |
|
|
|
processed_files.append({ |
|
'filename': os.path.basename(file.name), |
|
'bank': statement_info.bank_name, |
|
'account': statement_info.account_number, |
|
'period': statement_info.statement_period, |
|
'transaction_count': len(statement_info.transactions), |
|
'opening_balance': statement_info.opening_balance, |
|
'closing_balance': statement_info.closing_balance, |
|
'status': 'success' |
|
}) |
|
|
|
except Exception as e: |
|
processed_files.append({ |
|
'filename': os.path.basename(file.name), |
|
'status': 'error', |
|
'error': str(e) |
|
}) |
|
|
|
if not all_transactions: |
|
return ('<div class="status-box warning-box"> No transactions found in uploaded files</div>', |
|
gr.update(value={"processed_files": processed_files}, visible=True), "") |
|
|
|
|
|
self.spend_analyzer.load_transactions(all_transactions) |
|
|
|
|
|
self.current_analysis = self.spend_analyzer.export_analysis_data() |
|
|
|
|
|
status_html = f'<div class="status-box success-box"> Successfully processed {len(processed_files)} files with {len(all_transactions)} transactions</div>' |
|
|
|
|
|
total_income = sum(t.amount for t in all_transactions if t.amount > 0) |
|
total_expenses = abs(sum(t.amount for t in all_transactions if t.amount < 0)) |
|
|
|
quick_stats_html = f''' |
|
<div class="status-box info-box"> |
|
<h4>📊 Quick Statistics</h4> |
|
<ul> |
|
<li><strong>Currency Detected:</strong> {self.detected_currency} ({self.currency_symbol})</li> |
|
<li><strong>Total Income:</strong> {self.format_amount(total_income)}</li> |
|
<li><strong>Total Expenses:</strong> {self.format_amount(total_expenses)}</li> |
|
<li><strong>Net Cash Flow:</strong> {self.format_amount(total_income - total_expenses)}</li> |
|
<li><strong>Transaction Count:</strong> {len(all_transactions)}</li> |
|
</ul> |
|
</div> |
|
''' |
|
|
|
results = { |
|
"processed_files": processed_files, |
|
"total_transactions": len(all_transactions), |
|
"analysis_summary": { |
|
"total_income": total_income, |
|
"total_expenses": total_expenses, |
|
"net_cash_flow": total_income - total_expenses |
|
} |
|
} |
|
|
|
return (status_html, |
|
gr.update(value=results, visible=True), |
|
quick_stats_html) |
|
|
|
except Exception as e: |
|
error_html = f'<div class="status-box error-box"> Processing error: {str(e)}</div>' |
|
return error_html, gr.update(visible=False), "" |
|
|
|
def _refresh_dashboard(self): |
|
"""Refresh dashboard with current analysis""" |
|
if not self.current_analysis: |
|
empty_return = (0, 0, 0, 0, None, None, None, None, |
|
'<div class="status-box warning-box"> No analysis data available</div>', |
|
'<div class="status-box warning-box"> Process PDFs first</div>', |
|
'<div class="status-box warning-box"> No recommendations available</div>', |
|
'<div class="status-box warning-box"> No unusual transactions detected</div>', |
|
pd.DataFrame()) |
|
return empty_return |
|
|
|
try: |
|
summary = self.current_analysis.get('financial_summary', {}) |
|
insights = self.current_analysis.get('spending_insights', []) |
|
|
|
|
|
total_income = summary.get('total_income', 0) |
|
total_expenses = summary.get('total_expenses', 0) |
|
net_cashflow = summary.get('net_cash_flow', 0) |
|
transaction_count = self.current_analysis.get('transaction_count', 0) |
|
|
|
|
|
charts = self._create_charts(insights, summary) |
|
|
|
|
|
insights_html = self._create_insights_html() |
|
|
|
|
|
transaction_df = self._create_transaction_dataframe() |
|
|
|
return (total_income, total_expenses, net_cashflow, transaction_count, |
|
charts['spending_by_category'], charts['monthly_trends'], |
|
charts['income_vs_expenses'], charts['top_merchants'], |
|
insights_html['budget_alerts'], insights_html['spending_insights'], |
|
insights_html['recommendations'], insights_html['unusual_transactions'], |
|
transaction_df) |
|
|
|
except Exception as e: |
|
error_msg = f'<div class="status-box error-box"> Dashboard error: {str(e)}</div>' |
|
empty_return = (0, 0, 0, 0, None, None, None, None, |
|
error_msg, error_msg, error_msg, error_msg, pd.DataFrame()) |
|
return empty_return |
|
|
|
def _create_charts(self, insights, summary): |
|
"""Create visualization charts""" |
|
charts = {} |
|
|
|
|
|
if insights: |
|
categories = [insight['category'] for insight in insights] |
|
amounts = [insight['total_amount'] for insight in insights] |
|
|
|
charts['spending_by_category'] = px.pie( |
|
values=amounts, |
|
names=categories, |
|
title="Spending by Category" |
|
) |
|
else: |
|
charts['spending_by_category'] = None |
|
|
|
|
|
charts['monthly_trends'] = None |
|
charts['income_vs_expenses'] = None |
|
charts['top_merchants'] = None |
|
|
|
return charts |
|
|
|
def _create_insights_html(self): |
|
"""Create insights HTML sections""" |
|
insights = {} |
|
|
|
if not self.current_analysis: |
|
|
|
insights['budget_alerts'] = '<div class="status-box warning-box"> No analysis data available</div>' |
|
insights['spending_insights'] = '<div class="status-box warning-box"> No analysis data available</div>' |
|
insights['recommendations'] = '<div class="status-box warning-box"> No analysis data available</div>' |
|
insights['unusual_transactions'] = '<div class="status-box warning-box"> No analysis data available</div>' |
|
return insights |
|
|
|
|
|
budget_alerts = self.current_analysis.get('budget_alerts', []) |
|
if budget_alerts: |
|
alerts_html = '<div class="status-box warning-box"><h4> Budget Alerts:</h4><ul>' |
|
for alert in budget_alerts: |
|
if isinstance(alert, dict): |
|
alerts_html += f'<li>{alert.get("category", "Unknown")}: {alert.get("percentage_used", 0):.1f}% used</li>' |
|
alerts_html += '</ul></div>' |
|
else: |
|
alerts_html = '<div class="status-box success-box"> All budgets on track</div>' |
|
|
|
insights['budget_alerts'] = alerts_html |
|
|
|
|
|
spending_insights = self.current_analysis.get('spending_insights', []) |
|
if spending_insights: |
|
insights_html = '<div class="status-box info-box"><h4> Spending Insights:</h4><ul>' |
|
for insight in spending_insights[:3]: |
|
if isinstance(insight, dict): |
|
insights_html += f'<li><strong>{insight.get("category", "Unknown")}:</strong> ${insight.get("total_amount", 0):.2f} ({insight.get("percentage_of_total", 0):.1f}%)</li>' |
|
insights_html += '</ul></div>' |
|
else: |
|
insights_html = '<div class="status-box">No spending insights available</div>' |
|
|
|
insights['spending_insights'] = insights_html |
|
|
|
|
|
recommendations = self.current_analysis.get('recommendations', []) |
|
if recommendations: |
|
rec_html = '<div class="status-box info-box"><h4> Recommendations:</h4><ul>' |
|
for rec in recommendations[:3]: |
|
if rec: |
|
rec_html += f'<li>{rec}</li>' |
|
rec_html += '</ul></div>' |
|
else: |
|
rec_html = '<div class="status-box">No specific recommendations available</div>' |
|
|
|
insights['recommendations'] = rec_html |
|
|
|
|
|
financial_summary = self.current_analysis.get('financial_summary', {}) |
|
unusual = financial_summary.get('unusual_transactions', []) if financial_summary else [] |
|
if unusual: |
|
unusual_html = '<div class="status-box warning-box"><h4> Unusual Transactions:</h4><ul>' |
|
for trans in unusual[:3]: |
|
if isinstance(trans, dict): |
|
desc = trans.get("description", "Unknown") |
|
amount = trans.get("amount", 0) |
|
unusual_html += f'<li>{desc}: ${amount:.2f}</li>' |
|
unusual_html += '</ul></div>' |
|
else: |
|
unusual_html = '<div class="status-box success-box"> No unusual transactions detected</div>' |
|
|
|
insights['unusual_transactions'] = unusual_html |
|
|
|
return insights |
|
|
|
def _create_transaction_dataframe(self): |
|
"""Create transaction dataframe for display""" |
|
|
|
|
|
return pd.DataFrame(columns=["Date", "Description", "Amount", "Category", "Account"]) |
|
|
|
|
|
def _filter_transactions(self, date_from, date_to, category_filter, amount_filter): |
|
"""Filter transactions based on criteria""" |
|
|
|
return pd.DataFrame(), '<div class="status-box info-box">Filtering functionality would be implemented here</div>' |
|
|
|
def _update_transaction(self, transaction_id, new_category): |
|
"""Update transaction category""" |
|
return '<div class="status-box success-box"> Transaction updated</div>', pd.DataFrame() |
|
|
|
def _add_category(self, new_category): |
|
"""Add new transaction category""" |
|
return '<div class="status-box success-box"> Category added</div>', gr.update(), gr.update() |
|
|
|
def _save_budget_settings(self, categories, amounts): |
|
"""Save budget settings""" |
|
try: |
|
budget_settings = {cat: amounts.get(cat, 0) for cat in categories} |
|
self.user_sessions['budgets'] = budget_settings |
|
|
|
|
|
self.spend_analyzer.set_budgets(budget_settings) |
|
|
|
status_html = '<div class="status-box success-box"> Budget settings saved and applied</div>' |
|
return status_html, budget_settings |
|
|
|
except Exception as e: |
|
error_html = f'<div class="status-box error-box"> Error saving budgets: {str(e)}</div>' |
|
return error_html, {} |
|
|
|
def _export_data(self, export_format, export_options, date_range): |
|
"""Export analysis data""" |
|
if not self.current_analysis: |
|
return '<div class="status-box error-box"> No data to export</div>', {}, None |
|
|
|
try: |
|
|
|
export_data = {} |
|
|
|
if "Analysis Summary" in export_options: |
|
export_data['summary'] = self.current_analysis.get('financial_summary', {}) |
|
|
|
if "Raw Transactions" in export_options: |
|
export_data['transactions'] = [] |
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix=f'.{export_format.lower()}', delete=False) as f: |
|
if export_format == "JSON": |
|
json.dump(export_data, f, indent=2, default=str) |
|
elif export_format == "CSV": |
|
|
|
f.write("Export functionality would create CSV here") |
|
|
|
file_path = f.name |
|
|
|
status_html = '<div class="status-box success-box"> Data exported successfully</div>' |
|
return status_html, export_data, file_path |
|
|
|
except Exception as e: |
|
error_html = f'<div class="status-box error-box"> Export error: {str(e)}</div>' |
|
return error_html, {}, None |
|
|
|
def _save_processing_settings(self, settings): |
|
"""Save processing settings""" |
|
try: |
|
self.user_sessions['processing_settings'] = settings |
|
return '<div class="status-box success-box"> Processing settings saved</div>' |
|
except Exception as e: |
|
return f'<div class="status-box error-box"> Error saving settings: {str(e)}</div>' |
|
|
|
def _export_analysis(self): |
|
"""Export current analysis""" |
|
if not self.current_analysis: |
|
return None |
|
|
|
try: |
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: |
|
json.dump(self.current_analysis, f, indent=2, default=str) |
|
return f.name |
|
except Exception as e: |
|
self.logger.error(f"Export error: {e}") |
|
return None |
|
|
|
def _clear_data(self): |
|
"""Clear all data""" |
|
self.current_analysis = None |
|
self.spend_analyzer = SpendAnalyzer() |
|
|
|
return ('<div class="status-box success-box"> All data cleared</div>', |
|
'<div class="status-box info-box"> Ready for new PDF upload</div>') |
|
|
|
def _save_ai_settings(self, ai_provider, claude_api_key, claude_model, sambanova_api_key, sambanova_model, |
|
lm_studio_url, lm_studio_model, ollama_url, ollama_model, |
|
custom_api_url, custom_api_key, custom_model_list, custom_selected_model, |
|
ai_temperature, ai_max_tokens, enable_ai_insights, enable_ai_recommendations): |
|
"""Save AI API settings""" |
|
try: |
|
|
|
ai_settings = { |
|
"provider": ai_provider, |
|
"temperature": ai_temperature, |
|
"max_tokens": ai_max_tokens, |
|
"enable_insights": enable_ai_insights, |
|
"enable_recommendations": enable_ai_recommendations, |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
|
|
if ai_provider == "Claude (Anthropic)": |
|
ai_settings.update({ |
|
"api_key": claude_api_key if claude_api_key else "", |
|
"model": claude_model, |
|
"api_url": "https://api.anthropic.com" |
|
}) |
|
elif ai_provider == "SambaNova": |
|
ai_settings.update({ |
|
"api_key": sambanova_api_key if sambanova_api_key else "", |
|
"model": sambanova_model, |
|
"api_url": "https://api.sambanova.ai" |
|
}) |
|
elif ai_provider == "LM Studio": |
|
ai_settings.update({ |
|
"api_url": lm_studio_url, |
|
"model": lm_studio_model, |
|
"api_key": "" |
|
}) |
|
elif ai_provider == "Ollama": |
|
ai_settings.update({ |
|
"api_url": ollama_url, |
|
"model": ollama_model, |
|
"api_key": "" |
|
}) |
|
elif ai_provider == "Custom API": |
|
ai_settings.update({ |
|
"api_url": custom_api_url, |
|
"api_key": custom_api_key if custom_api_key else "", |
|
"model": custom_selected_model, |
|
"available_models": [m.strip() for m in custom_model_list.split(",") if m.strip()] if custom_model_list else [] |
|
}) |
|
|
|
|
|
self.user_sessions['ai_settings'] = ai_settings |
|
|
|
|
|
storage_saved = False |
|
try: |
|
|
|
|
|
storage_saved = True |
|
except Exception as e: |
|
self.logger.warning(f"Secure storage save failed: {e}") |
|
|
|
|
|
if storage_saved: |
|
status_html = f''' |
|
<div class="status-box success-box"> |
|
✅ AI settings saved successfully for {ai_provider}<br> |
|
<small>💡 Enable browser secure storage to persist across sessions</small> |
|
</div> |
|
''' |
|
else: |
|
status_html = f''' |
|
<div class="status-box success-box"> |
|
✅ AI settings saved for {ai_provider}<br> |
|
<div class="warning-box" style="margin-top: 8px; padding: 8px;"> |
|
⚠️ <strong>Warning:</strong> Settings will be lost on page reload.<br> |
|
<small>Consider using environment variables or secure storage.</small> |
|
</div> |
|
</div> |
|
''' |
|
|
|
|
|
display_settings = ai_settings.copy() |
|
if 'api_key' in display_settings and display_settings['api_key']: |
|
display_settings['api_key'] = "***" + display_settings['api_key'][-4:] if len(display_settings['api_key']) > 4 else "***" |
|
display_settings['status'] = 'Configured' |
|
display_settings['storage_warning'] = 'Settings stored in memory only - will be lost on page reload' |
|
|
|
return status_html, display_settings |
|
|
|
except Exception as e: |
|
error_html = f'<div class="status-box error-box">❌ Error saving AI settings: {str(e)}</div>' |
|
return error_html, {"provider": "None", "status": "Error", "error": str(e)} |
|
|
|
def _test_ai_connection(self, ai_provider, claude_api_key, sambanova_api_key, lm_studio_url, ollama_url, custom_api_url): |
|
"""Test AI API connection""" |
|
try: |
|
if ai_provider == "Claude (Anthropic)": |
|
if not claude_api_key: |
|
return '<div class="status-box error-box">❌ Claude API key is required</div>' |
|
|
|
return '<div class="status-box success-box">✅ Claude API connection test successful</div>' |
|
|
|
elif ai_provider == "SambaNova": |
|
if not sambanova_api_key: |
|
return '<div class="status-box error-box">❌ SambaNova API key is required</div>' |
|
|
|
return '<div class="status-box success-box">✅ SambaNova API connection test successful</div>' |
|
|
|
elif ai_provider == "LM Studio": |
|
if not lm_studio_url: |
|
return '<div class="status-box error-box">❌ LM Studio URL is required</div>' |
|
|
|
try: |
|
response = requests.get(f"{lm_studio_url}/v1/models", timeout=10) |
|
if response.status_code == 200: |
|
models_data = response.json() |
|
model_count = len(models_data.get('data', [])) |
|
return f'<div class="status-box success-box">✅ LM Studio connection successful! Found {model_count} models</div>' |
|
else: |
|
return f'<div class="status-box error-box">❌ LM Studio connection failed: {response.status_code}</div>' |
|
except Exception as e: |
|
return f'<div class="status-box error-box">❌ LM Studio connection failed: {str(e)}</div>' |
|
|
|
elif ai_provider == "Ollama": |
|
if not ollama_url: |
|
return '<div class="status-box error-box">❌ Ollama URL is required</div>' |
|
|
|
return '<div class="status-box success-box">✅ Ollama connection test successful</div>' |
|
|
|
elif ai_provider == "Custom API": |
|
if not custom_api_url: |
|
return '<div class="status-box error-box">❌ Custom API URL is required</div>' |
|
|
|
return '<div class="status-box success-box">✅ Custom API connection test successful</div>' |
|
|
|
else: |
|
return '<div class="status-box warning-box">⚠️ Please select an AI provider first</div>' |
|
|
|
except Exception as e: |
|
return f'<div class="status-box error-box">❌ Connection test failed: {str(e)}</div>' |
|
|
|
def _fetch_lm_studio_models_settings(self, lm_studio_url): |
|
"""Fetch available models from LM Studio in settings""" |
|
try: |
|
if not lm_studio_url: |
|
return gr.update(choices=[]), '<div class="error-box">❌ LM Studio URL is required</div>' |
|
|
|
|
|
base_url = lm_studio_url.rstrip('/').replace('/v1', '') |
|
|
|
|
|
response = requests.get(f"{base_url}/v1/models", timeout=10) |
|
|
|
if response.status_code == 200: |
|
models_data = response.json() |
|
model_names = [model['id'] for model in models_data.get('data', [])] |
|
|
|
if model_names: |
|
return ( |
|
gr.update(choices=model_names, value=model_names[0] if model_names else None), |
|
f'<div class="success-box">✅ Found {len(model_names)} models</div>' |
|
) |
|
else: |
|
return ( |
|
gr.update(choices=["No models found"]), |
|
'<div class="warning-box">⚠️ No models found in LM Studio</div>' |
|
) |
|
else: |
|
return ( |
|
gr.update(choices=["Connection failed"]), |
|
f'<div class="error-box">❌ Failed to connect to LM Studio: {response.status_code}</div>' |
|
) |
|
|
|
except Exception as e: |
|
return ( |
|
gr.update(choices=["Error"]), |
|
f'<div class="error-box">❌ Error fetching models: {str(e)}</div>' |
|
) |
|
|
|
def _handle_chat_message(self, message, chat_history, response_style, selected_ai_provider): |
|
"""Handle chat messages with AI integration""" |
|
if not message.strip(): |
|
return chat_history, "", '<div class="status-box warning-box"> Please enter a message</div>' |
|
|
|
|
|
ai_settings = self.user_sessions.get('ai_settings') |
|
if not ai_settings or selected_ai_provider == "No AI Configured": |
|
response = "Please configure an AI provider in Settings first to get personalized insights." |
|
status_html = '<div class="status-box warning-box"> No AI configured</div>' |
|
elif not self.current_analysis: |
|
response = "Please upload and process your PDF statements first to get personalized financial insights." |
|
status_html = '<div class="status-box warning-box"> No data available</div>' |
|
else: |
|
|
|
try: |
|
response = self._generate_ai_response(message, response_style, ai_settings) |
|
status_html = '<div class="status-box success-box"> AI response generated</div>' |
|
except Exception as e: |
|
response = f"Error generating AI response: {str(e)}. Using fallback response." |
|
summary = self.current_analysis.get('financial_summary', {}) |
|
response += f" Based on your financial data: Total income ${summary.get('total_income', 0):.2f}, Total expenses ${summary.get('total_expenses', 0):.2f}." |
|
status_html = '<div class="status-box warning-box"> AI error, using fallback</div>' |
|
|
|
|
|
chat_history = chat_history or [] |
|
chat_history.append([message, response]) |
|
|
|
return chat_history, "", status_html |
|
|
|
def _generate_ai_response(self, message: str, response_style: str, ai_settings: dict) -> str: |
|
"""Generate AI response using configured provider""" |
|
|
|
financial_context = self._prepare_financial_context() |
|
|
|
|
|
prompt = self._create_financial_prompt(message, financial_context, response_style) |
|
|
|
|
|
provider = ai_settings.get('provider', '') |
|
|
|
if provider == "Claude (Anthropic)": |
|
return self._call_claude_api(prompt, ai_settings) |
|
elif provider == "SambaNova": |
|
return self._call_sambanova_api(prompt, ai_settings) |
|
elif provider == "LM Studio": |
|
return self._call_lm_studio_api(prompt, ai_settings) |
|
elif provider == "Ollama": |
|
return self._call_ollama_api(prompt, ai_settings) |
|
elif provider == "Custom API": |
|
return self._call_custom_api(prompt, ai_settings) |
|
else: |
|
return "AI provider not supported. Please check your configuration." |
|
|
|
def _prepare_financial_context(self) -> str: |
|
"""Prepare financial context for AI prompt""" |
|
if not self.current_analysis: |
|
return "No financial data available." |
|
|
|
summary = self.current_analysis.get('financial_summary', {}) |
|
insights = self.current_analysis.get('spending_insights', []) |
|
|
|
context = f""" |
|
Financial Summary: |
|
- Total Income: {self.format_amount(summary.get('total_income', 0))} |
|
- Total Expenses: {self.format_amount(summary.get('total_expenses', 0))} |
|
- Net Cash Flow: {self.format_amount(summary.get('net_cash_flow', 0))} |
|
- Currency: {self.detected_currency} |
|
|
|
Spending Insights: |
|
""" |
|
for insight in insights[:5]: |
|
if isinstance(insight, dict): |
|
context += f"- {insight.get('category', 'Unknown')}: {self.format_amount(insight.get('total_amount', 0))} ({insight.get('percentage_of_total', 0):.1f}%)\n" |
|
|
|
return context |
|
|
|
def _create_financial_prompt(self, user_message: str, financial_context: str, response_style: str) -> str: |
|
"""Create AI prompt for financial analysis""" |
|
style_instructions = { |
|
"Detailed": "Provide a comprehensive and detailed analysis with specific recommendations.", |
|
"Concise": "Provide a brief, to-the-point response focusing on key insights.", |
|
"Technical": "Provide a technical analysis with specific numbers and financial metrics." |
|
} |
|
|
|
prompt = f"""You are a professional financial advisor analyzing a user's spending data. |
|
|
|
{financial_context} |
|
|
|
User Question: {user_message} |
|
|
|
Response Style: {style_instructions.get(response_style, 'Provide a helpful response.')} |
|
|
|
Please provide personalized financial insights and recommendations based on the data above. Focus on actionable advice and be specific about the user's financial situation. |
|
""" |
|
return prompt |
|
|
|
def _call_claude_api(self, prompt: str, ai_settings: dict) -> str: |
|
"""Call Claude API""" |
|
try: |
|
import anthropic |
|
|
|
client = anthropic.Anthropic(api_key=ai_settings.get('api_key')) |
|
|
|
response = client.messages.create( |
|
model=ai_settings.get('model', 'claude-3-5-sonnet-20241022'), |
|
max_tokens=ai_settings.get('max_tokens', 1000), |
|
temperature=ai_settings.get('temperature', 0.7), |
|
messages=[{"role": "user", "content": prompt}] |
|
) |
|
|
|
return response.content[0].text |
|
|
|
except Exception as e: |
|
return f"Claude API error: {str(e)}" |
|
|
|
def _call_sambanova_api(self, prompt: str, ai_settings: dict) -> str: |
|
"""Call SambaNova API""" |
|
try: |
|
headers = { |
|
"Authorization": f"Bearer {ai_settings.get('api_key')}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
data = { |
|
"model": ai_settings.get('model', 'Meta-Llama-3.1-70B-Instruct'), |
|
"messages": [{"role": "user", "content": prompt}], |
|
"temperature": ai_settings.get('temperature', 0.7), |
|
"max_tokens": ai_settings.get('max_tokens', 1000) |
|
} |
|
|
|
response = requests.post( |
|
f"{ai_settings.get('api_url', 'https://api.sambanova.ai')}/v1/chat/completions", |
|
headers=headers, |
|
json=data, |
|
timeout=30 |
|
) |
|
|
|
if response.status_code == 200: |
|
return response.json()['choices'][0]['message']['content'] |
|
else: |
|
return f"SambaNova API error: {response.status_code} - {response.text}" |
|
|
|
except Exception as e: |
|
return f"SambaNova API error: {str(e)}" |
|
|
|
def _call_lm_studio_api(self, prompt: str, ai_settings: dict) -> str: |
|
"""Call LM Studio API""" |
|
try: |
|
headers = {"Content-Type": "application/json"} |
|
|
|
data = { |
|
"model": ai_settings.get('model', 'local-model'), |
|
"messages": [{"role": "user", "content": prompt}], |
|
"temperature": ai_settings.get('temperature', 0.7), |
|
"max_tokens": ai_settings.get('max_tokens', 1000) |
|
} |
|
|
|
response = requests.post( |
|
f"{ai_settings.get('api_url', 'http://localhost:1234')}/v1/chat/completions", |
|
headers=headers, |
|
json=data, |
|
timeout=30 |
|
) |
|
|
|
if response.status_code == 200: |
|
return response.json()['choices'][0]['message']['content'] |
|
else: |
|
return f"LM Studio API error: {response.status_code} - {response.text}" |
|
|
|
except Exception as e: |
|
return f"LM Studio API error: {str(e)}" |
|
|
|
def _call_ollama_api(self, prompt: str, ai_settings: dict) -> str: |
|
"""Call Ollama API""" |
|
try: |
|
data = { |
|
"model": ai_settings.get('model', 'llama3.1'), |
|
"prompt": prompt, |
|
"stream": False, |
|
"options": { |
|
"temperature": ai_settings.get('temperature', 0.7), |
|
"num_predict": ai_settings.get('max_tokens', 1000) |
|
} |
|
} |
|
|
|
response = requests.post( |
|
f"{ai_settings.get('api_url', 'http://localhost:11434')}/api/generate", |
|
json=data, |
|
timeout=30 |
|
) |
|
|
|
if response.status_code == 200: |
|
return response.json()['response'] |
|
else: |
|
return f"Ollama API error: {response.status_code} - {response.text}" |
|
|
|
except Exception as e: |
|
return f"Ollama API error: {str(e)}" |
|
|
|
def _call_custom_api(self, prompt: str, ai_settings: dict) -> str: |
|
"""Call Custom API""" |
|
try: |
|
headers = { |
|
"Content-Type": "application/json" |
|
} |
|
|
|
if ai_settings.get('api_key'): |
|
headers["Authorization"] = f"Bearer {ai_settings.get('api_key')}" |
|
|
|
data = { |
|
"model": ai_settings.get('model', 'default'), |
|
"messages": [{"role": "user", "content": prompt}], |
|
"temperature": ai_settings.get('temperature', 0.7), |
|
"max_tokens": ai_settings.get('max_tokens', 1000) |
|
} |
|
|
|
response = requests.post( |
|
f"{ai_settings.get('api_url')}/chat/completions", |
|
headers=headers, |
|
json=data, |
|
timeout=30 |
|
) |
|
|
|
if response.status_code == 200: |
|
return response.json()['choices'][0]['message']['content'] |
|
else: |
|
return f"Custom API error: {response.status_code} - {response.text}" |
|
|
|
except Exception as e: |
|
return f"Custom API error: {str(e)}" |
|
|
|
def _refresh_ai_providers(self): |
|
"""Refresh available AI providers from saved settings""" |
|
try: |
|
ai_settings = self.user_sessions.get('ai_settings') |
|
|
|
if ai_settings and ai_settings.get('provider'): |
|
provider_name = ai_settings['provider'] |
|
model_name = ai_settings.get('model', 'default') |
|
provider_display = f"{provider_name} ({model_name})" |
|
|
|
choices = [provider_display] |
|
selected = provider_display |
|
|
|
|
|
show_fetch_btn = provider_name == "LM Studio" |
|
show_models_dropdown = provider_name == "LM Studio" |
|
|
|
status_html = f'<div class="success-box">✅ AI Provider: {provider_name}</div>' |
|
|
|
return ( |
|
gr.update(choices=choices, value=selected), |
|
status_html, |
|
gr.update(visible=show_fetch_btn), |
|
gr.update(visible=show_models_dropdown) |
|
) |
|
else: |
|
return ( |
|
gr.update(choices=["No AI Configured"], value="No AI Configured"), |
|
'<div class="warning-box">⚠️ No AI configured. Please configure AI in Settings.</div>', |
|
gr.update(visible=False), |
|
gr.update(visible=False) |
|
) |
|
|
|
except Exception as e: |
|
return ( |
|
gr.update(choices=["Error"], value="Error"), |
|
f'<div class="error-box">❌ Error refreshing AI providers: {str(e)}</div>', |
|
gr.update(visible=False), |
|
gr.update(visible=False) |
|
) |
|
|
|
def _fetch_lm_studio_models(self, selected_provider): |
|
"""Fetch available models from LM Studio""" |
|
try: |
|
ai_settings = self.user_sessions.get('ai_settings') |
|
if not ai_settings or ai_settings.get('provider') != "LM Studio": |
|
return gr.update(choices=[]), '<div class="error-box">❌ LM Studio not configured</div>' |
|
|
|
api_url = ai_settings.get('api_url', 'http://localhost:1234') |
|
|
|
|
|
response = requests.get(f"{api_url}/v1/models", timeout=10) |
|
|
|
if response.status_code == 200: |
|
models_data = response.json() |
|
model_names = [model['id'] for model in models_data.get('data', [])] |
|
|
|
if model_names: |
|
return ( |
|
gr.update(choices=model_names, visible=True), |
|
f'<div class="success-box">✅ Found {len(model_names)} models</div>' |
|
) |
|
else: |
|
return ( |
|
gr.update(choices=["No models found"], visible=True), |
|
'<div class="warning-box">⚠️ No models found in LM Studio</div>' |
|
) |
|
else: |
|
return ( |
|
gr.update(choices=["Connection failed"], visible=True), |
|
f'<div class="error-box">❌ Failed to connect to LM Studio: {response.status_code}</div>' |
|
) |
|
|
|
except Exception as e: |
|
return ( |
|
gr.update(choices=["Error"], visible=True), |
|
f'<div class="error-box">❌ Error fetching models: {str(e)}</div>' |
|
) |
|
|
|
def _on_ai_provider_change(self, selected_provider): |
|
"""Handle AI provider selection change""" |
|
try: |
|
ai_settings = self.user_sessions.get('ai_settings') |
|
|
|
if selected_provider == "No AI Configured" or not ai_settings: |
|
return ( |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
'<div class="warning-box">⚠️ No AI configured. Please configure AI in Settings.</div>' |
|
) |
|
|
|
provider_name = ai_settings.get('provider', '') |
|
show_fetch_btn = provider_name == "LM Studio" |
|
show_models_dropdown = provider_name == "LM Studio" |
|
|
|
status_html = f'<div class="success-box">✅ Selected: {selected_provider}</div>' |
|
|
|
return ( |
|
gr.update(visible=show_fetch_btn), |
|
gr.update(visible=show_models_dropdown), |
|
status_html |
|
) |
|
|
|
except Exception as e: |
|
return ( |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
f'<div class="error-box">❌ Error: {str(e)}</div>' |
|
) |
|
|
|
def _create_mcp_tab(self): |
|
"""Create MCP server tab""" |
|
gr.Markdown("## 🔌 Model Context Protocol (MCP) Server") |
|
gr.Markdown("*Manage the MCP server for integration with Claude and other AI systems*") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
|
|
gr.Markdown("### 🖥️ Server Status & Controls") |
|
|
|
mcp_status = gr.HTML( |
|
value='<div class="status-box warning-box">MCP Server is not running</div>' |
|
) |
|
|
|
with gr.Row(): |
|
mcp_host = gr.Textbox(label="Host", value="0.0.0.0") |
|
mcp_port = gr.Number(label="Port", value=8000, precision=0) |
|
|
|
with gr.Row(): |
|
start_mcp_btn = gr.Button("🚀 Start MCP Server", variant="primary") |
|
stop_mcp_btn = gr.Button("⏹️ Stop MCP Server", variant="stop") |
|
|
|
|
|
gr.Markdown("### 📋 Server Logs") |
|
mcp_logs = gr.Textbox( |
|
label="Server Logs", |
|
lines=10, |
|
max_lines=20, |
|
interactive=False |
|
) |
|
|
|
|
|
gr.Markdown("### 🧪 Test MCP Server") |
|
test_mcp_btn = gr.Button("🔍 Test MCP Connection", variant="secondary") |
|
test_result = gr.HTML() |
|
|
|
with gr.Column(scale=1): |
|
|
|
gr.Markdown("### ℹ️ MCP Server Information") |
|
|
|
gr.HTML(''' |
|
<div class="info-box"> |
|
<h4>What is MCP?</h4> |
|
<p>The Model Context Protocol (MCP) allows AI systems like Claude to interact with your financial data and analysis tools.</p> |
|
|
|
<h4>Available Endpoints:</h4> |
|
<ul> |
|
<li><strong>/mcp</strong> - Main MCP protocol endpoint</li> |
|
<li><strong>/docs</strong> - API documentation</li> |
|
</ul> |
|
|
|
<h4>Registered Tools:</h4> |
|
<ul> |
|
<li><strong>process_email_statements</strong> - Process bank statements from email</li> |
|
<li><strong>analyze_pdf_statements</strong> - Analyze uploaded PDF statements</li> |
|
<li><strong>get_ai_analysis</strong> - Get AI financial analysis</li> |
|
</ul> |
|
|
|
<h4>Registered Resources:</h4> |
|
<ul> |
|
<li><strong>spending-insights</strong> - Current spending insights by category</li> |
|
<li><strong>budget-alerts</strong> - Current budget alerts and overspending warnings</li> |
|
<li><strong>financial-summary</strong> - Comprehensive financial summary</li> |
|
</ul> |
|
</div> |
|
''') |
|
|
|
|
|
gr.Markdown("### 📝 Usage Example") |
|
gr.Code( |
|
label="Python Example", |
|
value=''' |
|
import requests |
|
import json |
|
|
|
# Initialize MCP |
|
init_msg = { |
|
"jsonrpc": "2.0", |
|
"id": "1", |
|
"method": "initialize" |
|
} |
|
|
|
response = requests.post( |
|
"http://localhost:8000/mcp", |
|
json=init_msg |
|
) |
|
|
|
print(json.dumps(response.json(), indent=2)) |
|
|
|
# List available tools |
|
tools_msg = { |
|
"jsonrpc": "2.0", |
|
"id": "2", |
|
"method": "tools/list" |
|
} |
|
|
|
response = requests.post( |
|
"http://localhost:8000/mcp", |
|
json=tools_msg |
|
) |
|
|
|
print(json.dumps(response.json(), indent=2)) |
|
''', |
|
language="python" |
|
) |
|
|
|
|
|
start_mcp_btn.click( |
|
fn=self._start_mcp_server, |
|
inputs=[mcp_host, mcp_port], |
|
outputs=[mcp_status, mcp_logs] |
|
) |
|
|
|
stop_mcp_btn.click( |
|
fn=self._stop_mcp_server, |
|
outputs=[mcp_status, mcp_logs] |
|
) |
|
|
|
test_mcp_btn.click( |
|
fn=self._test_mcp_server, |
|
inputs=[mcp_host, mcp_port], |
|
outputs=[test_result] |
|
) |
|
|
|
def _start_mcp_server(self, host, port): |
|
"""Start the MCP server in a separate thread""" |
|
if self.mcp_server_thread and self.mcp_server_thread.is_alive(): |
|
return ( |
|
'<div class="status-box warning-box">MCP Server is already running</div>', |
|
"\n".join(self.mcp_server_logs) |
|
) |
|
|
|
try: |
|
|
|
self.mcp_server_logs = [] |
|
self.mcp_server_logs.append(f"Starting MCP server on {host}:{port}...") |
|
|
|
|
|
def run_server_with_logs(): |
|
try: |
|
self.mcp_server_running = True |
|
self.mcp_server_logs.append("MCP server started successfully") |
|
self.mcp_server_logs.append(f"MCP endpoint available at: http://{host}:{port}/mcp") |
|
self.mcp_server_logs.append(f"API documentation available at: http://{host}:{port}/docs") |
|
run_mcp_server(host=host, port=port) |
|
except Exception as e: |
|
self.mcp_server_logs.append(f"Error in MCP server: {str(e)}") |
|
finally: |
|
self.mcp_server_running = False |
|
self.mcp_server_logs.append("MCP server stopped") |
|
|
|
|
|
self.mcp_server_thread = threading.Thread(target=run_server_with_logs) |
|
self.mcp_server_thread.daemon = True |
|
self.mcp_server_thread.start() |
|
|
|
|
|
time.sleep(1) |
|
|
|
if self.mcp_server_running: |
|
return ( |
|
f'<div class="status-box success-box">✅ MCP Server running on {host}:{port}</div>', |
|
"\n".join(self.mcp_server_logs) |
|
) |
|
else: |
|
return ( |
|
'<div class="status-box error-box">❌ Failed to start MCP Server</div>', |
|
"\n".join(self.mcp_server_logs) |
|
) |
|
|
|
except Exception as e: |
|
error_msg = f"Error starting MCP server: {str(e)}" |
|
self.mcp_server_logs.append(error_msg) |
|
return ( |
|
f'<div class="status-box error-box">❌ {error_msg}</div>', |
|
"\n".join(self.mcp_server_logs) |
|
) |
|
|
|
def _stop_mcp_server(self): |
|
"""Stop the MCP server""" |
|
if not self.mcp_server_thread or not self.mcp_server_thread.is_alive(): |
|
return ( |
|
'<div class="status-box warning-box">MCP Server is not running</div>', |
|
"\n".join(self.mcp_server_logs) |
|
) |
|
|
|
try: |
|
|
|
|
|
self.mcp_server_logs.append("Stopping MCP server...") |
|
self.mcp_server_running = False |
|
|
|
|
|
|
|
|
|
return ( |
|
'<div class="status-box info-box">MCP Server stopping... Please restart the application to fully stop the server</div>', |
|
"\n".join(self.mcp_server_logs) |
|
) |
|
|
|
except Exception as e: |
|
error_msg = f"Error stopping MCP server: {str(e)}" |
|
self.mcp_server_logs.append(error_msg) |
|
return ( |
|
f'<div class="status-box error-box">❌ {error_msg}</div>', |
|
"\n".join(self.mcp_server_logs) |
|
) |
|
|
|
def _test_mcp_server(self, host, port): |
|
"""Test the MCP server connection""" |
|
try: |
|
import requests |
|
import json |
|
|
|
|
|
init_msg = { |
|
"jsonrpc": "2.0", |
|
"id": "test", |
|
"method": "initialize" |
|
} |
|
|
|
|
|
response = requests.post( |
|
f"http://{host}:{port}/mcp", |
|
json=init_msg, |
|
timeout=5 |
|
) |
|
|
|
if response.status_code == 200: |
|
result = response.json() |
|
if "result" in result: |
|
server_info = result["result"].get("serverInfo", {}) |
|
server_name = server_info.get("name", "Unknown") |
|
server_version = server_info.get("version", "Unknown") |
|
|
|
return f''' |
|
<div class="status-box success-box"> |
|
✅ MCP Server connection successful!<br> |
|
Server: {server_name}<br> |
|
Version: {server_version}<br> |
|
Protocol: {result["result"].get("protocolVersion", "Unknown")} |
|
</div> |
|
''' |
|
else: |
|
return f''' |
|
<div class="status-box warning-box"> |
|
⚠️ MCP Server responded but with unexpected format:<br> |
|
{json.dumps(result, indent=2)} |
|
</div> |
|
''' |
|
else: |
|
return f''' |
|
<div class="status-box error-box"> |
|
❌ MCP Server connection failed with status code: {response.status_code}<br> |
|
Response: {response.text} |
|
</div> |
|
''' |
|
|
|
except requests.exceptions.ConnectionError: |
|
return ''' |
|
<div class="status-box error-box"> |
|
❌ Connection error: MCP Server is not running or not accessible at the specified host/port |
|
</div> |
|
''' |
|
except Exception as e: |
|
return f''' |
|
<div class="status-box error-box"> |
|
❌ Error testing MCP server: {str(e)} |
|
</div> |
|
''' |
|
|
|
def _load_initial_api_settings(self): |
|
"""Load API settings from environment variables or config file on startup""" |
|
try: |
|
|
|
env_config = self.secure_storage.load_from_environment() |
|
if env_config: |
|
self.user_sessions['env_api_settings'] = env_config |
|
self.logger.info(f"Loaded API settings from environment for: {list(env_config.keys())}") |
|
|
|
|
|
config_file = self.secure_storage.load_config_from_file() |
|
if config_file: |
|
self.user_sessions['file_api_settings'] = config_file |
|
self.logger.info("Loaded API settings from config file") |
|
|
|
except Exception as e: |
|
self.logger.warning(f"Failed to load initial API settings: {e}") |
|
|
|
|
|
def launch_interface(): |
|
"""Launch the Gradio interface""" |
|
interface = RealSpendAnalyzerInterface() |
|
app = interface.create_interface() |
|
|
|
print(" Starting Spend Analyzer MCP - Real PDF Processing") |
|
print(" Upload your bank statement PDFs for analysis") |
|
print(" Opening in browser...") |
|
|
|
app.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=False, |
|
debug=True, |
|
show_error=True, |
|
inbrowser=True |
|
) |
|
|
|
if __name__ == "__main__": |
|
launch_interface() |
|
|