Spaces:
Paused
Paused
import gradio as gr | |
import torch | |
import unicodedata | |
import re | |
import numpy as np | |
from pathlib import Path | |
from transformers import AutoTokenizer, AutoModel | |
from sklearn.feature_extraction.text import HashingVectorizer | |
from sklearn.preprocessing import normalize as sk_normalize | |
import chromadb | |
import joblib | |
import pickle | |
import scipy.sparse | |
import textwrap | |
import os | |
# --------------------------- CONFIG ----------------------------------- | |
DB_DIR = Path("./chroma_db_greekbertChatbotVol106") | |
ASSETS_DIR = Path("./assets") | |
STATIC_PDF_DIR = Path("./static_pdfs") | |
STATIC_PDF_DIR_NAME = "static_pdfs" | |
COL_NAME = "dataset14_grbert_charword" | |
MODEL_NAME = "sentence-transformers/paraphrase-xlm-r-multilingual-v1" | |
CHUNK_SIZE = 512 | |
ALPHA_BASE = 0.2 | |
ALPHA_LONGQ = 0.5 | |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
print(f"Running on device: {DEVICE}") | |
# ----------------------- PRE-/POST HELPERS ---------------------------- | |
def strip_acc(s: str) -> str: | |
return ''.join(ch for ch in unicodedata.normalize('NFD', s) | |
if not unicodedata.combining(ch)) | |
STOP = {"σχετικο", "σχετικα", "με", "και"} | |
def preprocess(txt: str) -> str: | |
txt = strip_acc(txt.lower()) | |
txt = re.sub(r"[^a-zα-ω0-9 ]", " ", txt) | |
txt = re.sub(r"\s+", " ", txt).strip() | |
return " ".join(w for w in txt.split() if w not in STOP) | |
def cls_embed(texts, tok, model): | |
out = [] | |
enc = tok(texts, padding=True, truncation=True, | |
max_length=CHUNK_SIZE, return_tensors="pt").to(DEVICE) | |
with torch.no_grad(): | |
hs = model(**enc, output_hidden_states=True).hidden_states | |
cls = torch.stack(hs[-4:],0).mean(0)[:,0,:] | |
cls = torch.nn.functional.normalize(cls, p=2, dim=1) | |
out.append(cls.cpu()) | |
return torch.cat(out).numpy() | |
# ---------------------- LOAD MODELS & DATA (Μία φορά κατά την εκκίνηση) -------------------- | |
print("⏳ Loading Model and Tokenizer...") | |
try: | |
tok = AutoTokenizer.from_pretrained(MODEL_NAME) | |
model = AutoModel.from_pretrained(MODEL_NAME).to(DEVICE).eval() | |
print("✓ Model and tokenizer loaded.") | |
except Exception as e: | |
print(f"CRITICAL ERROR loading model/tokenizer: {e}") | |
raise | |
print("⏳ Loading TF-IDF vectorizers and SPARSE matrices...") | |
try: | |
char_vec = joblib.load(ASSETS_DIR / "char_vectorizer.joblib") | |
word_vec = joblib.load(ASSETS_DIR / "word_vectorizer.joblib") | |
X_char = scipy.sparse.load_npz(ASSETS_DIR / "X_char_sparse.npz") | |
X_word = scipy.sparse.load_npz(ASSETS_DIR / "X_word_sparse.npz") | |
print("✓ TF-IDF components loaded (sparse matrices).") | |
print(f" → X_char shape: {X_char.shape}, type: {type(X_char)}") | |
print(f" → X_word shape: {X_word.shape}, type: {type(X_word)}") | |
except Exception as e: | |
print(f"CRITICAL ERROR loading TF-IDF components: {e}") | |
raise | |
print("⏳ Loading chunk data (pre_chunks, raw_chunks, ids, metas)...") | |
try: | |
with open(ASSETS_DIR / "pre_chunks.pkl", "rb") as f: | |
pre_chunks = pickle.load(f) | |
with open(ASSETS_DIR / "raw_chunks.pkl", "rb") as f: | |
raw_chunks = pickle.load(f) | |
with open(ASSETS_DIR / "ids.pkl", "rb") as f: | |
ids = pickle.load(f) | |
with open(ASSETS_DIR / "metas.pkl", "rb") as f: | |
metas = pickle.load(f) | |
print(f"✓ Chunk data loaded. Total chunks from ids: {len(ids):,}") | |
if not all([pre_chunks, raw_chunks, ids, metas]): | |
print("WARNING: One or more chunk data lists are empty!") | |
except Exception as e: | |
print(f"CRITICAL ERROR loading chunk data: {e}") | |
raise | |
print("⏳ Connecting to ChromaDB...") | |
try: | |
client = chromadb.PersistentClient(path=str(DB_DIR.resolve())) | |
col = client.get_collection(COL_NAME) | |
print(f"✓ Connected to ChromaDB. Collection '{COL_NAME}' count: {col.count()}") | |
if col.count() == 0: | |
print("WARNING: ChromaDB collection is empty or not found correctly!") | |
except Exception as e: | |
print(f"CRITICAL ERROR connecting to ChromaDB or getting collection: {e}") | |
print(f"Attempted DB path for PersistentClient: {str(DB_DIR.resolve())}") | |
print("Ensure the ChromaDB directory structure is correct in your Hugging Face Space repository.") | |
raise | |
# ---------------------- HYBRID SEARCH (Κύρια Λογική) --------------------------------- | |
def hybrid_search_gradio(query, k=5): | |
if not query.strip(): | |
return "Παρακαλώ εισάγετε μια ερώτηση." | |
if not ids: | |
return "Σφάλμα: Τα δεδομένα αναζήτησης (ids) δεν έχουν φορτωθεί. Επικοινωνήστε με τον διαχειριστή." | |
q_pre = preprocess(query) | |
words = q_pre.split() | |
alpha = ALPHA_LONGQ if len(words) > 30 else ALPHA_BASE | |
exact_ids_set = {ids[i] for i, t in enumerate(pre_chunks) if q_pre in t} | |
q_emb_np = cls_embed([q_pre], tok, model) | |
q_emb_list = q_emb_np.tolist() | |
try: | |
sem_results = col.query( | |
query_embeddings=q_emb_list, | |
n_results=min(k * 30, len(ids)), | |
include=["distances", "metadatas", "documents"] | |
) | |
except Exception as e: | |
print(f"ERROR during ChromaDB query: {e}") | |
return "Σφάλμα κατά την σημασιολογική αναζήτηση." | |
sem_sims = {doc_id: 1 - dist for doc_id, dist in zip(sem_results["ids"][0], sem_results["distances"][0])} | |
q_char_sparse = char_vec.transform([q_pre]) | |
q_char_normalized = sk_normalize(q_char_sparse) | |
char_sim_scores = (q_char_normalized @ X_char.T).toarray().flatten() | |
q_word_sparse = word_vec.transform([q_pre]) | |
q_word_normalized = sk_normalize(q_word_sparse) | |
word_sim_scores = (q_word_normalized @ X_word.T).toarray().flatten() | |
lex_sims = {} | |
for idx, (c_score, w_score) in enumerate(zip(char_sim_scores, word_sim_scores)): | |
if c_score > 0 or w_score > 0: | |
if idx < len(ids): | |
lex_sims[ids[idx]] = 0.85 * c_score + 0.15 * w_score | |
else: | |
print(f"Warning: Lexical score index {idx} out of bounds for ids list (len: {len(ids)}).") | |
all_chunk_ids_set = set(sem_sims.keys()) | set(lex_sims.keys()) | exact_ids_set | |
scored = [] | |
for chunk_id_key in all_chunk_ids_set: | |
s = alpha * sem_sims.get(chunk_id_key, 0.0) + \ | |
(1 - alpha) * lex_sims.get(chunk_id_key, 0.0) | |
if chunk_id_key in exact_ids_set: | |
s = 1.0 | |
scored.append((chunk_id_key, s)) | |
scored.sort(key=lambda x: x[1], reverse=True) | |
hits_output = [] | |
seen_doc_main_ids = set() | |
for chunk_id_val, score_val in scored: | |
try: | |
idx_in_lists = ids.index(chunk_id_val) | |
except ValueError: | |
print(f"Warning: chunk_id '{chunk_id_val}' from search results not found in global 'ids' list. Skipping.") | |
continue | |
doc_meta = metas[idx_in_lists] | |
doc_main_id = doc_meta['id'] | |
if doc_main_id in seen_doc_main_ids: | |
continue | |
original_url_from_meta = doc_meta.get('url', '#') | |
# *** ΕΝΑΡΞΗ ΤΡΟΠΟΠΟΙΗΜΕΝΟΥ/ΝΕΟΥ ΚΩΔΙΚΑ ΓΙΑ PDF DEBUGGING *** | |
pdf_serve_url = "#" | |
pdf_filename_display = "N/A" | |
pdf_filename_extracted = None # Αρχικοποίηση | |
if original_url_from_meta and original_url_from_meta != '#': | |
pdf_filename_extracted = os.path.basename(original_url_from_meta) | |
print(f"--- Debug: Original URL: {original_url_from_meta}, Initial Extracted filename: {pdf_filename_extracted}") | |
# --- ΠΡΟΣΩΡΙΝΟΣ ΚΩΔΙΚΑΣ ΓΙΑ ΔΟΚΙΜΗ ASCII FILENAME (Μπορείτε να τον ενεργοποιήσετε αφαιρώντας τα σχόλια) --- | |
# TARGET_ORIGINAL_FILENAME_FOR_TEST = "6ΑΤΘ469Β7Η-963.pdf" # Το αρχικό ελληνικό όνομα που είχατε μετονομάσει | |
# ASCII_TEST_FILENAME = "testfileGR.pdf" # Το νέο ASCII όνομα που βάλατε στο static_pdfs | |
# | |
# if pdf_filename_extracted == TARGET_ORIGINAL_FILENAME_FOR_TEST: | |
# print(f"--- INFO: ASCII Filename Test Active ---") | |
# print(f"--- Original filename was: {pdf_filename_extracted}") | |
# print(f"--- Temporarily using: {ASCII_TEST_FILENAME} for linking and checking existence.") | |
# pdf_filename_extracted = ASCII_TEST_FILENAME | |
# --- ΤΕΛΟΣ ΠΡΟΣΩΡΙΝΟΥ ΚΩΔΙΚΑ ASCII --- | |
if pdf_filename_extracted and pdf_filename_extracted.lower().endswith(".pdf"): | |
potential_pdf_path_on_server = STATIC_PDF_DIR / pdf_filename_extracted | |
print(f"--- Debug: Final pdf_filename_extracted to check: {pdf_filename_extracted}") | |
print(f"--- Debug: Checking for PDF at server path: {potential_pdf_path_on_server.resolve()}") | |
if potential_pdf_path_on_server.exists() and potential_pdf_path_on_server.is_file(): | |
print(f"--- Debug: Path.exists() and Path.is_file() are TRUE for {potential_pdf_path_on_server.resolve()}. Attempting to open...") | |
try: | |
# Προσπάθεια ανοίγματος του αρχείου σε binary read mode και ανάγνωσης ενός byte | |
with open(potential_pdf_path_on_server, "rb") as f_test_access: | |
f_test_access.read(1) | |
print(f"--- Debug: Successfully opened and read a byte from: {potential_pdf_path_on_server.resolve()}") | |
pdf_serve_url = f"/file/{STATIC_PDF_DIR_NAME}/{pdf_filename_extracted}" | |
pdf_filename_display = pdf_filename_extracted | |
except Exception as e_file_access: | |
print(f"!!! CRITICAL ERROR trying to open/read file {potential_pdf_path_on_server.resolve()}: {e_file_access}") | |
pdf_filename_display = "Error accessing file content" # Ενημέρωση για εμφάνιση | |
else: | |
print(f"--- Debug: Path.exists() or Path.is_file() is FALSE for {potential_pdf_path_on_server.resolve()}") | |
pdf_filename_display = "File not found by script" | |
else: | |
if not pdf_filename_extracted: # Αν το pdf_filename_extracted κατέληξε κενό | |
print(f"--- Debug: pdf_filename_extracted is empty or None after os.path.basename or ASCII test.") | |
else: # Αν δεν έχει επέκταση .pdf | |
print(f"--- Debug: Extracted filename '{pdf_filename_extracted}' does not end with .pdf") | |
pdf_filename_display = "Not a valid PDF link" | |
else: # original_url_from_meta ήταν κενό ή '#' | |
print(f"--- Debug: No valid original_url_from_meta found. URL was: '{original_url_from_meta}'") | |
pdf_filename_display = "No source URL" | |
# *** ΤΕΛΟΣ ΤΡΟΠΟΠΟΙΗΜΕΝΟΥ/ΝΕΟΥ ΚΩΔΙΚΑ ΓΙΑ PDF DEBUGGING *** | |
hits_output.append({ | |
"score": score_val, | |
"title": doc_meta.get('title', 'N/A'), | |
"snippet": raw_chunks[idx_in_lists][:500] + " ...", | |
"original_url_meta": original_url_from_meta, | |
"pdf_serve_url": pdf_serve_url, | |
"pdf_filename_display": pdf_filename_display | |
}) | |
seen_doc_main_ids.add(doc_main_id) | |
if len(hits_output) >= k: | |
break | |
if not hits_output: | |
return "Δεν βρέθηκαν σχετικά αποτελέσματα." | |
output_md = f"Βρέθηκαν **{len(hits_output)}** σχετικά αποτελέσματα:\n\n" | |
for hit in hits_output: | |
output_md += f"### {hit['title']} (Score: {hit['score']:.3f})\n" | |
snippet_wrapped = textwrap.fill(hit['snippet'].replace("\n", " "), width=100) | |
output_md += f"**Απόσπασμα:** {snippet_wrapped}\n" | |
if hit['pdf_serve_url'] and hit['pdf_serve_url'] != '#': | |
output_md += f"**Πηγή (PDF):** <a href='{hit['pdf_serve_url']}' target='_blank'>{hit['pdf_filename_display']}</a>\n" | |
elif hit['original_url_meta'] and hit['original_url_meta'] != '#': | |
output_md += f"**Πηγή (αρχικό):** [{hit['original_url_meta']}]({hit['original_url_meta']})\n" | |
output_md += "---\n" | |
# ΠΡΟΣΩΡΙΝΗ ΠΡΟΣΘΗΚΗ ΓΙΑ ΔΟΚΙΜΗ TXT ΑΡΧΕΙΟΥ | |
output_md += "\n\n---\n**Δοκιμαστικός Σύνδεσμος Κειμένου:** <a href='/file/static_pdfs/test_text_file.txt' target='_blank'>Άνοιγμα test_text_file.txt</a>\n" | |
return output_md | |
# ---------------------- GRADIO INTERFACE ----------------------------------- | |
print("🚀 Launching Gradio Interface...") | |
iface = gr.Interface( | |
fn=hybrid_search_gradio, | |
inputs=gr.Textbox(lines=3, placeholder="Γράψε την ερώτησή σου εδώ...", label="Ερώτηση προς τον βοηθό:"), | |
outputs=gr.Markdown(label="Απαντήσεις από τα έγγραφα:", rtl=False, sanitize_html=False), | |
title="🏛️ Ελληνικό Chatbot Υβριδικής Αναζήτησης (v1.0.9)", # Νέα έκδοση για παρακολούθηση | |
description="Πληκτρολογήστε την ερώτησή σας για αναζήτηση στα διαθέσιμα έγγραφα. Η αναζήτηση συνδυάζει σημασιολογική ομοιότητα (GreekBERT) και ομοιότητα λέξεων/χαρακτήρων (TF-IDF).\nΧρησιμοποιεί το μοντέλο: sentence-transformers/paraphrase-xlm-r-multilingual-v1.\nΤα PDF ανοίγουν σε νέα καρτέλα.", | |
allow_flagging="never", | |
examples=[ | |
["Ποια είναι τα μέτρα για τον κορονοϊό;", 5], | |
["Πληροφορίες για άδεια ειδικού σκοπού", 3], | |
["Τι προβλέπεται για τις μετακινήσεις εκτός νομού;", 5] | |
], | |
) | |
if __name__ == '__main__': | |
# Παραλλαγή 2 | |
# STATIC_PDF_DIR ορίζεται στην αρχή του αρχείου ως Path("./static_pdfs") | |
iface.launch(allowed_paths=[str(STATIC_PDF_DIR.resolve())]) |