Spaces:
Runtime error
Runtime error
import pinecone | |
import os | |
import PyPDF2 | |
import gradio as gr | |
from tqdm import tqdm | |
from pydantic import Field | |
from langchain.load.serializable import Serializable | |
# from langchain.vectorstores import Pinecone | |
from custom_vectordb import Pinecone | |
from config import PINECONE_API_KEY, PINECONE_ENVIRONMENT, INDEX_NAME, CONNECTION_STRING, CONTAINER_NAME, NAME_SPACE_1, NAME_SPACE_2 | |
from config import EMBEDDING_API_BASE, EMBEDDING_API_KEY, OPENAI_API_TYPE, OPENAI_API_VERSION, EMBEDDING_DEPLOYMENT_ID | |
from langchain.embeddings import OpenAIEmbeddings | |
from langchain.text_splitter import TokenTextSplitter | |
from azure.storage.blob import BlobServiceClient | |
# initialize pinecone | |
pinecone.init( | |
api_key=PINECONE_API_KEY, # find at app.pinecone.io | |
environment=PINECONE_ENVIRONMENT, # next to api key in console | |
) | |
# Azure embedding model definition | |
embeddings = OpenAIEmbeddings( | |
deployment=EMBEDDING_DEPLOYMENT_ID, | |
openai_api_key=EMBEDDING_API_KEY, | |
openai_api_base=EMBEDDING_API_BASE, | |
openai_api_type=OPENAI_API_TYPE, | |
openai_api_version=OPENAI_API_VERSION, | |
chunk_size=16 | |
) | |
text_splitter = TokenTextSplitter(chunk_size=500, chunk_overlap=30) | |
if INDEX_NAME and INDEX_NAME not in pinecone.list_indexes(): | |
pinecone.create_index( | |
INDEX_NAME, | |
metric="cosine", | |
dimension=1536 | |
) | |
print(f"Index {INDEX_NAME} created successfully") | |
index = pinecone.Index(INDEX_NAME) | |
blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING) | |
class Document(Serializable): | |
"""Class for storing a piece of text and associated metadata.""" | |
page_content: str | |
"""String text.""" | |
metadata: dict = Field(default_factory=dict) | |
"""Arbitrary metadata about the page content (e.g., source, relationships to other | |
documents, etc.). | |
""" | |
# def update_fb(): | |
# with open('data.json') as json_file: | |
# data = json.load(json_file) | |
# datas = ast.literal_eval(data) | |
# texts = [] | |
# for k, v in datas.items(): | |
# content = v["content"].split("-----")[0] + "\nimage_link: " + str(v["image"]) | |
# post_url = v["post_url"] | |
# texts.append(Document(page_content=content, metadata={"source": post_url})) | |
# if len(texts)>0: | |
# Pinecone.from_documents(texts, embeddings, index_name=INDEX_NAME, namespace=NAME_SPACE_2) | |
# message = f"Add facebook data to space {NAME_SPACE_2} in {INDEX_NAME} sucessfully" | |
# return message | |
def upload_files_blob(file_path): | |
file_name = os.path.basename(file_path) | |
blob_client = blob_service_client.get_blob_client(container=CONTAINER_NAME, blob=file_name) | |
with open(file_path,'rb') as data: | |
blob_client.upload_blob(data) | |
print(f"Uploaded {file_name}.") | |
def load_files_blob(): | |
container_client = blob_service_client.get_container_client(CONTAINER_NAME) | |
files_name = [] | |
for blob in container_client.list_blobs(): | |
files_name.append(blob.name) | |
return files_name | |
def delete_blob(blob_name): | |
# Get container client | |
container_client = blob_service_client.get_container_client(CONTAINER_NAME) | |
container_client.delete_blob(blob_name) | |
print(f"Deleted {blob_name}") | |
def delete_all(): | |
container_client = blob_service_client.get_container_client(CONTAINER_NAME) | |
blob_list = container_client.list_blobs() | |
for blob in blob_list: | |
container_client.delete_blob(blob.name) | |
index.delete(delete_all=True, namespace=NAME_SPACE_1) | |
message = f"Delete all files in space {NAME_SPACE_1} succesfully" | |
return gr.update(choices=[]), message, gr.Files.update(None) | |
def delete_file(files_src): | |
file_name = [] | |
for files in files_src: | |
delete_blob(files) | |
file_name.append(files) | |
_filter = {"source": {"$in": file_name}} | |
index.delete(filter=_filter, namespace=NAME_SPACE_1) | |
message = f"Delete {len(files_src)} files in space {NAME_SPACE_1} files succesfully" | |
available_files = load_files_blob() | |
return gr.update(choices=available_files), message, gr.Files.update(None) | |
def upload_file(check_box): | |
if check_box: | |
namespace = NAME_SPACE_1 | |
else: | |
namespace = NAME_SPACE_2 | |
vectorstore = Pinecone.from_existing_index(INDEX_NAME, embeddings, namespace=namespace) | |
print(f"Load files from space {namespace} in {INDEX_NAME}") | |
return vectorstore | |
def handle_upload_file(files): | |
documents = get_documents(files) | |
if len(documents)>0: | |
Pinecone.from_documents(documents, embeddings, index_name=INDEX_NAME, namespace=NAME_SPACE_1) | |
message = f"Add files to space {NAME_SPACE_1} in {INDEX_NAME} sucessfully" | |
print(message) | |
else: | |
message = f"Load files from space existing {NAME_SPACE_1} in {INDEX_NAME}" | |
print(message) | |
return message | |
def get_documents(file_src): | |
documents = [] | |
if file_src is None: | |
return documents | |
available_files = load_files_blob() | |
for file in file_src: | |
filepath = file.name | |
filename = os.path.basename(filepath) | |
file_type = os.path.splitext(filename)[1] | |
if filename in available_files: | |
continue | |
else: | |
upload_files_blob(filepath) | |
try: | |
if file_type == ".pdf": | |
pdftext = "" | |
with open(filepath, "rb") as pdfFileObj: | |
pdf_reader = PyPDF2.PdfReader(pdfFileObj) | |
for page in tqdm(pdf_reader.pages): | |
pdftext += page.extract_text() | |
texts = [Document(page_content=pdftext, metadata={"source": filename})] | |
elif file_type == ".docx": | |
from langchain.document_loaders import UnstructuredWordDocumentLoader | |
loader = UnstructuredWordDocumentLoader(filepath) | |
texts = loader.load() | |
elif file_type == ".pptx": | |
from langchain.document_loaders import UnstructuredPowerPointLoader | |
loader = UnstructuredPowerPointLoader(filepath) | |
texts = loader.load() | |
else: | |
from langchain.document_loaders import TextLoader | |
loader = TextLoader(filepath, "utf8") | |
texts = loader.load() | |
except Exception as e: | |
import traceback | |
traceback.print_exc() | |
texts = text_splitter.split_documents(texts) | |
documents.extend(texts) | |
return documents | |
if __name__ == "__main__": | |
upload_file(["STANDARD_SOFTWARE LIFECYCLES.pdf"]) |