import os import requests from typing import Dict, Any, List # Optionally load from .env if python-dotenv is installed try: from dotenv import load_dotenv load_dotenv() except ImportError: pass WAAPI_BASE_URL = 'https://waapi.app/api/v1' WAAPI_INSTANCE_ID = os.environ.get('WAAPI_INSTANCE_ID', '') # Set in .env WAAPI_API_TOKEN = os.environ.get('WAAPI_API_TOKEN', '') # Set in .env HEADERS = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {WAAPI_API_TOKEN}' } def send_message(to: str, text: str) -> Dict[str, Any]: """Send a WhatsApp message using WaAPI documented endpoint.""" url = f"{WAAPI_BASE_URL}/instances/{WAAPI_INSTANCE_ID}/client/action/send-message" payload = { 'chatId': to, # Should be in format '1234567890@c.us' 'message': text } response = requests.post(url, json=payload, headers=HEADERS) response.raise_for_status() return response.json() def receive_message(payload: Dict[str, Any]) -> Dict[str, Any]: """Parse a WaAPI webhook payload for a new incoming message.""" # See: https://waapi.readme.io/reference/message-event message = payload.get('data', {}) return { 'text': message.get('body', ''), 'sender': message.get('from', ''), 'to': message.get('to', ''), 'message_id': message.get('id', ''), 'event': payload } def mark_message_as_read(chat_id: str) -> Dict[str, Any]: """Mark all messages in a chat as seen using WaAPI documented endpoint.""" url = f"{WAAPI_BASE_URL}/instances/{WAAPI_INSTANCE_ID}/client/action/send-seen" payload = { 'chatId': chat_id } response = requests.post(url, json=payload, headers=HEADERS) response.raise_for_status() return response.json() def get_last_messages(chat_id: str, limit: int = 10) -> List[Dict[str, Any]]: """Fetch the last N messages from a WhatsApp chat using WaAPI documented endpoint.""" url = f"{WAAPI_BASE_URL}/instances/{WAAPI_INSTANCE_ID}/client/action/fetch-messages" payload = { 'chatId': chat_id, 'limit': limit } response = requests.post(url, json=payload, headers=HEADERS) response.raise_for_status() return response.json().get('messages', [])