Spaces:
Running
Running
import torch | |
from flask import Flask, render_template, request, jsonify, redirect, session | |
import json | |
import os | |
from transformers import pipeline | |
from gtts import gTTS | |
from pydub import AudioSegment | |
from pydub.silence import detect_nonsilent | |
from transformers import AutoConfig | |
import time | |
from waitress import serve | |
from simple_salesforce import Salesforce | |
import requests | |
# Initialize Flask app | |
app = Flask(__name__) | |
app.secret_key = os.urandom(24) # For session handling | |
# Use whisper-small for faster processing and better speed | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Create config object to set timeout and other parameters | |
config = AutoConfig.from_pretrained("openai/whisper-small") | |
config.update({"timeout": 60}) # Set timeout to 60 seconds | |
# Salesforce connection details | |
try: | |
print("Attempting to connect to Salesforce...") | |
sf = Salesforce(username='[email protected]', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') | |
print("Connected to Salesforce successfully!") | |
print("User Info:", sf.UserInfo) # Log the user info to verify the connection | |
except Exception as e: | |
print(f"Failed to connect to Salesforce: {str(e)}") | |
# Functions for Salesforce operations | |
def create_salesforce_record(sf, name, email, phone_number): | |
try: | |
customer_login = sf.Customer_Login__c.create({ | |
'Name': name, | |
'Email__c': email, | |
'Phone_Number__c': phone_number | |
}) | |
return customer_login | |
except Exception as e: | |
raise Exception(f"Failed to create record: {str(e)}") | |
def get_menu_items(sf): | |
query = "SELECT Name, Price__c, Ingredients__c, Category__c FROM Menu_Item__c" | |
result = sf.query(query) | |
return result['records'] | |
# Voice-related functions | |
def generate_audio_prompt(text, filename): | |
try: | |
tts = gTTS(text) | |
tts.save(os.path.join("static", filename)) | |
except gtts.tts.gTTSError as e: | |
print(f"Error: {e}") | |
print("Retrying after 5 seconds...") | |
time.sleep(5) # Wait for 5 seconds before retrying | |
generate_audio_prompt(text, filename) | |
# Utility functions | |
def convert_to_wav(input_path, output_path): | |
try: | |
audio = AudioSegment.from_file(input_path) | |
audio = audio.set_frame_rate(16000).set_channels(1) # Convert to 16kHz, mono | |
audio.export(output_path, format="wav") | |
except Exception as e: | |
print(f"Error: {str(e)}") | |
raise Exception(f"Audio conversion failed: {str(e)}") | |
def is_silent_audio(audio_path): | |
audio = AudioSegment.from_wav(audio_path) | |
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) # Reduced silence duration | |
print(f"Detected nonsilent parts: {nonsilent_parts}") | |
return len(nonsilent_parts) == 0 # If no speech detected | |
# Routes and Views | |
def index(): | |
return render_template("index.html") | |
def dashboard(): | |
return render_template("dashboard.html") # Render the dashboard template | |
def login(): | |
# Get data from voice bot (name, email, phone number) | |
data = request.json # Assuming voice bot sends JSON data | |
name = data.get('name') | |
email = data.get('email') | |
phone_number = data.get('phone_number') | |
if not name or not email or not phone_number: | |
return jsonify({'error': 'Missing required fields'}), 400 | |
try: | |
customer_login = create_salesforce_record(sf, name, email, phone_number) | |
session['customer_id'] = customer_login['id'] # Store customer ID in session | |
return redirect("/menu") # Redirect to the menu page after successful login | |
except Exception as e: | |
return jsonify({'error': f'Failed to create record in Salesforce: {str(e)}'}), 500 | |
def menu_page(): | |
menu_items = get_menu_items(sf) # Fetch menu items from Salesforce | |
menu_data = [{"name": item['Name'], "price": item['Price__c'], "ingredients": item['Ingredients__c'], "category": item['Category__c']} for item in menu_items] | |
return render_template("menu_page.html", menu_items=menu_data) | |
def cart(): | |
# Retrieve cart items from session | |
cart_items = session.get('cart_items', []) | |
return render_template("cart_page.html", cart_items=cart_items) | |
def order_summary(): | |
# Retrieve order details from session | |
order_details = session.get('cart_items', []) | |
total_price = sum(item['price'] * item['quantity'] for item in order_details) | |
return render_template("order_summary.html", order_details=order_details, total_price=total_price) | |
def final_order(): | |
# Clear cart items from the session after confirming the order | |
session.pop('cart_items', None) | |
return render_template("final_order.html") | |
def add_to_cart(): | |
item_name = request.json.get('item_name') | |
quantity = request.json.get('quantity') | |
# Retrieve the current cart items from session or initialize an empty list | |
cart_items = session.get('cart_items', []) | |
cart_items.append({"name": item_name, "quantity": quantity, "price": 10}) # Assuming a fixed price for now | |
session['cart_items'] = cart_items # Save the updated cart items in session | |
return jsonify({"success": True, "message": f"Added {item_name} to cart."}) | |
def transcribe(): | |
if "audio" not in request.files: | |
print("No audio file provided") | |
return jsonify({"error": "No audio file provided"}), 400 | |
audio_file = request.files["audio"] | |
input_audio_path = os.path.join("static", "temp_input.wav") | |
output_audio_path = os.path.join("static", "temp.wav") | |
audio_file.save(input_audio_path) | |
try: | |
# Convert to WAV | |
convert_to_wav(input_audio_path, output_audio_path) | |
# Check for silence | |
if is_silent_audio(output_audio_path): | |
return jsonify({"error": "No speech detected. Please try again."}), 400 | |
else: | |
print("Audio contains speech, proceeding with transcription.") | |
# Use Whisper ASR model for transcription | |
result = None | |
retry_attempts = 3 | |
for attempt in range(retry_attempts): | |
try: | |
result = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if torch.cuda.is_available() else -1, config=config) | |
print(f"Transcribed text: {result['text']}") | |
break | |
except requests.exceptions.ReadTimeout: | |
print(f"Timeout occurred, retrying attempt {attempt + 1}/{retry_attempts}...") | |
time.sleep(5) | |
if result is None: | |
return jsonify({"error": "Unable to transcribe audio after retries."}), 500 | |
transcribed_text = result["text"].strip().capitalize() | |
print(f"Transcribed text: {transcribed_text}") | |
# Extract name, email, and phone number from the transcribed text | |
parts = transcribed_text.split() | |
name = parts[0] if len(parts) > 0 else "Unknown Name" | |
email = parts[1] if '@' in parts[1] else "[email protected]" | |
phone_number = parts[2] if len(parts) > 2 else "0000000000" | |
print(f"Parsed data - Name: {name}, Email: {email}, Phone Number: {phone_number}") | |
# Confirm details before submission | |
confirmation = f"Is this correct? Name: {name}, Email: {email}, Phone: {phone_number}" | |
generate_audio_prompt(confirmation, "confirmation.mp3") | |
# Simulate confirmation via user action | |
user_confirms = True # Assuming the user confirms, you can replace this with actual user input logic | |
if user_confirms: | |
# Create record in Salesforce | |
salesforce_response = create_salesforce_record(name, email, phone_number) | |
# Log the Salesforce response | |
print(f"Salesforce record creation response: {salesforce_response}") | |
# Check if the response contains an error | |
if "error" in salesforce_response: | |
print(f"Error creating record in Salesforce: {salesforce_response['error']}") | |
return jsonify(salesforce_response), 500 | |
return jsonify({"text": transcribed_text, "salesforce_record": salesforce_response}) | |
except Exception as e: | |
print(f"Error in transcribing or processing: {str(e)}") | |
return jsonify({"error": f"Speech recognition error: {str(e)}"}), 500 | |
# Start Production Server | |
if __name__ == "__main__": | |
serve(app, host="0.0.0.0", port=7860) | |