Chat_QnA_v2 / vector_db.py
binh99's picture
update new
d037cdf
raw
history blame
5.05 kB
import pinecone
import os
import json
import ast
import PyPDF2
import shutil
import gradio as gr
from tqdm import tqdm
from pydantic import Field
from typing import List, Optional
from langchain.load.serializable import Serializable
from langchain.vectorstores import Pinecone
from config import PINECONE_API_KEY, PINECONE_ENVIRONMENT, INDEX_NAME, SAVE_DIR
from config import EMBEDDING_API_BASE, EMBEDDING_API_KEY, OPENAI_API_TYPE, OPENAI_API_VERSION, EMBEDDING_DEPLOYMENT_ID
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import TokenTextSplitter
# initialize pinecone
pinecone.init(
api_key=PINECONE_API_KEY, # find at app.pinecone.io
environment=PINECONE_ENVIRONMENT, # next to api key in console
)
# Azure embedding model definition
embeddings = OpenAIEmbeddings(
deployment=EMBEDDING_DEPLOYMENT_ID,
openai_api_key=EMBEDDING_API_KEY,
openai_api_base=EMBEDDING_API_BASE,
openai_api_type=OPENAI_API_TYPE,
openai_api_version=OPENAI_API_VERSION,
chunk_size=16
)
text_splitter = TokenTextSplitter(chunk_size=500, chunk_overlap=30)
if INDEX_NAME and INDEX_NAME not in pinecone.list_indexes():
pinecone.create_index(
INDEX_NAME,
metric="cosine",
dimension=1536
)
print(f"Index {INDEX_NAME} created successfully")
index = pinecone.Index(INDEX_NAME)
class Document(Serializable):
"""Class for storing a piece of text and associated metadata."""
page_content: str
"""String text."""
metadata: dict = Field(default_factory=dict)
"""Arbitrary metadata about the page content (e.g., source, relationships to other
documents, etc.).
"""
def delete_all():
for files in os.listdir(SAVE_DIR):
os.remove(os.path.join(SAVE_DIR, files))
index.delete(delete_all=True)
message = "Delete all files succesfully"
return gr.update(choices=[]), message, gr.Files.update(None)
def delete_file(files_src):
file_name = []
for files in files_src:
os.remove(os.path.join(SAVE_DIR, files))
file_name.append(files)
_filter = {"document_id": {"$in": file_name}}
index.delete(filter=_filter)
message = f"Delete {len(files_src)} files succesfully"
return gr.update(choices=os.listdir(SAVE_DIR)), message, gr.Files.update(None)
def upload_file():
vectorstore = Pinecone.from_existing_index(INDEX_NAME, embeddings)
print(f"Load files from existing {INDEX_NAME}")
return vectorstore
def handle_upload_file(files):
documents = get_documents(files)
if len(documents)>0:
Pinecone.from_documents(documents, embeddings, index_name=INDEX_NAME)
message = f"Add files to {INDEX_NAME} sucessfully"
print(message)
else:
message = f"Load files from existing {INDEX_NAME}"
print(message)
return message
def update_file():
with open('data.json') as json_file:
data = json.load(json_file)
datas = ast.literal_eval(data)
texts = []
for k, v in datas.items():
content = v["content"]
post_url = v["post_url"]
texts.append(Document(page_content=content, metadata={"source": post_url}))
if len(texts)>0:
Pinecone.from_documents(texts, embeddings, index_name=INDEX_NAME)
message = f"Add facebook data to {INDEX_NAME} sucessfully"
return message
def get_documents(file_src):
documents = []
if file_src is None:
return documents
for file in file_src:
filepath = file.name
filename = os.path.basename(filepath)
file_type = os.path.splitext(filename)[1]
if filename in os.listdir(SAVE_DIR):
continue
else:
shutil.copy(filepath, os.path.join(SAVE_DIR, filename))
try:
if file_type == ".pdf":
pdftext = ""
with open(filepath, "rb") as pdfFileObj:
pdf_reader = PyPDF2.PdfReader(pdfFileObj)
for page in tqdm(pdf_reader.pages):
pdftext += page.extract_text()
texts = [Document(page_content=pdftext, metadata={"source": filepath})]
elif file_type == ".docx":
from langchain.document_loaders import UnstructuredWordDocumentLoader
loader = UnstructuredWordDocumentLoader(filepath)
texts = loader.load()
elif file_type == ".pptx":
from langchain.document_loaders import UnstructuredPowerPointLoader
loader = UnstructuredPowerPointLoader(filepath)
texts = loader.load()
else:
from langchain.document_loaders import TextLoader
loader = TextLoader(filepath, "utf8")
texts = loader.load()
except Exception as e:
import traceback
traceback.print_exc()
texts = text_splitter.split_documents(texts)
documents.extend(texts)
return documents
if __name__ == "__main__":
upload_file(["STANDARD_SOFTWARE LIFECYCLES.pdf"])