from textutils import ParagraphDocumentProcessor, DocumentProcessor from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer, AutoModel import ollama import faiss import os import csv import re import pandas as pd from datetime import datetime class RAGPipeline: def __init__(self, model_name: str = "flaollama", model_orig: str = "mistral", docprocessor = ParagraphDocumentProcessor(), sentence_transformer_name: str = 'paraphrase-multilingual-MiniLM-L12-v2', numero_frammenti = 10 ) : self.model_name = model_name self.model_orig = model_orig self.docprocessor = docprocessor self.sentence_transformer_name = sentence_transformer_name self.sentence_transformer_model = SentenceTransformer( self.sentence_transformer_name ) self.numero_frammenti = numero_frammenti self.documenti = [] self.files_pdf =[] self.indice =False self.timestamp = datetime.now().strftime("%Y%m%d%H%M%S") ##attrributi_frammenti contiene una lista di frammenti con attribui es: ## ## ## self.attributi_frammenti = [] #elenco di dizionari di tutti i frammenti ##LISTA DI FRAMMENTI INDICIZZATI (lista di testi) self. frammenti_indicizzati = [] #sonoi testi dei vari frammenti #print(f"NUMERODI FRAMMENTIIIII {self.numero_frammenti} param {numero_frammenti}") @staticmethod def dump_excel(dizionario, filename ): """Salva un dizionario in un file Excel accodando i dati se il file esiste.""" file_esiste = os.path.isfile(filename) # Converti il dizionario in un DataFrame con una sola riga df_nuova_riga = pd.DataFrame([dizionario]) if file_esiste: # Carica il file Excel esistente df_esistente = pd.read_excel(filename, engine='openpyxl') # Concatenazione dei DataFrame df_finale = pd.concat([df_esistente, df_nuova_riga], ignore_index=True) else: # Se il file non esiste, il DataFrame finale è la nuova riga df_finale = df_nuova_riga # Salva il DataFrame finale nel file Excel df_finale.to_excel(filename, index=False, engine='openpyxl') def crea_indice(self): # Converte i documenti in vettori docId = 0 fraId = 0 for documento in self.documenti: frammenti = self.docprocessor.scomponi_in_frammenti(documento, self.numero_frammenti ) for frammento in frammenti: dizionario_frammenti = { 'timestamp': self.timestamp, 'id': f"{docId}-{fraId}", "documento": docId, "frammento": fraId, "nomefile": os.path.basename(self.files_pdf[docId]), 'testo_frammento':frammento } self.attributi_frammenti.append( dizionario_frammenti ) self.frammenti_indicizzati.append(frammento) fraId = fraId +1 docId = docId +1 self.doc_embeddings = self.sentence_transformer_model.encode(self.frammenti_indicizzati) # Creazione dell'indice Faiss dimension = self.doc_embeddings.shape[1] self.indice = faiss.IndexFlatL2(dimension) # Indice L2 (distanza euclidea) self.indice.add(self.doc_embeddings) def aggiungi_file_pdf(self, filename: str) : text= self.docprocessor.estrai_da_pdf(filename) self.documenti.append(text) self.files_pdf.append(filename) class Retriever: def __init__(self, indice , sentence_transformer_model , query : str, documenti =[], frammenti_indicizzati = [], #tutti i frammenti ?? attributi_frammenti = [] ##elenco attrivuti frammenti ): self.indice = indice self.sentence_transformer = sentence_transformer_model self.query = query self.documenti = documenti self.frammenti_indicizzati = frammenti_indicizzati self.attributi_frammenti = attributi_frammenti self.passaggi_rilevanti = [] ## documenti rilevanti per la query (recuperati) self.attributi_rilevanti = [] ## atteributi dei frammenti rilevanti def esegui_query(self, top_k = 5): # Embedding della query query_embedding = self.sentence_transformer.encode([self.query]) # Recupero dei documenti più simili distances, indices = self.indice.search(query_embedding, top_k) # documenti rilevanti e passaggi rilevanti self.passaggi_rilevanti = [self.frammenti_indicizzati[j] for j in indices[0]] #frammenti rilevanti self.attributi_rilevanti = [self.attributi_frammenti[j] for j in indices[0]] #passaggi rilevanti class ChatBot: def pulisci_risposta(self, response: str): retval=re.sub(r".*?", "", response, flags=re.DOTALL).strip() retval = retval.replace("\n", " ").replace("\t", " ").replace("|", " ") return retval def chat(self, domanda: str, istruzioni: str = None, frammenti =[]) -> str: raise NotImplementedError("Questo metodo deve essere implementato nelle sottoclassi.") def generate(self, relevant_docs = [], attributi_frammenti_rilevanti = [], query="", istruzioni :str = None ) -> str: raise NotImplementedError("Questo metodo deve essere implementato nelle sottoclassi.") class OllamaChatbot(ChatBot): def __init__(self, model_name: str = "flaollama", model_orig: str = "mistral" , model_system=( """Sei un esperto di diritto amministrativo che deve eseguire il controllo di regolarità amministrativa su un atto amministrativo di un comune italiano. Ti verranno forniti un atto amministrativo (determinazione dirigenziale) ed eventuali allegati, questi sono forniti come frammenti rilevanti. Utilizza solamente i frammenti che ti verranno inviati. Rispondi in Italiano usando al massimo 50 parole. Basati esclusivamente sul seguente testo: """ ), dump_filename="dump.csv" ): self.model_name = model_name self.model_orig = model_orig self.model_system = model_system self.dump_filename = dump_filename ollama.create( model=model_name, from_=model_orig, system = model_system ) def chat(self, domanda: str, istruzioni: str = None, frammenti =[]) -> str: prompt = f"ISTRUZIONI: {istruzioni}\n\nCONTESTO:\n" + "\n".join(frammenti) + f"\n\nDOMANDA: {domanda}" response = ollama.chat(model=self.model_orig, messages=[ {"role": "user", "content": prompt} ], options= {'num_ctx' : 4096 }) return response["message"]["content"] def generate(self, relevant_docs = [], attributi_frammenti_rilevanti = [], query="", istruzioni :str = None ): i = 0 #print (f"DIMESIONE FILE {len(relevant_files)}") #print (f"DIMESIONE TESTI {len(relevant_docs)}") prompt="" for documento in relevant_docs: prompt += f"{relevant_docs[i]} " i = i+1 #"{context}\n\nDomanda: {query}" if istruzioni is not None: query = query + " Istruzioni: " + istruzioni prompt +=f"\n\nDomanda:{query} \n\n" #print(prompt) rersponse = ollama.generate( model=self.model_name, prompt=prompt , options= {'num_ctx' : 4096 } ) return rersponse['response']