Spaces:
Running
Running
import time | |
import os | |
import streamlit as st | |
from reportlab.lib.pagesizes import letter | |
from reportlab.pdfgen import canvas | |
from googleapiclient.discovery import build | |
from google.oauth2.service_account import Credentials | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain.prompts import PromptTemplate | |
from langchain.memory import ConversationBufferWindowMemory | |
from langchain.chains import ConversationalRetrievalChain | |
from langchain_together import Together | |
from footer import footer | |
# Google Drive API setup | |
SCOPES = ["https://www.googleapis.com/auth/drive.readonly"] | |
SERVICE_ACCOUNT_FILE = "data/credentials.json" # Path to your Google API credentials file | |
FOLDER_ID = "1LZIx-1tt_GormpU8nF_I2WL88Oxa9juU" # Replace with your Google Drive folder ID | |
def authenticate_drive(): | |
"""Authenticate and return the Google Drive API service.""" | |
creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES) | |
return build("drive", "v3", credentials=creds) | |
from fuzzywuzzy import process | |
def search_drive_file(file_name): | |
"""Search for a file by name in the specified Google Drive folder using fuzzy matching.""" | |
service = authenticate_drive() | |
try: | |
# Get all files in the folder | |
query = f"'{FOLDER_ID}' in parents and trashed=false" | |
results = service.files().list(q=query, fields="files(id, name)").execute() | |
files = results.get("files", []) | |
# Debug: Print all file names for inspection | |
st.write("Available files:", [f['name'] for f in files]) | |
# Perform fuzzy matching to find the best match | |
file_names = [f['name'] for f in files] | |
best_match, score = process.extractOne(file_name, file_names) | |
if score >= 75: # Threshold for a match | |
matched_file = next(f for f in files if f['name'] == best_match) | |
st.write(f"Match found: {matched_file['name']} (Score: {score})") | |
return [matched_file] | |
else: | |
st.warning(f"No close matches found for '{file_name}'. Try rephrasing or checking the folder manually.") | |
return [] | |
except Exception as e: | |
st.error(f"An error occurred: {e}") | |
return [] | |
# Set the Streamlit page configuration and theme | |
st.set_page_config(page_title="In-Legal-IPC", layout="wide") | |
# Display the logo image with blur shadow | |
col1, col2, col3 = st.columns([1, 30, 1]) | |
with col2: | |
st.markdown( | |
""" | |
<style> | |
.blur-shadow { | |
box-shadow: 0px 4px 20px rgba(0, 0, 0, 0.5); | |
border-radius: 10px; | |
} | |
</style> | |
<div class="blur-shadow"> | |
<img src="https://raw.githubusercontent.com/shiv4321/Images/refs/heads/main/Banner.png" alt="Banner" width="100%" height="600px"> | |
</div> | |
""", | |
unsafe_allow_html=True | |
) | |
def hide_hamburger_menu(): | |
st.markdown(""" | |
<style> | |
#MainMenu {visibility: hidden;} | |
footer {visibility: hidden;} | |
</style> | |
""", unsafe_allow_html=True) | |
hide_hamburger_menu() | |
# Initialize session state for messages and memory | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "memory" not in st.session_state: | |
st.session_state.memory = ConversationBufferWindowMemory(k=5, memory_key="chat_history", return_messages=True) | |
def load_embeddings(): | |
"""Load and cache the embeddings model.""" | |
return HuggingFaceEmbeddings(model_name="law-ai/InLegalBERT") | |
embeddings = load_embeddings() | |
db = FAISS.load_local("ipc_embed_db", embeddings, allow_dangerous_deserialization=True) | |
db_retriever = db.as_retriever(search_type="similarity", search_kwargs={"k": 3}) | |
# Define the prompt template | |
prompt_template = """ | |
<s>[INST] | |
As a legal chatbot specializing in the Indian Penal Code, you are tasked with providing highly accurate and contextually appropriate responses. Ensure your answers meet these criteria: | |
- Respond in a bullet-point format to clearly delineate distinct aspects of the legal query. | |
- Each point should accurately reflect the breadth of the legal provision in question, avoiding over-specificity unless directly relevant to the user's query. | |
- Clarify the general applicability of the legal rules or sections mentioned, highlighting any common misconceptions or frequently misunderstood aspects. | |
- Limit responses to essential information that directly addresses the user's question, providing concise yet comprehensive explanations. | |
- Avoid assuming specific contexts or details not provided in the query, focusing on delivering universally applicable legal interpretations unless otherwise specified. | |
- Conclude with a brief summary that captures the essence of the legal discussion and corrects any common misinterpretations related to the topic. | |
CONTEXT: {context} | |
CHAT HISTORY: {chat_history} | |
QUESTION: {question} | |
ANSWER: | |
- [Detail the first key aspect of the law, ensuring it reflects general application] | |
- [Provide a concise explanation of how the law is typically interpreted or applied] | |
- [Correct a common misconception or clarify a frequently misunderstood aspect] | |
- [Detail any exceptions to the general rule, if applicable] | |
- [Include any additional relevant information that directly relates to the user's query] | |
</s>[INST] | |
""" | |
prompt = PromptTemplate(template=prompt_template, | |
input_variables=['context', 'question', 'chat_history']) | |
api_key = os.getenv('TOGETHER_API_KEY') | |
llm = Together(model="mistralai/Mixtral-8x22B-Instruct-v0.1", temperature=0.5, max_tokens=1024) #together_api_key="") | |
qa = ConversationalRetrievalChain.from_llm(llm=llm, memory=st.session_state.memory, retriever=db_retriever, combine_docs_chain_kwargs={'prompt': prompt}) | |
def extract_answer(full_response): | |
"""Extracts the answer from the LLM's full response by removing the instructional text.""" | |
answer_start = full_response.find("Response:") | |
if answer_start != -1: | |
answer_start += len("Response:") | |
answer_end = len(full_response) | |
return full_response[answer_start:answer_end].strip() | |
return full_response | |
def reset_conversation(): | |
st.session_state.messages = [] | |
st.session_state.memory.clear() | |
# Function to create a PDF | |
def create_pdf(content): | |
pdf_filename = "legal_letter.pdf" | |
c = canvas.Canvas(pdf_filename, pagesize=letter) | |
width, height = letter | |
c.drawString(100, height - 100, content) | |
c.save() | |
return pdf_filename | |
# Add links to multiple PDFs just above the chat input | |
st.markdown("<h3 class='underline'>Useful PDFs</h3>", unsafe_allow_html=True) | |
col1, col2 = st.columns(2) # Create two columns for better alignment | |
with col1: | |
if st.button("Commercial Court Rules and Forms π", key="ccrf", help="Open PDF", use_container_width=True): | |
st.markdown("[Open PDF](https://drive.google.com/file/d/198SC1mKipJ7WQXGN-5uc8qkNV5rLxVlT/view?usp=sharing)", unsafe_allow_html=True) | |
if st.button("Bail-Bond π", key="bb", help="Open PDF", use_container_width=True): | |
st.markdown("[Open PDF](https://drive.google.com/file/d/1Eju14MgFFME3nUknjwlbU8C9nrQoeM1v/view?usp=drive_link)", unsafe_allow_html=True) | |
with col2: | |
if st.button("Inspection Form π", key="if", help="Open PDF", use_container_width=True): | |
st.markdown("[Open PDF](https://drive.google.com/file/d/17FT5Pmgp4bgf31tFyQRMNVnoRuVlQ2zi/view?usp=sharing)", unsafe_allow_html=True) | |
if st.button("Additional PDF π", key="apdf", help="Open PDF", use_container_width=True): | |
st.markdown("[Open PDF](https://drive.google.com/file/d/1LY1-R9chmd_I7Tf3iC4jNZ5dHRFFkjaV/view?usp=sharing)", unsafe_allow_html=True) | |
# Add the new message below the PDF section | |
st.markdown("<p style='text-align: center; font-size: 16px; color: #555;'>If you need any application form, please mention 'form' at the end.</p>", unsafe_allow_html=True) | |
# Add CSS for the button styling | |
st.markdown(""" | |
<style> | |
.stButton button { | |
background-color: #ADD8E6; | |
color: black; | |
font-size: 16px; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Display previous messages | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.write(message["content"]) | |
# Initialize session state variables | |
if "show_reset" not in st.session_state: | |
st.session_state.show_reset = False | |
# Chat input area | |
input_prompt = st.chat_input("Say something...") | |
if input_prompt: | |
with st.chat_message("user"): | |
st.markdown(f"**You:** {input_prompt}") | |
# Enable the reset button after receiving input | |
st.session_state.show_reset = True | |
if "form" in input_prompt.lower() or "document" in input_prompt.lower(): | |
with st.spinner("Searching Google Drive..."): | |
# Call the updated search function | |
search_results = search_drive_file(input_prompt) | |
if search_results: | |
# Generate response for found files | |
response = "π Document(s) found! Click below to view:" | |
for file in search_results: | |
response += f"\n- [{file['name']}](https://drive.google.com/file/d/{file['id']}/view)" | |
st.session_state.messages.append({"role": "assistant", "content": response}) | |
st.write(response) | |
else: | |
# If no results, provide an alternative message | |
response = ( | |
"β οΈ No matching documents found. " | |
"Please check the spelling or explore the folder directly: " | |
f"[Google Drive Folder](https://drive.google.com/drive/folders/{FOLDER_ID})" | |
) | |
st.session_state.messages.append({"role": "assistant", "content": response}) | |
st.write(response) | |
else: | |
# Handle general questions | |
with st.chat_message("assistant"): | |
with st.spinner("Thinking π‘..."): | |
try: | |
# Validate the input before invoking the QA chain | |
if not input_prompt.strip(): | |
st.warning("β οΈ Input cannot be empty!") | |
else: | |
result = qa.invoke(input=input_prompt) | |
answer = result["answer"].strip() | |
# Simulate typing effect for the response | |
message_placeholder = st.empty() | |
full_response = ( | |
"β οΈ **_Gentle reminder: We strive for precision, but please double-check._**\n\n" | |
) | |
for chunk in answer.split(): | |
full_response += chunk + " " | |
time.sleep(0.02) # Simulating typing | |
message_placeholder.markdown(full_response + " |", unsafe_allow_html=True) | |
st.session_state.messages.append({"role": "assistant", "content": answer}) | |
except Exception as e: | |
# Handle unexpected errors during QA invocation | |
error_message = f"β οΈ **_Error: An unexpected issue occurred: {str(e)}._**" | |
st.error(error_message) | |
st.session_state.messages.append({"role": "assistant", "content": error_message}) | |
# Reset button | |
if st.session_state.show_reset: | |
if st.button('ποΈ Reset All Chat', on_click=reset_conversation): | |
st.rerun() # Updated from st.experimental_rerun | |
footer() | |