Spaces:
Running
Running
import torch | |
from flask import Flask, render_template, request, jsonify | |
import json | |
import os | |
from transformers import pipeline | |
from gtts import gTTS | |
from pydub import AudioSegment | |
from pydub.silence import detect_nonsilent | |
from transformers import AutoConfig | |
import time | |
from waitress import serve | |
from simple_salesforce import Salesforce | |
import requests | |
app = Flask(__name__) | |
# Use whisper-small for faster processing and better speed | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Create config object to set timeout and other parameters | |
config = AutoConfig.from_pretrained("openai/whisper-small") | |
config.update({"timeout": 60}) # Set timeout to 60 seconds | |
# Salesforce credentials (Replace with actual values) | |
try: | |
print("Attempting to connect to Salesforce...") | |
sf = Salesforce(username='[email protected]', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') | |
print("Connected to Salesforce successfully!") | |
except Exception as e: | |
print(f"Failed to connect to Salesforce: {str(e)}") | |
# Function for Salesforce operations | |
def create_salesforce_record(name, email, phone_number): | |
try: | |
customer_login = sf.Customer_Login__c.create({ | |
'Name': name, | |
'Email__c': email, | |
'Phone_Number__c': phone_number | |
}) | |
return customer_login | |
except SalesforceResourceNotFound as e: | |
raise Exception(f"Salesforce resource not found: {str(e)}") | |
except Exception as e: | |
raise Exception(f"Failed to create record: {str(e)}") | |
def get_menu_items(): | |
query = "SELECT Name, Price__c, Ingredients__c, Category__c FROM Menu_Item__c" | |
result = sf.query(query) | |
return result['records'] | |
# Voice-related functions | |
def generate_audio_prompt(text, filename): | |
try: | |
tts = gTTS(text) | |
tts.save(os.path.join("static", filename)) | |
except gtts.tts.gTTSError as e: | |
print(f"Error: {e}") | |
time.sleep(5) # Retry after 5 seconds | |
generate_audio_prompt(text, filename) | |
# Utility functions | |
def convert_to_wav(input_path, output_path): | |
try: | |
audio = AudioSegment.from_file(input_path) | |
audio = audio.set_frame_rate(16000).set_channels(1) | |
audio.export(output_path, format="wav") | |
except Exception as e: | |
print(f"Error: {str(e)}") | |
raise Exception(f"Audio conversion failed: {str(e)}") | |
def is_silent_audio(audio_path): | |
audio = AudioSegment.from_wav(audio_path) | |
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) | |
return len(nonsilent_parts) == 0 # If no speech detected | |
# Routes and Views | |
def index(): | |
return render_template("index.html") | |
def dashboard(): | |
return render_template("dashboard.html") | |
def submit(): | |
try: | |
data = request.get_json() | |
if not data: | |
return jsonify({"success": False, "message": "Invalid or empty JSON received"}), 400 | |
name = data.get("name") | |
email = data.get("email") | |
phone = data.get("phone") | |
if not name or not email or not phone: | |
return jsonify({"success": False, "message": "Missing required fields"}), 400 | |
# Insert data into Salesforce | |
salesforce_data = {"Name": name, "Email__c": email, "Phone_Number__c": phone} | |
try: | |
result = sf.Customer_Login__c.create(salesforce_data) | |
return jsonify({"success": True, "message": "Data submitted successfully"}) | |
except Exception as e: | |
return jsonify({"success": False, "message": "Salesforce submission failed", "error": str(e)}), 500 | |
except Exception as e: | |
return jsonify({"success": False, "message": "Internal server error", "error": str(e)}), 500 | |
def menu_page(): | |
menu_items = get_menu_items() | |
menu_data = [{"name": item['Name'], "price": item['Price__c'], "ingredients": item['Ingredients__c'], "category": item['Category__c']} for item in menu_items] | |
return render_template("menu_page.html", menu_items=menu_data) | |
def validate_login(): | |
try: | |
# Ensure JSON is being received | |
data = request.get_json() | |
login_email = data.get("email") | |
login_mobile = data.get("mobile") | |
# Validate if email and mobile are present | |
if not login_email or not login_mobile: | |
return jsonify({"success": False, "message": "Missing email or mobile number"}), 400 | |
# Query Salesforce to check if the record exists | |
query = f"SELECT Id, Name, Email__c, Phone_Number__c FROM Customer_Login__c WHERE Email__c = '{login_email}' AND Phone_Number__c = '{login_mobile}'" | |
result = sf.query(query) | |
if result['records']: | |
# If a matching record is found, return success and the name | |
user_name = result['records'][0]['Name'] | |
return jsonify({"success": True, "message": "Login successful", "name": user_name}) | |
else: | |
# If no matching record is found | |
return jsonify({"success": False, "message": "Invalid email or mobile number"}), 401 | |
except Exception as e: | |
return jsonify({"success": False, "message": "Internal server error", "error": str(e)}), 500 | |
def place_order(): | |
order_data = request.json | |
# Get order details from the request | |
customer_name = order_data.get("customerName") | |
customer_email = order_data.get("customerEmail") | |
customer_phone = order_data.get("customerPhone") | |
customer_address = order_data.get("customerAddress") | |
items = order_data.get("items") | |
total_amount = order_data.get("totalAmount") | |
# Ensure all required fields are present | |
if not customer_name or not customer_email or not customer_phone or not customer_address or not items or not total_amount: | |
return jsonify({"success": False, "message": "Missing required fields"}), 400 | |
try: | |
# Create the order in Salesforce (custom object Order__c) | |
order = sf.Order__c.create({ | |
"Customer_Name__c": customer_name, | |
"Customer_Email__c": customer_email, | |
"Customer_Phone__c": customer_phone, | |
"Customer_Address__c": customer_address, | |
"Total_Amount__c": total_amount | |
}) | |
# Optionally create order items in a related object or as line items | |
for item in items: | |
sf.Order_Item__c.create({ | |
"Order__c": order['id'], # Link to the parent order | |
"Item__c": item['name'], | |
"Quantity__c": item['quantity'], | |
"Price__c": item['price'] | |
}) | |
# Return success response | |
return jsonify({"success": True, "message": "Order placed successfully."}) | |
except Exception as e: | |
# If an error occurs, return an error message | |
return jsonify({"success": False, "message": f"Error placing order: {str(e)}"}), 500 | |
def cart(): | |
cart_items = [] # Placeholder for cart items | |
return render_template("cart_page.html", cart_items=cart_items) | |
def order_summary(): | |
order_details = [] # Placeholder for order details | |
return render_template("order_summary.html", order_details=order_details) | |
def transcribe(): | |
if "audio" not in request.files: | |
return jsonify({"error": "No audio file provided"}), 400 | |
audio_file = request.files["audio"] | |
input_audio_path = os.path.join("static", "temp_input.wav") | |
output_audio_path = os.path.join("static", "temp.wav") | |
audio_file.save(input_audio_path) | |
try: | |
convert_to_wav(input_audio_path, output_audio_path) | |
if is_silent_audio(output_audio_path): | |
return jsonify({"error": "No speech detected. Please try again."}), 400 | |
result = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if torch.cuda.is_available() else -1, config=config) | |
transcribed_text = result["text"].strip().capitalize() | |
# Extract name, email, and phone number from the transcribed text | |
parts = transcribed_text.split() | |
name = parts[0] if len(parts) > 0 else "Unknown Name" | |
email = parts[1] if '@' in parts[1] else "[email protected]" | |
phone_number = parts[2] if len(parts) > 2 else "0000000000" | |
confirmation = f"Is this correct? Name: {name}, Email: {email}, Phone: {phone_number}" | |
generate_audio_prompt(confirmation, "confirmation.mp3") | |
user_confirms = True # Assuming user confirms, replace with actual logic | |
if user_confirms: | |
salesforce_response = create_salesforce_record(name, email, phone_number) | |
if "error" in salesforce_response: | |
return jsonify(salesforce_response), 500 | |
return jsonify({"text": transcribed_text, "salesforce_record": salesforce_response}) | |
except Exception as e: | |
return jsonify({"error": f"Speech recognition error: {str(e)}"}), 500 | |
if __name__ == "__main__": | |
serve(app, host="0.0.0.0", port=7860) | |