import json import os import re import openai from langchain.prompts import PromptTemplate from config import TIMEOUT_STREAM, HISTORY_DIR from vector_db import upload_file from callback import StreamingGradioCallbackHandler from queue import SimpleQueue, Empty, Queue from threading import Thread from utils import add_source_numbers, add_details, web_citation, get_history_names from chains.custom_chain import CustomConversationalRetrievalChain from langchain.chains import LLMChain from chains.azure_openai import CustomAzureOpenAI from config import OPENAI_API_TYPE, OPENAI_API_VERSION, OPENAI_API_KEY, OPENAI_API_BASE, API_KEY, \ DEPLOYMENT_ID, MODEL_ID from cosmos_db import upsert_item, read_item, delete_items, query_items class OpenAIModel: def __init__( self, llm_model_name, condense_model_name, prompt_template="", temperature=0.0, top_p=1.0, n_choices=1, stop=None, presence_penalty=0, frequency_penalty=0, user=None ): self.llm_model_name = llm_model_name self.condense_model_name = condense_model_name self.prompt_template = prompt_template self.temperature = temperature self.top_p = top_p self.n_choices = n_choices self.stop = stop self.presence_penalty = presence_penalty self.frequency_penalty = frequency_penalty self.history = [] self.user_identifier = user def set_user_identifier(self, new_user_identifier): self.user_identifier = new_user_identifier def format_prompt(self, qa_prompt_template, condense_prompt_template): # Prompt template langchain qa_prompt = PromptTemplate(template=qa_prompt_template, input_variables=["question", "chat_history", "context"]) condense_prompt = PromptTemplate(template=condense_prompt_template, input_variables=["question", "chat_history"]) return qa_prompt, condense_prompt def memory(self, inputs, outputs, last_k=3): # last_k: top k last conversation if len(self.history) >= last_k: self.history.pop(0) self.history.extend([(inputs, outputs)]) def reset_conversation(self): self.history = [] return [] def delete_first_conversation(self): if self.history: self.history.pop(0) def delete_last_conversation(self): if len(self.history) > 0: self.history.pop() def save_history(self, chatbot, file_name): message = upsert_item(self.user_identifier, file_name, self.history, chatbot) return message def load_history(self, file_name): items = read_item(self.user_identifier, file_name) return items['id'], items['chatbot'] def delete_history(self, file_name): message = delete_items(self.user_identifier, file_name) return message, get_history_names(False, self.user_identifier), [] def audio_response(self, audio): media_file = open(audio, 'rb') response = openai.Audio.transcribe( api_key=API_KEY, model=MODEL_ID, file=media_file ) return response["text"], None def inference(self, inputs, chatbot, streaming=False, upload_files_btn=False, custom_websearch=False, local_db=False, **kwargs): if upload_files_btn or local_db: status_text = "Indexing files to vector database" yield chatbot, status_text vectorstore = upload_file(upload_files_btn) qa_prompt, condense_prompt = self.format_prompt(**kwargs) job_done = object() # signals the processing is done q = SimpleQueue() if streaming: timeout = TIMEOUT_STREAM streaming_callback = [StreamingGradioCallbackHandler(q)] # Define llm model llm = CustomAzureOpenAI(deployment_name=DEPLOYMENT_ID, openai_api_type=OPENAI_API_TYPE, openai_api_base=OPENAI_API_BASE, openai_api_version=OPENAI_API_VERSION, openai_api_key=OPENAI_API_KEY, temperature=self.temperature, model_kwargs={"top_p": self.top_p}, streaming=streaming, \ callbacks=streaming_callback, request_timeout=timeout) condense_llm = CustomAzureOpenAI(deployment_name=self.condense_model_name, openai_api_type=OPENAI_API_TYPE, openai_api_base=OPENAI_API_BASE, openai_api_version=OPENAI_API_VERSION, openai_api_key=OPENAI_API_KEY, temperature=self.temperature) status_text = "Request URL: " + OPENAI_API_BASE yield chatbot, status_text # Create a function to call - this will run in a thread # Create a Queue object response_queue = SimpleQueue() def task(): # Conversation + RetrivalChain qa = CustomConversationalRetrievalChain.from_llm(llm, vectorstore.as_retriever( search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.75}), condense_question_llm=condense_llm, verbose=True, condense_question_prompt=condense_prompt, combine_docs_chain_kwargs={"prompt": qa_prompt}, return_source_documents=True) # query with input and chat history response = qa({"question": inputs, "chat_history": self.history}) response_queue.put(response) q.put(job_done) thread = Thread(target=task) thread.start() chatbot.append((inputs, "")) content = "" while True: try: next_token = q.get(block=True) if next_token is job_done: break content += next_token chatbot[-1] = (chatbot[-1][0], content) yield chatbot, status_text except Empty: continue # add citation info to response response = response_queue.get() relevant_docs = response["source_documents"] if len(relevant_docs) == 0: display_append = "" else: if upload_files_btn: reference_results = [d.page_content for d in relevant_docs] reference_sources = [d.metadata["source"] for d in relevant_docs] display_append = add_details(reference_results, reference_sources) display_append = '