|
|
|
import datetime |
|
import uuid |
|
import openai |
|
from langchain.embeddings import OpenAIEmbeddings |
|
from langchain.vectorstores import Chroma |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain.embeddings import SentenceTransformerEmbeddings |
|
|
|
import os |
|
from langchain.document_loaders import WebBaseLoader, TextLoader, Docx2txtLoader, PyMuPDFLoader |
|
from whatsapp_chat_custom import WhatsAppChatLoader |
|
|
|
from collections import deque |
|
import re |
|
from bs4 import BeautifulSoup |
|
import requests |
|
from urllib.parse import urlparse |
|
import mimetypes |
|
from pathlib import Path |
|
import tiktoken |
|
|
|
|
|
HTTP_URL_PATTERN = r'^http[s]*://.+' |
|
|
|
mimetypes.init() |
|
media_files = tuple([x for x in mimetypes.types_map if mimetypes.types_map[x].split('/')[0] in ['image', 'video', 'audio']]) |
|
filter_strings = ['/email-protection#'] |
|
|
|
def getOaiCreds(key): |
|
if key: |
|
return {'service': 'openai', |
|
'oai_key' : key |
|
} |
|
else: |
|
return {} |
|
|
|
def getWxCreds(key, p_id): |
|
if key and p_id: |
|
return {'service': 'watsonx', |
|
'credentials' : {"url": "https://us-south.ml.cloud.ibm.com", "apikey": key }, |
|
'project_id': p_id |
|
} |
|
else: |
|
return {} |
|
|
|
def getPersonalBotApiKey(): |
|
if os.getenv("OPENAI_API_KEY"): |
|
return getOaiCreds(os.getenv("OPENAI_API_KEY")) |
|
elif os.getenv("WX_API_KEY") and os.getenv("WX_PROJECT_ID"): |
|
return getWxCreds(os.getenv("WX_API_KEY"), os.getenv("WX_PROJECT_ID")) |
|
else: |
|
return {} |
|
|
|
def get_hyperlinks(url): |
|
try: |
|
reqs = requests.get(url) |
|
if not reqs.headers.get('Content-Type').startswith("text/html") or 400<=reqs.status_code<600: |
|
return [] |
|
soup = BeautifulSoup(reqs.text, 'html.parser') |
|
except Exception as e: |
|
print(e) |
|
return [] |
|
|
|
hyperlinks = [] |
|
for link in soup.find_all('a', href=True): |
|
hyperlinks.append(link.get('href')) |
|
|
|
return hyperlinks |
|
|
|
|
|
|
|
def get_domain_hyperlinks(local_domain, url): |
|
clean_links = [] |
|
for link in set(get_hyperlinks(url)): |
|
clean_link = None |
|
|
|
|
|
if re.search(HTTP_URL_PATTERN, link): |
|
|
|
url_obj = urlparse(link) |
|
if url_obj.netloc.replace('www.','') == local_domain.replace('www.',''): |
|
clean_link = link |
|
|
|
|
|
else: |
|
if link.startswith("/"): |
|
link = link[1:] |
|
elif link.startswith(("#", '?', 'mailto:')): |
|
continue |
|
|
|
if 'wp-content/uploads' in url: |
|
clean_link = url+ "/" + link |
|
else: |
|
clean_link = "https://" + local_domain + "/" + link |
|
|
|
if clean_link is not None: |
|
clean_link = clean_link.strip().rstrip('/').replace('/../', '/') |
|
|
|
if not any(x in clean_link for x in filter_strings): |
|
clean_links.append(clean_link) |
|
|
|
|
|
return list(set(clean_links)) |
|
|
|
|
|
def crawl(url, local_domain, prog=None): |
|
|
|
queue = deque([url]) |
|
|
|
|
|
seen = set([url]) |
|
|
|
|
|
while queue: |
|
|
|
url_pop = queue.pop() |
|
|
|
for link in get_domain_hyperlinks(local_domain, url_pop): |
|
if link not in seen: |
|
queue.append(link) |
|
seen.add(link) |
|
if len(seen)>=100: |
|
return seen |
|
if prog is not None: prog(1, desc=f'Crawling: {url_pop}') |
|
|
|
return seen |
|
|
|
|
|
def ingestURL(documents, url, crawling=True, prog=None): |
|
url = url.rstrip('/') |
|
|
|
local_domain = urlparse(url).netloc |
|
if not (local_domain and url.startswith('http')): |
|
return documents |
|
print('Loading URL', url) |
|
if crawling: |
|
|
|
if prog is not None: prog(0, desc=f'Crawling: {url}') |
|
links = crawl(url, local_domain, prog) |
|
if prog is not None: prog(1, desc=f'Crawling: {url}') |
|
else: |
|
links = set([url]) |
|
|
|
c_links, pdf_links = [], [] |
|
for x in links: |
|
if x.endswith('.pdf'): |
|
pdf_links.append(x) |
|
elif not x.endswith(media_files): |
|
c_links.append(x) |
|
|
|
|
|
if prog is not None: prog(0.5, desc=f'Ingesting: {url}') |
|
if c_links: |
|
loader = WebBaseLoader(list(c_links)) |
|
documents.extend(loader.load()) |
|
|
|
|
|
for pdf_link in list(pdf_links): |
|
loader = PyMuPDFLoader(pdf_link) |
|
doc = loader.load() |
|
for x in doc: |
|
x.metadata['source'] = loader.source |
|
documents.extend(doc) |
|
|
|
return documents |
|
|
|
def ingestFiles(documents, files_list, prog=None): |
|
for fPath in files_list: |
|
doc = None |
|
if fPath.endswith('.pdf'): |
|
doc = PyMuPDFLoader(fPath).load() |
|
elif fPath.endswith('.txt') and not 'WhatsApp Chat with' in fPath: |
|
doc = TextLoader(fPath).load() |
|
elif fPath.endswith(('.doc', 'docx')): |
|
doc = Docx2txtLoader(fPath).load() |
|
elif 'WhatsApp Chat with' in fPath and fPath.endswith('.csv'): |
|
doc = WhatsAppChatLoader(fPath).load() |
|
else: |
|
pass |
|
|
|
if doc is not None and doc[0].page_content: |
|
if prog is not None: prog(1, desc='Loaded file: '+fPath.rsplit('/')[0]) |
|
print('Loaded file:', fPath) |
|
documents.extend(doc) |
|
return documents |
|
|
|
|
|
def data_ingestion(inputDir=None, file_list=[], url_list=[], prog=None): |
|
documents = [] |
|
|
|
if inputDir is not None: |
|
files = [str(x) for x in Path(inputDir).glob('**/*')] |
|
documents = ingestFiles(documents, files) |
|
if file_list: |
|
documents = ingestFiles(documents, file_list, prog) |
|
|
|
if url_list: |
|
for url in url_list: |
|
documents = ingestURL(documents, url, prog=prog) |
|
|
|
|
|
for x in documents: |
|
if 'WhatsApp Chat with' not in x.metadata['source']: |
|
x.page_content = x.page_content.strip().replace('\n', ' ').replace('\\n', ' ').replace(' ', ' ') |
|
|
|
|
|
return documents |
|
|
|
|
|
def split_docs(documents): |
|
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=2500, chunk_overlap=250) |
|
docs = text_splitter.split_documents(documents) |
|
return docs |
|
|
|
|
|
def getSourcesFromMetadata(metadata, sourceOnly=True, sepFileUrl=True): |
|
|
|
setSrc = set() |
|
for x in metadata: |
|
metadataText = '' |
|
if x is not None: |
|
|
|
source = x['source'] |
|
source = source.rsplit('/',1)[-1] if 'http' not in source else source |
|
notSource = [] |
|
for k,v in x.items(): |
|
if v is not None and k!='source' and k in ['page', 'title']: |
|
notSource.extend([f"{k}: {v}"]) |
|
metadataText = ', '.join([f'source: {source}'] + notSource) if sourceOnly==False else source |
|
setSrc.add(metadataText) |
|
|
|
if sepFileUrl: |
|
src_files = '\n'.join(([f"{i+1}) {x}" for i,x in enumerate(sorted([x for x in setSrc if 'http' not in x], key=str.casefold))])) |
|
src_urls = '\n'.join(([f"{i+1}) {x}" for i,x in enumerate(sorted([x for x in setSrc if 'http' in x], key=str.casefold))])) |
|
|
|
src_files = 'Files:\n'+src_files if src_files else '' |
|
src_urls = 'URLs:\n'+src_urls if src_urls else '' |
|
newLineSep = '\n\n' if src_files and src_urls else '' |
|
|
|
return src_files + newLineSep + src_urls , len(setSrc) |
|
else: |
|
src_docs = '\n'.join(([f"{i+1}) {x}" for i,x in enumerate(sorted(list(setSrc), key=str.casefold))])) |
|
return src_docs, len(setSrc) |
|
|
|
def getEmbeddingFunc(creds): |
|
|
|
if creds.get('service')=='openai': |
|
embeddings = OpenAIEmbeddings(openai_api_key=creds.get('oai_key','Null')) |
|
|
|
elif creds.get('service')=='watsonx': |
|
|
|
|
|
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") |
|
else: |
|
raise Exception('Error: Invalid or None Credentials') |
|
return embeddings |
|
|
|
def getVsDict(embeddingFunc, docs, vsDict={}): |
|
|
|
if vsDict.get('chromaClient') is None: |
|
vsDict['chromaDir'] = './vecstore/'+str(uuid.uuid1()) |
|
vsDict['chromaClient'] = Chroma(embedding_function=embeddingFunc, persist_directory=vsDict['chromaDir']) |
|
|
|
if vsDict['chromaClient']._collection.count()>0: |
|
vsDict['chromaClient'].delete(vsDict['chromaClient'].get()['ids']) |
|
|
|
vsDict['chromaClient'].add_documents(docs) |
|
print('vectorstore count:',vsDict['chromaClient']._collection.count(), 'at', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) |
|
return vsDict |
|
|
|
|
|
def localData_vecStore(embKey={}, inputDir=None, file_list=[], url_list=[], vsDict={}): |
|
documents = data_ingestion(inputDir, file_list, url_list) |
|
if not documents: |
|
raise Exception('Error: No Documents Found') |
|
docs = split_docs(documents) |
|
|
|
embeddings = getEmbeddingFunc(embKey) |
|
|
|
vsDict_hd = getVsDict(embeddings, docs, vsDict) |
|
|
|
src_str = getSourcesFromMetadata(vsDict_hd['chromaClient'].get()['metadatas']) |
|
src_str = str(src_str[1]) + ' source document(s) successfully loaded in vector store.'+'\n\n' + src_str[0] |
|
print(src_str) |
|
return vsDict_hd |
|
|
|
|
|
def num_tokens_from_string(string, encoding_name = "cl100k_base"): |
|
"""Returns the number of tokens in a text string.""" |
|
encoding = tiktoken.get_encoding(encoding_name) |
|
num_tokens = len(encoding.encode(string)) |
|
return num_tokens |
|
|
|
|