Spaces:
Sleeping
Sleeping
import torch | |
from flask import Flask, render_template, request, jsonify, session, redirect, url_for | |
import json | |
import os | |
from transformers import pipeline | |
from gtts import gTTS | |
from pydub import AudioSegment | |
from pydub.silence import detect_nonsilent | |
from transformers import AutoConfig | |
import time | |
from waitress import serve | |
from simple_salesforce import Salesforce | |
import requests # Import requests for exception handling | |
app = Flask(__name__) | |
# Use whisper-small for faster processing and better speed | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Create config object to set timeout and other parameters | |
config = AutoConfig.from_pretrained("openai/whisper-small") | |
config.update({"timeout": 60}) # Set timeout to 60 seconds | |
# Salesforce credentials (Replace with actual values) | |
try: | |
print("Attempting to connect to Salesforce...") | |
sf = Salesforce(username='[email protected]', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') | |
print("Connected to Salesforce successfully!") | |
except Exception as e: | |
print(f"Failed to connect to Salesforce: {str(e)}") | |
# Function for Salesforce operations | |
def create_salesforce_record(name, email, phone_number): | |
try: | |
# Create a new record in Salesforce's Customer_Login__c object | |
customer_login = sf.Customer_Login__c.create({ | |
'Name': name, | |
'Email__c': email, | |
'Phone_Number__c': phone_number | |
}) | |
return customer_login | |
except SalesforceResourceNotFound as e: | |
raise Exception(f"Salesforce resource not found: {str(e)}") | |
except Exception as e: | |
raise Exception(f"Failed to create record: {str(e)}") | |
def get_menu_items(): | |
query = "SELECT Name, Price__c, Ingredients__c, Category__c FROM Menu_Item__c" | |
result = sf.query(query) | |
return result['records'] | |
# Voice-related functions | |
def generate_audio_prompt(text, filename): | |
try: | |
tts = gTTS(text) | |
tts.save(os.path.join("static", filename)) | |
except gtts.tts.gTTSError as e: | |
print(f"Error: {e}") | |
print("Retrying after 5 seconds...") | |
time.sleep(5) # Wait for 5 seconds before retrying | |
generate_audio_prompt(text, filename) | |
# Utility functions | |
def convert_to_wav(input_path, output_path): | |
try: | |
audio = AudioSegment.from_file(input_path) | |
audio = audio.set_frame_rate(16000).set_channels(1) # Convert to 16kHz, mono | |
audio.export(output_path, format="wav") | |
except Exception as e: | |
print(f"Error: {str(e)}") | |
raise Exception(f"Audio conversion failed: {str(e)}") | |
def is_silent_audio(audio_path): | |
audio = AudioSegment.from_wav(audio_path) | |
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) # Reduced silence duration | |
print(f"Detected nonsilent parts: {nonsilent_parts}") | |
return len(nonsilent_parts) == 0 # If no speech detected | |
# Routes and Views | |
def index(): | |
return render_template("index.html") | |
# Route for Registering a User | |
def register(): | |
if request.method == 'POST': | |
data = request.form | |
name = data.get('name') | |
email = data.get('email') | |
phone = data.get('phone') | |
# Clean phone number: remove unwanted characters (commas, spaces) | |
phone = ''.join(e for e in phone if e.isdigit()) # Keep only digits | |
# Validate phone number to ensure it's exactly 10 digits | |
if len(phone) != 10: | |
return jsonify({"success": False, "message": "Phone number must be 10 digits."}), 400 | |
if not name or not email or not phone: | |
return jsonify({"success": False, "message": "All fields are required."}), 400 | |
try: | |
# Create a new record in Salesforce's Customer_Login__c object | |
customer_login = sf.Customer_Login__c.create({ | |
'Name': name, | |
'Email__c': email, | |
'Phone_Number__c': phone | |
}) | |
return redirect(url_for('login')) # Redirect to login page after successful registration | |
except Exception as e: | |
return jsonify({"success": False, "message": f"Registration failed: {str(e)}"}), 500 | |
return render_template("register.html") # Registration page | |
# Route for Login Validation | |
def login(): | |
try: | |
data = request.form | |
login_email = data.get("email") | |
login_mobile = data.get("mobile") | |
# Clean phone number: remove unwanted characters (commas, spaces) | |
login_mobile = ''.join(e for e in login_mobile if e.isdigit()) | |
if not login_email or not login_mobile: | |
return jsonify({"success": False, "message": "Missing email or mobile number"}), 400 | |
query = f"SELECT Id, Name, Email__c, Phone_Number__c FROM Customer_Login__c WHERE Email__c = '{login_email}' AND Phone_Number__c = '{login_mobile}'" | |
result = sf.query(query) | |
if result['records']: | |
user_name = result['records'][0]['Name'] | |
session['user_email'] = login_email # Store user email in session | |
return redirect(url_for('menu_page')) # Redirect to menu page after successful login | |
else: | |
return jsonify({"success": False, "message": "Invalid email or mobile number"}), 401 | |
except Exception as e: | |
return jsonify({"success": False, "message": "Internal server error", "error": str(e)}), 500 | |
# Route to Get Menu Items | |
def menu_page(): | |
try: | |
menu_items = get_menu_items() | |
menu_data = [{"name": item['Name'], "price": item['Price__c'], "ingredients": item['Ingredients__c'], "category": item['Category__c']} for item in menu_items] | |
return render_template("menu_page.html", menu_items=menu_data) | |
except Exception as e: | |
return jsonify({"success": False, "message": "Error fetching menu items", "error": str(e)}), 500 | |
# Route for Cart Items | |
def add_to_cart(): | |
try: | |
data = request.get_json() | |
customer_id = data.get("customer_id") | |
item_id = data.get("item_id") | |
quantity = data.get("quantity") | |
if not customer_id or not item_id or not quantity: | |
return jsonify({"success": False, "message": "Missing required fields"}), 400 | |
cart_item_data = { | |
"Customer__c": customer_id, | |
"Menu_Item__c": item_id, | |
"Quantity__c": quantity, | |
"Status__c": "In Cart", | |
} | |
sf.Cart_Item__c.create(cart_item_data) | |
return jsonify({"success": True, "message": "Item added to cart"}) | |
except Exception as e: | |
return jsonify({"success": False, "message": "Error adding item to cart", "error": str(e)}), 500 | |
# Route to Create an Order | |
def place_order(): | |
try: | |
data = request.get_json() | |
customer_id = data.get("customer_id") | |
items = data.get("items") # List of ordered items: [{"item_name": "Item 1", "quantity": 2}, ...] | |
if not customer_id or not items: | |
return jsonify({"success": False, "message": "Missing required fields"}), 400 | |
order_data = { | |
"Customer__c": customer_id, | |
"Status__c": "New", # Example status | |
} | |
order_result = sf.Order__c.create(order_data) | |
order_id = order_result['id'] | |
# Create related Order_Items | |
for item in items: | |
order_item_data = { | |
"Order__c": order_id, | |
"Menu_Item__c": item['item_id'], | |
"Quantity__c": item['quantity'], | |
} | |
sf.Order_Item__c.create(order_item_data) | |
return jsonify({"success": True, "message": "Order placed successfully", "order_id": order_id}) | |
except Exception as e: | |
return jsonify({"success": False, "message": "Error placing order", "error": str(e)}), 500 | |
# Route for Order History | |
def get_order_history(): | |
try: | |
customer_id = request.args.get("customer_id") | |
if not customer_id: | |
return jsonify({"success": False, "message": "Missing customer_id"}), 400 | |
query = f"SELECT Id, Order_Date__c, Status__c, Total_Amount__c FROM Order_History__c WHERE Customer__c = '{customer_id}'" | |
result = sf.query(query) | |
# Prepare order history data | |
order_history = [{"order_id": item['Id'], "date": item['Order_Date__c'], "status": item['Status__c'], "total": item['Total_Amount__c']} for item in result['records']] | |
# Return the order history as JSON | |
return jsonify({"success": True, "order_history": order_history}) | |
except Exception as e: | |
return jsonify({"success": False, "message": "Error fetching order history", "error": str(e)}), 500 | |
# Route for Logout (Optional) | |
def logout(): | |
session.clear() # Clear the session | |
return redirect(url_for('index')) # Redirect to home page | |
# Start Production Server | |
if __name__ == "__main__": | |
serve(app, host="0.0.0.0", port=7860) | |