Chat_QnA_v2 / vector_db.py
binh99's picture
update cosmos db
a4b89be
import pinecone
import os
import PyPDF2
import gradio as gr
from tqdm import tqdm
from pydantic import Field
from langchain.load.serializable import Serializable
# from langchain.vectorstores import Pinecone
from custom_vectordb import Pinecone
from config import PINECONE_API_KEY, PINECONE_ENVIRONMENT, INDEX_NAME, CONNECTION_STRING, CONTAINER_NAME, NAME_SPACE_1, NAME_SPACE_2
from config import EMBEDDING_API_BASE, EMBEDDING_API_KEY, OPENAI_API_TYPE, OPENAI_API_VERSION, EMBEDDING_DEPLOYMENT_ID
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import TokenTextSplitter
from azure.storage.blob import BlobServiceClient
# initialize pinecone
pinecone.init(
api_key=PINECONE_API_KEY, # find at app.pinecone.io
environment=PINECONE_ENVIRONMENT, # next to api key in console
)
# Azure embedding model definition
embeddings = OpenAIEmbeddings(
deployment=EMBEDDING_DEPLOYMENT_ID,
openai_api_key=EMBEDDING_API_KEY,
openai_api_base=EMBEDDING_API_BASE,
openai_api_type=OPENAI_API_TYPE,
openai_api_version=OPENAI_API_VERSION,
chunk_size=16
)
text_splitter = TokenTextSplitter(chunk_size=500, chunk_overlap=30)
if INDEX_NAME and INDEX_NAME not in pinecone.list_indexes():
pinecone.create_index(
INDEX_NAME,
metric="cosine",
dimension=1536
)
print(f"Index {INDEX_NAME} created successfully")
index = pinecone.Index(INDEX_NAME)
blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING)
class Document(Serializable):
"""Class for storing a piece of text and associated metadata."""
page_content: str
"""String text."""
metadata: dict = Field(default_factory=dict)
"""Arbitrary metadata about the page content (e.g., source, relationships to other
documents, etc.).
"""
# def update_fb():
# with open('data.json') as json_file:
# data = json.load(json_file)
# datas = ast.literal_eval(data)
# texts = []
# for k, v in datas.items():
# content = v["content"].split("-----")[0] + "\nimage_link: " + str(v["image"])
# post_url = v["post_url"]
# texts.append(Document(page_content=content, metadata={"source": post_url}))
# if len(texts)>0:
# Pinecone.from_documents(texts, embeddings, index_name=INDEX_NAME, namespace=NAME_SPACE_2)
# message = f"Add facebook data to space {NAME_SPACE_2} in {INDEX_NAME} sucessfully"
# return message
def upload_files_blob(file_path):
file_name = os.path.basename(file_path)
blob_client = blob_service_client.get_blob_client(container=CONTAINER_NAME, blob=file_name)
with open(file_path,'rb') as data:
blob_client.upload_blob(data)
print(f"Uploaded {file_name}.")
def load_files_blob():
container_client = blob_service_client.get_container_client(CONTAINER_NAME)
files_name = []
for blob in container_client.list_blobs():
files_name.append(blob.name)
return files_name
def delete_blob(blob_name):
# Get container client
container_client = blob_service_client.get_container_client(CONTAINER_NAME)
container_client.delete_blob(blob_name)
print(f"Deleted {blob_name}")
def delete_all():
container_client = blob_service_client.get_container_client(CONTAINER_NAME)
blob_list = container_client.list_blobs()
for blob in blob_list:
container_client.delete_blob(blob.name)
index.delete(delete_all=True, namespace=NAME_SPACE_1)
message = f"Delete all files in space {NAME_SPACE_1} succesfully"
return gr.update(choices=[]), message, gr.Files.update(None)
def delete_file(files_src):
file_name = []
for files in files_src:
delete_blob(files)
file_name.append(files)
_filter = {"source": {"$in": file_name}}
index.delete(filter=_filter, namespace=NAME_SPACE_1)
message = f"Delete {len(files_src)} files in space {NAME_SPACE_1} files succesfully"
available_files = load_files_blob()
return gr.update(choices=available_files), message, gr.Files.update(None)
def upload_file(check_box):
if check_box:
namespace = NAME_SPACE_1
else:
namespace = NAME_SPACE_2
vectorstore = Pinecone.from_existing_index(INDEX_NAME, embeddings, namespace=namespace)
print(f"Load files from space {namespace} in {INDEX_NAME}")
return vectorstore
def handle_upload_file(files):
documents = get_documents(files)
if len(documents)>0:
Pinecone.from_documents(documents, embeddings, index_name=INDEX_NAME, namespace=NAME_SPACE_1)
message = f"Add files to space {NAME_SPACE_1} in {INDEX_NAME} sucessfully"
print(message)
else:
message = f"Load files from space existing {NAME_SPACE_1} in {INDEX_NAME}"
print(message)
return message
def get_documents(file_src):
documents = []
if file_src is None:
return documents
available_files = load_files_blob()
for file in file_src:
filepath = file.name
filename = os.path.basename(filepath)
file_type = os.path.splitext(filename)[1]
if filename in available_files:
continue
else:
upload_files_blob(filepath)
try:
if file_type == ".pdf":
pdftext = ""
with open(filepath, "rb") as pdfFileObj:
pdf_reader = PyPDF2.PdfReader(pdfFileObj)
for page in tqdm(pdf_reader.pages):
pdftext += page.extract_text()
texts = [Document(page_content=pdftext, metadata={"source": filename})]
elif file_type == ".docx":
from langchain.document_loaders import UnstructuredWordDocumentLoader
loader = UnstructuredWordDocumentLoader(filepath)
texts = loader.load()
elif file_type == ".pptx":
from langchain.document_loaders import UnstructuredPowerPointLoader
loader = UnstructuredPowerPointLoader(filepath)
texts = loader.load()
else:
from langchain.document_loaders import TextLoader
loader = TextLoader(filepath, "utf8")
texts = loader.load()
except Exception as e:
import traceback
traceback.print_exc()
texts = text_splitter.split_documents(texts)
documents.extend(texts)
return documents
if __name__ == "__main__":
upload_file(["STANDARD_SOFTWARE LIFECYCLES.pdf"])