Spaces:
Runtime error
Runtime error
| """ | |
| MCP Server for Spend Analysis - Core Protocol Implementation | |
| """ | |
| import json | |
| import asyncio | |
| import uvicorn | |
| from fastapi import FastAPI, Request | |
| from typing import Dict, List, Any, Optional, Callable | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| import logging | |
| from spend_analyzer import SpendAnalyzer | |
| # MCP Protocol Types | |
| class MessageType(Enum): | |
| REQUEST = "request" | |
| RESPONSE = "response" | |
| NOTIFICATION = "notification" | |
| class MCPMessage: | |
| jsonrpc: str = "2.0" | |
| id: Optional[str] = None | |
| method: Optional[str] = None | |
| params: Optional[Dict] = None | |
| result: Optional[Any] = None | |
| error: Optional[Dict] = None | |
| class MCPServer: | |
| def __init__(self): | |
| self.tools = {} | |
| self.resources = {} | |
| self.prompts = {} | |
| self.logger = logging.getLogger(__name__) | |
| def register_tool(self, name: str, description: str, handler, input_schema=None): | |
| """Register a tool that Claude can call""" | |
| if input_schema is None: | |
| input_schema = { | |
| "type": "object", | |
| "properties": {}, | |
| "required": [] | |
| } | |
| self.tools[name] = { | |
| "description": description, | |
| "handler": handler, | |
| "input_schema": input_schema | |
| } | |
| def register_resource(self, uri: str, name: str, description: str, handler): | |
| """Register a resource that provides data""" | |
| self.resources[uri] = { | |
| "name": name, | |
| "description": description, | |
| "handler": handler, | |
| "mimeType": "application/json" | |
| } | |
| async def handle_message(self, message: Dict) -> Dict: | |
| """Handle incoming MCP messages""" | |
| try: | |
| method = message.get("method") | |
| params = message.get("params", {}) | |
| msg_id = message.get("id") | |
| if method == "initialize": | |
| return self._handle_initialize(msg_id) | |
| elif method == "tools/list": | |
| return self._handle_list_tools(msg_id) | |
| elif method == "tools/call": | |
| return await self._handle_call_tool(msg_id, params) | |
| elif method == "resources/list": | |
| return self._handle_list_resources(msg_id) | |
| elif method == "resources/read": | |
| return await self._handle_read_resource(msg_id, params) | |
| else: | |
| return self._error_response(msg_id, -32601, f"Method not found: {method}") | |
| except Exception as e: | |
| self.logger.error(f"Error handling message: {e}") | |
| return self._error_response(message.get("id"), -32603, str(e)) | |
| def _handle_initialize(self, msg_id: Optional[str]) -> Dict: | |
| """Handle MCP initialization""" | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": msg_id, | |
| "result": { | |
| "protocolVersion": "2024-11-05", | |
| "capabilities": { | |
| "tools": {}, | |
| "resources": {}, | |
| "prompts": {} | |
| }, | |
| "serverInfo": { | |
| "name": "spend-analyzer-mcp-bmt", | |
| "version": "1.0.0" | |
| } | |
| } | |
| } | |
| def _handle_list_tools(self, msg_id: Optional[str]) -> Dict: | |
| """List available tools""" | |
| tools_list = [] | |
| for name, tool in self.tools.items(): | |
| tools_list.append({ | |
| "name": name, | |
| "description": tool["description"], | |
| "inputSchema": tool["input_schema"] | |
| }) | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": msg_id, | |
| "result": {"tools": tools_list} | |
| } | |
| async def _handle_call_tool(self, msg_id: Optional[str], params: Dict) -> Dict: | |
| """Execute a tool call""" | |
| tool_name = params.get("name") | |
| arguments = params.get("arguments", {}) | |
| if tool_name not in self.tools: | |
| return self._error_response(msg_id, -32602, f"Tool not found: {tool_name}") | |
| try: | |
| handler = self.tools[tool_name]["handler"] | |
| result = await handler(arguments) | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": msg_id, | |
| "result": { | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": json.dumps(result) | |
| } | |
| ] | |
| } | |
| } | |
| except Exception as e: | |
| return self._error_response(msg_id, -32603, f"Tool execution failed: {str(e)}") | |
| def _handle_list_resources(self, msg_id: Optional[str]) -> Dict: | |
| """List available resources""" | |
| resources_list = [] | |
| for uri, resource in self.resources.items(): | |
| resources_list.append({ | |
| "uri": uri, | |
| "name": resource["name"], | |
| "description": resource["description"], | |
| "mimeType": resource["mimeType"] | |
| }) | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": msg_id, | |
| "result": {"resources": resources_list} | |
| } | |
| async def _handle_read_resource(self, msg_id: Optional[str], params: Dict) -> Dict: | |
| """Read a resource""" | |
| uri = params.get("uri") | |
| if uri not in self.resources: | |
| return self._error_response(msg_id, -32602, f"Resource not found: {uri}") | |
| try: | |
| handler = self.resources[uri]["handler"] | |
| content = await handler() | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": msg_id, | |
| "result": { | |
| "contents": [ | |
| { | |
| "uri": uri, | |
| "mimeType": "application/json", | |
| "text": json.dumps(content, indent=2) | |
| } | |
| ] | |
| } | |
| } | |
| except Exception as e: | |
| return self._error_response(msg_id, -32603, f"Resource read failed: {str(e)}") | |
| def _error_response(self, msg_id: Optional[str], code: int, message: str) -> Dict: | |
| """Create error response""" | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": msg_id, | |
| "error": { | |
| "code": code, | |
| "message": message | |
| } | |
| } | |
| # Register all tools for the MCP server | |
| def register_all_tools(server: MCPServer): | |
| """Register all tools with the MCP server""" | |
| # Process email statements tool | |
| async def process_email_statements_tool(args: Dict) -> Dict: | |
| """Process bank statements from email""" | |
| from email_processor import EmailProcessor, PDFProcessor | |
| email_config = args.get('email_config', {}) | |
| days_back = args.get('days_back', 30) | |
| passwords = args.get('passwords', {}) | |
| try: | |
| # Initialize processors | |
| email_processor = EmailProcessor(email_config) | |
| pdf_processor = PDFProcessor() | |
| analyzer = SpendAnalyzer() | |
| # Fetch emails | |
| emails = await email_processor.fetch_bank_emails(days_back) | |
| all_transactions = [] | |
| processed_statements = [] | |
| for email_msg in emails: | |
| # Extract attachments | |
| attachments = await email_processor.extract_attachments(email_msg) | |
| for filename, content, file_type in attachments: | |
| if file_type == 'pdf': | |
| # Try to process PDF | |
| password = passwords.get(filename) | |
| try: | |
| statement_info = await pdf_processor.process_pdf(content, password) | |
| all_transactions.extend(statement_info.transactions) | |
| processed_statements.append({ | |
| 'filename': filename, | |
| 'bank': statement_info.bank_name, | |
| 'account': statement_info.account_number, | |
| 'transaction_count': len(statement_info.transactions) | |
| }) | |
| except Exception as e: | |
| processed_statements.append({ | |
| 'filename': filename, | |
| 'status': 'error', | |
| 'error': str(e) | |
| }) | |
| # Analyze transactions | |
| if all_transactions: | |
| analyzer.load_transactions(all_transactions) | |
| analysis_data = analyzer.export_analysis_data() | |
| else: | |
| analysis_data = {'message': 'No transactions found'} | |
| return { | |
| 'processed_statements': processed_statements, | |
| 'total_transactions': len(all_transactions), | |
| 'analysis': analysis_data | |
| } | |
| except Exception as e: | |
| return {'error': str(e)} | |
| # Analyze PDF statements tool | |
| async def analyze_pdf_statements_tool(args: Dict) -> Dict: | |
| """Analyze uploaded PDF statements""" | |
| from email_processor import PDFProcessor | |
| pdf_contents = args.get('pdf_contents', {}) | |
| passwords = args.get('passwords', {}) | |
| try: | |
| pdf_processor = PDFProcessor() | |
| analyzer = SpendAnalyzer() | |
| all_transactions = [] | |
| processed_files = [] | |
| for filename, content in pdf_contents.items(): | |
| try: | |
| password = passwords.get(filename) | |
| statement_info = await pdf_processor.process_pdf(content, password) | |
| all_transactions.extend(statement_info.transactions) | |
| processed_files.append({ | |
| 'filename': filename, | |
| 'bank': statement_info.bank_name, | |
| 'account': statement_info.account_number, | |
| 'transaction_count': len(statement_info.transactions), | |
| 'status': 'success' | |
| }) | |
| except Exception as e: | |
| processed_files.append({ | |
| 'filename': filename, | |
| 'status': 'error', | |
| 'error': str(e) | |
| }) | |
| # Analyze transactions | |
| if all_transactions: | |
| analyzer.load_transactions(all_transactions) | |
| analysis_data = analyzer.export_analysis_data() | |
| else: | |
| analysis_data = {'message': 'No transactions found'} | |
| return { | |
| 'processed_files': processed_files, | |
| 'total_transactions': len(all_transactions), | |
| 'analysis': analysis_data | |
| } | |
| except Exception as e: | |
| return {'error': str(e)} | |
| # Get AI analysis tool | |
| async def get_ai_analysis_tool(args: Dict) -> Dict: | |
| """Get AI financial analysis""" | |
| import os | |
| analysis_data = args.get('analysis_data', {}) | |
| user_question = args.get('user_question', '') | |
| provider = args.get('provider', 'claude') | |
| try: | |
| # Prepare context for AI | |
| context = f""" | |
| Financial Analysis Data: | |
| {json.dumps(analysis_data, indent=2, default=str)} | |
| User Question: {user_question if user_question else "Please provide a comprehensive analysis of my spending patterns and recommendations."} | |
| """ | |
| prompt = f""" | |
| You are a financial advisor analyzing bank statement data. | |
| Based on the provided financial data, give insights about: | |
| 1. Spending patterns and trends | |
| 2. Budget adherence and alerts | |
| 3. Unusual transactions that need attention | |
| 4. Specific recommendations for improvement | |
| 5. Answer to the user's specific question if provided | |
| Be specific, actionable, and highlight both positive aspects and areas for improvement. | |
| {context} | |
| """ | |
| if provider.lower() == "claude": | |
| # Call Claude API | |
| try: | |
| import anthropic | |
| client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY", "")) | |
| response = client.messages.create( | |
| model="claude-3-sonnet-20240229", | |
| max_tokens=1500, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ] | |
| ) | |
| # Handle different response formats | |
| try: | |
| # Extract text from Claude response | |
| if hasattr(response, 'content') and response.content: | |
| content_item = response.content[0] | |
| # Handle different Claude API versions | |
| if isinstance(content_item, dict): | |
| if 'text' in content_item: | |
| analysis_text = content_item['text'] | |
| else: | |
| analysis_text = str(content_item) | |
| # Handle object with attributes | |
| elif hasattr(content_item, '__dict__'): | |
| content_dict = vars(content_item) | |
| if 'text' in content_dict: | |
| analysis_text = content_dict['text'] | |
| else: | |
| analysis_text = str(content_item) | |
| else: | |
| analysis_text = str(content_item) | |
| else: | |
| analysis_text = str(response) | |
| except Exception as e: | |
| analysis_text = f"Error parsing Claude response: {str(e)}" | |
| return { | |
| 'ai_analysis': analysis_text, | |
| 'provider': 'claude', | |
| 'model': 'claude-3-sonnet-20240229' | |
| } | |
| except Exception as e: | |
| return {'error': f"Claude API error: {str(e)}"} | |
| elif provider.lower() == "sambanova": | |
| # Call SambaNova API | |
| try: | |
| import openai | |
| # SambaNova uses OpenAI-compatible API | |
| client = openai.OpenAI( | |
| api_key=os.environ.get("SAMBANOVA_API_KEY", ""), | |
| base_url="https://api.sambanova.ai/v1" | |
| ) | |
| response = client.chat.completions.create( | |
| model="Meta-Llama-3.1-8B-Instruct", # SambaNova model | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| max_tokens=1500, | |
| temperature=0.7 | |
| ) | |
| return { | |
| 'ai_analysis': response.choices[0].message.content, | |
| 'provider': 'sambanova', | |
| 'model': 'Meta-Llama-3.1-8B-Instruct' | |
| } | |
| except Exception as e: | |
| return {'error': f"SambaNova API error: {str(e)}"} | |
| else: | |
| return {'error': f"Unsupported provider: {provider}"} | |
| except Exception as e: | |
| return {'error': f"AI API error: {str(e)}"} | |
| # Register tools with proper input schemas | |
| server.register_tool( | |
| "process_email_statements", | |
| "Process bank statements from email", | |
| process_email_statements_tool, | |
| input_schema={ | |
| "type": "object", | |
| "properties": { | |
| "email_config": { | |
| "type": "object", | |
| "properties": { | |
| "email": {"type": "string"}, | |
| "password": {"type": "string"}, | |
| "imap_server": {"type": "string"} | |
| }, | |
| "required": ["email", "password", "imap_server"] | |
| }, | |
| "days_back": {"type": "integer", "default": 30}, | |
| "passwords": { | |
| "type": "object", | |
| "additionalProperties": {"type": "string"} | |
| } | |
| }, | |
| "required": ["email_config"] | |
| } | |
| ) | |
| server.register_tool( | |
| "analyze_pdf_statements", | |
| "Analyze uploaded PDF statements", | |
| analyze_pdf_statements_tool, | |
| input_schema={ | |
| "type": "object", | |
| "properties": { | |
| "pdf_contents": { | |
| "type": "object", | |
| "additionalProperties": {"type": "string", "format": "binary"} | |
| }, | |
| "passwords": { | |
| "type": "object", | |
| "additionalProperties": {"type": "string"} | |
| } | |
| }, | |
| "required": ["pdf_contents"] | |
| } | |
| ) | |
| server.register_tool( | |
| "get_ai_analysis", | |
| "Get AI financial analysis (Claude or SambaNova)", | |
| get_ai_analysis_tool, | |
| input_schema={ | |
| "type": "object", | |
| "properties": { | |
| "analysis_data": {"type": "object"}, | |
| "user_question": {"type": "string"}, | |
| "provider": { | |
| "type": "string", | |
| "enum": ["claude", "sambanova"], | |
| "default": "claude" | |
| } | |
| }, | |
| "required": ["analysis_data"] | |
| } | |
| ) | |
| # Register all resources for the MCP server | |
| def register_all_resources(server: MCPServer): | |
| """Register all resources with the MCP server""" | |
| # Spending insights resource | |
| async def get_spending_insights_resource(): | |
| """Resource handler for spending insights""" | |
| from dataclasses import asdict | |
| analyzer = SpendAnalyzer() | |
| # Try to load sample data if available | |
| try: | |
| import os | |
| import json | |
| sample_path = os.path.join(os.path.dirname(__file__), "sample_data", "transactions.json") | |
| if os.path.exists(sample_path): | |
| with open(sample_path, 'r') as f: | |
| transactions = json.load(f) | |
| analyzer.load_transactions(transactions) | |
| except Exception as e: | |
| logging.warning(f"Could not load sample data: {e}") | |
| # Return empty insights if no data | |
| return [] | |
| # Convert SpendingInsight objects to dictionaries | |
| insights = analyzer.analyze_spending_by_category() | |
| return [asdict(insight) for insight in insights] | |
| # Budget alerts resource | |
| async def get_budget_alerts_resource(): | |
| """Resource handler for budget alerts""" | |
| from dataclasses import asdict | |
| analyzer = SpendAnalyzer() | |
| # Try to load sample data and budgets if available | |
| try: | |
| import os | |
| import json | |
| sample_path = os.path.join(os.path.dirname(__file__), "sample_data", "transactions.json") | |
| budgets_path = os.path.join(os.path.dirname(__file__), "sample_data", "budgets.json") | |
| if os.path.exists(sample_path) and os.path.exists(budgets_path): | |
| with open(sample_path, 'r') as f: | |
| transactions = json.load(f) | |
| with open(budgets_path, 'r') as f: | |
| budgets = json.load(f) | |
| analyzer.load_transactions(transactions) | |
| analyzer.set_budgets(budgets) | |
| except Exception as e: | |
| logging.warning(f"Could not load sample data: {e}") | |
| # Return empty alerts if no data | |
| return [] | |
| # Convert BudgetAlert objects to dictionaries | |
| alerts = analyzer.check_budget_alerts() | |
| return [asdict(alert) for alert in alerts] | |
| # Financial summary resource | |
| async def get_financial_summary_resource(): | |
| """Resource handler for financial summary""" | |
| from dataclasses import asdict | |
| analyzer = SpendAnalyzer() | |
| # Try to load sample data if available | |
| try: | |
| import os | |
| import json | |
| sample_path = os.path.join(os.path.dirname(__file__), "sample_data", "transactions.json") | |
| if os.path.exists(sample_path): | |
| with open(sample_path, 'r') as f: | |
| transactions = json.load(f) | |
| analyzer.load_transactions(transactions) | |
| except Exception as e: | |
| logging.warning(f"Could not load sample data: {e}") | |
| # Return empty summary if no data | |
| return { | |
| "total_income": 0, | |
| "total_expenses": 0, | |
| "net_cash_flow": 0, | |
| "largest_expense": {}, | |
| "most_frequent_category": "", | |
| "unusual_transactions": [], | |
| "monthly_trends": {} | |
| } | |
| # Convert FinancialSummary object to dictionary | |
| summary = analyzer.generate_financial_summary() | |
| return asdict(summary) | |
| # Register resources | |
| server.register_resource( | |
| uri="spending-insights", | |
| name="Spending Insights", | |
| description="Current spending insights by category", | |
| handler=get_spending_insights_resource | |
| ) | |
| server.register_resource( | |
| uri="budget-alerts", | |
| name="Budget Alerts", | |
| description="Current budget alerts and overspending warnings", | |
| handler=get_budget_alerts_resource | |
| ) | |
| server.register_resource( | |
| uri="financial-summary", | |
| name="Financial Summary", | |
| description="Comprehensive financial summary and analysis", | |
| handler=get_financial_summary_resource | |
| ) | |
| # Create FastAPI app for MCP server | |
| def create_mcp_app(): | |
| """Create a FastAPI app for the MCP server""" | |
| app = FastAPI(title="Spend Analyzer MCP Server") | |
| server = MCPServer() | |
| # Register tools and resources | |
| register_all_tools(server) | |
| register_all_resources(server) | |
| async def handle_mcp_request(request: Request): | |
| """Handle MCP protocol requests""" | |
| try: | |
| data = await request.json() | |
| return await server.handle_message(data) | |
| except Exception as e: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": None, | |
| "error": { | |
| "code": -32700, | |
| "message": f"Parse error: {str(e)}" | |
| } | |
| } | |
| async def root(): | |
| """Root endpoint with server info""" | |
| return { | |
| "name": "Spend Analyzer MCP Server", | |
| "version": "1.0.0", | |
| "description": "MCP server for financial analysis", | |
| "endpoints": { | |
| "/mcp": "MCP protocol endpoint", | |
| "/docs": "API documentation" | |
| } | |
| } | |
| return app | |
| # Run standalone MCP server | |
| def run_mcp_server(host='0.0.0.0', port=8000): | |
| """Run a standalone MCP server""" | |
| app = create_mcp_app() | |
| uvicorn.run(app, host=host, port=port) | |
| # Example usage and testing | |
| if __name__ == "__main__": | |
| # Run the standalone MCP server | |
| print("Starting Spend Analyzer MCP Server...") | |
| print("MCP endpoint will be available at: http://localhost:8000/mcp") | |
| print("API documentation will be available at: http://localhost:8000/docs") | |
| run_mcp_server() | |