Spaces:
Runtime error
Runtime error
| import os | |
| import pickle | |
| import re | |
| import time | |
| from typing import List, Union | |
| from urllib.parse import urlparse, urljoin | |
| import faiss | |
| import requests | |
| from PyPDF2 import PdfReader | |
| from bs4 import BeautifulSoup | |
| from langchain import OpenAI, LLMChain | |
| from langchain.agents import ConversationalAgent | |
| from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent, AgentOutputParser | |
| from langchain.prompts import BaseChatPromptTemplate | |
| from langchain.chains import ConversationalRetrievalChain | |
| from langchain.docstore.document import Document | |
| from langchain.embeddings import OpenAIEmbeddings | |
| from langchain.memory import ConversationBufferWindowMemory | |
| from langchain.schema import AgentAction, AgentFinish, HumanMessage | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain.vectorstores.faiss import FAISS | |
| book_url = 'https://g.co/kgs/2VFC7u' | |
| book_file = "Book.pdf" | |
| url = 'https://makerlab.illinois.edu/' | |
| pickle_file = "open_ai.pkl" | |
| index_file = "open_ai.index" | |
| gpt_3_5 = OpenAI(model_name='gpt-3.5-turbo',temperature=0) | |
| embeddings = OpenAIEmbeddings() | |
| chat_history = [] | |
| memory = ConversationBufferWindowMemory(memory_key="chat_history") | |
| gpt_3_5_index = None | |
| class CustomOutputParser(AgentOutputParser): | |
| def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]: | |
| # Check if agent replied without using tools | |
| if "AI:" in llm_output: | |
| return AgentFinish(return_values={"output": llm_output.split("AI:")[-1].strip()}, | |
| log=llm_output) | |
| # Check if agent should finish | |
| if "Final Answer:" in llm_output: | |
| return AgentFinish( | |
| # Return values is generally always a dictionary with a single `output` key | |
| # It is not recommended to try anything else at the moment :) | |
| return_values={"output": llm_output.split("Final Answer:")[-1].strip()}, | |
| log=llm_output, | |
| ) | |
| # Parse out the action and action input | |
| regex = r"Action: (.*?)[\n]*Action Input:[\s]*(.*)" | |
| match = re.search(regex, llm_output, re.DOTALL) | |
| if not match: | |
| raise ValueError(f"Could not parse LLM output: `{llm_output}`") | |
| action = match.group(1).strip() | |
| action_input = match.group(2) | |
| # Return the action and action input | |
| return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output) | |
| # Set up a prompt template | |
| class CustomPromptTemplate(BaseChatPromptTemplate): | |
| # The template to use | |
| template: str | |
| # The list of tools available | |
| tools: List[Tool] | |
| def format_messages(self, **kwargs) -> str: | |
| # Get the intermediate steps (AgentAction, Observation tuples) | |
| # Format them in a particular way | |
| intermediate_steps = kwargs.pop("intermediate_steps") | |
| thoughts = "" | |
| for action, observation in intermediate_steps: | |
| thoughts += action.log | |
| thoughts += f"\nObservation: {observation}\nThought: " | |
| # Set the agent_scratchpad variable to that value | |
| kwargs["agent_scratchpad"] = thoughts | |
| # Create a tools variable from the list of tools provided | |
| kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools]) | |
| # Create a list of tool names for the tools provided | |
| kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools]) | |
| formatted = self.template.format(**kwargs) | |
| return [HumanMessage(content=formatted)] | |
| def get_search_index(): | |
| global gpt_3_5_index | |
| if os.path.isfile(pickle_file) and os.path.isfile(index_file) and os.path.getsize(pickle_file) > 0: | |
| # Load index from pickle file | |
| with open(pickle_file, "rb") as f: | |
| search_index = pickle.load(f) | |
| else: | |
| search_index = create_index() | |
| gpt_3_5_index = search_index | |
| def create_index(): | |
| source_chunks = create_chunk_documents() | |
| search_index = search_index_from_docs(source_chunks) | |
| faiss.write_index(search_index.index, index_file) | |
| # Save index to pickle file | |
| with open(pickle_file, "wb") as f: | |
| pickle.dump(search_index, f) | |
| return search_index | |
| def create_chunk_documents(): | |
| sources = fetch_data_for_embeddings(url, book_file, book_url) | |
| # print("sources" + str(len(sources))) | |
| splitter = CharacterTextSplitter(separator=" ", chunk_size=800, chunk_overlap=0) | |
| source_chunks = splitter.split_documents(sources) | |
| for chunk in source_chunks: | |
| print("Size of chunk: " + str(len(chunk.page_content) + len(chunk.metadata))) | |
| if chunk.page_content is None or chunk.page_content == '': | |
| print("removing chunk: "+ chunk.page_content) | |
| source_chunks.remove(chunk) | |
| elif len(chunk.page_content) >=1000: | |
| print("splitting document") | |
| source_chunks.extend(splitter.split_documents([chunk])) | |
| # print("Chunks: " + str(len(source_chunks)) + "and type " + str(type(source_chunks))) | |
| return source_chunks | |
| def fetch_data_for_embeddings(url, book_file, book_url): | |
| sources = get_website_data(url) | |
| sources.extend(get_document_data(book_file, book_url)) | |
| return sources | |
| def get_website_data(index_url): | |
| # Get all page paths from index | |
| paths = get_paths(index_url) | |
| # Filter out invalid links and join them with the base URL | |
| links = get_links(index_url, paths) | |
| return get_content_from_links(links, index_url) | |
| def get_content_from_links(links, index_url): | |
| content_list = [] | |
| for link in set(links): | |
| if link.startswith(index_url): | |
| page_data = requests.get(link).content | |
| soup = BeautifulSoup(page_data, "html.parser") | |
| # Get page content | |
| content = soup.get_text(separator="\n") | |
| # print(link) | |
| # Get page metadata | |
| metadata = {"source": link} | |
| content_list.append(Document(page_content=content, metadata=metadata)) | |
| time.sleep(1) | |
| # print("content list" + str(len(content_list))) | |
| return content_list | |
| def get_paths(index_url): | |
| index_data = requests.get(index_url).content | |
| soup = BeautifulSoup(index_data, "html.parser") | |
| paths = set([a.get('href') for a in soup.find_all('a', href=True)]) | |
| return paths | |
| def get_links(index_url, paths): | |
| links = [] | |
| for path in paths: | |
| url = urljoin(index_url, path) | |
| parsed_url = urlparse(url) | |
| if parsed_url.scheme in ["http", "https"] and "squarespace" not in parsed_url.netloc: | |
| links.append(url) | |
| return links | |
| def get_document_data(book_file, book_url): | |
| document_list = [] | |
| with open(book_file, 'rb') as f: | |
| pdf_reader = PdfReader(f) | |
| for i in range(len(pdf_reader.pages)): | |
| page_text = pdf_reader.pages[i].extract_text() | |
| metadata = {"source": book_url} | |
| document_list.append(Document(page_content=page_text, metadata=metadata)) | |
| # print("document list" + str(len(document_list))) | |
| return document_list | |
| def search_index_from_docs(source_chunks): | |
| # Create index from chunk documents | |
| # print("Size of chunk" + str(len(source_chunks))) | |
| search_index = FAISS.from_texts([doc.page_content for doc in source_chunks], embeddings, metadatas=[doc.metadata for doc in source_chunks]) | |
| return search_index | |
| def get_qa_chain(gpt_3_5_index): | |
| global gpt_3_5 | |
| print("index: " + str(gpt_3_5_index)) | |
| return ConversationalRetrievalChain.from_llm(gpt_3_5, chain_type="stuff", get_chat_history=get_chat_history, | |
| retriever=gpt_3_5_index.as_retriever(), return_source_documents=True, verbose=True) | |
| def get_chat_history(inputs) -> str: | |
| res = [] | |
| for human, ai in inputs: | |
| res.append(f"Human:{human}\nAI:{ai}") | |
| return "\n".join(res) | |
| def generate_answer(question) -> str: | |
| global chat_history, gpt_3_5_index | |
| gpt_3_5_chain = get_qa_chain(gpt_3_5_index) | |
| result = gpt_3_5_chain( | |
| {"question": question, "chat_history": chat_history,"vectordbkwargs": {"search_distance": 0.8}}) | |
| print("REsult: " + str(result)) | |
| chat_history = [(question, result["answer"])] | |
| sources = [] | |
| for document in result['source_documents']: | |
| source = document.metadata['source'] | |
| sources.append(source) | |
| source = ',\n'.join(set(sources)) | |
| return result['answer'] + '\nSOURCES: ' + source | |
| def get_agent_chain(prompt, tools): | |
| global gpt_3_5 | |
| # output_parser = CustomOutputParser() | |
| llm_chain = LLMChain(llm=gpt_3_5, prompt=prompt) | |
| agent = ConversationalAgent(llm_chain=llm_chain, tools=tools, verbose=True) | |
| agent_chain = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True, memory=memory, | |
| intermediate_steps=True) | |
| return agent_chain | |
| def get_prompt_and_tools(): | |
| tools = get_tools() | |
| prefix = """Have a conversation with a human, answering the following questions as best you can. | |
| Always try to use Vectorstore first. | |
| Your name is Makerlab Bot because you are a personal assistant of Makerlab. You have access to the following tools:""" | |
| suffix = """Begin! If you use any tool, ALWAYS return a "SOURCES" part in your answer" | |
| {chat_history} | |
| Question: {input} | |
| {agent_scratchpad} | |
| SOURCES:""" | |
| prompt = ConversationalAgent.create_prompt( | |
| tools, | |
| prefix=prefix, | |
| suffix=suffix, | |
| input_variables=["input", "chat_history", "agent_scratchpad"] | |
| ) | |
| # print("Template: " + prompt.template) | |
| return prompt, tools | |
| def get_tools(): | |
| tools = [ | |
| Tool( | |
| name="Vectorstore", | |
| func=generate_answer, | |
| description="useful for when you need to answer questions about the Makerlab or 3D Printing.", | |
| return_direct=True | |
| )] | |
| return tools | |
| def get_custom_agent(prompt, tools): | |
| llm_chain = LLMChain(llm=gpt_3_5, prompt=prompt) | |
| output_parser = CustomOutputParser() | |
| tool_names = [tool.name for tool in tools] | |
| agent = LLMSingleActionAgent( | |
| llm_chain=llm_chain, | |
| output_parser=output_parser, | |
| stop=["\nObservation:"], | |
| allowed_tools=tool_names | |
| ) | |
| agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True, memory=memory, | |
| intermediate_steps=True) | |
| return agent_executor | |
| def get_prompt_and_tools_for_custom_agent(): | |
| template = """ | |
| Have a conversation with a human, answering the following questions as best you can. | |
| Always try to use Vectorstore first. | |
| Your name is Makerlab Bot because you are a personal assistant of Makerlab. You have access to the following tools: | |
| {tools} | |
| To answer for the new input, use the following format: | |
| New Input: the input question you must answer | |
| Thought: Do I need to use a tool? Yes | |
| Action: the action to take, should be one of [{tool_names}] | |
| Action Input: the input to the action | |
| Observation: the result of the action | |
| ... (this Thought/Action/Action Input/Observation can repeat N times) | |
| Thought: I now know the final answer | |
| Final Answer: the final answer to the original input question. SOURCES: the sources referred to find the final answer | |
| When you have a response to say to the Human and DO NOT need to use a tool: | |
| 1. DO NOT return "SOURCES" if you did not use any tool. | |
| 2. You MUST use this format: | |
| ``` | |
| Thought: Do I need to use a tool? No | |
| AI: [your response here] | |
| ``` | |
| Begin! Remember to speak as a personal assistant when giving your final answer. | |
| ALWAYS return a "SOURCES" part in your answer, if you used any tool. | |
| Previous conversation history: | |
| {chat_history} | |
| New input: {input} | |
| {agent_scratchpad} | |
| SOURCES:""" | |
| tools = get_tools() | |
| prompt = CustomPromptTemplate( | |
| template=template, | |
| tools=tools, | |
| # This omits the `agent_scratchpad`, `tools`, and `tool_names` variables because those are generated dynamically | |
| # This includes the `intermediate_steps` variable because that is needed | |
| input_variables=["input", "intermediate_steps", "chat_history"] | |
| ) | |
| return prompt, tools |