|
|
|
|
|
import os |
|
import sys |
|
from operator import itemgetter |
|
|
|
from clearml import PipelineDecorator |
|
from dotenv import load_dotenv |
|
from langchain.prompts import PromptTemplate |
|
from langchain_community.embeddings import OllamaEmbeddings |
|
from langchain_community.llms import Ollama |
|
from qdrant_client import QdrantClient |
|
|
|
|
|
try: |
|
load_dotenv(override=True) |
|
except Exception: |
|
load_dotenv(sys.path[1] + "/.env", override=True) |
|
CLEARML_WEB_HOST = os.getenv("CLEARML_WEB_HOST") |
|
CLEARML_API_HOST = os.getenv("CLEARML_API_HOST") |
|
CLEARML_FILES_HOST = os.getenv("CLEARML_FILES_HOST") |
|
CLEARML_API_ACCESS_KEY = os.getenv("CLEARML_API_ACCESS_KEY") |
|
CLEARML_API_SECRET_KEY = os.getenv("CLEARML_API_SECRETKEY") |
|
|
|
|
|
|
|
@PipelineDecorator.component(cache=False, return_values=["newQuery"]) |
|
def queryExpansion(query): |
|
|
|
MODEL = "llama3.2" |
|
try: |
|
load_dotenv(override=True) |
|
except Exception: |
|
load_dotenv(sys.path[1] + "/.env", override=True) |
|
USE_DOCKER = os.getenv("USE_DOCKER") |
|
if USE_DOCKER == "True": |
|
model = Ollama(model=MODEL, base_url="http://host.docker.internal:11434") |
|
else: |
|
model = Ollama(model=MODEL) |
|
|
|
template = """ |
|
Rewrite the prompt. The new prompt must offer a different perspective. |
|
Do not change the meaning. Output only the rewritten prompt with no introduction. |
|
Prompt: {prompt} |
|
""" |
|
prompt = PromptTemplate.from_template(template) |
|
chain = {"prompt": itemgetter("prompt")} | prompt | model |
|
return chain.invoke({"prompt": query}) |
|
|
|
|
|
|
|
@PipelineDecorator.component(cache=False, return_values=["codingQuestion"]) |
|
def selfQuerying(query): |
|
|
|
MODEL = "llama3.2" |
|
try: |
|
load_dotenv(override=True) |
|
except Exception: |
|
load_dotenv(sys.path[1] + "/.env", override=True) |
|
USE_DOCKER = os.getenv("USE_DOCKER") |
|
if USE_DOCKER == "True": |
|
model = Ollama(model=MODEL, base_url="http://host.docker.internal:11434") |
|
else: |
|
model = Ollama(model=MODEL) |
|
|
|
template = """ |
|
You are an AI assistant. You must determine if the prompt requires code as the answer. |
|
Output a 1 if it is or a 0 if it is not and nothing else. |
|
Prompt: {prompt} |
|
""" |
|
prompt = PromptTemplate.from_template(template) |
|
chain = {"prompt": itemgetter("prompt")} | prompt | model |
|
return chain.invoke({"prompt": query}) |
|
|
|
|
|
|
|
@PipelineDecorator.component(cache=False, return_values=["results1, results2"]) |
|
def filteredVectorSearch(query, newQuery, codingQuestion): |
|
|
|
try: |
|
load_dotenv(override=True) |
|
except Exception: |
|
load_dotenv(sys.path[1] + "/.env", override=True) |
|
USE_QDRANT_CLOUD = os.getenv("USE_QDRANT_CLOUD") |
|
QDRANT_CLOUD_URL = os.getenv("QDRANT_CLOUD_URL") |
|
QDRANT_APIKEY = os.getenv("QDRANT_APIKEY") |
|
if USE_QDRANT_CLOUD=="True": |
|
qClient = QdrantClient(url=QDRANT_CLOUD_URL, api_key=QDRANT_APIKEY) |
|
else: |
|
qClient = QdrantClient(url=QDRANT_CLOUD_URL) |
|
|
|
|
|
MODEL = "llama3.2" |
|
try: |
|
load_dotenv(override=True) |
|
except Exception: |
|
load_dotenv(sys.path[1] + "/.env", override=True) |
|
USE_DOCKER = os.getenv("USE_DOCKER") |
|
if USE_DOCKER == "True": |
|
embeddingsModel = OllamaEmbeddings(model=MODEL, base_url="http://host.docker.internal:11434") |
|
else: |
|
embeddingsModel = OllamaEmbeddings(model=MODEL) |
|
|
|
|
|
relatedCollection = "Document" |
|
if codingQuestion == "1": |
|
relatedCollection = "Github" |
|
results1 = qClient.search( |
|
collection_name=relatedCollection, |
|
query_vector=embeddingsModel.embed_query(query), |
|
limit=10, |
|
) |
|
results2 = qClient.search( |
|
collection_name=relatedCollection, |
|
query_vector=embeddingsModel.embed_query(newQuery), |
|
limit=10, |
|
) |
|
return results1, results2 |
|
|
|
|
|
|
|
@PipelineDecorator.component(cache=False, return_values=["results"]) |
|
def collectingResults(results1, results2): |
|
return results1 + results2 |
|
|
|
|
|
|
|
@PipelineDecorator.component(cache=False, return_values=["topTexts"]) |
|
def reranking(results): |
|
ids = [result.id for result in results] |
|
scores = [result.score for result in results] |
|
topIds = [] |
|
topIndexes = [] |
|
for x in range(3): |
|
maxScore = 0 |
|
maxIndex = 0 |
|
for i in range(len(ids)): |
|
if ids[i] not in topIds and scores[i] > maxScore: |
|
maxScore = scores[i] |
|
maxIndex = i |
|
topIds.append(ids[maxIndex]) |
|
topIndexes.append(maxIndex) |
|
texts = [result.payload["text"] for result in results] |
|
topTexts = "" |
|
for index in topIndexes: |
|
topTexts += texts[index] |
|
return topTexts |
|
|
|
|
|
|
|
@PipelineDecorator.component(cache=False, return_values=["prompt"]) |
|
def buildingPrompt(codingQuestion): |
|
if codingQuestion == "1": |
|
template = """ |
|
Write code for the following question given the related coding document below. |
|
|
|
Document: {document} |
|
Question: {question} |
|
""" |
|
return PromptTemplate.from_template(template) |
|
else: |
|
template = """ |
|
Answer the question based on the document below. If you can't answer the question, reply "I don't know" |
|
|
|
Document: {document} |
|
Question: {question} |
|
""" |
|
return PromptTemplate.from_template(template) |
|
|
|
|
|
|
|
@PipelineDecorator.component(cache=False, return_values=["answer"]) |
|
def obtainingAnswer(query, prompt, topTexts): |
|
|
|
MODEL = "llama3.2" |
|
try: |
|
load_dotenv(override=True) |
|
except Exception: |
|
load_dotenv(sys.path[1] + "/.env", override=True) |
|
USE_DOCKER = os.getenv("USE_DOCKER") |
|
if USE_DOCKER == "True": |
|
model = Ollama(model=MODEL, base_url="http://host.docker.internal:11434") |
|
else: |
|
model = Ollama(model=MODEL) |
|
|
|
chain = ( |
|
{"document": itemgetter("document"), "question": itemgetter("question")} |
|
| prompt |
|
| model |
|
) |
|
chain.invoke({"document": topTexts, "question": query}) |
|
|
|
|
|
|
|
@PipelineDecorator.pipeline( |
|
name="Inference Pipeline", |
|
project="RAG LLM", |
|
version="0.1", |
|
) |
|
def main(): |
|
|
|
query = "What operating system was ROS written for?" |
|
newQuery = queryExpansion(query) |
|
codingQuestion = selfQuerying(query) |
|
results1, results2 = filteredVectorSearch(query, newQuery, codingQuestion) |
|
results = collectingResults(results1, results2) |
|
topTexts = reranking(results) |
|
prompt = buildingPrompt(codingQuestion) |
|
return obtainingAnswer(query, prompt, topTexts) |
|
|
|
|
|
if __name__ == "__main__": |
|
PipelineDecorator.run_locally() |
|
main() |
|
|