Spaces:
Sleeping
Sleeping
# NEW CODE | |
import os | |
import json | |
import streamlit as st | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_chroma import Chroma | |
from langchain.memory import ConversationBufferMemory | |
from langchain.chains import ConversationalRetrievalChain | |
from vectorize_documents import embeddings | |
import speech_recognition as sr | |
from deep_translator import GoogleTranslator | |
# Set up working directory and API configuration | |
working_dir = os.path.dirname(os.path.abspath(__file__)) | |
config_data = json.load(open(f"{working_dir}/config.json")) | |
os.environ["GROQ_API_KEY"] = config_data["GROQ_API_KEY"] | |
# Streamlit session state initialization | |
def initialize_session_state(): | |
if "chat_history" not in st.session_state: | |
st.session_state["chat_history"] = [] | |
if "vectorstore" not in st.session_state: | |
st.session_state["vectorstore"] = setup_vectorstore() | |
if "chain" not in st.session_state: | |
st.session_state["chain"] = chat_chain(st.session_state["vectorstore"]) | |
# Vectorstore setup | |
def setup_vectorstore(): | |
embeddings = HuggingFaceEmbeddings() | |
vectorstore = Chroma( | |
persist_directory=f"{working_dir}/vector_db_dir", | |
embedding_function=embeddings | |
) | |
return vectorstore | |
# Chat chain setup | |
def chat_chain(vectorstore): | |
from langchain_groq import ChatGroq | |
llm = ChatGroq( | |
model="llama-3.1-70b-versatile", | |
temperature=0 | |
) | |
retriever = vectorstore.as_retriever() | |
memory = ConversationBufferMemory( | |
memory_key="chat_history", | |
return_messages=True | |
) | |
chain = ConversationalRetrievalChain.from_llm( | |
llm=llm, | |
retriever=retriever, | |
chain_type="stuff", | |
memory=memory, | |
verbose=True | |
) | |
return chain | |
# Transcription function | |
def transcribe_audio(selected_language): | |
try: | |
recognizer = sr.Recognizer() | |
with sr.Microphone() as source: | |
st.write("π€ Listening... Please ask your question.") | |
try: | |
audio = recognizer.listen(source, timeout=5) | |
query = recognizer.recognize_google(audio, language=selected_language) | |
st.write(f"**π£οΈ You said:** {query}") | |
return query | |
except sr.WaitTimeoutError: | |
st.error("β³ You didn't speak in time. Please try again.") | |
except sr.UnknownValueError: | |
st.error("β Sorry, could not understand the audio. Please try again.") | |
except sr.RequestError as e: | |
st.error(f"β οΈ Error with speech recognition service: {e}") | |
except AttributeError: | |
st.error("β Microphone or PyAudio not available. Please check installation.") | |
except OSError as e: | |
st.error(f"β οΈ Audio input error: {e}") | |
return None | |
# Translation functions | |
def translate_to_english(text, source_lang): | |
if source_lang == "en": # Skip translation if the language is English | |
return text | |
return GoogleTranslator(source=source_lang, target="en").translate(text) | |
def translate_from_english(text, target_lang): | |
if target_lang == "en": # Skip translation if the language is English | |
return text | |
return GoogleTranslator(source="en", target=target_lang).translate(text) | |
# Streamlit UI | |
initialize_session_state() | |
st.markdown( | |
""" | |
<style> | |
.main-title { | |
font-size: 36px; | |
color: #FF8C00; | |
font-weight: bold; | |
} | |
.sub-title { | |
font-size: 24px; | |
color: #FF8C00; | |
} | |
.icon { | |
font-size: 50px; | |
color: #FF8C00; | |
} | |
</style> | |
""", | |
unsafe_allow_html=True | |
) | |
st.markdown('<div class="icon">π</div>', unsafe_allow_html=True) | |
st.markdown('<div class="main-title">Bhagavad Gita & Yoga Sutras Query Assistant</div>', unsafe_allow_html=True) | |
st.markdown('<div class="sub-title">Ask questions and explore timeless wisdom</div>', unsafe_allow_html=True) | |
# Language support | |
indian_languages = { | |
"English": "en", | |
"Assamese": "as", | |
"Bengali": "bn", | |
"Gujarati": "gu", | |
"Hindi": "hi", | |
"Kannada": "kn", | |
"Kashmiri": "ks", | |
"Konkani": "kok", | |
"Malayalam": "ml", | |
"Manipuri": "mni", | |
"Marathi": "mr", | |
"Nepali": "ne", | |
"Odia": "or", | |
"Punjabi": "pa", | |
"Sanskrit": "sa", | |
"Santali": "sat", | |
"Sindhi": "sd", | |
"Tamil": "ta", | |
"Telugu": "te", | |
"Urdu": "ur", | |
"Bodo": "brx", | |
"Dogri": "doi", | |
"Maithili": "mai", | |
"Santhali": "sat", | |
"Tulu": "tcy", | |
"Bhili/Bhilodi": "bhi", | |
"Khasi": "kha", | |
"Garo": "grt", | |
"Mizo": "lus", | |
"Sora": "srb", | |
"Ho": "hoc", | |
"Kurukh": "kru", | |
"Korwa": "kfa", | |
"Gondi": "gon", | |
"Konkani": "kok" | |
} | |
selected_language = st.selectbox("Select your language:", options=list(indian_languages.keys())) | |
language_code = indian_languages[selected_language] | |
# User-friendly input selection | |
st.markdown("### How would you like to ask your question?") | |
input_mode = st.radio("Choose input method:", ("Voice", "Typing")) | |
user_query = None # Initialize the variable to ensure it's always defined | |
if input_mode == "Voice": | |
st.write("Click the button below to speak your question:") | |
if st.button("π€ Use Voice Input"): | |
user_query = transcribe_audio(language_code) | |
if user_query: | |
user_query = translate_to_english(user_query, language_code) | |
else: | |
user_query = st.text_input("Type your question here:") | |
if user_query: | |
user_query = translate_to_english(user_query, language_code) | |
if user_query: # This check will now always work | |
with st.spinner("Getting answer..."): | |
response = st.session_state["chain"]({"question": user_query}) | |
relevant_content = response.get("source_documents", None) | |
if not relevant_content: | |
st.markdown("### β **No Answer Available:**") | |
st.write("The system does not have sufficient information to answer this question.") | |
else: | |
answer = response.get("answer", None) | |
translated_answer = translate_from_english(answer, language_code) | |
st.markdown("### β **Answer:**") | |
st.write(translated_answer) | |
# OLD CODE IS BELOW | |
'''import os | |
import json | |
import streamlit as st | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_chroma import Chroma | |
from langchain.memory import ConversationBufferMemory | |
from langchain.chains import ConversationalRetrievalChain | |
from vectorize_documents import embeddings # Import embeddings from the vectorization script | |
import speech_recognition as sr # For voice recognition | |
# Set up working directory and API configuration | |
working_dir = os.path.dirname(os.path.abspath(__file__)) | |
config_data = json.load(open(f"{working_dir}/config.json")) | |
os.environ["GROQ_API_KEY"] = config_data["GROQ_API_KEY"] | |
def setup_vectorstore(): | |
persist_directory = f"{working_dir}/vector_db_dir" | |
vectorstore = Chroma( | |
persist_directory=persist_directory, | |
embedding_function=embeddings | |
) | |
return vectorstore | |
def chat_chain(vectorstore): | |
from langchain_groq import ChatGroq # Import the LLM class | |
llm = ChatGroq( | |
model="llama-3.1-70b-versatile", # Replace with your LLM of choice | |
temperature=0 # Set low temperature to reduce hallucinations | |
) | |
retriever = vectorstore.as_retriever() # Retrieve relevant chunks | |
memory = ConversationBufferMemory( | |
llm=llm, | |
output_key="answer", | |
memory_key="chat_history", | |
return_messages=True | |
) | |
# Build the conversational retrieval chain | |
chain = ConversationalRetrievalChain.from_llm( | |
llm=llm, | |
retriever=retriever, | |
chain_type="stuff", # Define how documents are combined | |
memory=memory, | |
verbose=True, | |
return_source_documents=True | |
) | |
return chain | |
def transcribe_audio(selected_language): | |
"""Function to capture and transcribe audio in the selected language.""" | |
try: | |
recognizer = sr.Recognizer() | |
with sr.Microphone() as source: | |
st.write("π€ Listening... Please ask your question.") | |
try: | |
audio = recognizer.listen(source, timeout=5) # 5 seconds to start speaking | |
query = recognizer.recognize_google(audio, language=selected_language) # Transcribe audio in selected language | |
st.write(f"**π£οΈ You said:** {query}") | |
return query | |
except sr.WaitTimeoutError: | |
st.error("β³ You didn't speak in time. Please try again.") | |
except sr.UnknownValueError: | |
st.error("β Sorry, could not understand the audio. Please try again.") | |
except sr.RequestError as e: | |
st.error(f"β οΈ Error with speech recognition service: {e}") | |
except AttributeError: | |
st.error("β Microphone or PyAudio not available. Please check installation.") | |
except OSError as e: | |
st.error(f"β οΈ Audio input error: {e}") | |
return None | |
# Streamlit UI | |
st.markdown( | |
""" | |
<style> | |
.main-title { | |
font-size: 36px; | |
color: #FF8C00; | |
font-weight: bold; | |
} | |
.sub-title { | |
font-size: 24px; | |
color: #FF8C00; | |
} | |
.icon { | |
font-size: 50px; | |
color: #FF8C00; | |
} | |
</style> | |
""", | |
unsafe_allow_html=True | |
) | |
st.markdown('<div class="icon">π</div>', unsafe_allow_html=True) | |
st.markdown('<div class="main-title">Bhagavad Gita & Yoga Sutras Query Assistant</div>', unsafe_allow_html=True) | |
st.markdown('<div class="sub-title">Ask questions and explore timeless wisdom</div>', unsafe_allow_html=True) | |
vectorstore = setup_vectorstore() | |
chain = chat_chain(vectorstore) | |
# User input options | |
st.write("You can either type your question or use voice search:") | |
st.markdown("### π Type your query or ποΈ Use voice search") | |
# Multilingual support: Select language for voice input | |
language_options = { | |
"English": "en-US", | |
"Hindi": "hi-IN", | |
"Spanish": "es-ES", | |
"French": "fr-FR", | |
"German": "de-DE" | |
} | |
selected_language = st.selectbox("Select your language for voice search:", options=list(language_options.keys())) | |
language_code = language_options[selected_language] | |
if st.button("ποΈ Use Voice Search"): | |
user_query = transcribe_audio(language_code) | |
else: | |
user_query = st.text_input("Ask a question about the Bhagavad Gita or Yoga Sutras:") | |
if user_query: | |
# Use `__call__` to get all outputs as a dictionary | |
response = chain({"question": user_query}) | |
answer = response.get("answer", "No answer found.") | |
source_documents = response.get("source_documents", []) | |
st.markdown("### β **Answer:**") | |
st.write(answer) | |
st.markdown("### π **Source Documents:**") | |
for doc in source_documents: | |
st.write(doc) | |
''' |