Spaces:
Runtime error
Runtime error
import os | |
import io | |
import json | |
import uuid | |
import wave | |
import tempfile | |
from datetime import datetime | |
from typing import Optional, Dict, Any | |
from pathlib import Path | |
from fastapi import FastAPI, File, UploadFile, HTTPException, Request | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import HTMLResponse, FileResponse | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from pydantic import BaseModel | |
import uvicorn | |
import requests | |
import numpy as np | |
from groq import Groq | |
import dotenv | |
# Load environment variables | |
dotenv.load_dotenv() | |
app = FastAPI(title="Voice AI Backend") | |
# Mount static files | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# CORS configuration | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # Configure appropriately for production | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Configuration | |
GROQ_MODEL = "whisper-large-v3-turbo" | |
AI_API_ENDPOINT = "https://nitinbot001-crop-rag-api.hf.space/api/query" | |
GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
# Initialize Groq client | |
groq_client = Groq(api_key=GROQ_API_KEY) if GROQ_API_KEY else None | |
# Store conversation history (in production, use a database) | |
conversation_history = [] | |
class TranscriptionResponse(BaseModel): | |
success: bool | |
user_query: str | |
ai_response: str | |
metadata: Dict[str, Any] | |
session_id: str | |
timestamp: str | |
error: Optional[str] = None | |
class ConversationHistory(BaseModel): | |
sessions: list | |
async def read_root(): | |
return FileResponse('index.html') | |
async def process_audio(audio: UploadFile = File(...)): | |
""" | |
Process audio file: transcribe and get AI response | |
""" | |
session_id = str(uuid.uuid4()) | |
timestamp = datetime.now().isoformat() | |
try: | |
# Validate file type | |
if not audio.filename.endswith(('.wav', '.webm', '.mp3', '.m4a', '.ogg')): | |
raise HTTPException(status_code=400, detail="Invalid audio format") | |
# Read audio data | |
audio_data = await audio.read() | |
# Save temporary file for processing | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: | |
# If it's webm (from browser), we need to save it as-is | |
# Groq can handle webm directly | |
if audio.filename.endswith('.webm'): | |
tmp_file.write(audio_data) | |
tmp_path = tmp_file.name | |
else: | |
# For wav files, write directly | |
tmp_file.write(audio_data) | |
tmp_path = tmp_file.name | |
# Transcribe with Groq | |
user_query = await transcribe_audio(tmp_path, audio.filename) | |
# Get AI response | |
ai_response = await get_ai_response(user_query) | |
# Create metadata | |
metadata = { | |
"audio_size": len(audio_data), | |
"audio_format": audio.filename.split('.')[-1], | |
"transcription_model": GROQ_MODEL, | |
"ai_endpoint": AI_API_ENDPOINT, | |
"processing_time": datetime.now().isoformat(), | |
} | |
# Store in history | |
conversation_history.append({ | |
"session_id": session_id, | |
"timestamp": timestamp, | |
"user_query": user_query, | |
"ai_response": ai_response, | |
"metadata": metadata | |
}) | |
# Clean up | |
os.unlink(tmp_path) | |
return TranscriptionResponse( | |
success=True, | |
user_query=user_query, | |
ai_response=ai_response, | |
metadata=metadata, | |
session_id=session_id, | |
timestamp=timestamp | |
) | |
except Exception as e: | |
return TranscriptionResponse( | |
success=False, | |
user_query="", | |
ai_response="", | |
metadata={}, | |
session_id=session_id, | |
timestamp=timestamp, | |
error=str(e) | |
) | |
async def transcribe_audio(file_path: str, original_filename: str) -> str: | |
""" | |
Transcribe audio using Groq Whisper | |
""" | |
if not groq_client: | |
raise HTTPException(status_code=500, detail="GROQ_API_KEY not configured") | |
try: | |
with open(file_path, "rb") as audio_file: | |
transcription = groq_client.audio.transcriptions.create( | |
file=(original_filename, audio_file.read()), | |
model=GROQ_MODEL, | |
response_format="text" | |
) | |
# Handle different response formats | |
if hasattr(transcription, 'text'): | |
text = transcription.text | |
elif isinstance(transcription, dict): | |
text = transcription.get('text', '') | |
else: | |
text = str(transcription) | |
return text.strip() | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}") | |
async def get_ai_response(query: str) -> str: | |
""" | |
Get response from AI API | |
""" | |
try: | |
headers = {"Content-Type": "application/json"} | |
payload = {"query": query} | |
response = requests.post( | |
AI_API_ENDPOINT, | |
json=payload, | |
headers=headers, | |
timeout=30 | |
) | |
response.raise_for_status() | |
result = response.json() | |
# Extract text from response (adjust based on actual API response format) | |
if isinstance(result, dict): | |
# Try different possible response keys | |
ai_text = result.get('response', | |
result.get('answer', | |
result.get('text', | |
result.get('message', str(result))))) | |
else: | |
ai_text = str(result) | |
return ai_text | |
except requests.exceptions.Timeout: | |
return "I'm sorry, the AI service is taking too long to respond. Please try again." | |
except Exception as e: | |
return f"I encountered an error while processing your request: {str(e)}" | |
async def get_history(): | |
""" | |
Get conversation history | |
""" | |
return ConversationHistory(sessions=conversation_history[-20:]) # Last 20 conversations | |
async def clear_history(): | |
""" | |
Clear conversation history | |
""" | |
global conversation_history | |
conversation_history = [] | |
return {"message": "History cleared"} | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=7860) |