voicemenuspe / app.py
lokesh341's picture
Update app.py
37d87bb verified
raw
history blame
5.89 kB
import torch
from flask import Flask, render_template, request, jsonify
import json
import os
from transformers import pipeline
from gtts import gTTS
from pydub import AudioSegment
from pydub.silence import detect_nonsilent
from transformers import AutoConfig
import time
from waitress import serve
from simple_salesforce import Salesforce
import requests
app = Flask(__name__)
# Use whisper-small for faster processing and better speed
device = "cuda" if torch.cuda.is_available() else "cpu"
# Create config object to set timeout and other parameters
config = AutoConfig.from_pretrained("openai/whisper-small")
config.update({"timeout": 60})
# Generate required voice prompts
prompts = {
"welcome": "Welcome to Biryani Hub.",
"ask_name": "Tell me your name.",
"ask_email": "Please provide your email address.",
"thank_you": "Thank you for registration."
}
# Function to generate and save audio prompts
def generate_audio_prompt(text, filename):
try:
tts = gTTS(text)
tts.save(os.path.join("static", filename))
except gtts.tts.gTTSError as e:
print(f"Error: {e}")
print("Retrying after 5 seconds...")
time.sleep(5)
generate_audio_prompt(text, filename)
for key, text in prompts.items():
generate_audio_prompt(text, f"{key}.mp3")
# Symbol mapping for proper recognition
SYMBOL_MAPPING = {
"at the rate": "@",
"at": "@",
"dot": ".",
"underscore": "_",
"hash": "#",
"plus": "+",
"dash": "-",
"comma": ",",
"space": " "
}
# Function to convert audio to WAV format
def convert_to_wav(input_path, output_path):
try:
audio = AudioSegment.from_file(input_path)
audio = audio.set_frame_rate(16000).set_channels(1)
audio.export(output_path, format="wav")
except Exception as e:
raise Exception(f"Audio conversion failed: {str(e)}")
# Function to check if audio contains actual speech
def is_silent_audio(audio_path):
audio = AudioSegment.from_wav(audio_path)
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16)
return len(nonsilent_parts) == 0
# Salesforce connection details
try:
print("Attempting to connect to Salesforce...")
sf = Salesforce(username='[email protected]', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q')
print("Connected to Salesforce successfully!")
except Exception as e:
print(f"Failed to connect to Salesforce: {str(e)}")
# Function to handle login & registration in Salesforce
@app.route('/login', methods=['POST'])
def login():
data = request.json
email = data.get('email')
phone_number = data.get('phone_number')
if not email or not phone_number:
return jsonify({'error': 'Missing email or phone number'}), 400
try:
# Check if user already exists
query = f"SELECT Id, Name FROM Customer_Login__c WHERE Email__c = '{email}' AND Phone_Number__c = '{phone_number}' LIMIT 1"
result = sf.query(query)
if result['totalSize'] > 0:
user_data = result['records'][0]
return jsonify({'success': True, 'message': 'Login successful', 'user_id': user_data['Id'], 'name': user_data['Name']}), 200
else:
return jsonify({'error': 'Invalid email or phone number. User not found'}), 401
except requests.exceptions.RequestException as req_error:
return jsonify({'error': f'Salesforce connection error: {str(req_error)}'}), 500
except Exception as e:
return jsonify({'error': f'Unexpected error: {str(e)}'}), 500
@app.route("/submit", methods=["POST"])
def submit():
data = request.json
name = data.get('name')
email = data.get('email')
phone = data.get('phone')
if not name or not email or not phone:
return jsonify({'error': 'Missing data'}), 400
try:
# Check if user already exists
query = f"SELECT Id FROM Customer_Login__c WHERE Email__c = '{email}' AND Phone_Number__c = '{phone}' LIMIT 1"
existing_user = sf.query(query)
if existing_user['totalSize'] > 0:
return jsonify({'error': 'User already exists'}), 409 # Conflict
# Create new user
customer_login = sf.Customer_Login__c.create({
'Name': name,
'Email__c': email,
'Phone_Number__c': phone
})
if customer_login.get('id'):
return jsonify({'success': True, 'user_id': customer_login['id']}), 200
else:
return jsonify({'error': 'Failed to create record'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route("/")
def index():
return render_template("index.html")
@app.route("/transcribe", methods=["POST"])
def transcribe():
if "audio" not in request.files:
return jsonify({"error": "No audio file provided"}), 400
audio_file = request.files["audio"]
input_audio_path = os.path.join("static", "temp_input.wav")
output_audio_path = os.path.join("static", "temp.wav")
audio_file.save(input_audio_path)
try:
# Convert to WAV
convert_to_wav(input_audio_path, output_audio_path)
# Check for silence
if is_silent_audio(output_audio_path):
return jsonify({"error": "No speech detected. Please try again."}), 400
# Use Whisper ASR model for transcription
result = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if torch.cuda.is_available() else -1, config=config)
transcribed_text = result(output_audio_path)["text"].strip().capitalize()
return jsonify({"text": transcribed_text})
except Exception as e:
return jsonify({"error": f"Speech recognition error: {str(e)}"}), 500
# Start Production Server
if __name__ == "__main__":
serve(app, host="0.0.0.0", port=7860)