Spaces:
Sleeping
Sleeping
import os | |
import time | |
import torch | |
from flask import Flask, render_template, request, jsonify | |
from simple_salesforce import Salesforce | |
from transformers import pipeline | |
from gtts import gTTS | |
from pydub import AudioSegment | |
from pydub.silence import detect_nonsilent | |
from transformers import AutoConfig # Import AutoConfig for the config object | |
from waitress import serve | |
# Flask setup | |
app = Flask(__name__, template_folder="templates") | |
app.secret_key = os.urandom(24) | |
# Check for available GPU | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Set up Whisper model config | |
config = AutoConfig.from_pretrained("openai/whisper-small") | |
config.update({"timeout": 60}) # Set timeout to 60 seconds | |
# Salesforce connection | |
try: | |
print("Attempting to connect to Salesforce...") | |
sf = Salesforce(username='[email protected]', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') | |
print("Connected to Salesforce successfully!") | |
except Exception as e: | |
print(f"Failed to connect to Salesforce: {str(e)}") | |
# Function to generate audio prompt using gTTS | |
def generate_audio_prompt(text, filename): | |
try: | |
tts = gTTS(text) | |
tts.save(os.path.join("static", filename)) | |
except Exception as e: | |
print(f"Error generating audio prompt: {e}") | |
time.sleep(5) | |
generate_audio_prompt(text, filename) | |
# Example prompts for voice interaction | |
prompts = { | |
"welcome": "Welcome to Biryani Hub.", | |
"ask_name": "Tell me your name.", | |
"ask_email": "Please provide your email address.", | |
"thank_you": "Thank you for registration." | |
} | |
# Generate audio prompts | |
for key, text in prompts.items(): | |
generate_audio_prompt(text, f"{key}.mp3") | |
# Function to check if the audio is silent | |
def is_silent_audio(audio_path): | |
audio = AudioSegment.from_wav(audio_path) | |
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) | |
return len(nonsilent_parts) == 0 | |
# Function to fetch menu items from Salesforce | |
def get_menu_items(): | |
try: | |
# Salesforce query to fetch all menu items | |
query = """ | |
SELECT Name, Price__c, Ingredients__c, Category__c | |
FROM Menu_Item__c | |
""" | |
result = sf.query(query) | |
menu_items = [] | |
for item in result["records"]: | |
menu_items.append({ | |
"name": item["Name"], | |
"price": item["Price__c"], | |
"ingredients": item["Ingredients__c"], | |
"category": item["Category__c"] | |
}) | |
return menu_items | |
except Exception as e: | |
print(f"Error fetching menu items: {str(e)}") | |
return [] | |
# Function to check if the customer exists in Salesforce (login check) | |
def get_customer_login(name, email, phone_number): | |
try: | |
# Salesforce query to fetch customer based on Name, Email, and Phone Number | |
query = f""" | |
SELECT Id, Name, Email__c, Phone_Number__c | |
FROM Customer_Login__c | |
WHERE Name = '{name}' | |
AND Email__c = '{email}' | |
AND Phone_Number__c = '{phone_number}' | |
""" | |
result = sf.query(query) | |
if result["records"]: | |
customer = result["records"][0] | |
return { | |
"id": customer["Id"], | |
"name": customer["Name"], | |
"email": customer["Email__c"], | |
"phone": customer["Phone_Number__c"] | |
} | |
else: | |
return None | |
except Exception as e: | |
print(f"Error fetching customer login details: {str(e)}") | |
return None | |
# Function to create a new customer login in Salesforce | |
def create_customer_login(name, email, phone): | |
try: | |
# Create a new customer login record in Salesforce | |
customer_login = sf.Customer_Login__c.create({ | |
'Name': name, | |
'Email__c': email, | |
'Phone_Number__c': phone | |
}) | |
return customer_login | |
except Exception as e: | |
print(f"Error creating customer login: {str(e)}") | |
return None | |
# Home Route (loads index.html) | |
def index(): | |
return render_template("index.html") | |
# Dashboard Route | |
def dashboard(): | |
return render_template("dashboard.html") | |
# Menu Page Route | |
def menu_page(): | |
menu_items = get_menu_items() | |
return render_template("menu_page.html", menu=menu_items) | |
# Login API | |
def login(): | |
data = request.json | |
name = data.get('name') | |
email = data.get('email') | |
phone_number = data.get('phone_number') | |
if not name or not email or not phone_number: | |
return jsonify({'error': 'Missing required fields'}), 400 | |
# Check if the customer exists in Salesforce | |
customer = get_customer_login(name, email, phone_number) | |
if customer: | |
return jsonify({'success': True, 'customer': customer}), 200 | |
else: | |
return jsonify({'error': 'Customer not found'}), 404 | |
# Register API (Create customer login) | |
def submit(): | |
data = request.json | |
name = data.get('name') | |
email = data.get('email') | |
phone = data.get('phone') | |
if not name or not email or not phone: | |
return jsonify({'error': 'Missing data'}), 400 | |
# Create customer login record in Salesforce | |
customer_login = create_customer_login(name, email, phone) | |
if customer_login: | |
return jsonify({'success': True}), 200 | |
else: | |
return jsonify({'error': 'Failed to create customer record'}), 500 | |
# Transcribe Audio API | |
def transcribe(): | |
if "audio" not in request.files: | |
return jsonify({"error": "No audio file provided"}), 400 | |
audio_file = request.files["audio"] | |
input_audio_path = os.path.join("static", "temp_input.wav") | |
output_audio_path = os.path.join("static", "temp.wav") | |
audio_file.save(input_audio_path) | |
try: | |
# Convert the audio to WAV format and check if it contains speech | |
convert_to_wav(input_audio_path, output_audio_path) | |
if is_silent_audio(output_audio_path): | |
return jsonify({"error": "No speech detected. Please try again."}), 400 | |
# Transcribe the audio using Whisper model | |
asr_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if torch.cuda.is_available() else -1, config=config) | |
result = asr_pipeline(output_audio_path) | |
transcribed_text = result["text"].strip().capitalize() | |
# Extract details from transcribed text (Assumed format: Name Email Phone) | |
parts = transcribed_text.split() | |
name = parts[0] if len(parts) > 0 else "Unknown Name" | |
email = parts[1] if '@' in parts[1] else "[email protected]" | |
phone_number = parts[2] if len(parts) > 2 else "0000000000" | |
confirmation = f"Is this correct? Name: {name}, Email: {email}, Phone: {phone_number}" | |
generate_audio_prompt(confirmation, "confirmation.mp3") | |
# Create a customer login record | |
salesforce_response = sf.Customer_Login__c.create({ | |
'Name': name, | |
'Email__c': email, | |
'Phone_Number__c': phone_number | |
}) | |
return jsonify({"text": transcribed_text, "salesforce_record": salesforce_response}) | |
except Exception as e: | |
return jsonify({"error": f"Speech recognition error: {str(e)}"}), 500 | |
# Start the Flask server | |
if __name__ == "__main__": | |
print("Starting Flask API Server on port 7860...") | |
serve(app, host="0.0.0.0", port=7860) | |