voicemenuspe / app.py
lokesh341's picture
Update app.py
95585ab verified
raw
history blame
4.4 kB
import torch
from flask import Flask, render_template, request, jsonify, redirect, url_for
import os
from transformers import pipeline, AutoConfig
from pydub import AudioSegment
from pydub.silence import detect_nonsilent
import time
from waitress import serve
from simple_salesforce import Salesforce
import requests
app = Flask(__name__)
# Use whisper-small for faster processing
device = "cuda" if torch.cuda.is_available() else "cpu"
# Whisper ASR Model Configuration
config = AutoConfig.from_pretrained("openai/whisper-small")
config.update({"timeout": 60}) # Set timeout to 60 seconds
# Salesforce Connection
try:
print("Connecting to Salesforce...")
sf = Salesforce(username='[email protected]', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q')
print("Connected to Salesforce!")
except Exception as e:
print(f"Failed to connect to Salesforce: {str(e)}")
# Function to convert audio to WAV format
def convert_to_wav(input_path, output_path):
try:
audio = AudioSegment.from_file(input_path)
audio = audio.set_frame_rate(16000).set_channels(1) # Convert to 16kHz, mono
audio.export(output_path, format="wav")
except Exception as e:
print(f"Audio conversion failed: {str(e)}")
raise
# Check if audio contains speech
def is_silent_audio(audio_path):
audio = AudioSegment.from_wav(audio_path)
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16)
return len(nonsilent_parts) == 0 # If no speech detected
# Create Salesforce Record
def create_salesforce_record(name, email, phone_number):
try:
response = sf.Customer_Login__c.create({
'Name': name,
'Email__c': email,
'Phone_Number__c': phone_number
})
return {'success': True, 'id': response.get('id')}
except Exception as e:
return {'error': f'Failed to create record: {str(e)}'}
# Registration API
@app.route("/submit", methods=["POST"])
def submit():
data = request.json
name = data.get('name')
email = data.get('email')
phone = data.get('phone')
if not name or not email or not phone:
return jsonify({'error': 'Missing data'}), 400
response = create_salesforce_record(name, email, phone)
return jsonify(response) if "error" not in response else (jsonify(response), 500)
# Updated Login API (Only for Registered Users)
@app.route('/login', methods=['POST'])
def login():
data = request.json
email = data.get('email')
phone_number = data.get('phone_number')
if not email or not phone_number:
return jsonify({'error': 'Email and phone number are required'}), 400
try:
# Print received login details for debugging
print(f"πŸ” Received login request for Email: {email}, Phone: {phone_number}")
# Query Salesforce to check if the user exists
query_result = sf.query(f"""
SELECT Id, Name, Email__c, Phone_Number__c FROM Customer_Login__c
WHERE LOWER(Email__c) = '{email.lower()}'
AND Phone_Number__c = '{phone_number}'
""")
# Debugging: Print Salesforce Query Result
print("πŸ” Salesforce Query Result:", query_result)
if query_result['totalSize'] > 0:
print("βœ… Login successful!")
return jsonify({
'success': True,
'message': 'Login successful',
'user': query_result['records'][0],
'redirect': '/dashboard'
}), 200
else:
print("❌ Invalid credentials! User not found in Salesforce.") # Debugging
return jsonify({'error': 'Invalid credentials! Only registered users can log in.', 'refresh': True}), 401
except Exception as e:
print(f"⚠️ Salesforce Query Failed: {str(e)}") # Debugging
return jsonify({'error': f'Salesforce query failed: {str(e)}', 'refresh': True}), 500
# Serve Registration and Login Pages
@app.route("/")
def index():
return render_template("index.html")
@app.route("/login-page")
def login_page():
return render_template("login.html")
@app.route("/dashboard")
def dashboard():
return "<h1>Welcome to the Dashboard</h1><p>You have successfully logged in.</p>"
# Start the Flask App
if __name__ == "__main__":
serve(app, host="0.0.0.0", port=7860)