|
import os |
|
import torch |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
from PyPDF2 import PdfReader |
|
import gradio as gr |
|
from datasets import Dataset, load_from_disk |
|
from sentence_transformers import SentenceTransformer |
|
import numpy as np |
|
|
|
|
|
def extract_text_from_pdf(pdf_path): |
|
text = "" |
|
with open(pdf_path, "rb") as f: |
|
reader = PdfReader(f) |
|
for page in reader.pages: |
|
text += page.extract_text() |
|
return text |
|
|
|
|
|
model_name = "scb10x/llama-3-typhoon-v1.5x-8b-instruct" |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForCausalLM.from_pretrained(model_name) |
|
|
|
|
|
embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') |
|
|
|
|
|
pdf_path = "/home/user/app/TOPF 2564.pdf" |
|
pdf_text = extract_text_from_pdf(pdf_path) |
|
passages = [{"title": "", "text": line} for line in pdf_text.split('\n') if line.strip()] |
|
|
|
|
|
embeddings = embedding_model.encode([passage["text"] for passage in passages]) |
|
|
|
|
|
dataset = Dataset.from_dict({"title": [p["title"] for p in passages], "text": [p["text"] for p in passages], "embeddings": embeddings.tolist()}) |
|
|
|
|
|
dataset_path = "/home/user/app/rag_document_dataset" |
|
index_path = "/home/user/app/rag_document_index" |
|
|
|
|
|
os.makedirs(dataset_path, exist_ok=True) |
|
os.makedirs(index_path, exist_ok=True) |
|
|
|
|
|
dataset.save_to_disk(dataset_path) |
|
dataset = load_from_disk(dataset_path) |
|
|
|
|
|
def add_faiss_index(dataset, column): |
|
import faiss |
|
embeddings = np.array(dataset[column]) |
|
dim = embeddings.shape[1] |
|
index = faiss.IndexFlatL2(dim) |
|
index.add(embeddings) |
|
dataset.add_faiss_index(column=column) |
|
return dataset |
|
|
|
dataset = add_faiss_index(dataset, column="embeddings") |
|
dataset.save(index_path) |
|
|
|
|
|
def retrieve(query): |
|
|
|
query_embedding = embedding_model.encode([query]) |
|
scores, samples = dataset.get_nearest_examples("embeddings", query_embedding, k=5) |
|
retrieved_passages = " ".join([sample["text"] for sample in samples]) |
|
return retrieved_passages |
|
|
|
|
|
def answer_question(question, context): |
|
retrieved_context = retrieve(question) |
|
inputs = tokenizer(question + " " + retrieved_context, return_tensors="pt") |
|
input_ids = inputs["input_ids"] |
|
attention_mask = inputs["attention_mask"] |
|
|
|
|
|
outputs = model.generate(input_ids=input_ids, attention_mask=attention_mask) |
|
answer = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
return answer |
|
|
|
|
|
def ask(question): |
|
return answer_question(question, pdf_text) |
|
|
|
demo = gr.Interface( |
|
fn=ask, |
|
inputs=gr.inputs.Textbox(lines=2, placeholder="Ask something..."), |
|
outputs="text", |
|
title="Document QA with RAG", |
|
description="Ask questions based on the provided document." |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|