Spaces:
Runtime error
Runtime error
File size: 5,054 Bytes
d037cdf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
import pinecone
import os
import json
import ast
import PyPDF2
import shutil
import gradio as gr
from tqdm import tqdm
from pydantic import Field
from typing import List, Optional
from langchain.load.serializable import Serializable
from langchain.vectorstores import Pinecone
from config import PINECONE_API_KEY, PINECONE_ENVIRONMENT, INDEX_NAME, SAVE_DIR
from config import EMBEDDING_API_BASE, EMBEDDING_API_KEY, OPENAI_API_TYPE, OPENAI_API_VERSION, EMBEDDING_DEPLOYMENT_ID
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import TokenTextSplitter
# initialize pinecone
pinecone.init(
api_key=PINECONE_API_KEY, # find at app.pinecone.io
environment=PINECONE_ENVIRONMENT, # next to api key in console
)
# Azure embedding model definition
embeddings = OpenAIEmbeddings(
deployment=EMBEDDING_DEPLOYMENT_ID,
openai_api_key=EMBEDDING_API_KEY,
openai_api_base=EMBEDDING_API_BASE,
openai_api_type=OPENAI_API_TYPE,
openai_api_version=OPENAI_API_VERSION,
chunk_size=16
)
text_splitter = TokenTextSplitter(chunk_size=500, chunk_overlap=30)
if INDEX_NAME and INDEX_NAME not in pinecone.list_indexes():
pinecone.create_index(
INDEX_NAME,
metric="cosine",
dimension=1536
)
print(f"Index {INDEX_NAME} created successfully")
index = pinecone.Index(INDEX_NAME)
class Document(Serializable):
"""Class for storing a piece of text and associated metadata."""
page_content: str
"""String text."""
metadata: dict = Field(default_factory=dict)
"""Arbitrary metadata about the page content (e.g., source, relationships to other
documents, etc.).
"""
def delete_all():
for files in os.listdir(SAVE_DIR):
os.remove(os.path.join(SAVE_DIR, files))
index.delete(delete_all=True)
message = "Delete all files succesfully"
return gr.update(choices=[]), message, gr.Files.update(None)
def delete_file(files_src):
file_name = []
for files in files_src:
os.remove(os.path.join(SAVE_DIR, files))
file_name.append(files)
_filter = {"document_id": {"$in": file_name}}
index.delete(filter=_filter)
message = f"Delete {len(files_src)} files succesfully"
return gr.update(choices=os.listdir(SAVE_DIR)), message, gr.Files.update(None)
def upload_file():
vectorstore = Pinecone.from_existing_index(INDEX_NAME, embeddings)
print(f"Load files from existing {INDEX_NAME}")
return vectorstore
def handle_upload_file(files):
documents = get_documents(files)
if len(documents)>0:
Pinecone.from_documents(documents, embeddings, index_name=INDEX_NAME)
message = f"Add files to {INDEX_NAME} sucessfully"
print(message)
else:
message = f"Load files from existing {INDEX_NAME}"
print(message)
return message
def update_file():
with open('data.json') as json_file:
data = json.load(json_file)
datas = ast.literal_eval(data)
texts = []
for k, v in datas.items():
content = v["content"]
post_url = v["post_url"]
texts.append(Document(page_content=content, metadata={"source": post_url}))
if len(texts)>0:
Pinecone.from_documents(texts, embeddings, index_name=INDEX_NAME)
message = f"Add facebook data to {INDEX_NAME} sucessfully"
return message
def get_documents(file_src):
documents = []
if file_src is None:
return documents
for file in file_src:
filepath = file.name
filename = os.path.basename(filepath)
file_type = os.path.splitext(filename)[1]
if filename in os.listdir(SAVE_DIR):
continue
else:
shutil.copy(filepath, os.path.join(SAVE_DIR, filename))
try:
if file_type == ".pdf":
pdftext = ""
with open(filepath, "rb") as pdfFileObj:
pdf_reader = PyPDF2.PdfReader(pdfFileObj)
for page in tqdm(pdf_reader.pages):
pdftext += page.extract_text()
texts = [Document(page_content=pdftext, metadata={"source": filepath})]
elif file_type == ".docx":
from langchain.document_loaders import UnstructuredWordDocumentLoader
loader = UnstructuredWordDocumentLoader(filepath)
texts = loader.load()
elif file_type == ".pptx":
from langchain.document_loaders import UnstructuredPowerPointLoader
loader = UnstructuredPowerPointLoader(filepath)
texts = loader.load()
else:
from langchain.document_loaders import TextLoader
loader = TextLoader(filepath, "utf8")
texts = loader.load()
except Exception as e:
import traceback
traceback.print_exc()
texts = text_splitter.split_documents(texts)
documents.extend(texts)
return documents
if __name__ == "__main__":
upload_file(["STANDARD_SOFTWARE LIFECYCLES.pdf"]) |