|
""" |
|
Modal.com Deployment Configuration for Spend Analyzer MCP |
|
""" |
|
import modal |
|
import os |
|
from typing import Dict, Any, Optional |
|
import json |
|
import asyncio |
|
from datetime import datetime |
|
import logging |
|
|
|
|
|
app = modal.App("spend-analyzer-mcp") |
|
|
|
|
|
image = ( |
|
modal.Image.debian_slim(python_version="3.11") |
|
.pip_install([ |
|
"fastapi", |
|
"uvicorn", |
|
"gradio", |
|
"pandas", |
|
"numpy", |
|
"PyPDF2", |
|
"PyMuPDF", |
|
"anthropic", |
|
"python-multipart", |
|
"aiofiles", |
|
"python-dotenv", |
|
"imaplib2", |
|
"email-validator", |
|
"pydantic", |
|
"websockets", |
|
"asyncio-mqtt" |
|
]) |
|
.apt_install(["tesseract-ocr", "tesseract-ocr-eng"]) |
|
) |
|
|
|
|
|
secrets = [ |
|
modal.Secret.from_name("anthropic-api-key"), |
|
modal.Secret.from_name("email-credentials"), |
|
] |
|
|
|
|
|
volume = modal.Volume.from_name("spend-analyzer-data", create_if_missing=True) |
|
|
|
@app.function( |
|
image=image, |
|
secrets=secrets, |
|
volumes={"/data": volume}, |
|
timeout=300, |
|
memory=2048, |
|
cpu=2.0 |
|
) |
|
def process_bank_statements(email_config: Dict, days_back: int = 30, passwords: Optional[Dict] = None): |
|
""" |
|
Modal function to process bank statements from email |
|
""" |
|
import sys |
|
sys.path.append("/data") |
|
|
|
from email_processor import EmailProcessor, PDFProcessor |
|
from spend_analyzer import SpendAnalyzer |
|
|
|
try: |
|
|
|
email_processor = EmailProcessor(email_config) |
|
pdf_processor = PDFProcessor() |
|
analyzer = SpendAnalyzer() |
|
|
|
|
|
emails = asyncio.run(email_processor.fetch_bank_emails(days_back)) |
|
|
|
all_transactions = [] |
|
processed_statements = [] |
|
|
|
for email_msg in emails: |
|
try: |
|
|
|
attachments = asyncio.run(email_processor.extract_attachments(email_msg)) |
|
|
|
for filename, content, file_type in attachments: |
|
if file_type == 'pdf': |
|
|
|
password = None |
|
if passwords and filename in passwords: |
|
password = passwords[filename] |
|
|
|
try: |
|
statement_info = asyncio.run(pdf_processor.process_pdf(content, password)) |
|
all_transactions.extend(statement_info.transactions) |
|
processed_statements.append({ |
|
'filename': filename, |
|
'bank': statement_info.bank_name, |
|
'account': statement_info.account_number, |
|
'period': statement_info.statement_period, |
|
'transaction_count': len(statement_info.transactions) |
|
}) |
|
|
|
except ValueError as e: |
|
if "password" in str(e).lower(): |
|
|
|
processed_statements.append({ |
|
'filename': filename, |
|
'status': 'password_required', |
|
'error': str(e) |
|
}) |
|
else: |
|
processed_statements.append({ |
|
'filename': filename, |
|
'status': 'error', |
|
'error': str(e) |
|
}) |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing email: {e}") |
|
continue |
|
|
|
|
|
if all_transactions: |
|
analyzer.load_transactions(all_transactions) |
|
analysis_data = analyzer.export_analysis_data() |
|
else: |
|
analysis_data = {'message': 'No transactions found'} |
|
|
|
return { |
|
'processed_statements': processed_statements, |
|
'total_transactions': len(all_transactions), |
|
'analysis': analysis_data, |
|
'timestamp': datetime.now().isoformat() |
|
} |
|
|
|
except Exception as e: |
|
logging.error(f"Error in process_bank_statements: {e}") |
|
return {'error': str(e)} |
|
|
|
@app.function( |
|
image=image, |
|
secrets=secrets, |
|
timeout=60 |
|
) |
|
def analyze_uploaded_statements(pdf_contents: Dict[str, bytes], passwords: Optional[Dict] = None): |
|
""" |
|
Modal function to analyze directly uploaded PDF statements |
|
""" |
|
from pdf_processor import PDFProcessor |
|
from spend_analyzer import SpendAnalyzer |
|
|
|
try: |
|
pdf_processor = PDFProcessor() |
|
analyzer = SpendAnalyzer() |
|
|
|
all_transactions = [] |
|
processed_files = [] |
|
|
|
for filename, content in pdf_contents.items(): |
|
try: |
|
password = passwords.get(filename) if passwords else None |
|
statement_info = asyncio.run(pdf_processor.process_pdf(content, password)) |
|
|
|
all_transactions.extend(statement_info.transactions) |
|
processed_files.append({ |
|
'filename': filename, |
|
'bank': statement_info.bank_name, |
|
'account': statement_info.account_number, |
|
'transaction_count': len(statement_info.transactions), |
|
'status': 'success' |
|
}) |
|
|
|
except Exception as e: |
|
processed_files.append({ |
|
'filename': filename, |
|
'status': 'error', |
|
'error': str(e) |
|
}) |
|
|
|
|
|
if all_transactions: |
|
analyzer.load_transactions(all_transactions) |
|
analysis_data = analyzer.export_analysis_data() |
|
else: |
|
analysis_data = {'message': 'No transactions found'} |
|
|
|
return { |
|
'processed_files': processed_files, |
|
'total_transactions': len(all_transactions), |
|
'analysis': analysis_data |
|
} |
|
|
|
except Exception as e: |
|
return {'error': str(e)} |
|
|
|
@app.function( |
|
image=image, |
|
secrets=secrets, |
|
volumes={"/data": volume}, |
|
timeout=30 |
|
) |
|
def get_claude_analysis(analysis_data: Dict, user_question: str = ""): |
|
""" |
|
Modal function to get Claude's analysis of spending data |
|
""" |
|
import anthropic |
|
|
|
try: |
|
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"]) |
|
|
|
|
|
context = f""" |
|
Financial Analysis Data: |
|
{json.dumps(analysis_data, indent=2, default=str)} |
|
|
|
User Question: {user_question if user_question else "Please provide a comprehensive analysis of my spending patterns and recommendations."} |
|
""" |
|
|
|
response = client.messages.create( |
|
model="claude-3-sonnet-20240229", |
|
max_tokens=1500, |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": f""" |
|
You are a financial advisor analyzing bank statement data. |
|
Based on the provided financial data, give insights about: |
|
|
|
1. Spending patterns and trends |
|
2. Budget adherence and alerts |
|
3. Unusual transactions that need attention |
|
4. Specific recommendations for improvement |
|
5. Answer to the user's specific question if provided |
|
|
|
Be specific, actionable, and highlight both positive aspects and areas for improvement. |
|
|
|
{context} |
|
""" |
|
} |
|
] |
|
) |
|
|
|
return { |
|
'claude_analysis': response.content[0].text, |
|
'usage': response.usage.input_tokens + response.usage.output_tokens |
|
} |
|
|
|
except Exception as e: |
|
return {'error': f"Claude API error: {str(e)}"} |
|
|
|
@app.function( |
|
image=image, |
|
volumes={"/data": volume}, |
|
timeout=30 |
|
) |
|
def save_user_data(user_id: str, data: Dict): |
|
""" |
|
Save user analysis data to persistent storage |
|
""" |
|
try: |
|
import json |
|
import os |
|
|
|
user_dir = f"/data/users/{user_id}" |
|
os.makedirs(user_dir, exist_ok=True) |
|
|
|
|
|
with open(f"{user_dir}/analysis.json", "w") as f: |
|
json.dump(data, f, indent=2, default=str) |
|
|
|
|
|
with open(f"{user_dir}/last_updated.txt", "w") as f: |
|
f.write(datetime.now().isoformat()) |
|
|
|
return {"status": "saved", "path": user_dir} |
|
|
|
except Exception as e: |
|
return {"error": str(e)} |
|
|
|
@app.function( |
|
image=image, |
|
volumes={"/data": volume}, |
|
timeout=30 |
|
) |
|
def load_user_data(user_id: str): |
|
""" |
|
Load user analysis data from persistent storage |
|
""" |
|
try: |
|
import json |
|
|
|
user_dir = f"/data/users/{user_id}" |
|
analysis_file = f"{user_dir}/analysis.json" |
|
|
|
if os.path.exists(analysis_file): |
|
with open(analysis_file, "r") as f: |
|
data = json.load(f) |
|
|
|
|
|
last_updated = None |
|
if os.path.exists(f"{user_dir}/last_updated.txt"): |
|
with open(f"{user_dir}/last_updated.txt", "r") as f: |
|
last_updated = f.read().strip() |
|
|
|
return { |
|
"data": data, |
|
"last_updated": last_updated, |
|
"status": "found" |
|
} |
|
else: |
|
return {"status": "not_found"} |
|
|
|
except Exception as e: |
|
return {"error": str(e)} |
|
|
|
|
|
@app.function( |
|
image=image, |
|
secrets=secrets, |
|
volumes={"/data": volume} |
|
) |
|
@modal.web_endpoint(method="POST") |
|
def mcp_webhook(request_data: Dict): |
|
""" |
|
Webhook endpoint for MCP protocol messages |
|
""" |
|
try: |
|
from mcp_server import MCPServer |
|
|
|
|
|
server = MCPServer() |
|
|
|
|
|
async def process_statements_tool(args): |
|
email_config = args.get('email_config', {}) |
|
days_back = args.get('days_back', 30) |
|
passwords = args.get('passwords', {}) |
|
|
|
result = process_bank_statements.remote(email_config, days_back, passwords) |
|
return result |
|
|
|
async def analyze_pdf_tool(args): |
|
pdf_contents = args.get('pdf_contents', {}) |
|
passwords = args.get('passwords', {}) |
|
|
|
result = analyze_uploaded_statements.remote(pdf_contents, passwords) |
|
return result |
|
|
|
async def get_analysis_tool(args): |
|
analysis_data = args.get('analysis_data', {}) |
|
user_question = args.get('user_question', '') |
|
|
|
result = get_claude_analysis.remote(analysis_data, user_question) |
|
return result |
|
|
|
|
|
server.register_tool("process_email_statements", "Process bank statements from email", process_statements_tool) |
|
server.register_tool("analyze_pdf_statements", "Analyze uploaded PDF statements", analyze_pdf_tool) |
|
server.register_tool("get_claude_analysis", "Get Claude's financial analysis", get_analysis_tool) |
|
|
|
|
|
response = asyncio.run(server.handle_message(request_data)) |
|
return response |
|
|
|
except Exception as e: |
|
return { |
|
"jsonrpc": "2.0", |
|
"id": request_data.get("id"), |
|
"error": { |
|
"code": -32603, |
|
"message": str(e) |
|
} |
|
} |
|
|
|
|
|
@app.local_entrypoint() |
|
def main(): |
|
""" |
|
Local entrypoint for testing Modal functions |
|
""" |
|
print("Testing Modal deployment...") |
|
|
|
|
|
test_data = { |
|
"spending_insights": [], |
|
"recommendations": ["Test recommendation"] |
|
} |
|
|
|
result = get_claude_analysis.remote(test_data, "What do you think about my spending?") |
|
print("Claude analysis result:", result) |
|
|
|
if __name__ == "__main__": |
|
|
|
modal.run(main) |