import pinecone import os import PyPDF2 import gradio as gr from tqdm import tqdm from pydantic import Field from langchain.load.serializable import Serializable # from langchain.vectorstores import Pinecone from custom_vectordb import Pinecone from config import PINECONE_API_KEY, PINECONE_ENVIRONMENT, INDEX_NAME, CONNECTION_STRING, CONTAINER_NAME, NAME_SPACE_1, NAME_SPACE_2 from config import EMBEDDING_API_BASE, EMBEDDING_API_KEY, OPENAI_API_TYPE, OPENAI_API_VERSION, EMBEDDING_DEPLOYMENT_ID from langchain.embeddings import OpenAIEmbeddings from langchain.text_splitter import TokenTextSplitter from azure.storage.blob import BlobServiceClient # initialize pinecone pinecone.init( api_key=PINECONE_API_KEY, # find at app.pinecone.io environment=PINECONE_ENVIRONMENT, # next to api key in console ) # Azure embedding model definition embeddings = OpenAIEmbeddings( deployment=EMBEDDING_DEPLOYMENT_ID, openai_api_key=EMBEDDING_API_KEY, openai_api_base=EMBEDDING_API_BASE, openai_api_type=OPENAI_API_TYPE, openai_api_version=OPENAI_API_VERSION, chunk_size=16 ) text_splitter = TokenTextSplitter(chunk_size=500, chunk_overlap=30) if INDEX_NAME and INDEX_NAME not in pinecone.list_indexes(): pinecone.create_index( INDEX_NAME, metric="cosine", dimension=1536 ) print(f"Index {INDEX_NAME} created successfully") index = pinecone.Index(INDEX_NAME) blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING) class Document(Serializable): """Class for storing a piece of text and associated metadata.""" page_content: str """String text.""" metadata: dict = Field(default_factory=dict) """Arbitrary metadata about the page content (e.g., source, relationships to other documents, etc.). """ # def update_fb(): # with open('data.json') as json_file: # data = json.load(json_file) # datas = ast.literal_eval(data) # texts = [] # for k, v in datas.items(): # content = v["content"].split("-----")[0] + "\nimage_link: " + str(v["image"]) # post_url = v["post_url"] # texts.append(Document(page_content=content, metadata={"source": post_url})) # if len(texts)>0: # Pinecone.from_documents(texts, embeddings, index_name=INDEX_NAME, namespace=NAME_SPACE_2) # message = f"Add facebook data to space {NAME_SPACE_2} in {INDEX_NAME} sucessfully" # return message def upload_files_blob(file_path): file_name = os.path.basename(file_path) blob_client = blob_service_client.get_blob_client(container=CONTAINER_NAME, blob=file_name) with open(file_path,'rb') as data: blob_client.upload_blob(data) print(f"Uploaded {file_name}.") def load_files_blob(): container_client = blob_service_client.get_container_client(CONTAINER_NAME) files_name = [] for blob in container_client.list_blobs(): files_name.append(blob.name) return files_name def delete_blob(blob_name): # Get container client container_client = blob_service_client.get_container_client(CONTAINER_NAME) container_client.delete_blob(blob_name) print(f"Deleted {blob_name}") def delete_all(): container_client = blob_service_client.get_container_client(CONTAINER_NAME) blob_list = container_client.list_blobs() for blob in blob_list: container_client.delete_blob(blob.name) index.delete(delete_all=True, namespace=NAME_SPACE_1) message = f"Delete all files in space {NAME_SPACE_1} succesfully" return gr.update(choices=[]), message, gr.Files.update(None) def delete_file(files_src): file_name = [] for files in files_src: delete_blob(files) file_name.append(files) _filter = {"source": {"$in": file_name}} index.delete(filter=_filter, namespace=NAME_SPACE_1) message = f"Delete {len(files_src)} files in space {NAME_SPACE_1} files succesfully" available_files = load_files_blob() return gr.update(choices=available_files), message, gr.Files.update(None) def upload_file(check_box): if check_box: namespace = NAME_SPACE_1 else: namespace = NAME_SPACE_2 vectorstore = Pinecone.from_existing_index(INDEX_NAME, embeddings, namespace=namespace) print(f"Load files from space {namespace} in {INDEX_NAME}") return vectorstore def handle_upload_file(files): documents = get_documents(files) if len(documents)>0: Pinecone.from_documents(documents, embeddings, index_name=INDEX_NAME, namespace=NAME_SPACE_1) message = f"Add files to space {NAME_SPACE_1} in {INDEX_NAME} sucessfully" print(message) else: message = f"Load files from space existing {NAME_SPACE_1} in {INDEX_NAME}" print(message) return message def get_documents(file_src): documents = [] if file_src is None: return documents available_files = load_files_blob() for file in file_src: filepath = file.name filename = os.path.basename(filepath) file_type = os.path.splitext(filename)[1] if filename in available_files: continue else: upload_files_blob(filepath) try: if file_type == ".pdf": pdftext = "" with open(filepath, "rb") as pdfFileObj: pdf_reader = PyPDF2.PdfReader(pdfFileObj) for page in tqdm(pdf_reader.pages): pdftext += page.extract_text() texts = [Document(page_content=pdftext, metadata={"source": filename})] elif file_type == ".docx": from langchain.document_loaders import UnstructuredWordDocumentLoader loader = UnstructuredWordDocumentLoader(filepath) texts = loader.load() elif file_type == ".pptx": from langchain.document_loaders import UnstructuredPowerPointLoader loader = UnstructuredPowerPointLoader(filepath) texts = loader.load() else: from langchain.document_loaders import TextLoader loader = TextLoader(filepath, "utf8") texts = loader.load() except Exception as e: import traceback traceback.print_exc() texts = text_splitter.split_documents(texts) documents.extend(texts) return documents if __name__ == "__main__": upload_file(["STANDARD_SOFTWARE LIFECYCLES.pdf"])