|
import os |
|
import streamlit as st |
|
from datasets import load_dataset |
|
import chromadb |
|
import string |
|
|
|
from openai import OpenAI |
|
|
|
import numpy as np |
|
import pandas as pd |
|
|
|
from scipy.spatial.distance import cosine |
|
|
|
from typing import Dict, List |
|
|
|
def merge_dataframes(dataframes): |
|
|
|
combined_dataframe = pd.concat(dataframes, ignore_index=True) |
|
|
|
|
|
combined_dataframe = combined_dataframe[['context', 'questions', 'answers']] |
|
|
|
return combined_dataframe |
|
|
|
def call_chatgpt(prompt: str) -> str: |
|
""" |
|
Uses the OpenAI API to generate an AI response to a prompt. |
|
Args: |
|
prompt: A string representing the prompt to send to the OpenAI API. |
|
Returns: |
|
A string representing the AI's generated response. |
|
""" |
|
|
|
|
|
client = OpenAI(api_key = os.environ["OPENAI_API_KEY"]) |
|
|
|
completion = client.chat.completions.create( |
|
model="gpt-3.5-turbo-0125", |
|
messages=[ |
|
{"role": "system", "content": "You are a helpful assistant."}, |
|
{"role": "user", "content": prompt} |
|
] |
|
) |
|
|
|
|
|
ans = completion.choices[0].message.content |
|
|
|
|
|
return ans |
|
|
|
def openai_text_embedding(prompt: str) -> str: |
|
return openai.Embedding.create(input=prompt, model="text-embedding-ada-002")[ |
|
"data" |
|
][0]["embedding"] |
|
|
|
def calculate_sts_openai_score(sentence1: str, sentence2: str) -> float: |
|
|
|
embedding1 = openai_text_embedding(sentence1) |
|
embedding2 = openai_text_embedding(sentence2) |
|
|
|
|
|
embedding1 = np.asarray(embedding1) |
|
embedding2 = np.asarray(embedding2) |
|
|
|
|
|
similarity_score = 1 - cosine(embedding1, embedding2) |
|
|
|
return similarity_score |
|
|
|
def add_dist_score_column( |
|
dataframe: pd.DataFrame, sentence: str, |
|
) -> pd.DataFrame: |
|
dataframe["stsopenai"] = dataframe["questions"].apply( |
|
lambda x: calculate_sts_openai_score(str(x), sentence) |
|
) |
|
|
|
sorted_dataframe = dataframe.sort_values(by="stsopenai", ascending=False) |
|
|
|
|
|
return sorted_dataframe.iloc[:5, :] |
|
|
|
def convert_to_list_of_dict(df: pd.DataFrame) -> List[Dict[str, str]]: |
|
""" |
|
Reads in a pandas DataFrame and produces a list of dictionaries with two keys each, 'question' and 'answer.' |
|
Args: |
|
df: A pandas DataFrame with columns named 'questions' and 'answers'. |
|
Returns: |
|
A list of dictionaries, with each dictionary containing a 'question' and 'answer' key-value pair. |
|
""" |
|
|
|
|
|
result = [] |
|
|
|
|
|
for index, row in df.iterrows(): |
|
|
|
qa_dict_quest = {"role": "user", "content": row["questions"]} |
|
qa_dict_ans = {"role": "assistant", "content": row["answers"]} |
|
|
|
|
|
result.append(qa_dict_quest) |
|
result.append(qa_dict_ans) |
|
|
|
|
|
return result |
|
|
|
st.sidebar.markdown("""This is a chatbot to help you learn more about Youth Spirit Artworks!""") |
|
|
|
domain = st.sidebar.selectbox("What do you want to learn about?", ("About YSA", "Our Team and Youth Leaders", "Tiny House Village", "Qualify/Apply for Village", "YSA Supporters")) |
|
|
|
special_threshold = 0.3 |
|
|
|
n_results = 3 |
|
|
|
clear_button = st.sidebar.button("Clear Conversation", key="clear") |
|
|
|
if clear_button: |
|
st.session_state.messages = [] |
|
|
|
|
|
if domain == "About YSA": |
|
dataset = load_dataset( |
|
"KeshavRa/About_YSA_Database" |
|
) |
|
elif domain == "Our Team and Youth Leaders": |
|
dataset = load_dataset( |
|
"KeshavRa/Our_Team_Youth_Leaders_Database" |
|
) |
|
elif domain == "Tiny House Village": |
|
dataset = load_dataset( |
|
"KeshavRa/Tiny_House_Village_Database" |
|
) |
|
elif domain == "Qualify/Apply for Village": |
|
dataset = load_dataset( |
|
"KeshavRa/Qualify_Apply_For_Village_Database" |
|
) |
|
elif domain == "YSA Supporters": |
|
dataset = load_dataset( |
|
"KeshavRa/YSA_Supporters_Database" |
|
) |
|
|
|
initial_input = "Tell me about YSA" |
|
|
|
|
|
client = chromadb.Client() |
|
|
|
|
|
random_number: int = np.random.randint(low=1e9, high=1e10) |
|
|
|
|
|
random_string: str = "".join( |
|
np.random.choice(list(string.ascii_uppercase + string.digits), size=10) |
|
) |
|
|
|
|
|
combined_string: str = f"{random_number}{random_string}" |
|
|
|
|
|
collection = client.create_collection(combined_string) |
|
|
|
st.title("Youth Spirit Artworks Chatbot") |
|
|
|
|
|
if "messages" not in st.session_state: |
|
st.session_state.messages = [] |
|
|
|
if "curr_database" not in st.session_state: |
|
st.session_state.curr_database = None |
|
|
|
init_messages = { |
|
"About YSA": ''' |
|
On this page, you can learn about what YSA does, how YSA was started, the advisory board, and the programs we offer. |
|
|
|
Examples |
|
|
|
--> What is the purpose of Youth Spirit Artworks? |
|
|
|
--> Who created YSA? |
|
|
|
--> What is the Advisory Board for Youth Spirit Artworks? |
|
|
|
--> What are the three empowerment-focused program areas of YSA? |
|
''', |
|
|
|
"Our Team and Youth Leaders": ''' b ''', |
|
|
|
"Tiny House Village": ''' c ''', |
|
|
|
"Qualify/Apply for Village": ''' d ''', |
|
|
|
"YSA Supporters": ''' e ''', |
|
} |
|
|
|
|
|
with st.spinner("Loading, please be patient with us ... 🙏"): |
|
L = len(dataset["train"]["questions"]) |
|
collection.add( |
|
ids=[str(i) for i in range(0, L)], |
|
documents=dataset["train"]["questions"], |
|
metadatas=[{"type": "support"} for _ in range(0, L)], |
|
) |
|
|
|
|
|
if not st.session_state.curr_database.equals(dataset): |
|
print("True") |
|
st.session_state.messages = [] |
|
init_message = init_messages[domain] |
|
st.session_state.messages.append({"role": "assistant", "content": init_message}) |
|
|
|
|
|
st.session_state.curr_database = dataset |
|
|
|
|
|
for message in st.session_state.messages: |
|
with st.chat_message(message["role"]): |
|
st.markdown(message["content"]) |
|
|
|
|
|
if prompt := st.chat_input("Tell me about YSA"): |
|
|
|
st.chat_message("user").markdown(prompt) |
|
|
|
st.session_state.messages.append({"role": "user", "content": prompt}) |
|
|
|
question = prompt |
|
|
|
results = collection.query(query_texts=question, n_results=n_results) |
|
|
|
idx = results["ids"][0] |
|
idx = [int(i) for i in idx] |
|
ref = pd.DataFrame( |
|
{ |
|
"idx": idx, |
|
"questions": [dataset["train"]["questions"][i] for i in idx], |
|
"answers": [dataset["train"]["answers"][i] for i in idx], |
|
"distances": results["distances"][0], |
|
} |
|
) |
|
|
|
|
|
filtered_ref = ref[ref["distances"] < special_threshold] |
|
if filtered_ref.shape[0] > 0: |
|
|
|
ref_from_db_search = filtered_ref["answers"].str.cat(sep=" ") |
|
final_ref = filtered_ref |
|
else: |
|
|
|
|
|
|
|
ref_from_db_search = ref["answers"].str.cat(sep=" ") |
|
final_ref = ref |
|
|
|
engineered_prompt = f""" |
|
Based on the context: {ref_from_db_search}, |
|
answer the user question: {question}. |
|
""" |
|
|
|
answer = call_chatgpt(engineered_prompt) |
|
|
|
response = answer |
|
|
|
with st.chat_message("assistant"): |
|
st.markdown(response) |
|
with st.expander("See reference:"): |
|
st.table(final_ref) |
|
|
|
st.session_state.messages.append({"role": "assistant", "content": response}) |