ChatbotGuide / app.py
KeshavRa's picture
Update app.py
91c5e04 verified
raw
history blame
26.4 kB
import streamlit as st
import pandas as pd
from openai import OpenAI
from PyPDF2 import PdfReader
from PIL import Image
# source: eagle0504/document-search-q-series
def read_and_textify_advanced(files, chunk_size):
"""
Reads PDF files and extracts text from each page, breaking the text into specified segments.
This function iterates over a list of uploaded PDF files, extracts text from each page,
and compiles a list of texts and corresponding source information, segmented into smaller parts
of approximately 'chunk_size' words each.
Args:
files (List[st.uploaded_file_manager.UploadedFile]): A list of uploaded PDF files.
chunk_size (int): The number of words per text segment. Default is 50.
Returns: A list of strings, where each string is a segment of text extracted from a PDF page.
"""
text_list = [] # List to store extracted text segments
# Iterate over each file
for file in files:
pdfReader = PdfReader(file) # Create a PDF reader object
# Iterate over each page in the PDF
for i in range(len(pdfReader.pages)):
pageObj = pdfReader.pages[i] # Get the page object
text = pageObj.extract_text() # Extract text from the page
if text:
# Split text into chunks of approximately 'chunk_size' words
words = text.split(".")
for j in range(0, len(words), chunk_size):
# Get the chunk of text from j-chunk_size to j+chunk_size
# start = max(0, j - chunk_size)
# end = min(len(words), j + chunk_size + 1)
chunk = ".".join(words[j:j+chunk_size]) + '.'
chunk = chunk.strip()
text_list.append(chunk)
# Create a source identifier for each chunk and add it to the list
else:
# If no text extracted, still add a placeholder
text_list.append("")
pageObj.clear() # Clear the page object (optional, for memory management)
return text_list
def get_questions(context, instructions) -> str:
"""
Given a text context, generates a list of questions using OpenAI's GPT-3 API.
Args:
- context: A string representing the context for which questions should be generated.
Returns:
- A string containing the question generated by the API.
"""
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": f"{instructions}\n\nText: {context}\n\nQuestions:\n"}
]
)
# Extract question text from the response
question_text = response.choices[0].message.content
return question_text
except:
# Return an empty string if there was an error
return ""
def get_answers(row, instructions) -> str:
"""
Given a dataframe row containing context and questions, generates an answer using OpenAI's GPT-3 API.
Args:
- row: A pandas dataframe row containing 'context' and 'questions' columns.
Returns:
- A string containing the answer generated by the API.
"""
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": f"{instructions}\n\nText: {row.context}\n\nQuestions:\n{row.questions}\n\nAnswers:\n"}
]
)
# Extract answer text from the response
answer_text = response.choices[0].message.content
return answer_text
except Exception as e:
# Print the error message and return an empty string if there was an error
print (e)
return ""
st.set_page_config(page_title="ChatbotGuide", layout="wide")
st.title("Chatbot Guide")
# Define the options in the dropdown menu
app_options = [
"1) Scrape PDFs",
"2) Create CSVs",
"3) Merge CSVs",
"4) Upload Datasets",
"5) Create Chatbot"
]
# Sidebar dropdown for selecting the application
selected_app = st.sidebar.selectbox("Select Step (1-5)", app_options)
# Clear session state when switching apps
if 'last_selected_app' in st.session_state:
if st.session_state.last_selected_app != selected_app:
st.session_state.clear()
st.session_state.last_selected_app = selected_app
if 'submit' not in st.session_state:
st.session_state.submit = False
if 'error' not in st.session_state:
st.session_state.error = ""
if 'success' not in st.session_state:
st.session_state.success = None
if selected_app == "1) Scrape PDFs":
st.write("1. Go to your organizations webpage")
image = Image.open('Example1.png')
st.image(image, caption="Example for Step 1",use_column_width=True)
st.divider()
st.write("2. Choose an section in the webpage")
image = Image.open('Example2.png')
st.image(image, caption="Example for Step 2",use_column_width=True)
st.divider()
# st.write("3. Copy all text on the page")
# image = Image.open('Example3.png')
# st.image(image, caption="Example for Step 3",use_column_width=True)
# st.divider()
# st.write("4. Open a new google doc")
# image = Image.open('Example4.png')
# st.image(image, caption="Example for Step 4",use_column_width=True)
# st.divider()
if selected_app == "2) Create CSVs":
if st.session_state.error != "":
st.error(st.session_state.error)
if st.session_state.success != None:
st.success("Success! Download the Q/A pairs below / Click reset to upload more PDFs")
st.download_button(
label=f"Download CSV: length = {st.session_state.success[1]}",
data=st.session_state.success[0],
file_name='questions_answers.csv',
mime='text/csv',
)
if st.button('Reset'):
st.session_state.clear()
st.rerun()
else:
uploaded_files = st.file_uploader("Upload PDFs Here", type="pdf", accept_multiple_files=True)
question_protocol = st.text_input("Provide instructions for how questions should be generated", "Write a question based on the text")
answer_protocol = st.text_input("Provide instructions for how answers should be generated", "Write an answer based on the text")
sentence_chunks = st.number_input("Number sentences per Q/A pair", value=2, step=1, min_value=1, max_value=3)
openai_api_key = st.text_input("Enter your OpenAI API key", type="password")
submit = st.button("Submit")
if submit:
st.session_state.submit = True
if st.session_state.submit:
if uploaded_files:
client = OpenAI(api_key=openai_api_key)
with st.spinner("Loading, please be patient with us ... πŸ™"):
# test api key
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Say this is a test"}
]
)
except:
st.session_state.clear()
st.session_state.error = "OpenAI API key is invalid"
st.rerun()
with st.spinner("Loading, please be patient with us ... πŸ™"):
textify_output = read_and_textify_advanced(uploaded_files, sentence_chunks)
df = pd.DataFrame(textify_output)
df.columns = ['context']
if question_protocol == "":
question_protocol = "Write questions based on the text"
df['questions'] = df.apply(lambda row: get_questions(row['context'], question_protocol), axis=1)
if answer_protocol == "":
answer_protocol = "Write answers based on the text"
df['answers'] = df.apply(lambda row: get_answers(row, answer_protocol), axis=1)
df = df.drop('context', axis=1)
length = len(df)
csv = df.to_csv(index=False).encode('utf-8')
st.session_state.clear()
st.session_state.success = (csv, length)
st.rerun()
else:
st.session_state.clear()
st.session_state.error = "Please upload at least 1 PDF"
st.rerun()
if selected_app == "3) Merge CSVs":
if st.session_state.error != "":
st.error(st.session_state.error)
if st.session_state.success != None:
st.success("Success! Download the merged CSV with Q/A pairs below / Reset to merge more CSVs")
st.download_button(
label=f"Download CSV: length = {st.session_state.success[1]}",
data=st.session_state.success[0],
file_name='questions_answers.csv',
mime='text/csv',
)
if st.button('Reset'):
st.session_state.clear()
st.rerun()
else:
uploaded_files = st.file_uploader("Upload CSV files to merge", accept_multiple_files=True, type="csv")
submit = st.button("Submit")
if submit:
st.session_state.submit = True
if st.session_state.submit:
if len(uploaded_files) > 1:
dfs = []
for file in uploaded_files:
df = pd.read_csv(file)
if "questions" in df.columns and "answers" in df.columns:
df = df[["questions", "answers"]]
dfs.append(df)
else:
st.session_state.clear()
st.session_state.error = "Please upload CSVs that have been generated from 1) Create CSV"
st.rerun()
df = pd.concat(dfs, ignore_index=True)
length = len(df)
csv = df.to_csv(index=False).encode('utf-8')
st.session_state.clear()
st.session_state.success = (csv, length)
st.rerun()
else:
st.session_state.clear()
st.session_state.error = "Please upload at least 2 CSVs to merge"
st.rerun()
if selected_app == "4) Upload Datasets":
st.markdown("Go to this [google colab link](https://colab.research.google.com/drive/1eCpk9HUoCKZb--tiNyQSHFW2ojoaA35m) to get started")
if selected_app == "5) Create Chatbot":
if st.session_state.error != "":
st.error(st.session_state.error)
if st.session_state.success != None:
st.success("Success! Copy/paste the requirements.txt and app.py files into your HuggingFace Space")
st.write('requirements.txt')
st.code(st.session_state.success[0], language='python')
st.write('app.py')
st.code(st.session_state.success[1], language='python')
if st.button('Reset'):
st.session_state.clear()
st.rerun()
else:
organization_name = st.text_input("What is the name of your organization", "")
num_domains = st.number_input("How many datasets do you have uploaded", value=1, step=1, min_value=1, max_value=10)
st.divider()
domain_info = []
for i in range(num_domains):
domain_link = st.text_input(f"Please enter link to dataset {i+1} with the format username/dataset_name", "Example: KeshavRa/About_YSA_Database")
domain_name = st.text_input(f"What should domain {i+1} be called in the chatbot itself", "Example: About YSA")
domain_purpose = st.text_area(f"What is the purpose of domain {i+1}, provide example questions (this will be visible to users of the chatbot)", 'Example: On this page, you can learn about what YSA does, how YSA was started, the advisory board, and the programs we offer.\n\nExample Questions\n\n--> What is the purpose of Youth Spirit Artworks?\n\n--> Who created YSA?\n\n--> What is the Advisory Board for Youth Spirit Artworks?\n\n--> What are the three empowerment-focused program areas of YSA?')
domain_instructions = st.text_input(f"What baseline instructions/specifications should be sent to ChatGPT to answer questions in domain {i+1}", "Example: You are an assistant to help the user learn more about Youth Spirit Artworks")
domain = {"link": domain_link, "name": domain_name, "purpose": domain_purpose, "instructions": domain_instructions}
domain_info.append(domain)
st.divider()
submit = st.button("Submit")
if submit:
st.session_state.submit = True
if st.session_state.submit:
if organization_name == "":
st.session_state.clear()
st.session_state.error = "Please enter an organization name"
st.rerun()
missing_info = []
for i in range(len(domain_info)):
if domain_info[i]['link'] == "":
missing_info.append(f"link to domain {i+1}")
if domain_info[i]['name'] == "":
missing_info.append(f"name for domain {i+1}")
if domain_info[i]['purpose'] == "":
missing_info.append(f"purpose for domain {i+1}")
if domain_info[i]['instructions'] == "":
missing_info.append(f"instructions for domain {i+1}")
if missing_info:
error = "Missing Info: "
for info in missing_info:
error += (info + ', ')
st.session_state.clear()
st.session_state.error = error
st.rerun()
requirements = '''
openai
scipy
streamlit
chromadb
datasets
'''
app = f"""
import os
import streamlit as st
from datasets import load_dataset
import chromadb
import string
from openai import OpenAI
import numpy as np
import pandas as pd
from scipy.spatial.distance import cosine
from typing import Dict, List
def merge_dataframes(dataframes):
# Concatenate the list of dataframes
combined_dataframe = pd.concat(dataframes, ignore_index=True)
# Ensure that the resulting dataframe only contains the columns "context", "questions", "answers"
combined_dataframe = combined_dataframe[['context', 'questions', 'answers']]
return combined_dataframe
def call_chatgpt(prompt: str, directions: str) -> str:
'''
Uses the OpenAI API to generate an AI response to a prompt.
Args:
prompt: A string representing the prompt to send to the OpenAI API.
Returns:
A string representing the AI's generated response.
'''
# Use the OpenAI API to generate a response based on the input prompt.
client = OpenAI(api_key = os.environ["OPENAI_API_KEY"])
completion = client.chat.completions.create(
model="gpt-3.5-turbo-0125",
messages=[
{{"role": "system", "content": directions}},
{{"role": "user", "content": prompt}}
]
)
# Extract the text from the first (and only) choice in the response output.
ans = completion.choices[0].message.content
# Return the generated AI response.
return ans
def openai_text_embedding(prompt: str) -> str:
return openai.Embedding.create(input=prompt, model="text-embedding-ada-002")[
"data"
][0]["embedding"]
def calculate_sts_openai_score(sentence1: str, sentence2: str) -> float:
# Compute sentence embeddings
embedding1 = openai_text_embedding(sentence1) # Flatten the embedding array
embedding2 = openai_text_embedding(sentence2) # Flatten the embedding array
# Convert to array
embedding1 = np.asarray(embedding1)
embedding2 = np.asarray(embedding2)
# Calculate cosine similarity between the embeddings
similarity_score = 1 - cosine(embedding1, embedding2)
return similarity_score
def add_dist_score_column(
dataframe: pd.DataFrame, sentence: str,
) -> pd.DataFrame:
dataframe["stsopenai"] = dataframe["questions"].apply(
lambda x: calculate_sts_openai_score(str(x), sentence)
)
sorted_dataframe = dataframe.sort_values(by="stsopenai", ascending=False)
return sorted_dataframe.iloc[:5, :]
def convert_to_list_of_dict(df: pd.DataFrame) -> List[Dict[str, str]]:
'''
Reads in a pandas DataFrame and produces a list of dictionaries with two keys each, 'question' and 'answer.'
Args:
df: A pandas DataFrame with columns named 'questions' and 'answers'.
Returns:
A list of dictionaries, with each dictionary containing a 'question' and 'answer' key-value pair.
'''
# Initialize an empty list to store the dictionaries
result = []
# Loop through each row of the DataFrame
for index, row in df.iterrows():
# Create a dictionary with the current question and answer
qa_dict_quest = {{"role": "user", "content": row["questions"]}}
qa_dict_ans = {{"role": "assistant", "content": row["answers"]}}
# Add the dictionary to the result list
result.append(qa_dict_quest)
result.append(qa_dict_ans)
# Return the list of dictionaries
return result
domain_info = {domain_info}
st.sidebar.markdown('''This is a chatbot to help you learn more about {organization_name}''')
domain = st.sidebar.selectbox("Select a topic", [d["name"] for d in domain_info])
special_threshold = 0.3
n_results = 3
clear_button = st.sidebar.button("Clear Conversation", key="clear")
if clear_button:
st.session_state.messages = []
st.session_state.curr_domain = ""
for d in domain_info:
if domain == d['name']:
dataset = load_dataset(d['link'])
initial_input = "Tell me about {organization_name}"
# Initialize a new client for ChromeDB.
client = chromadb.Client()
# Generate a random number between 1 billion and 10 billion.
random_number: int = np.random.randint(low=1e9, high=1e10)
# Generate a random string consisting of 10 uppercase letters and digits.
random_string: str = "".join(
np.random.choice(list(string.ascii_uppercase + string.digits), size=10)
)
# Combine the random number and random string into one identifier.
combined_string: str = f"{{random_number}}{{random_string}}"
# Create a new collection in ChromeDB with the combined string as its name.
collection = client.create_collection(combined_string)
st.title("{organization_name} Chatbot")
# Initialize chat history
if "messages" not in st.session_state:
st.session_state.messages = []
if "curr_domain" not in st.session_state:
st.session_state.curr_domain = ""
init_messages = {{}}
for d in domain_info:
init_messages[d['name']] = d['purpose']
chatbot_instructions = {{}}
for d in domain_info:
chatbot_instructions[d['name']] = d['instructions']
# Embed and store the first N supports for this demo
with st.spinner("Loading, please be patient with us ... πŸ™"):
L = len(dataset["train"]["questions"])
collection.add(
ids=[str(i) for i in range(0, L)], # IDs are just strings
documents=dataset["train"]["questions"], # Enter questions here
metadatas=[{{"type": "support"}} for _ in range(0, L)],
)
if st.session_state.curr_domain != domain:
st.session_state.messages = []
init_message = init_messages[domain]
st.session_state.messages.append({{"role": "assistant", "content": init_message}})
st.session_state.curr_domain = domain
# Display chat messages from history on app rerun
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# React to user input
if prompt := st.chat_input("Tell me about {organization_name}"):
# Display user message in chat message container
st.chat_message("user").markdown(prompt)
# Add user message to chat history
st.session_state.messages.append({{"role": "user", "content": prompt}})
question = prompt
results = collection.query(query_texts=question, n_results=n_results)
idx = results["ids"][0]
idx = [int(i) for i in idx]
ref = pd.DataFrame(
{{
"idx": idx,
"questions": [dataset["train"]["questions"][i] for i in idx],
"answers": [dataset["train"]["answers"][i] for i in idx],
"distances": results["distances"][0],
}}
)
# special_threshold = st.sidebar.slider('How old are you?', 0, 0.6, 0.1) # 0.3
# special_threshold = 0.3
filtered_ref = ref[ref["distances"] < special_threshold]
if filtered_ref.shape[0] > 0:
# st.success("There are highly relevant information in our database.")
ref_from_db_search = filtered_ref["answers"].str.cat(sep=" ")
final_ref = filtered_ref
else:
# st.warning(
# "The database may not have relevant information to help your question so please be aware of hallucinations."
# )
ref_from_db_search = ref["answers"].str.cat(sep=" ")
final_ref = ref
engineered_prompt = f'''
Based on the context: {{ref_from_db_search}},
answer the user question: {{question}}.
'''
directions = chatbot_instructions[domain]
answer = call_chatgpt(engineered_prompt, directions)
response = answer
# Display assistant response in chat message container
with st.chat_message("assistant"):
st.markdown(response)
with st.expander("See reference:"):
st.table(final_ref)
# Add assistant response to chat history
st.session_state.messages.append({{"role": "assistant", "content": response}})
"""
st.session_state.clear()
st.session_state.success = (requirements, app)
st.rerun()