Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, request, jsonify, session | |
from simple_salesforce import Salesforce | |
import json | |
import os | |
import time | |
from transformers import pipeline | |
import torch | |
from gtts import gTTS | |
from pydub import AudioSegment | |
from pydub.silence import detect_nonsilent | |
from transformers import AutoConfig # Import AutoConfig for the config object | |
from waitress import serve | |
app = Flask(__name__) | |
app.secret_key = 'your_secret_key_here' # Secret key for session management | |
# Sample menu data | |
menu_data = { | |
"Main Course": [ | |
{"name": "Chicken Biryani", "price": 250}, | |
{"name": "Veg Biryani", "price": 200}, | |
{"name": "Mutton Biryani", "price": 300}, | |
], | |
"Appetizers": [ | |
{"name": "Paneer Tikka", "price": 180}, | |
{"name": "Chicken Wings", "price": 220}, | |
] | |
} | |
# Salesforce credentials (Replace with actual values) | |
try: | |
print("Attempting to connect to Salesforce...") | |
sf = Salesforce(username='[email protected]', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') | |
print("Connected to Salesforce successfully!") | |
print("User Info:", sf.UserInfo) # Log the user info to verify the connection | |
except Exception as e: | |
print(f"Failed to connect to Salesforce: {str(e)}") | |
# Setup Whisper ASR model and Salesforce credentials | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
config = AutoConfig.from_pretrained("openai/whisper-small") | |
config.update({"timeout": 60}) | |
# Route for the Welcome Page (Page 1) | |
def index(): | |
return render_template("index.html") | |
# Route for the Menu Pages (Page 2) | |
def menu_page(): | |
category = request.args.get('category', 'Main Course') | |
if category not in menu_data: | |
return jsonify({"error": "Category not found"}), 404 | |
return render_template("menu_page.html", menu_items=menu_data[category]) | |
# Route to add items to the cart | |
def add_to_cart(): | |
item_data = request.get_json() | |
item_name = item_data.get('name') | |
item_price = item_data.get('price') | |
# If cart is not in session, initialize it | |
if 'cart' not in session: | |
session['cart'] = [] | |
# Add item to the cart | |
session['cart'].append({"name": item_name, "price": item_price}) | |
return jsonify({"message": f"{item_name} added to cart!"}), 200 | |
# Route for Cart Page (Page 3) | |
def cart_page(): | |
if 'cart' not in session or len(session['cart']) == 0: | |
return jsonify({"message": "Your cart is empty"}), 200 | |
return render_template("cart_page.html", cart_items=session['cart']) | |
# Route to place an order (API integration for placing order) | |
def place_order(): | |
if 'cart' not in session or len(session['cart']) == 0: | |
return jsonify({"message": "Cart is empty, cannot place order."}), 400 | |
order_data = session['cart'] | |
# Here you can integrate with your API or Salesforce | |
# Example: Call Salesforce API to store the order | |
salesforce_response = send_to_salesforce(order_data) | |
# Simulate order processing with a success message | |
session['cart'] = [] # Clear the cart after order | |
return jsonify({"message": "Order placed successfully!", "order_data": order_data, "salesforce_response": salesforce_response}), 200 | |
# Salesforce integration function | |
def send_to_salesforce(order_data): | |
try: | |
# Salesforce authentication | |
sf = Salesforce(username='[email protected]', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') | |
# Example: Create an Order record in Salesforce | |
order = sf.Order__c.create({ | |
'Name': 'New Order', | |
'Items__c': str(order_data) # Example field for items (this could be a custom Salesforce field) | |
}) | |
return order | |
except Exception as e: | |
print(f"Error: {e}") | |
return None | |
# Route to register a new user | |
def register(): | |
try: | |
data = request.get_json() | |
name = data.get('name') | |
email = data.get('email') | |
phone = data.get('phone') | |
if not name or not email or not phone: | |
return jsonify({"success": False, "message": "All fields are required."}), 400 | |
# Create a new record in Salesforce's Customer_Login__c object | |
customer_login = sf.Customer_Login__c.create({ | |
'Name': name, | |
'Email__c': email, | |
'Phone_Number__c': phone | |
}) | |
return jsonify({"success": True, "message": "Registration successful!", "salesforce_response": customer_login}), 201 | |
except Exception as e: | |
print(f"Error: {str(e)}") | |
return jsonify({"success": False, "message": f"Registration failed: {str(e)}"}), 500 | |
# Route to validate login credentials | |
def validate_login(): | |
try: | |
data = request.get_json() | |
login_email = data.get("email") | |
login_mobile = data.get("mobile") | |
if not login_email or not login_mobile: | |
return jsonify({"success": False, "message": "Missing email or mobile number"}), 400 | |
# Query Salesforce to check if the record exists | |
query = f"SELECT Id, Name, Email__c, Phone_Number__c FROM Customer_Login__c WHERE Email__c = '{login_email}' AND Phone_Number__c = '{login_mobile}'" | |
result = sf.query(query) | |
if result['records']: | |
# If a matching record is found, return success and the name | |
user_name = result['records'][0]['Name'] | |
return jsonify({"success": True, "message": "Login successful", "name": user_name}) | |
else: | |
return jsonify({"success": False, "message": "Invalid email or mobile number"}), 401 | |
except Exception as e: | |
print(f"Error validating login: {str(e)}") | |
return jsonify({"success": False, "message": "Internal server error", "error": str(e)}), 500 | |
# Voice-related functions | |
def generate_audio_prompt(text, filename): | |
try: | |
tts = gTTS(text) | |
tts.save(os.path.join("static", filename)) | |
except gtts.tts.gTTSError as e: | |
print(f"Error: {e}") | |
print("Retrying after 5 seconds...") | |
time.sleep(5) # Wait for 5 seconds before retrying | |
generate_audio_prompt(text, filename) | |
# Utility functions | |
def convert_to_wav(input_path, output_path): | |
try: | |
audio = AudioSegment.from_file(input_path) | |
audio = audio.set_frame_rate(16000).set_channels(1) # Convert to 16kHz, mono | |
audio.export(output_path, format="wav") | |
except Exception as e: | |
print(f"Error: {str(e)}") | |
raise Exception(f"Audio conversion failed: {str(e)}") | |
def is_silent_audio(audio_path): | |
audio = AudioSegment.from_wav(audio_path) | |
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) # Reduced silence duration | |
print(f"Detected nonsilent parts: {nonsilent_parts}") | |
return len(nonsilent_parts) == 0 # If no speech detected | |
# Start Production Server | |
if __name__ == "__main__": | |
serve(app, host="0.0.0.0", port=7860) | |