|
|
|
|
|
|
|
import os |
|
import sys |
|
|
|
import pymongo |
|
from clearml import PipelineDecorator |
|
from dotenv import load_dotenv |
|
from langchain_community.embeddings import OllamaEmbeddings |
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
from qdrant_client import QdrantClient |
|
from qdrant_client.http.models import Distance, PointStruct, VectorParams |
|
|
|
|
|
try: |
|
load_dotenv(override=True) |
|
except Exception: |
|
load_dotenv(sys.path[1] + "/.env", override=True) |
|
CLEARML_WEB_HOST = os.getenv("CLEARML_WEB_HOST") |
|
CLEARML_API_HOST = os.getenv("CLEARML_API_HOST") |
|
CLEARML_FILES_HOST = os.getenv("CLEARML_FILES_HOST") |
|
CLEARML_API_ACCESS_KEY = os.getenv("CLEARML_API_ACCESS_KEY") |
|
CLEARML_API_SECRET_KEY = os.getenv("CLEARML_API_SECRETKEY") |
|
|
|
|
|
@PipelineDecorator.component(cache=False, return_values=["links, resultTypes, texts"]) |
|
def retreiveDocuments(): |
|
links = [] |
|
resultTypes = [] |
|
texts = [] |
|
|
|
try: |
|
load_dotenv(override=True) |
|
except Exception: |
|
load_dotenv(sys.path[1] + "/.env", override=True) |
|
DATABASE_HOST = os.getenv("DATABASE_HOST") |
|
mongoHost = pymongo.MongoClient(DATABASE_HOST) |
|
mongoDatabase = mongoHost["twin"] |
|
collections = mongoDatabase.list_collection_names() |
|
for collection in collections: |
|
mongoCollection = mongoDatabase[collection] |
|
results = mongoCollection.find() |
|
for result in results: |
|
|
|
links.append(result["link"]) |
|
resultTypes.append(result["type"]) |
|
texts.append(result["content"]) |
|
return links, resultTypes, texts |
|
|
|
|
|
@PipelineDecorator.component(cache=False, return_values=["cleanTexts"]) |
|
def cleanDocuments(texts): |
|
cleanTexts = [] |
|
for text in texts: |
|
cleanTexts.append("".join(char for char in text if 32 <= ord(char) <= 126)) |
|
return cleanTexts |
|
|
|
|
|
@PipelineDecorator.component(cache=False, return_values=["chunks", "chunkNums"]) |
|
def chunkDocuments(texts): |
|
chunks = [] |
|
chunkNums = [] |
|
text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=500, |
|
chunk_overlap=20, |
|
length_function=len, |
|
is_separator_regex=False, |
|
) |
|
for text in texts: |
|
textChunks = text_splitter.split_text(text) |
|
chunkNum = 0 |
|
for chunk in textChunks: |
|
chunks.append(text_splitter.split_text(chunk)) |
|
chunkNums.append(chunkNum) |
|
chunkNum += 1 |
|
return chunks, chunkNums |
|
|
|
|
|
@PipelineDecorator.component(cache=False, return_values=["embeddings"]) |
|
def embedChunks(chunks): |
|
|
|
MODEL = "llama3.2" |
|
try: |
|
load_dotenv(override=True) |
|
except Exception: |
|
load_dotenv(sys.path[1] + "/.env", override=True) |
|
USE_DOCKER = os.getenv("USE_DOCKER") |
|
if USE_DOCKER == "True": |
|
embeddingsModel = OllamaEmbeddings(model=MODEL, base_url="http://host.docker.internal:11434") |
|
else: |
|
embeddingsModel = OllamaEmbeddings(model=MODEL) |
|
return embeddingsModel.embed_documents(chunks) |
|
|
|
|
|
|
|
@PipelineDecorator.component(cache=False) |
|
def storeEmbeddings(embeddings, links, resultTypes, chunks, chunkNums): |
|
|
|
try: |
|
load_dotenv(override=True) |
|
except Exception: |
|
load_dotenv(sys.path[1] + "/.env", override=True) |
|
USE_QDRANT_CLOUD = os.getenv("USE_QDRANT_CLOUD") |
|
QDRANT_CLOUD_URL = os.getenv("QDRANT_CLOUD_URL") |
|
QDRANT_APIKEY = os.getenv("QDRANT_APIKEY") |
|
if USE_QDRANT_CLOUD == "True": |
|
qClient = QdrantClient(url=QDRANT_CLOUD_URL, api_key=QDRANT_APIKEY) |
|
else: |
|
qClient = QdrantClient(url=QDRANT_CLOUD_URL) |
|
|
|
|
|
if not qClient.collection_exists("Github"): |
|
qClient.create_collection( |
|
collection_name="Github", |
|
vectors_config=VectorParams(size=3072, distance=Distance.COSINE), |
|
) |
|
if not qClient.collection_exists("Document"): |
|
qClient.create_collection( |
|
collection_name="Document", |
|
vectors_config=VectorParams(size=3072, distance=Distance.COSINE), |
|
) |
|
chunkIndex = -1 |
|
documentIndex = -1 |
|
for chunkNum in chunkNums: |
|
chunkIndex += 1 |
|
if chunkNum == 0: |
|
documentIndex += 1 |
|
|
|
qClient.upsert( |
|
collection_name=resultTypes[documentIndex], |
|
wait=True, |
|
points=[ |
|
PointStruct( |
|
id=chunkIndex, |
|
vector=embeddings[chunkIndex], |
|
payload={ |
|
"link": links[documentIndex], |
|
"type": resultTypes[documentIndex], |
|
"chunk": chunkNum, |
|
"text": chunks[chunkIndex], |
|
}, |
|
) |
|
], |
|
) |
|
|
|
|
|
|
|
@PipelineDecorator.pipeline( |
|
name="Feature Pipeline", |
|
project="RAG LLM", |
|
version="0.3", |
|
) |
|
def main(): |
|
links, resultTypes, texts = retreiveDocuments() |
|
texts = cleanDocuments(texts) |
|
chunks, chunkNums = chunkDocuments(texts) |
|
embeddings = embedChunks(chunks) |
|
storeEmbeddings(embeddings, links, resultTypes, chunks, chunkNums) |
|
|
|
|
|
if __name__ == "__main__": |
|
PipelineDecorator.run_locally() |
|
main() |
|
|