import os import io import json import uuid import wave import tempfile from datetime import datetime from typing import Optional, Dict, Any from pathlib import Path from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel import uvicorn import requests import numpy as np from groq import Groq import dotenv # Load environment variables dotenv.load_dotenv() app = FastAPI(title="Voice AI Backend") # CORS configuration app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure appropriately for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Configuration GROQ_MODEL = "whisper-large-v3-turbo" AI_API_ENDPOINT = "https://nitinbot001-crop-rag-api.hf.space/api/query" GROQ_API_KEY = os.getenv("GROQ_API_KEY") # Initialize Groq client groq_client = Groq(api_key=GROQ_API_KEY) if GROQ_API_KEY else None # Store conversation history (in production, use a database) conversation_history = [] class TranscriptionResponse(BaseModel): success: bool user_query: str ai_response: str metadata: Dict[str, Any] session_id: str timestamp: str error: Optional[str] = None class ConversationHistory(BaseModel): sessions: list @app.get("/") async def root(): return {"message": "Voice AI Backend API", "status": "online"} @app.post("/api/process-audio", response_model=TranscriptionResponse) async def process_audio(audio: UploadFile = File(...)): """ Process audio file: transcribe and get AI response """ session_id = str(uuid.uuid4()) timestamp = datetime.now().isoformat() try: # Validate file type if not audio.filename.endswith(('.wav', '.webm', '.mp3', '.m4a', '.ogg')): raise HTTPException(status_code=400, detail="Invalid audio format") # Read audio data audio_data = await audio.read() # Save temporary file for processing with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: # If it's webm (from browser), we need to save it as-is # Groq can handle webm directly if audio.filename.endswith('.webm'): tmp_file.write(audio_data) tmp_path = tmp_file.name else: # For wav files, write directly tmp_file.write(audio_data) tmp_path = tmp_file.name # Transcribe with Groq user_query = await transcribe_audio(tmp_path, audio.filename) # Get AI response ai_response = await get_ai_response(user_query) # Create metadata metadata = { "audio_size": len(audio_data), "audio_format": audio.filename.split('.')[-1], "transcription_model": GROQ_MODEL, "ai_endpoint": AI_API_ENDPOINT, "processing_time": datetime.now().isoformat(), } # Store in history conversation_history.append({ "session_id": session_id, "timestamp": timestamp, "user_query": user_query, "ai_response": ai_response, "metadata": metadata }) # Clean up os.unlink(tmp_path) return TranscriptionResponse( success=True, user_query=user_query, ai_response=ai_response, metadata=metadata, session_id=session_id, timestamp=timestamp ) except Exception as e: return TranscriptionResponse( success=False, user_query="", ai_response="", metadata={}, session_id=session_id, timestamp=timestamp, error=str(e) ) async def transcribe_audio(file_path: str, original_filename: str) -> str: """ Transcribe audio using Groq Whisper """ if not groq_client: raise HTTPException(status_code=500, detail="GROQ_API_KEY not configured") try: with open(file_path, "rb") as audio_file: transcription = groq_client.audio.transcriptions.create( file=(original_filename, audio_file.read()), model=GROQ_MODEL, response_format="text" ) # Handle different response formats if hasattr(transcription, 'text'): text = transcription.text elif isinstance(transcription, dict): text = transcription.get('text', '') else: text = str(transcription) return text.strip() except Exception as e: raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}") async def get_ai_response(query: str) -> str: """ Get response from AI API """ try: headers = {"Content-Type": "application/json"} payload = {"query": query} response = requests.post( AI_API_ENDPOINT, json=payload, headers=headers, timeout=30 ) response.raise_for_status() result = response.json() # Extract text from response (adjust based on actual API response format) if isinstance(result, dict): # Try different possible response keys ai_text = result.get('response', result.get('answer', result.get('text', result.get('message', str(result))))) else: ai_text = str(result) return ai_text except requests.exceptions.Timeout: return "I'm sorry, the AI service is taking too long to respond. Please try again." except Exception as e: return f"I encountered an error while processing your request: {str(e)}" @app.get("/api/history", response_model=ConversationHistory) async def get_history(): """ Get conversation history """ return ConversationHistory(sessions=conversation_history[-20:]) # Last 20 conversations @app.delete("/api/history") async def clear_history(): """ Clear conversation history """ global conversation_history conversation_history = [] return {"message": "History cleared"} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)