import streamlit as st |
from chat import ChatSession, ChatSessionSchema, Role |
from app import react_agent |
from mongoengine import connect |
from config import settings |
import requests |
from tools.menu_tools_utils import get_data_store_id |
import speech_recognition as sr |
st.set_page_config(layout="wide") |
connect(settings.DB_NAME, host=settings.DB_URI, alias="default") |
print("Database connection established!!") |
def speech_to_text(): |
r = sr.Recognizer() |
with sr.Microphone() as mic: |
st.write("Listening for your query...") |
r.adjust_for_ambient_noise(mic, duration=1) |
audio = r.listen(mic) |
try: |
text = r.recognize_google(audio) |
st.success(f"Recognized: {text}") |
return text |
except sr.UnknownValueError: |
st.error("Sorry, I could not understand the audio.") |
return "" |
except sr.RequestError as e: |
st.error( |
f"Could not request results from Google Speech Recognition service; {e}" |
) |
return "" |
col1, col2 = st.columns([1, 3]) |
with col2: |
st.title("Restaurant Order Bot") |
if "session_id" not in st.session_state: |
session = ChatSession() |
session.save() |
st.session_state["session_id"] = str(session.id) |
if "conversation" not in st.session_state: |
st.session_state["conversation"] = [] |
user_query = st.text_input("Ask something:") |
col3, col4, col5 = st.columns([3, 3, 3]) |
with col3: |
if st.button("🎤 Speak your query"): |
recognized_text = speech_to_text() |
if recognized_text: |
user_query = recognized_text |
st.session_state["conversation"].append( |
{"role": "user", "text": user_query} |
) |
chat_schema = ChatSessionSchema( |
session_id=st.session_state["session_id"], query=user_query |
) |
chat_session = ChatSession.objects(id=chat_schema.session_id).first() |
chat_history = chat_session.get_last_messages() |
response = react_agent.handle_query( |
session_id=chat_schema.session_id, |
query=chat_schema.query, |
chat_history=chat_history, |
) |
chat_session.add_message_with_metadata( |
role=Role.USER.value, content=chat_schema.query |
) |
chat_session.add_message_with_metadata( |
role=Role.MODEL.value, content=response |
) |
if response: |
st.session_state["conversation"].append( |
{"role": "bot", "text": response} |
) |
else: |
st.session_state["conversation"].append( |
{ |
"role": "bot", |
"text": "Error: Could not get a response from the server.", |
} |
) |
with col4: |
if st.button("Submit"): |
if user_query: |
st.session_state["conversation"].append( |
{"role": "user", "text": user_query} |
) |
chat_schema = ChatSessionSchema( |
session_id=st.session_state["session_id"], query=user_query |
) |
chat_session = ChatSession.objects(id=chat_schema.session_id).first() |
chat_history = chat_session.get_last_messages() |
response = react_agent.handle_query( |
session_id=chat_schema.session_id, |
query=chat_schema.query, |
chat_history=chat_history, |
) |
chat_session.add_message_with_metadata( |
role=Role.USER.value, content=chat_schema.query |
) |
chat_session.add_message_with_metadata( |
role=Role.MODEL.value, content=response |
) |
if response: |
st.session_state["conversation"].append( |
{"role": "bot", "text": response} |
) |
else: |
st.session_state["conversation"].append( |
{ |
"role": "bot", |
"text": "Error: Could not get a response from the server.", |
} |
) |
with col5: |
if st.button("Clear Chat"): |
st.session_state["conversation"] = [] |
user_message_style = """ |
background-color: #d1e7dd; |
padding: 10px; |
border-radius: 10px; |
margin-bottom: 10px; |
color: #0f5132; |
text-align: left; |
""" |
bot_message_style = """ |
background-color: #f8d7da; |
padding: 10px; |
border-radius: 10px; |
margin-bottom: 10px; |
color: #842029; |
text-align: left; |
""" |
for message in st.session_state["conversation"]: |
if message["role"] == "user": |
st.markdown( |
f"<div style='{user_message_style}'><strong>You:</strong> {message['text']}</div>", |
unsafe_allow_html=True, |
) |
else: |
st.markdown( |
f"<div style='{bot_message_style}'><strong>Bot:</strong> {message['text']}</div>", |
unsafe_allow_html=True, |
) |
with col1: |
st.title("Menu") |
response = get_data_store_id(settings.STORE_ID, settings.BRAND_ID) |
menu_data = response |
blank_image_url = "https://via.placeholder.com/1x1/FFFFFF/FFFFFF" |
with st.container(): |
if menu_data.get("success"): |
for category in menu_data["data"]: |
with st.expander(category["name"], expanded=False): |
for item in category.get("itemsData", []): |
try: |
img_url = item.get("img_url", None) |
if img_url: |
st.image(img_url, width=100) |
else: |
st.image(blank_image_url, width=100) |
except Exception as e: |
st.image(blank_image_url, width=100) |
st.write(f"**{item['title']}**: {item['price']} Dirham AED") |
st.write(item["description"]) |
st.write("---") |
else: |
st.write("Failed to load menu data.") |