|
import torch |
|
from flask import Flask, render_template, request, jsonify |
|
import json |
|
import os |
|
from transformers import pipeline |
|
from gtts import gTTS |
|
from pydub import AudioSegment |
|
from pydub.silence import detect_nonsilent |
|
from transformers import AutoConfig |
|
import time |
|
from waitress import serve |
|
from simple_salesforce import Salesforce |
|
import requests |
|
|
|
app = Flask(__name__, template_folder="templates") |
|
app.secret_key = os.urandom(24) |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
config = AutoConfig.from_pretrained("openai/whisper-small") |
|
config.update({"timeout": 60}) |
|
|
|
|
|
def generate_audio_prompt(text, filename): |
|
try: |
|
tts = gTTS(text) |
|
tts.save(os.path.join("static", filename)) |
|
except Exception as e: |
|
print(f"Error: {e}") |
|
time.sleep(5) |
|
generate_audio_prompt(text, filename) |
|
|
|
|
|
prompts = { |
|
"welcome": "Welcome to Biryani Hub.", |
|
"ask_name": "Tell me your name.", |
|
"ask_email": "Please provide your email address.", |
|
"thank_you": "Thank you for registration." |
|
} |
|
|
|
for key, text in prompts.items(): |
|
generate_audio_prompt(text, f"{key}.mp3") |
|
|
|
|
|
def convert_to_wav(input_path, output_path): |
|
try: |
|
audio = AudioSegment.from_file(input_path) |
|
audio = audio.set_frame_rate(16000).set_channels(1) |
|
audio.export(output_path, format="wav") |
|
except Exception as e: |
|
raise Exception(f"Audio conversion failed: {str(e)}") |
|
|
|
|
|
def is_silent_audio(audio_path): |
|
audio = AudioSegment.from_wav(audio_path) |
|
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) |
|
return len(nonsilent_parts) == 0 |
|
|
|
|
|
try: |
|
print("Attempting to connect to Salesforce...") |
|
sf = Salesforce(username='[email protected]', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') |
|
print("Connected to Salesforce successfully!") |
|
except Exception as e: |
|
print(f"Failed to connect to Salesforce: {str(e)}") |
|
|
|
|
|
@app.route("/", methods=["GET"]) |
|
def index(): |
|
return render_template("index.html") |
|
|
|
|
|
@app.route("/dashboard", methods=["GET"]) |
|
def dashboard(): |
|
return render_template("dashboard.html") |
|
|
|
|
|
@app.route("/menu_page", methods=["GET"]) |
|
def menu_page(): |
|
return render_template("menu_page.html") |
|
|
|
|
|
@app.route('/login', methods=['POST']) |
|
def login(): |
|
data = request.json |
|
name = data.get('name') |
|
email = data.get('email') |
|
phone_number = data.get('phone_number') |
|
|
|
if not name or not email or not phone_number: |
|
return jsonify({'error': 'Missing required fields'}), 400 |
|
|
|
try: |
|
customer_login = sf.Customer_Login__c.create({ |
|
'Name': name, |
|
'Email__c': email, |
|
'Phone_Number__c': phone_number |
|
}) |
|
return jsonify({'success': True, 'id': customer_login['id']}), 200 |
|
except Exception as e: |
|
return jsonify({'error': f'Failed to create record in Salesforce: {str(e)}'}), 500 |
|
|
|
|
|
@app.route("/submit", methods=["POST"]) |
|
def submit(): |
|
data = request.json |
|
name = data.get('name') |
|
email = data.get('email') |
|
phone = data.get('phone') |
|
|
|
if not name or not email or not phone: |
|
return jsonify({'error': 'Missing data'}), 400 |
|
|
|
try: |
|
customer_login = sf.Customer_Login__c.create({ |
|
'Name': name, |
|
'Email__c': email, |
|
'Phone_Number__c': phone |
|
}) |
|
return jsonify({'success': True}), 200 |
|
except Exception as e: |
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route("/transcribe", methods=["POST"]) |
|
def transcribe(): |
|
if "audio" not in request.files: |
|
return jsonify({"error": "No audio file provided"}), 400 |
|
|
|
audio_file = request.files["audio"] |
|
input_audio_path = os.path.join("static", "temp_input.wav") |
|
output_audio_path = os.path.join("static", "temp.wav") |
|
audio_file.save(input_audio_path) |
|
|
|
try: |
|
convert_to_wav(input_audio_path, output_audio_path) |
|
if is_silent_audio(output_audio_path): |
|
return jsonify({"error": "No speech detected. Please try again."}), 400 |
|
|
|
asr_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if torch.cuda.is_available() else -1, config=config) |
|
result = asr_pipeline(output_audio_path) |
|
|
|
transcribed_text = result["text"].strip().capitalize() |
|
|
|
parts = transcribed_text.split() |
|
name = parts[0] if len(parts) > 0 else "Unknown Name" |
|
email = parts[1] if '@' in parts[1] else "[email protected]" |
|
phone_number = parts[2] if len(parts) > 2 else "0000000000" |
|
|
|
confirmation = f"Is this correct? Name: {name}, Email: {email}, Phone: {phone_number}" |
|
generate_audio_prompt(confirmation, "confirmation.mp3") |
|
|
|
salesforce_response = sf.Customer_Login__c.create({ |
|
'Name': name, |
|
'Email__c': email, |
|
'Phone_Number__c': phone_number |
|
}) |
|
|
|
return jsonify({"text": transcribed_text, "salesforce_record": salesforce_response}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": f"Speech recognition error: {str(e)}"}), 500 |
|
|
|
@app.route("/menu", methods=["GET"]) |
|
def get_menu(): |
|
try: |
|
|
|
query = "SELECT Name, Price__c, Ingredients__c, Category__c FROM Menu_Item__c" |
|
result = sf.query(query) |
|
|
|
menu_items = [] |
|
for item in result["records"]: |
|
menu_items.append({ |
|
"name": item["Name"], |
|
"price": item["Price__c"], |
|
"ingredients": item["Ingredients__c"], |
|
"category": item["Category__c"] |
|
}) |
|
|
|
|
|
return render_template("menu_page.html", menu=menu_items) |
|
|
|
except Exception as e: |
|
return jsonify({"error": f"Failed to fetch menu: {str(e)}"}), 500 |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
print("β
Starting Flask API Server on port 7860...") |
|
serve(app, host="0.0.0.0", port=7860) |
|
|