import streamlit as st import pandas as pd from openai import OpenAI from PyPDF2 import PdfReader from PIL import Image # source: eagle0504/document-search-q-series def read_and_textify_advanced(files, chunk_size): """ Reads PDF files and extracts text from each page, breaking the text into specified segments. This function iterates over a list of uploaded PDF files, extracts text from each page, and compiles a list of texts and corresponding source information, segmented into smaller parts of approximately 'chunk_size' words each. Args: files (List[st.uploaded_file_manager.UploadedFile]): A list of uploaded PDF files. chunk_size (int): The number of words per text segment. Default is 50. Returns: A list of strings, where each string is a segment of text extracted from a PDF page. """ text_list = [] # List to store extracted text segments # Iterate over each file for file in files: pdfReader = PdfReader(file) # Create a PDF reader object # Iterate over each page in the PDF for i in range(len(pdfReader.pages)): pageObj = pdfReader.pages[i] # Get the page object text = pageObj.extract_text() # Extract text from the page if text: # Split text into chunks of approximately 'chunk_size' words words = text.split(".") for j in range(0, len(words), chunk_size): # Get the chunk of text from j-chunk_size to j+chunk_size # start = max(0, j - chunk_size) # end = min(len(words), j + chunk_size + 1) chunk = ".".join(words[j:j+chunk_size]) + '.' chunk = chunk.strip() text_list.append(chunk) # Create a source identifier for each chunk and add it to the list else: # If no text extracted, still add a placeholder text_list.append("") pageObj.clear() # Clear the page object (optional, for memory management) return text_list def get_questions(context, instructions) -> str: """ Given a text context, generates a list of questions using OpenAI's GPT-3 API. Args: - context: A string representing the context for which questions should be generated. Returns: - A string containing the question generated by the API. """ try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": f"{instructions}\n\nText: {context}\n\nQuestions:\n"} ] ) # Extract question text from the response question_text = response.choices[0].message.content return question_text except: # Return an empty string if there was an error return "" def get_answers(row, instructions) -> str: """ Given a dataframe row containing context and questions, generates an answer using OpenAI's GPT-3 API. Args: - row: A pandas dataframe row containing 'context' and 'questions' columns. Returns: - A string containing the answer generated by the API. """ try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": f"{instructions}\n\nText: {row.context}\n\nQuestions:\n{row.questions}\n\nAnswers:\n"} ] ) # Extract answer text from the response answer_text = response.choices[0].message.content return answer_text except Exception as e: # Print the error message and return an empty string if there was an error print (e) return "" st.set_page_config(page_title="ChatbotGuide", layout="wide") st.title("Chatbot Guide") # Define the options in the dropdown menu app_options = [ "1) Scrape PDFs", "2) Create CSVs", "3) Merge CSVs", "4) Upload Datasets", "5) Create Chatbot" ] # Sidebar dropdown for selecting the application selected_app = st.sidebar.selectbox("Select Step (1-5)", app_options) # Clear session state when switching apps if 'last_selected_app' in st.session_state: if st.session_state.last_selected_app != selected_app: st.session_state.clear() st.session_state.last_selected_app = selected_app if 'submit' not in st.session_state: st.session_state.submit = False if 'error' not in st.session_state: st.session_state.error = "" if 'success' not in st.session_state: st.session_state.success = None if selected_app == "1) Scrape PDFs": st.write("1. Go to your organizations webpage") image = Image.open('Example1.png') st.image(image, caption="Example for Step 1",use_column_width=True) st.divider() st.write("2. Choose an section in the webpage") image = Image.open('Example2.png') st.image(image, caption="Example for Step 2",use_column_width=True) st.divider() # st.write("3. Copy all text on the page") # image = Image.open('Example3.png') # st.image(image, caption="Example for Step 3",use_column_width=True) # st.divider() # st.write("4. Open a new google doc") # image = Image.open('Example4.png') # st.image(image, caption="Example for Step 4",use_column_width=True) # st.divider() if selected_app == "2) Create CSVs": if st.session_state.error != "": st.error(st.session_state.error) if st.session_state.success != None: st.success("Success! Download the Q/A pairs below / Click reset to upload more PDFs") st.download_button( label=f"Download CSV: length = {st.session_state.success[1]}", data=st.session_state.success[0], file_name='questions_answers.csv', mime='text/csv', ) if st.button('Reset'): st.session_state.clear() st.rerun() else: uploaded_files = st.file_uploader("Upload PDFs Here", type="pdf", accept_multiple_files=True) question_protocol = st.text_input("Provide instructions for how questions should be generated", "Write a question based on the text") answer_protocol = st.text_input("Provide instructions for how answers should be generated", "Write an answer based on the text") sentence_chunks = st.number_input("Number sentences per Q/A pair", value=2, step=1, min_value=1, max_value=3) openai_api_key = st.text_input("Enter your OpenAI API key", type="password") submit = st.button("Submit") if submit: st.session_state.submit = True if st.session_state.submit: if uploaded_files: client = OpenAI(api_key=openai_api_key) with st.spinner("Loading, please be patient with us ... 🙏"): # test api key try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Say this is a test"} ] ) except: st.session_state.clear() st.session_state.error = "OpenAI API key is invalid" st.rerun() with st.spinner("Loading, please be patient with us ... 🙏"): textify_output = read_and_textify_advanced(uploaded_files, sentence_chunks) df = pd.DataFrame(textify_output) df.columns = ['context'] if question_protocol == "": question_protocol = "Write questions based on the text" df['questions'] = df.apply(lambda row: get_questions(row['context'], question_protocol), axis=1) if answer_protocol == "": answer_protocol = "Write answers based on the text" df['answers'] = df.apply(lambda row: get_answers(row, answer_protocol), axis=1) df = df.drop('context', axis=1) length = len(df) csv = df.to_csv(index=False).encode('utf-8') st.session_state.clear() st.session_state.success = (csv, length) st.rerun() else: st.session_state.clear() st.session_state.error = "Please upload at least 1 PDF" st.rerun() if selected_app == "3) Merge CSVs": if st.session_state.error != "": st.error(st.session_state.error) if st.session_state.success != None: st.success("Success! Download the merged CSV with Q/A pairs below / Reset to merge more CSVs") st.download_button( label=f"Download CSV: length = {st.session_state.success[1]}", data=st.session_state.success[0], file_name='questions_answers.csv', mime='text/csv', ) if st.button('Reset'): st.session_state.clear() st.rerun() else: uploaded_files = st.file_uploader("Upload CSV files to merge", accept_multiple_files=True, type="csv") submit = st.button("Submit") if submit: st.session_state.submit = True if st.session_state.submit: if len(uploaded_files) > 1: dfs = [] for file in uploaded_files: df = pd.read_csv(file) if "questions" in df.columns and "answers" in df.columns: df = df[["questions", "answers"]] dfs.append(df) else: st.session_state.clear() st.session_state.error = "Please upload CSVs that have been generated from 1) Create CSV" st.rerun() df = pd.concat(dfs, ignore_index=True) length = len(df) csv = df.to_csv(index=False).encode('utf-8') st.session_state.clear() st.session_state.success = (csv, length) st.rerun() else: st.session_state.clear() st.session_state.error = "Please upload at least 2 CSVs to merge" st.rerun() if selected_app == "4) Upload Datasets": st.markdown("Go to this [google colab link](https://colab.research.google.com/drive/1eCpk9HUoCKZb--tiNyQSHFW2ojoaA35m) to get started") if selected_app == "5) Create Chatbot": if st.session_state.error != "": st.error(st.session_state.error) if st.session_state.success != None: st.success("Success! Copy/paste the requirements.txt and app.py files into your HuggingFace Space") st.write('requirements.txt') st.code(st.session_state.success[0], language='python') st.write('app.py') st.code(st.session_state.success[1], language='python') if st.button('Reset'): st.session_state.clear() st.rerun() else: organization_name = st.text_input("What is the name of your organization", "") num_domains = st.number_input("How many datasets do you have uploaded", value=1, step=1, min_value=1, max_value=10) st.divider() domain_info = [] for i in range(num_domains): domain_link = st.text_input(f"Please enter link to dataset {i+1} with the format username/dataset_name", "Example: KeshavRa/About_YSA_Database") domain_name = st.text_input(f"What should domain {i+1} be called in the chatbot itself", "Example: About YSA") domain_purpose = st.text_area(f"What is the purpose of domain {i+1}, provide example questions (this will be visible to users of the chatbot)", 'Example: On this page, you can learn about what YSA does, how YSA was started, the advisory board, and the programs we offer.\n\nExample Questions\n\n--> What is the purpose of Youth Spirit Artworks?\n\n--> Who created YSA?\n\n--> What is the Advisory Board for Youth Spirit Artworks?\n\n--> What are the three empowerment-focused program areas of YSA?') domain_instructions = st.text_input(f"What baseline instructions/specifications should be sent to ChatGPT to answer questions in domain {i+1}", "Example: You are an assistant to help the user learn more about Youth Spirit Artworks") domain = {"link": domain_link, "name": domain_name, "purpose": domain_purpose, "instructions": domain_instructions} domain_info.append(domain) st.divider() submit = st.button("Submit") if submit: st.session_state.submit = True if st.session_state.submit: if organization_name == "": st.session_state.clear() st.session_state.error = "Please enter an organization name" st.rerun() missing_info = [] for i in range(len(domain_info)): if domain_info[i]['link'] == "": missing_info.append(f"link to domain {i+1}") if domain_info[i]['name'] == "": missing_info.append(f"name for domain {i+1}") if domain_info[i]['purpose'] == "": missing_info.append(f"purpose for domain {i+1}") if domain_info[i]['instructions'] == "": missing_info.append(f"instructions for domain {i+1}") if missing_info: error = "Missing Info: " for info in missing_info: error += (info + ', ') st.session_state.clear() st.session_state.error = error st.rerun() requirements = ''' openai scipy streamlit chromadb datasets ''' app = f""" import os import streamlit as st from datasets import load_dataset import chromadb import string from openai import OpenAI import numpy as np import pandas as pd from scipy.spatial.distance import cosine from typing import Dict, List def merge_dataframes(dataframes): # Concatenate the list of dataframes combined_dataframe = pd.concat(dataframes, ignore_index=True) # Ensure that the resulting dataframe only contains the columns "context", "questions", "answers" combined_dataframe = combined_dataframe[['context', 'questions', 'answers']] return combined_dataframe def call_chatgpt(prompt: str, directions: str) -> str: ''' Uses the OpenAI API to generate an AI response to a prompt. Args: prompt: A string representing the prompt to send to the OpenAI API. Returns: A string representing the AI's generated response. ''' # Use the OpenAI API to generate a response based on the input prompt. client = OpenAI(api_key = os.environ["OPENAI_API_KEY"]) completion = client.chat.completions.create( model="gpt-3.5-turbo-0125", messages=[ {{"role": "system", "content": directions}}, {{"role": "user", "content": prompt}} ] ) # Extract the text from the first (and only) choice in the response output. ans = completion.choices[0].message.content # Return the generated AI response. return ans def openai_text_embedding(prompt: str) -> str: return openai.Embedding.create(input=prompt, model="text-embedding-ada-002")[ "data" ][0]["embedding"] def calculate_sts_openai_score(sentence1: str, sentence2: str) -> float: # Compute sentence embeddings embedding1 = openai_text_embedding(sentence1) # Flatten the embedding array embedding2 = openai_text_embedding(sentence2) # Flatten the embedding array # Convert to array embedding1 = np.asarray(embedding1) embedding2 = np.asarray(embedding2) # Calculate cosine similarity between the embeddings similarity_score = 1 - cosine(embedding1, embedding2) return similarity_score def add_dist_score_column( dataframe: pd.DataFrame, sentence: str, ) -> pd.DataFrame: dataframe["stsopenai"] = dataframe["questions"].apply( lambda x: calculate_sts_openai_score(str(x), sentence) ) sorted_dataframe = dataframe.sort_values(by="stsopenai", ascending=False) return sorted_dataframe.iloc[:5, :] def convert_to_list_of_dict(df: pd.DataFrame) -> List[Dict[str, str]]: ''' Reads in a pandas DataFrame and produces a list of dictionaries with two keys each, 'question' and 'answer.' Args: df: A pandas DataFrame with columns named 'questions' and 'answers'. Returns: A list of dictionaries, with each dictionary containing a 'question' and 'answer' key-value pair. ''' # Initialize an empty list to store the dictionaries result = [] # Loop through each row of the DataFrame for index, row in df.iterrows(): # Create a dictionary with the current question and answer qa_dict_quest = {{"role": "user", "content": row["questions"]}} qa_dict_ans = {{"role": "assistant", "content": row["answers"]}} # Add the dictionary to the result list result.append(qa_dict_quest) result.append(qa_dict_ans) # Return the list of dictionaries return result domain_info = {domain_info} st.sidebar.markdown('''This is a chatbot to help you learn more about {organization_name}''') domain = st.sidebar.selectbox("Select a topic", [d["name"] for d in domain_info]) special_threshold = 0.3 n_results = 3 clear_button = st.sidebar.button("Clear Conversation", key="clear") if clear_button: st.session_state.messages = [] st.session_state.curr_domain = "" for d in domain_info: if domain == d['name']: dataset = load_dataset(d['link']) initial_input = "Tell me about {organization_name}" # Initialize a new client for ChromeDB. client = chromadb.Client() # Generate a random number between 1 billion and 10 billion. random_number: int = np.random.randint(low=1e9, high=1e10) # Generate a random string consisting of 10 uppercase letters and digits. random_string: str = "".join( np.random.choice(list(string.ascii_uppercase + string.digits), size=10) ) # Combine the random number and random string into one identifier. combined_string: str = f"{{random_number}}{{random_string}}" # Create a new collection in ChromeDB with the combined string as its name. collection = client.create_collection(combined_string) st.title("{organization_name} Chatbot") # Initialize chat history if "messages" not in st.session_state: st.session_state.messages = [] if "curr_domain" not in st.session_state: st.session_state.curr_domain = "" init_messages = {{}} for d in domain_info: init_messages[d['name']] = d['purpose'] chatbot_instructions = {{}} for d in domain_info: chatbot_instructions[d['name']] = d['instructions'] # Embed and store the first N supports for this demo with st.spinner("Loading, please be patient with us ... 🙏"): L = len(dataset["train"]["questions"]) collection.add( ids=[str(i) for i in range(0, L)], # IDs are just strings documents=dataset["train"]["questions"], # Enter questions here metadatas=[{{"type": "support"}} for _ in range(0, L)], ) if st.session_state.curr_domain != domain: st.session_state.messages = [] init_message = init_messages[domain] st.session_state.messages.append({{"role": "assistant", "content": init_message}}) st.session_state.curr_domain = domain # Display chat messages from history on app rerun for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # React to user input if prompt := st.chat_input("Tell me about {organization_name}"): # Display user message in chat message container st.chat_message("user").markdown(prompt) # Add user message to chat history st.session_state.messages.append({{"role": "user", "content": prompt}}) question = prompt results = collection.query(query_texts=question, n_results=n_results) idx = results["ids"][0] idx = [int(i) for i in idx] ref = pd.DataFrame( {{ "idx": idx, "questions": [dataset["train"]["questions"][i] for i in idx], "answers": [dataset["train"]["answers"][i] for i in idx], "distances": results["distances"][0], }} ) # special_threshold = st.sidebar.slider('How old are you?', 0, 0.6, 0.1) # 0.3 # special_threshold = 0.3 filtered_ref = ref[ref["distances"] < special_threshold] if filtered_ref.shape[0] > 0: # st.success("There are highly relevant information in our database.") ref_from_db_search = filtered_ref["answers"].str.cat(sep=" ") final_ref = filtered_ref else: # st.warning( # "The database may not have relevant information to help your question so please be aware of hallucinations." # ) ref_from_db_search = ref["answers"].str.cat(sep=" ") final_ref = ref engineered_prompt = f''' Based on the context: {{ref_from_db_search}}, answer the user question: {{question}}. ''' directions = chatbot_instructions[domain] answer = call_chatgpt(engineered_prompt, directions) response = answer # Display assistant response in chat message container with st.chat_message("assistant"): st.markdown(response) with st.expander("See reference:"): st.table(final_ref) # Add assistant response to chat history st.session_state.messages.append({{"role": "assistant", "content": response}}) """ st.session_state.clear() st.session_state.success = (requirements, app) st.rerun()