|
import torch |
|
from flask import Flask, render_template, request, jsonify |
|
import json |
|
import os |
|
from transformers import pipeline |
|
from gtts import gTTS |
|
from pydub import AudioSegment |
|
from pydub.silence import detect_nonsilent |
|
from transformers import AutoConfig |
|
import time |
|
from waitress import serve |
|
from simple_salesforce import Salesforce |
|
import requests |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
config = AutoConfig.from_pretrained("openai/whisper-small") |
|
config.update({"timeout": 60}) |
|
|
|
|
|
prompts = { |
|
"welcome": "Welcome to Biryani Hub.", |
|
"ask_name": "Tell me your name.", |
|
"ask_email": "Please provide your email address.", |
|
"thank_you": "Thank you for registration." |
|
} |
|
|
|
|
|
def generate_audio_prompt(text, filename): |
|
try: |
|
tts = gTTS(text) |
|
tts.save(os.path.join("static", filename)) |
|
except gtts.tts.gTTSError as e: |
|
print(f"Error: {e}") |
|
print("Retrying after 5 seconds...") |
|
time.sleep(5) |
|
generate_audio_prompt(text, filename) |
|
|
|
for key, text in prompts.items(): |
|
generate_audio_prompt(text, f"{key}.mp3") |
|
|
|
|
|
SYMBOL_MAPPING = { |
|
"at the rate": "@", |
|
"at": "@", |
|
"dot": ".", |
|
"underscore": "_", |
|
"hash": "#", |
|
"plus": "+", |
|
"dash": "-", |
|
"comma": ",", |
|
"space": " " |
|
} |
|
|
|
|
|
def convert_to_wav(input_path, output_path): |
|
try: |
|
audio = AudioSegment.from_file(input_path) |
|
audio = audio.set_frame_rate(16000).set_channels(1) |
|
audio.export(output_path, format="wav") |
|
except Exception as e: |
|
raise Exception(f"Audio conversion failed: {str(e)}") |
|
|
|
|
|
def is_silent_audio(audio_path): |
|
audio = AudioSegment.from_wav(audio_path) |
|
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) |
|
return len(nonsilent_parts) == 0 |
|
|
|
|
|
try: |
|
print("Attempting to connect to Salesforce...") |
|
sf = Salesforce(username='[email protected]', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') |
|
print("Connected to Salesforce successfully!") |
|
except Exception as e: |
|
print(f"Failed to connect to Salesforce: {str(e)}") |
|
|
|
|
|
@app.route('/login', methods=['POST']) |
|
def login(): |
|
data = request.json |
|
email = data.get('email') |
|
phone_number = data.get('phone_number') |
|
|
|
if not email or not phone_number: |
|
return jsonify({'error': 'Missing email or phone number'}), 400 |
|
|
|
try: |
|
|
|
query = f"SELECT Id, Name FROM Customer_Login__c WHERE Email__c = '{email}' AND Phone_Number__c = '{phone_number}' LIMIT 1" |
|
result = sf.query(query) |
|
|
|
if result['totalSize'] > 0: |
|
user_data = result['records'][0] |
|
return jsonify({'success': True, 'message': 'Login successful', 'user_id': user_data['Id'], 'name': user_data['Name']}), 200 |
|
else: |
|
return jsonify({'error': 'Invalid email or phone number. User not found'}), 401 |
|
|
|
except requests.exceptions.RequestException as req_error: |
|
return jsonify({'error': f'Salesforce connection error: {str(req_error)}'}), 500 |
|
except Exception as e: |
|
return jsonify({'error': f'Unexpected error: {str(e)}'}), 500 |
|
|
|
@app.route("/submit", methods=["POST"]) |
|
def submit(): |
|
data = request.json |
|
name = data.get('name') |
|
email = data.get('email') |
|
phone = data.get('phone') |
|
|
|
if not name or not email or not phone: |
|
return jsonify({'error': 'Missing data'}), 400 |
|
|
|
try: |
|
|
|
query = f"SELECT Id FROM Customer_Login__c WHERE Email__c = '{email}' AND Phone_Number__c = '{phone}' LIMIT 1" |
|
existing_user = sf.query(query) |
|
|
|
if existing_user['totalSize'] > 0: |
|
return jsonify({'error': 'User already exists'}), 409 |
|
|
|
|
|
customer_login = sf.Customer_Login__c.create({ |
|
'Name': name, |
|
'Email__c': email, |
|
'Phone_Number__c': phone |
|
}) |
|
|
|
if customer_login.get('id'): |
|
return jsonify({'success': True, 'user_id': customer_login['id']}), 200 |
|
else: |
|
return jsonify({'error': 'Failed to create record'}), 500 |
|
|
|
except Exception as e: |
|
return jsonify({'error': str(e)}), 500 |
|
|
|
@app.route("/") |
|
def index(): |
|
return render_template("index.html") |
|
|
|
@app.route("/transcribe", methods=["POST"]) |
|
def transcribe(): |
|
if "audio" not in request.files: |
|
return jsonify({"error": "No audio file provided"}), 400 |
|
|
|
audio_file = request.files["audio"] |
|
input_audio_path = os.path.join("static", "temp_input.wav") |
|
output_audio_path = os.path.join("static", "temp.wav") |
|
audio_file.save(input_audio_path) |
|
|
|
try: |
|
|
|
convert_to_wav(input_audio_path, output_audio_path) |
|
|
|
|
|
if is_silent_audio(output_audio_path): |
|
return jsonify({"error": "No speech detected. Please try again."}), 400 |
|
|
|
|
|
result = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if torch.cuda.is_available() else -1, config=config) |
|
transcribed_text = result(output_audio_path)["text"].strip().capitalize() |
|
|
|
return jsonify({"text": transcribed_text}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": f"Speech recognition error: {str(e)}"}), 500 |
|
|
|
|
|
if __name__ == "__main__": |
|
serve(app, host="0.0.0.0", port=7860) |
|
|