import pinecone import os import json import ast import PyPDF2 import shutil import gradio as gr from tqdm import tqdm from pydantic import Field from typing import List, Optional from langchain.load.serializable import Serializable from langchain.vectorstores import Pinecone from config import PINECONE_API_KEY, PINECONE_ENVIRONMENT, INDEX_NAME, SAVE_DIR from config import EMBEDDING_API_BASE, EMBEDDING_API_KEY, OPENAI_API_TYPE, OPENAI_API_VERSION, EMBEDDING_DEPLOYMENT_ID from langchain.embeddings import OpenAIEmbeddings from langchain.text_splitter import TokenTextSplitter # initialize pinecone pinecone.init( api_key=PINECONE_API_KEY, # find at app.pinecone.io environment=PINECONE_ENVIRONMENT, # next to api key in console ) # Azure embedding model definition embeddings = OpenAIEmbeddings( deployment=EMBEDDING_DEPLOYMENT_ID, openai_api_key=EMBEDDING_API_KEY, openai_api_base=EMBEDDING_API_BASE, openai_api_type=OPENAI_API_TYPE, openai_api_version=OPENAI_API_VERSION, chunk_size=16 ) text_splitter = TokenTextSplitter(chunk_size=500, chunk_overlap=30) if INDEX_NAME and INDEX_NAME not in pinecone.list_indexes(): pinecone.create_index( INDEX_NAME, metric="cosine", dimension=1536 ) print(f"Index {INDEX_NAME} created successfully") index = pinecone.Index(INDEX_NAME) class Document(Serializable): """Class for storing a piece of text and associated metadata.""" page_content: str """String text.""" metadata: dict = Field(default_factory=dict) """Arbitrary metadata about the page content (e.g., source, relationships to other documents, etc.). """ def delete_all(): for files in os.listdir(SAVE_DIR): os.remove(os.path.join(SAVE_DIR, files)) index.delete(delete_all=True) message = "Delete all files succesfully" return gr.update(choices=[]), message, gr.Files.update(None) def delete_file(files_src): file_name = [] for files in files_src: os.remove(os.path.join(SAVE_DIR, files)) file_name.append(files) _filter = {"document_id": {"$in": file_name}} index.delete(filter=_filter) message = f"Delete {len(files_src)} files succesfully" return gr.update(choices=os.listdir(SAVE_DIR)), message, gr.Files.update(None) def upload_file(): vectorstore = Pinecone.from_existing_index(INDEX_NAME, embeddings) print(f"Load files from existing {INDEX_NAME}") return vectorstore def handle_upload_file(files): documents = get_documents(files) if len(documents)>0: Pinecone.from_documents(documents, embeddings, index_name=INDEX_NAME) message = f"Add files to {INDEX_NAME} sucessfully" print(message) else: message = f"Load files from existing {INDEX_NAME}" print(message) return message def update_file(): with open('data.json') as json_file: data = json.load(json_file) datas = ast.literal_eval(data) texts = [] for k, v in datas.items(): content = v["content"] post_url = v["post_url"] texts.append(Document(page_content=content, metadata={"source": post_url})) if len(texts)>0: Pinecone.from_documents(texts, embeddings, index_name=INDEX_NAME) message = f"Add facebook data to {INDEX_NAME} sucessfully" return message def get_documents(file_src): documents = [] if file_src is None: return documents for file in file_src: filepath = file.name filename = os.path.basename(filepath) file_type = os.path.splitext(filename)[1] if filename in os.listdir(SAVE_DIR): continue else: shutil.copy(filepath, os.path.join(SAVE_DIR, filename)) try: if file_type == ".pdf": pdftext = "" with open(filepath, "rb") as pdfFileObj: pdf_reader = PyPDF2.PdfReader(pdfFileObj) for page in tqdm(pdf_reader.pages): pdftext += page.extract_text() texts = [Document(page_content=pdftext, metadata={"source": filepath})] elif file_type == ".docx": from langchain.document_loaders import UnstructuredWordDocumentLoader loader = UnstructuredWordDocumentLoader(filepath) texts = loader.load() elif file_type == ".pptx": from langchain.document_loaders import UnstructuredPowerPointLoader loader = UnstructuredPowerPointLoader(filepath) texts = loader.load() else: from langchain.document_loaders import TextLoader loader = TextLoader(filepath, "utf8") texts = loader.load() except Exception as e: import traceback traceback.print_exc() texts = text_splitter.split_documents(texts) documents.extend(texts) return documents if __name__ == "__main__": upload_file(["STANDARD_SOFTWARE LIFECYCLES.pdf"])