|
import torch |
|
from flask import Flask, render_template, request, jsonify |
|
import json |
|
import os |
|
from transformers import pipeline |
|
from gtts import gTTS |
|
from pydub import AudioSegment |
|
from pydub.silence import detect_nonsilent |
|
from transformers import AutoConfig |
|
import time |
|
from waitress import serve |
|
from simple_salesforce import Salesforce |
|
import requests |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
config = AutoConfig.from_pretrained("openai/whisper-small") |
|
config.update({"timeout": 60}) |
|
|
|
|
|
def generate_audio_prompt(text, filename): |
|
try: |
|
tts = gTTS(text) |
|
tts.save(os.path.join("static", filename)) |
|
except gtts.tts.gTTSError as e: |
|
print(f"Error: {e}") |
|
print("Retrying after 5 seconds...") |
|
time.sleep(5) |
|
generate_audio_prompt(text, filename) |
|
|
|
|
|
prompts = { |
|
"welcome": "Welcome to Biryani Hub.", |
|
"ask_name": "Tell me your name.", |
|
"ask_email": "Please provide your email address.", |
|
"thank_you": "Thank you for registration." |
|
} |
|
|
|
for key, text in prompts.items(): |
|
generate_audio_prompt(text, f"{key}.mp3") |
|
|
|
|
|
SYMBOL_MAPPING = { |
|
"at the rate": "@", |
|
"at": "@", |
|
"dot": ".", |
|
"underscore": "_", |
|
"hash": "#", |
|
"plus": "+", |
|
"dash": "-", |
|
"comma": ",", |
|
"space": " " |
|
} |
|
|
|
|
|
def convert_to_wav(input_path, output_path): |
|
try: |
|
audio = AudioSegment.from_file(input_path) |
|
audio = audio.set_frame_rate(16000).set_channels(1) |
|
audio.export(output_path, format="wav") |
|
except Exception as e: |
|
print(f"Error: {str(e)}") |
|
raise Exception(f"Audio conversion failed: {str(e)}") |
|
|
|
|
|
def is_silent_audio(audio_path): |
|
audio = AudioSegment.from_wav(audio_path) |
|
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) |
|
print(f"Detected nonsilent parts: {nonsilent_parts}") |
|
return len(nonsilent_parts) == 0 |
|
|
|
|
|
try: |
|
print("Attempting to connect to Salesforce...") |
|
sf = Salesforce(username='[email protected]', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') |
|
print("Connected to Salesforce successfully!") |
|
print("User Info:", sf.UserInfo) |
|
except Exception as e: |
|
print(f"Failed to connect to Salesforce: {str(e)}") |
|
|
|
|
|
|
|
@app.route('/login', methods=['POST']) |
|
def login(): |
|
|
|
data = request.json |
|
|
|
name = data.get('name') |
|
email = data.get('email') |
|
phone_number = data.get('phone_number') |
|
|
|
if not name or not email or not phone_number: |
|
return jsonify({'error': 'Missing required fields'}), 400 |
|
|
|
|
|
try: |
|
customer_login = sf.Customer_Login__c.create({ |
|
'Name': name, |
|
'Email__c': email, |
|
'Phone_Number__c': phone_number |
|
}) |
|
return jsonify({'success': True, 'id': customer_login['id']}), 200 |
|
except Exception as e: |
|
return jsonify({'error': f'Failed to create record in Salesforce: {str(e)}'}), 500 |
|
|
|
@app.route("/submit", methods=["POST"]) |
|
def submit(): |
|
data = request.json |
|
name = data.get('name') |
|
email = data.get('email') |
|
phone = data.get('phone') |
|
|
|
if not name or not email or not phone: |
|
return jsonify({'error': 'Missing data'}), 400 |
|
|
|
try: |
|
|
|
customer_login = sf.Customer_Login__c.create({ |
|
'Name': name, |
|
'Email__c': email, |
|
'Phone_Number__c': phone |
|
}) |
|
|
|
if customer_login.get('id'): |
|
return jsonify({'success': True}) |
|
else: |
|
return jsonify({'error': 'Failed to create record'}), 500 |
|
|
|
except Exception as e: |
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route("/") |
|
def index(): |
|
return render_template("index.html") |
|
|
|
@app.route("/transcribe", methods=["POST"]) |
|
def transcribe(): |
|
if "audio" not in request.files: |
|
print("No audio file provided") |
|
return jsonify({"error": "No audio file provided"}), 400 |
|
|
|
audio_file = request.files["audio"] |
|
input_audio_path = os.path.join("static", "temp_input.wav") |
|
output_audio_path = os.path.join("static", "temp.wav") |
|
audio_file.save(input_audio_path) |
|
|
|
try: |
|
|
|
convert_to_wav(input_audio_path, output_audio_path) |
|
|
|
|
|
if is_silent_audio(output_audio_path): |
|
return jsonify({"error": "No speech detected. Please try again."}), 400 |
|
else: |
|
print("Audio contains speech, proceeding with transcription.") |
|
|
|
|
|
result = None |
|
retry_attempts = 3 |
|
for attempt in range(retry_attempts): |
|
try: |
|
result = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if torch.cuda.is_available() else -1, config=config) |
|
print(f"Transcribed text: {result['text']}") |
|
break |
|
except requests.exceptions.ReadTimeout: |
|
print(f"Timeout occurred, retrying attempt {attempt + 1}/{retry_attempts}...") |
|
time.sleep(5) |
|
|
|
if result is None: |
|
return jsonify({"error": "Unable to transcribe audio after retries."}), 500 |
|
|
|
transcribed_text = result["text"].strip().capitalize() |
|
print(f"Transcribed text: {transcribed_text}") |
|
|
|
|
|
parts = transcribed_text.split() |
|
name = parts[0] if len(parts) > 0 else "Unknown Name" |
|
email = parts[1] if '@' in parts[1] else "[email protected]" |
|
phone_number = parts[2] if len(parts) > 2 else "0000000000" |
|
print(f"Parsed data - Name: {name}, Email: {email}, Phone Number: {phone_number}") |
|
|
|
|
|
salesforce_response = create_salesforce_record(name, email, phone_number) |
|
|
|
|
|
print(f"Salesforce record creation response: {salesforce_response}") |
|
|
|
|
|
if "error" in salesforce_response: |
|
print(f"Error creating record in Salesforce: {salesforce_response['error']}") |
|
return jsonify(salesforce_response), 500 |
|
|
|
|
|
return jsonify({"text": transcribed_text, "salesforce_record": salesforce_response}) |
|
|
|
except Exception as e: |
|
print(f"Error in transcribing or processing: {str(e)}") |
|
return jsonify({"error": f"Speech recognition error: {str(e)}"}), 500 |
|
|
|
|
|
if __name__ == "__main__": |
|
serve(app, host="0.0.0.0", port=7860) |
|
|