Spaces:
Sleeping
Sleeping
import torch | |
from flask import Flask, render_template, request, jsonify | |
import os | |
from transformers import pipeline | |
from gtts import gTTS | |
from pydub import AudioSegment | |
from pydub.silence import detect_nonsilent | |
from waitress import serve | |
from flask import Flask, render_template, request, jsonify, redirect, url_for, session | |
from flask_session import Session # Import the Session class | |
from flask.sessions import SecureCookieSessionInterface # Import the class | |
from salesforce import get_salesforce_connection | |
import os | |
# Initialize Flask app and Salesforce connection | |
print("Starting app...") | |
app = Flask(__name__) | |
print("Flask app initialized.") | |
# Add debug logs in Salesforce connection setup | |
sf = get_salesforce_connection() | |
print("Salesforce connection established.") | |
# Set the secret key to handle sessions securely | |
app.secret_key = os.getenv("SECRET_KEY", "sSSjyhInIsUohKpG8sHzty2q") # Replace with a secure key | |
# Configure the session type | |
app.config["SESSION_TYPE"] = "filesystem" # Use filesystem for session storage | |
#app.config["SESSION_COOKIE_NAME"] = "my_session" # Optional: Change session cookie name | |
app.config["SESSION_COOKIE_SECURE"] = True # Ensure cookies are sent over HTTPS | |
app.config["SESSION_COOKIE_SAMESITE"] = "None" # Allow cross-site cookies | |
# Initialize the session | |
Session(app) # Correctly initialize the Session object | |
print("Session interface configured.") | |
# Ensure secure session handling for environments like Hugging Face | |
app.session_interface = SecureCookieSessionInterface() | |
print("Session interface configured.") | |
app = Flask(__name__) | |
# Use whisper-small for faster processing and better speed | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
asr_model = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if device == "cuda" else -1) | |
# Function to generate audio prompts | |
def generate_audio_prompt(text, filename): | |
tts = gTTS(text=text, lang="en") | |
tts.save(os.path.join("static", filename)) | |
# Generate required voice prompts | |
prompts = { | |
"welcome": "Welcome to Biryani Hub.", | |
"ask_name": "Tell me your name.", | |
"ask_email": "Please provide your email address.", | |
"thank_you": "Thank you for registration." | |
} | |
for key, text in prompts.items(): | |
generate_audio_prompt(text, f"{key}.mp3") | |
# Symbol mapping for proper recognition | |
SYMBOL_MAPPING = { | |
"at the rate": "@", | |
"at": "@", | |
"dot": ".", | |
"underscore": "_", | |
"hash": "#", | |
"plus": "+", | |
"dash": "-", | |
"comma": ",", | |
"space": " " | |
} | |
# Function to convert audio to WAV format | |
def convert_to_wav(input_path, output_path): | |
try: | |
audio = AudioSegment.from_file(input_path) | |
audio = audio.set_frame_rate(16000).set_channels(1) # Convert to 16kHz, mono | |
audio.export(output_path, format="wav") | |
except Exception as e: | |
raise Exception(f"Audio conversion failed: {str(e)}") | |
# Function to check if audio contains actual speech | |
def is_silent_audio(audio_path): | |
audio = AudioSegment.from_wav(audio_path) | |
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) # Reduced silence duration | |
return len(nonsilent_parts) == 0 # If no speech detected | |
def index(): | |
return render_template("index.html") | |
def transcribe(): | |
if "audio" not in request.files: | |
return jsonify({"error": "No audio file provided"}), 400 | |
audio_file = request.files["audio"] | |
input_audio_path = os.path.join("static", "temp_input.wav") | |
output_audio_path = os.path.join("static", "temp.wav") | |
audio_file.save(input_audio_path) | |
try: | |
# Convert to WAV | |
convert_to_wav(input_audio_path, output_audio_path) | |
# Check for silence | |
if is_silent_audio(output_audio_path): | |
return jsonify({"error": "No speech detected. Please try again."}), 400 | |
# Use Whisper ASR model for transcription | |
result = asr_model(output_audio_path, generate_kwargs={"language": "en"}) | |
transcribed_text = result["text"].strip().capitalize() | |
return jsonify({"text": transcribed_text}) | |
except Exception as e: | |
return jsonify({"error": f"Speech recognition error: {str(e)}"}), 500 | |
# Start Production Server | |
if __name__ == "__main__": | |
serve(app, host="0.0.0.0", port=7860) | |