|
import torch |
|
from flask import Flask, render_template, request, jsonify, session |
|
import json |
|
import os |
|
from transformers import pipeline |
|
from gtts import gTTS |
|
from pydub import AudioSegment |
|
from pydub.silence import detect_nonsilent |
|
from transformers import AutoConfig |
|
import time |
|
from waitress import serve |
|
from simple_salesforce import Salesforce |
|
import requests |
|
|
|
app = Flask(__name__) |
|
app.secret_key = 'your_secret_key' |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
config = AutoConfig.from_pretrained("openai/whisper-small") |
|
config.update({"timeout": 60}) |
|
|
|
|
|
try: |
|
print("Attempting to connect to Salesforce...") |
|
sf = Salesforce(username='[email protected]', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') |
|
print("Connected to Salesforce successfully!") |
|
except Exception as e: |
|
print(f"Failed to connect to Salesforce: {str(e)}") |
|
|
|
|
|
def create_salesforce_record(name, email, phone_number): |
|
try: |
|
customer_login = sf.Customer_Login__c.create({ |
|
'Name': name, |
|
'Email__c': email, |
|
'Phone_Number__c': phone_number |
|
}) |
|
return customer_login |
|
except Exception as e: |
|
raise Exception(f"Failed to create record: {str(e)}") |
|
|
|
def get_menu_items(): |
|
query = "SELECT Name, Price__c, Ingredients__c, Category__c FROM Menu_Item__c" |
|
result = sf.query(query) |
|
return result['records'] |
|
|
|
def create_salesforce_order(cart_items, total_price, customer_id): |
|
try: |
|
order = sf.Order__c.create({ |
|
'Total_Price__c': total_price, |
|
'Cart_Items__c': json.dumps(cart_items), |
|
'Customer__c': customer_id |
|
}) |
|
return order |
|
except Exception as e: |
|
raise Exception(f"Failed to create order: {str(e)}") |
|
|
|
|
|
def generate_audio_prompt(text, filename): |
|
try: |
|
tts = gTTS(text) |
|
tts.save(os.path.join("static", filename)) |
|
except gtts.tts.gTTSError as e: |
|
print(f"Error: {e}") |
|
print("Retrying after 5 seconds...") |
|
time.sleep(5) |
|
generate_audio_prompt(text, filename) |
|
|
|
|
|
def convert_to_wav(input_path, output_path): |
|
try: |
|
audio = AudioSegment.from_file(input_path) |
|
audio = audio.set_frame_rate(16000).set_channels(1) |
|
audio.export(output_path, format="wav") |
|
except Exception as e: |
|
print(f"Error: {str(e)}") |
|
raise Exception(f"Audio conversion failed: {str(e)}") |
|
|
|
def is_silent_audio(audio_path): |
|
audio = AudioSegment.from_wav(audio_path) |
|
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) |
|
print(f"Detected nonsilent parts: {nonsilent_parts}") |
|
return len(nonsilent_parts) == 0 |
|
|
|
|
|
@app.route("/") |
|
def index(): |
|
return render_template("index.html") |
|
|
|
@app.route("/dashboard", methods=["GET"]) |
|
def dashboard(): |
|
return render_template("dashboard.html") |
|
|
|
@app.route('/submit', methods=['POST']) |
|
def submit(): |
|
try: |
|
data = request.get_json() |
|
print("Received Data:", data) |
|
|
|
if not data: |
|
return jsonify({"success": False, "message": "Invalid or empty JSON received"}), 400 |
|
|
|
name = data.get("name") |
|
email = data.get("email") |
|
phone = data.get("phone") |
|
|
|
if not name or not email or not phone: |
|
return jsonify({"success": False, "message": "Missing required fields"}), 400 |
|
|
|
salesforce_data = { |
|
"Name": name, |
|
"Email__c": email, |
|
"Phone_Number__c": phone |
|
} |
|
|
|
try: |
|
result = sf.Customer_Login__c.create(salesforce_data) |
|
print(f"Salesforce Response: {result}") |
|
return jsonify({"success": True, "message": "Data submitted successfully"}) |
|
except Exception as e: |
|
if "STORAGE_LIMIT_EXCEEDED" in str(e): |
|
return jsonify({"success": False, "message": "Salesforce storage limit exceeded. Please clean up or contact support."}), 500 |
|
else: |
|
print(f"Salesforce Insertion Error: {str(e)}") |
|
return jsonify({"success": False, "message": "Salesforce submission failed", "error": str(e)}), 500 |
|
except Exception as e: |
|
print(f"Server Error: {str(e)}") |
|
return jsonify({"success": False, "message": "Internal server error", "error": str(e)}), 500 |
|
|
|
@app.route('/validate-login', methods=['POST']) |
|
def validate_login(): |
|
try: |
|
data = request.get_json() |
|
login_email = data.get("email") |
|
login_mobile = data.get("mobile") |
|
|
|
if not login_email or not login_mobile: |
|
return jsonify({"success": False, "message": "Missing email or mobile number"}), 400 |
|
|
|
query = f"SELECT Id, Name, Email__c, Phone_Number__c FROM Customer_Login__c WHERE Email__c = '{login_email}' AND Phone_Number__c = '{login_mobile}'" |
|
result = sf.query(query) |
|
|
|
if result['records']: |
|
user_name = result['records'][0]['Name'] |
|
return jsonify({"success": True, "message": "Login successful", "name": user_name}) |
|
else: |
|
return jsonify({"success": False, "message": "Invalid email or mobile number"}), 401 |
|
except Exception as e: |
|
print(f"Error validating login: {str(e)}") |
|
return jsonify({"success": False, "message": "Internal server error", "error": str(e)}), 500 |
|
|
|
@app.route("/menu", methods=["GET"]) |
|
def menu_page(): |
|
menu_items = get_menu_items() |
|
menu_data = [{"name": item['Name'], "price": item['Price__c'], "ingredients": item['Ingredients__c'], "category": item['Category__c']} for item in menu_items] |
|
return render_template("menu_page.html", menu_items=menu_data) |
|
|
|
@app.route("/add_to_cart", methods=["POST"]) |
|
def add_to_cart(): |
|
cart_items = session.get('cart_items', []) |
|
item_name = request.json.get('item_name') |
|
quantity = request.json.get('quantity') |
|
|
|
cart_items.append({"name": item_name, "quantity": quantity}) |
|
session['cart_items'] = cart_items |
|
|
|
return jsonify({"success": True, "message": f"Added {item_name} to your cart."}) |
|
|
|
@app.route("/view_cart", methods=["GET"]) |
|
def view_cart(): |
|
cart_items = session.get('cart_items', []) |
|
return render_template("cart_page.html", cart_items=cart_items) |
|
|
|
@app.route("/place_order", methods=["POST"]) |
|
def place_order(): |
|
cart_items = session.get('cart_items', []) |
|
total_price = sum([item['quantity'] * 100 for item in cart_items]) |
|
customer_id = session.get('customer_id') |
|
|
|
if not customer_id: |
|
return jsonify({"message": "User not logged in"}), 400 |
|
|
|
try: |
|
order_data = {"Total_Price__c": total_price, "Cart_Items__c": json.dumps(cart_items), "Customer__c": customer_id} |
|
order = sf.Order__c.create(order_data) |
|
session.pop('cart_items', None) |
|
return jsonify({"message": "Order placed successfully!", "order": order}) |
|
|
|
except Exception as e: |
|
return jsonify({"message": f"Failed to create order: {str(e)}"}), 500 |
|
|
|
|
|
if __name__ == "__main__": |
|
app.run(debug=True) |
|
|