Spaces:
Runtime error
Runtime error
import pinecone | |
import os | |
import json | |
import ast | |
import PyPDF2 | |
import shutil | |
import gradio as gr | |
from tqdm import tqdm | |
from pydantic import Field | |
from typing import List, Optional | |
from langchain.load.serializable import Serializable | |
from langchain.vectorstores import Pinecone | |
from config import PINECONE_API_KEY, PINECONE_ENVIRONMENT, INDEX_NAME, SAVE_DIR | |
from config import EMBEDDING_API_BASE, EMBEDDING_API_KEY, OPENAI_API_TYPE, OPENAI_API_VERSION, EMBEDDING_DEPLOYMENT_ID | |
from langchain.embeddings import OpenAIEmbeddings | |
from langchain.text_splitter import TokenTextSplitter | |
# initialize pinecone | |
pinecone.init( | |
api_key=PINECONE_API_KEY, # find at app.pinecone.io | |
environment=PINECONE_ENVIRONMENT, # next to api key in console | |
) | |
# Azure embedding model definition | |
embeddings = OpenAIEmbeddings( | |
deployment=EMBEDDING_DEPLOYMENT_ID, | |
openai_api_key=EMBEDDING_API_KEY, | |
openai_api_base=EMBEDDING_API_BASE, | |
openai_api_type=OPENAI_API_TYPE, | |
openai_api_version=OPENAI_API_VERSION, | |
chunk_size=16 | |
) | |
text_splitter = TokenTextSplitter(chunk_size=500, chunk_overlap=30) | |
if INDEX_NAME and INDEX_NAME not in pinecone.list_indexes(): | |
pinecone.create_index( | |
INDEX_NAME, | |
metric="cosine", | |
dimension=1536 | |
) | |
print(f"Index {INDEX_NAME} created successfully") | |
index = pinecone.Index(INDEX_NAME) | |
class Document(Serializable): | |
"""Class for storing a piece of text and associated metadata.""" | |
page_content: str | |
"""String text.""" | |
metadata: dict = Field(default_factory=dict) | |
"""Arbitrary metadata about the page content (e.g., source, relationships to other | |
documents, etc.). | |
""" | |
def delete_all(): | |
for files in os.listdir(SAVE_DIR): | |
os.remove(os.path.join(SAVE_DIR, files)) | |
index.delete(delete_all=True) | |
message = "Delete all files succesfully" | |
return gr.update(choices=[]), message, gr.Files.update(None) | |
def delete_file(files_src): | |
file_name = [] | |
for files in files_src: | |
os.remove(os.path.join(SAVE_DIR, files)) | |
file_name.append(files) | |
_filter = {"document_id": {"$in": file_name}} | |
index.delete(filter=_filter) | |
message = f"Delete {len(files_src)} files succesfully" | |
return gr.update(choices=os.listdir(SAVE_DIR)), message, gr.Files.update(None) | |
def upload_file(): | |
vectorstore = Pinecone.from_existing_index(INDEX_NAME, embeddings) | |
print(f"Load files from existing {INDEX_NAME}") | |
return vectorstore | |
def handle_upload_file(files): | |
documents = get_documents(files) | |
if len(documents)>0: | |
Pinecone.from_documents(documents, embeddings, index_name=INDEX_NAME) | |
message = f"Add files to {INDEX_NAME} sucessfully" | |
print(message) | |
else: | |
message = f"Load files from existing {INDEX_NAME}" | |
print(message) | |
return message | |
def update_file(): | |
with open('data.json') as json_file: | |
data = json.load(json_file) | |
datas = ast.literal_eval(data) | |
texts = [] | |
for k, v in datas.items(): | |
content = v["content"] | |
post_url = v["post_url"] | |
texts.append(Document(page_content=content, metadata={"source": post_url})) | |
if len(texts)>0: | |
Pinecone.from_documents(texts, embeddings, index_name=INDEX_NAME) | |
message = f"Add facebook data to {INDEX_NAME} sucessfully" | |
return message | |
def get_documents(file_src): | |
documents = [] | |
if file_src is None: | |
return documents | |
for file in file_src: | |
filepath = file.name | |
filename = os.path.basename(filepath) | |
file_type = os.path.splitext(filename)[1] | |
if filename in os.listdir(SAVE_DIR): | |
continue | |
else: | |
shutil.copy(filepath, os.path.join(SAVE_DIR, filename)) | |
try: | |
if file_type == ".pdf": | |
pdftext = "" | |
with open(filepath, "rb") as pdfFileObj: | |
pdf_reader = PyPDF2.PdfReader(pdfFileObj) | |
for page in tqdm(pdf_reader.pages): | |
pdftext += page.extract_text() | |
texts = [Document(page_content=pdftext, metadata={"source": filepath})] | |
elif file_type == ".docx": | |
from langchain.document_loaders import UnstructuredWordDocumentLoader | |
loader = UnstructuredWordDocumentLoader(filepath) | |
texts = loader.load() | |
elif file_type == ".pptx": | |
from langchain.document_loaders import UnstructuredPowerPointLoader | |
loader = UnstructuredPowerPointLoader(filepath) | |
texts = loader.load() | |
else: | |
from langchain.document_loaders import TextLoader | |
loader = TextLoader(filepath, "utf8") | |
texts = loader.load() | |
except Exception as e: | |
import traceback | |
traceback.print_exc() | |
texts = text_splitter.split_documents(texts) | |
documents.extend(texts) | |
return documents | |
if __name__ == "__main__": | |
upload_file(["STANDARD_SOFTWARE LIFECYCLES.pdf"]) |