visual-rag-tool / tool.py
paultltc's picture
fix typo
6c0b466
import os
from smolagents import Tool
from dotenv import load_dotenv
load_dotenv()
class VisualRAGTool(Tool):
name = "visual_rag"
description = """Performs a RAG query on your internal PDF documents and returns the generated text response."""
inputs = {
"query": {
"type": "string",
"description": "The query to perform. This should be semantically close to your target documents.",
},
"k": {
"type": "number",
"description": "The number of documents to retrieve.",
"default": 1,
"nullable": True,
},
"api_key": {
"type": "string",
"description": "The OpenAI API key to use for the query. If not provided, the key will be taken from the OPENAI_KEY environment variable.",
"nullable": True,
}
}
output_type = "string"
model_name: str = "vidore/colqwen2-v1.0"
api_key: str = os.getenv("OPENAI_KEY")
class Page:
from typing import Optional, Dict, Any
from PIL import Image
image: Image.Image
metadata: Optional[Dict[str, Any]] = None
def __init__(self, image, metadata=None):
self.image = image
self.metadata = metadata
@property
def caption(self):
if self.metadata is None:
return None
return f"Document: {self.metadata.get('doc_title')}, Context: {self.metadata.get('context')}"
def __hash__(self):
return hash(self.image)
def __init__(self, *args, **kwargs):
self.is_initialized = False
def _init_models(self, model_name: str) -> None:
import torch
from colpali_engine.models import ColQwen2, ColQwen2Processor
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model = ColQwen2.from_pretrained(
model_name,
torch_dtype=torch.bfloat16,
device_map="auto",
attn_implementation="flash_attention_2"
).eval()
self.processor = ColQwen2Processor.from_pretrained(model_name)
def setup(self):
"""
Overwrite this method here for any operation that is expensive and needs to be executed before you start using
your tool. Such as loading a big model.
"""
self._init_models(self.model_name)
self.embds = []
self.pages = []
self.is_initialized = True
def _encode_image_to_base64(self, image):
"""Encodes a PIL image to a base64 string."""
from io import BytesIO
import base64
buffered = BytesIO()
image.save(buffered, format="JPEG")
return base64.b64encode(buffered.getvalue()).decode("utf-8")
def _build_query(self, query: str, pages: list) -> list:
"""Builds the query for OpenAI based on the pages and the query."""
messages = []
messages.append({"type": "text", "text": "PDF pages:\n"})
for page in pages:
capt = page.caption
if capt is not None:
messages.append({
"type": "text",
"text": capt
})
messages.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{self._encode_image_to_base64(page.image)}"
},
})
messages.append({"type": "text", "text": f"Query:\n{query}"})
return messages
def query_openai(self, query, pages, api_key=None, system_prompt=None, model="gpt-4o-mini"):
"""Calls OpenAI's GPT-4o-mini with the query and image data."""
from smolagents import ChatMessage
system_prompt = system_prompt or \
"""You are a smart assistant designed to answer questions about a PDF document.
You are given relevant information in the form of PDF pages preceded by their metadata: document title, page identifier, surrounding context.
Use them to construct a short response to the question, and cite your sources in the following format: (document, page number).
If it is not possible to answer using the provided pages, do not attempt to provide an answer and simply say the answer is not present within the documents.
Give detailed and extensive answers, only containing info in the pages you are given.
You can answer using information contained in plots and figures if necessary.
Answer in the same language as the query."""
api_key = api_key or self.api_key
if api_key and api_key.startswith("sk"):
try:
from openai import OpenAI
client = OpenAI(api_key=api_key.strip())
response = client.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": system_prompt
},
{
"role": "user",
"content": self._build_query(query, pages)
}
],
max_tokens=500,
)
message = ChatMessage.from_dict(
response.choices[0].message.model_dump(include={"role", "content", "tool_calls"})
)
message.raw = response
return message
except Exception as e:
return "OpenAI API connection failure. Verify the provided key is correct (sk-***)."
return "Enter your OpenAI API key to get a custom response"
def _extract_contexts(self, images, api_key, window=10) -> list:
"""Extracts context from images."""
from pqdm.threads import pqdm
CONTEXT_SYSTEM_PROMPT = \
"""You are a smart assistant designed to extract context of PDF pages.
Give concise answers, only containing info in the pages you are given.
You can answer using information contained in plots and figures if necessary."""
try:
args = [
{
'query': "Give the general context about these pages. Give the context in the same language as the documents.",
'pages': [self.Page(image=im) for im in images[max(i-window+1, 0):i+1]],
'api_key': api_key,
'system_prompt': CONTEXT_SYSTEM_PROMPT,
} for i in range(0, len(images), window)
]
window_contexts = pqdm(args, self.query_openai, n_jobs=8, argument_type='kwargs')
# code sequentially ftm with tqdm
# query = "Give the general context about these pages. Give the context in the same language as the documents."
# window_contexts = [query_openai(query, [Page(image=im) for im in images[max(i-window+1, 0):i+1]], api_key, DEFAULT_CONTEXT_PROMPT)\
# for i in tqdm(range(0, len(images), window))]
contexts = []
for i in range(len(images)):
context = window_contexts[i//window].content
contexts.append(context)
except Exception as e:
print(f"Error extracting contexts: {e}")
contexts = [None for _ in range(len(images))]
# Ensure that the number of contexts is equal to the number of images
assert len(contexts) == len(images)
return contexts
def _preprocess_file(self, file: str, contextualize: bool = True, api_key: str = None, window: int = 10) -> list:
"""Converts a file to images and extracts metadata."""
from pdf2image import convert_from_path
title = file.split("/")[-1]
images = convert_from_path(file, thread_count=4)
if contextualize and api_key:
contexts = self._extract_contexts(images, api_key, window=window)
else:
contexts = [None for _ in range(len(images))]
metadatas = [{'doc_title': title, 'page_id': i, 'context': contexts[i]} for i in range(len(images))]
return [self.Page(image=img, metadata=metadata) for img, metadata in zip(images, metadatas)]
def preprocess(self, files: list, contextualize: bool = True, api_key: str = None, window: int = 10) -> list:
"""Preprocesses the files and extracts metadata."""
pages = [page for file in files for page in self._preprocess_file(file, contextualize=contextualize, api_key=api_key, window=window)]
print(f"Example metadata:\n{pages[0].metadata.get('context')}")
return pages
def compute_embeddings(self, pages) -> list:
"""Embeds the images using the model."""
"""Example script to run inference with ColPali (ColQwen2)"""
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
# run inference - docs
dataloader = DataLoader(
pages,
batch_size=4,
shuffle=False,
collate_fn=lambda x: self.processor.process_images([p.image for p in x]).to(self.device),
)
embds = []
for batch_doc in tqdm(dataloader):
with torch.no_grad():
batch_doc = {k: v.to(self.device) for k, v in batch_doc.items()}
embeddings_doc = self.model(**batch_doc)
embds.extend(list(torch.unbind(embeddings_doc.to("cpu"))))
return embds
def index(self, files: list, contextualize: bool = True, api_key: str = None, overwrite_db: bool = False) -> int:
"""Indexes the uploaded files."""
if not self.is_initialized:
self.setup()
print("Converting files...")
# Convert files to images and extract metadata
pgs = self.preprocess(files, contextualize=contextualize, api_key=api_key or self.api_key)
# Embed the images
embds = self.compute_embeddings(pgs)
# Overwrite the database if necessary
if overwrite_db:
self.pages = []
self.embds = []
# Extend the pages
self.pages.extend(pgs)
# Extend the datasets
self.embds.extend(embds)
print(f"Extracted and indexed {len(pgs)} images from {len(files)} files.")
return len(embds)
def retrieve(self, query: str, k: int) -> list:
"""Retrieve the top k documents based on the query."""
import torch
k = min(k, len(self.embds))
qs = []
with torch.no_grad():
batch_query = self.processor.process_queries([query]).to(self.model.device)
embeddings_query = self.model(**batch_query)
qs.extend(list(torch.unbind(embeddings_query.to("cpu"))))
# Run scoring
scores = self.processor.score(qs, self.embds, device=self.device)[0]
top_k_idx = scores.topk(k).indices.tolist()
print("Top Scores:")
[print(f"Page {self.pages[idx].metadata.get('page_id')}: {scores[idx]}") for idx in top_k_idx]
# Get the top k results
results = [self.pages[idx] for idx in top_k_idx]
return results
def generate_answer(self, query: str, docs: list, api_key: str = None):
"""Generates an answer based on the query and the retrieved documents."""
RAG_SYSTEM_PROMPT = \
""" You are a smart assistant designed to answer questions about a PDF document.
You are given relevant information in the form of PDF pages preceded by their metadata: document title, page identifier, surrounding context.
Use them to construct a response to the question, and cite your sources.
Use the following citation format:
"Some information from a first document [1, p.Page Number]. Some information from the same first document but at a different page [1, p.Page Number]. Some more information from another document [2, p.Page Number].
...
Sources:
[1] Document Title
[2] Another Document Title"
You can answer using information contained in plots and figures if necessary.
If it is not possible to answer using the provided pages, do not attempt to provide an answer and simply say the answer is not present within the documents.
Give detailed answers, only containing info in the pages you are given.
Answer in the same language as the query."""
result = self.query_openai(query, docs, api_key or self.api_key, system_prompt=RAG_SYSTEM_PROMPT)
return result
def search(self, query: str, k: int = 1, api_key: str = None) -> tuple:
"""Searches for the most relevant pages based on the query."""
print(f"Searching for query: {query}")
# Retrieve the top k documents
context = self.retrieve(query, k)
# Generate response from GPT-4o-mini
rag_answer = self.generate_answer(
query=query,
docs=context,
api_key=api_key
)
return context, rag_answer.content
def forward(self, query: str, k: int = 1, api_key: str = None) -> str:
assert isinstance(query, str), "Your search query must be a string"
# Online indexing
# if files:
# _ = self.index(files, api_key)
# Retrieve the top k documents and generate response
return self.search(
query=query,
k=k,
api_key=api_key
)[1]