Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify, render_template | |
from dotenv import load_dotenv | |
from groq import Groq | |
import os | |
import uuid | |
from gtts import gTTS | |
import io | |
import base64 | |
import speech_recognition as sr | |
import tempfile | |
import json | |
try: | |
import pyaudio | |
except ImportError: | |
print("Warning: PyAudio not available, speech functionality will be limited") | |
# Initialize Flask app | |
app = Flask(__name__, static_folder='static') | |
# Load environment variables | |
load_dotenv() | |
# Groq API Configuration | |
GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
client = Groq(api_key=GROQ_API_KEY) | |
MODEL = "llama3-70b-8192" | |
# Initialize speech recognition | |
recognizer = sr.Recognizer() | |
# Store conversation history | |
conversations = {} | |
def load_base_prompt(): | |
try: | |
with open("base_prompt.txt", "r") as file: | |
return file.read().strip() | |
except FileNotFoundError: | |
print("Error: base_prompt.txt file not found.") | |
return "You are a helpful assistant for language learning." | |
# Load the base prompt | |
base_prompt = load_base_prompt() | |
def chat_with_groq(user_message, conversation_id=None): | |
try: | |
# Get conversation history or create new | |
messages = conversations.get(conversation_id, []) | |
if not messages: | |
messages.append({"role": "system", "content": base_prompt}) | |
# Add user message | |
messages.append({"role": "user", "content": user_message}) | |
# Get completion from Groq | |
completion = client.chat.completions.create( | |
model=MODEL, | |
messages=messages, | |
temperature=0.1, | |
max_tokens=1024 | |
) | |
# Add assistant's response to history | |
assistant_message = completion.choices[0].message.content.strip() | |
messages.append({"role": "assistant", "content": assistant_message}) | |
# Update conversation history | |
if conversation_id: | |
conversations[conversation_id] = messages | |
return assistant_message | |
except Exception as e: | |
print(f"Error in chat_with_groq: {str(e)}") | |
return f"I apologize, but I'm having trouble responding right now. Error: {str(e)}" | |
def text_to_speech(text): | |
try: | |
tts = gTTS(text=text, lang='en') | |
audio_io = io.BytesIO() | |
tts.write_to_fp(audio_io) | |
audio_io.seek(0) | |
return audio_io | |
except Exception as e: | |
print(f"Error in text_to_speech: {str(e)}") | |
return None | |
def speech_to_text(audio_file): | |
try: | |
# Save the uploaded audio to a temporary file | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_audio: | |
audio_file.save(temp_audio.name) | |
# Use SpeechRecognition to convert speech to text | |
with sr.AudioFile(temp_audio.name) as source: | |
# Adjust recognition settings | |
recognizer.dynamic_energy_threshold = True | |
recognizer.energy_threshold = 4000 | |
# Record the entire audio file | |
audio = recognizer.record(source) | |
# Perform recognition with increased timeout | |
text = recognizer.recognize_google(audio, language='en-US') | |
return text | |
except sr.UnknownValueError: | |
return "Could not understand audio" | |
except sr.RequestError as e: | |
return f"Could not request results; {str(e)}" | |
except Exception as e: | |
print(f"Error in speech_to_text: {str(e)}") | |
return None | |
finally: | |
# Clean up temporary file | |
try: | |
os.unlink(temp_audio.name) | |
except: | |
pass | |
def index(): | |
return render_template('index.html') | |
def chat(): | |
try: | |
data = request.get_json() | |
user_message = data.get('message', '') | |
conversation_id = data.get('conversation_id', str(uuid.uuid4())) | |
if not user_message: | |
return jsonify({'error': 'No message provided'}), 400 | |
# Get response from Groq | |
response = chat_with_groq(user_message, conversation_id) | |
# Generate voice response | |
audio_io = text_to_speech(response) | |
result = { | |
'response': response, | |
'conversation_id': conversation_id | |
} | |
if audio_io: | |
audio_base64 = base64.b64encode(audio_io.getvalue()).decode('utf-8') | |
result['voice_response'] = audio_base64 | |
return jsonify(result) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def handle_voice(): | |
try: | |
if 'audio' not in request.files: | |
return jsonify({'error': 'No audio file provided'}), 400 | |
audio_file = request.files['audio'] | |
conversation_id = request.form.get('conversation_id', str(uuid.uuid4())) | |
# Save the audio file temporarily with a .wav extension | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_audio: | |
audio_file.save(temp_audio.name) | |
# Use FFmpeg to convert the audio to the correct format | |
output_path = temp_audio.name + '_converted.wav' | |
os.system(f'ffmpeg -i {temp_audio.name} -acodec pcm_s16le -ac 1 -ar 16000 {output_path}') | |
try: | |
# Use the converted file for speech recognition | |
with sr.AudioFile(output_path) as source: | |
audio = recognizer.record(source) | |
text = recognizer.recognize_google(audio) | |
if not text: | |
return jsonify({'error': 'Could not transcribe audio'}), 400 | |
# Get response from Groq | |
response = chat_with_groq(text, conversation_id) | |
# Generate voice response | |
audio_io = text_to_speech(response) | |
result = { | |
'text': text, | |
'response': response, | |
'conversation_id': conversation_id | |
} | |
if audio_io: | |
audio_base64 = base64.b64encode(audio_io.getvalue()).decode('utf-8') | |
result['voice_response'] = audio_base64 | |
return jsonify(result) | |
finally: | |
# Clean up temporary files | |
try: | |
os.remove(temp_audio.name) | |
os.remove(output_path) | |
except: | |
pass | |
except sr.UnknownValueError: | |
return jsonify({'error': 'Could not understand audio'}), 400 | |
except sr.RequestError as e: | |
return jsonify({'error': f'Could not request results: {str(e)}'}), 400 | |
except Exception as e: | |
print(f"Error in speech_to_text: {str(e)}") | |
return jsonify({'error': str(e)}), 400 | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860) |