|
import streamlit as st |
|
from chat_client import chat |
|
import time |
|
import os |
|
from dotenv import load_dotenv |
|
from sentence_transformers import SentenceTransformer |
|
import requests |
|
from langchain_community.vectorstores import Chroma |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
import json |
|
from audio_recorder_streamlit import audio_recorder |
|
import speech_recognition as sr |
|
from googlesearch import search |
|
from bs4 import BeautifulSoup |
|
import PyPDF2 |
|
import pytesseract |
|
from PIL import Image |
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
|
|
load_dotenv() |
|
URL_APP_SCRIPT = os.getenv('URL_APP_SCRIPT') |
|
URL_PROMPT = URL_APP_SCRIPT + '?IdFoglio=1cLw9q70BsPmxMBj9PIzgXtq6sm3X-GVBVnOB5wE8jr8' |
|
URL_DOCUMENTI = URL_APP_SCRIPT + '?IdSecondoFoglio=1cLw9q70BsPmxMBj9PIzgXtq6sm3X-GVBVnOB5wE8jr8' |
|
SYSTEM_PROMPT = ["Sei BonsiAI e mi aiuterai nelle mie richieste (Parla in ITALIANO)", "Esatto, sono BonsiAI. Di cosa hai bisogno?"] |
|
CHAT_BOTS = {"Mixtral 8x7B v0.1" :"mistralai/Mixtral-8x7B-Instruct-v0.1"} |
|
option_personalizzata = {'Personalizzata': {'systemRole': 'Tu sei BONSI AI, il mio assistente personale della scuola superiore del Bonsignori. Aiutami in base alle mie esigenze', |
|
'systemStyle': 'Firmati sempre come BONSI AI. (scrivi in italiano)', |
|
'instruction': '', |
|
'tipo': '', |
|
'RAG': False} |
|
} |
|
|
|
|
|
st.set_page_config(page_title="Bonsi A.I.", page_icon="🏫") |
|
|
|
def init_state() : |
|
if "messages" not in st.session_state: |
|
st.session_state.messages = [] |
|
|
|
if "temp" not in st.session_state: |
|
st.session_state.temp = 0.8 |
|
|
|
if "history" not in st.session_state: |
|
st.session_state.history = [SYSTEM_PROMPT] |
|
|
|
if "top_k" not in st.session_state: |
|
st.session_state.top_k = 5 |
|
|
|
if "repetion_penalty" not in st.session_state : |
|
st.session_state.repetion_penalty = 1 |
|
|
|
if "chat_bot" not in st.session_state : |
|
st.session_state.chat_bot = "Mixtral 8x7B v0.1" |
|
|
|
if 'loaded_data' not in st.session_state: |
|
st.session_state.loaded_data = False |
|
|
|
if "split" not in st.session_state: |
|
st.session_state.split = 30 |
|
|
|
if "enable_history" not in st.session_state: |
|
st.session_state.enable_history = True |
|
|
|
if "audio_bytes" not in st.session_state: |
|
st.session_state.audio_bytes = False |
|
|
|
if "cerca_online" not in st.session_state: |
|
st.session_state.cerca_online = False |
|
|
|
if "numero_siti" not in st.session_state: |
|
st.session_state.numero_siti = 3 |
|
|
|
if "numero_generazioni" not in st.session_state: |
|
st.session_state.numero_generazioni = 1 |
|
|
|
if "testo_documenti" not in st.session_state: |
|
st.session_state.testo_documenti = '' |
|
|
|
if "uploaded_files" not in st.session_state: |
|
st.session_state.uploaded_files = None |
|
|
|
if "urls" not in st.session_state: |
|
st.session_state.urls = [""] * 5 |
|
|
|
if "tbs_options" not in st.session_state: |
|
st.session_state.tbs_options = { |
|
"Sempre": "0", |
|
"Ultimo anno": "qdr:y", |
|
"Ultimo mese": "qdr:m", |
|
"Ultima settimana": "qdr:w", |
|
"Ultimo giorno": "qdr:d" |
|
} |
|
|
|
if not st.session_state.loaded_data: |
|
place=st.empty() |
|
with place: |
|
with st.status("Caricamento in corso...", expanded=True) as status: |
|
st.write("Inizializzazione Ambiente") |
|
time.sleep(1) |
|
st.write("Inizializzazione Prompt") |
|
options = requests.get(URL_PROMPT).json() |
|
st.write("Inizializzazione Documenti") |
|
documenti = requests.get(URL_DOCUMENTI).json() |
|
st.session_state.options = {**option_personalizzata, **options} |
|
st.session_state.documenti = documenti |
|
st.session_state.loaded_data = True |
|
status.update(label="Caricamento Completato", state="complete", expanded=False) |
|
place.empty() |
|
|
|
def read_text_from_file(file): |
|
text = "" |
|
if file.name.endswith(".txt"): |
|
text = file.read().decode("utf-8") |
|
elif file.name.endswith(".pdf"): |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
for page_num in range(len(pdf_reader.pages)): |
|
page = pdf_reader.pages[page_num] |
|
text += page.extract_text() |
|
else: |
|
try: |
|
image = Image.open(file) |
|
text = pytesseract.image_to_string(image) |
|
except: |
|
st.write(f"Non è possibile leggere il testo dal file '{file.name}'.") |
|
return text |
|
|
|
def sidebar(): |
|
def retrieval_settings() : |
|
st.markdown("# Impostazioni Prompt") |
|
st.session_state.selected_option_key = st.selectbox('Azione', list(st.session_state.options.keys())) |
|
st.session_state.selected_option = st.session_state.options.get(st.session_state.selected_option_key, {}) |
|
|
|
if st.session_state.options.get(st.session_state.selected_option_key, {})["tipo"]=='DOCUMENTO': |
|
st.session_state.selected_documento_key = st.selectbox('Documento', list(st.session_state.documenti.keys())) |
|
st.session_state.selected_documento = st.session_state.documenti.get(st.session_state.selected_documento_key, {}) |
|
st.session_state.instruction = st.session_state.selected_documento.get('instruction', '')['Testo'] |
|
st.session_state.split = st.slider(label="Pagine Suddivisione", min_value=1, max_value=30, value=30, help='Se il documento ha 100 pagine e suddivido per 20 pagine elaborerà la risposta 5 volte. Più alto è il numero e meno volte elaborerà ma la risposta sarà più imprecisa') |
|
else: |
|
st.session_state.instruction = st.session_state.selected_option.get('instruction', '') |
|
|
|
st.session_state.systemRole = st.session_state.selected_option.get('systemRole', '') |
|
st.session_state.systemRole = st.text_area("Descrizione", st.session_state.systemRole, help='Ruolo del chatbot e descrizione dell\'azione che deve svolgere') |
|
st.session_state.systemStyle = st.session_state.selected_option.get('systemStyle', '') |
|
st.session_state.systemStyle = st.text_area("Stile", st.session_state.systemStyle, help='Descrizione dello stile utilizzato per generare il testo') |
|
st.session_state.rag_enabled = st.session_state.selected_option.get('tipo', '')=='RAG' |
|
if st.session_state.selected_option_key == 'Decreti': |
|
st.session_state.top_k = st.slider(label="Documenti da ricercare", min_value=1, max_value=20, value=4, disabled=not st.session_state.rag_enabled) |
|
st.session_state.decreti_escludere = st.multiselect( |
|
'Decreti da escludere', |
|
['23.10.2 destinazione risorse residue pnrr DGR 1051-2023_Destinazione risorse PNRR Duale.pdf', '23.10.25 accompagnatoria Circolare Inail assicurazione.pdf', '23.10.26 circolare Inail assicurazione.pdf', '23.10.3 FAQ in attesa di avviso_.pdf', '23.11.2 avviso 24_24 Decreto 17106-2023 Approvazione Avviso IeFP 2023-2024.pdf', '23.5.15 decreto linee inclusione x enti locali.pdf', '23.6.21 Circolare+esplicativa+DGR+312-2023.pdf', '23.7.3 1° Decreto R.L. 23_24 .pdf', '23.9 Regolamento_prevenzione_bullismo_e_cyberbullismo__Centro_Bonsignori.pdf', '23.9.1 FAQ inizio anno formativo.pdf', '23.9.15 DECRETO VERIFICHE AMMINISTR 15-09-23.pdf', '23.9.4 modifica decreto GRS.pdf', '23.9.8 Budget 23_24.pdf', '24.10.2022 DECRETO loghi N.15176.pdf', 'ALLEGATO C_Scheda Supporti al funzionamento.pdf', 'ALLEGATO_ B_ Linee Guida.pdf', 'ALLEGATO_A1_PEI_INFANZIA.pdf', 'ALLEGATO_A2_PEI_PRIMARIA.pdf', 'ALLEGATO_A3_PEI_SEC_1_GRADO.pdf', 'ALLEGATO_A4_PEI_SEC_2_GRADO.pdf', 'ALLEGATO_C_1_Tabella_Fabbisogni.pdf', 'Brand+Guidelines+FSE+.pdf', 'Decreto 20797 del 22-12-2023_Aggiornamento budget PNRR.pdf', 'Decreto 20874 del 29-12-2023 Avviso IeFP PNRR 2023-2024_file unico.pdf'], |
|
[]) |
|
st.session_state.uploaded_files = st.file_uploader("Importa file", accept_multiple_files=True) |
|
st.session_state.testo_documenti = '' |
|
for uploaded_file in st.session_state.uploaded_files: |
|
text_doc = read_text_from_file(uploaded_file) |
|
st.session_state.testo_documenti += text_doc |
|
print(st.session_state.testo_documenti) |
|
st.markdown("---") |
|
st.markdown("# Ricerca Online") |
|
st.session_state.cerca_online = st.toggle("Attivata", value=False) |
|
with st.popover("Siti Specifici", disabled=not st.session_state.cerca_online,use_container_width=True): |
|
st.markdown("#### Inserisci Siti Web ") |
|
for i in range(5): |
|
st.session_state.urls[i] = st.text_input(f"URL Sito {i+1}", placeholder='Sito Web...', help='è possibile specificare anche il link di un video Youtube, in tal caso verrà restituita la trascrizione del video') |
|
st.session_state.selected_tbs = st.selectbox("Periodo:", list(st.session_state.tbs_options.keys()), disabled=(not st.session_state.cerca_online) or (st.session_state.urls[0]!="")) |
|
st.session_state.tbs_value = st.session_state.tbs_options[st.session_state.selected_tbs] |
|
st.session_state.numero_siti = st.slider(label="Risultati", min_value = 1, max_value=20, value=3, disabled=(not st.session_state.cerca_online) or (st.session_state.urls[0]!="")) |
|
|
|
st.markdown("---") |
|
|
|
def model_settings(): |
|
st.markdown("# Impostazioni Modello") |
|
st.session_state.chat_bot = st.sidebar.radio('Modello:', [key for key, value in CHAT_BOTS.items() ]) |
|
st.session_state.numero_generazioni = st.slider(label="Generazioni", min_value = 1, max_value=10, value=1) |
|
st.session_state.enable_history = st.toggle("Storico Messaggi", value=True) |
|
st.session_state.temp = st.slider(label="Creatività", min_value=0.0, max_value=1.0, step=0.1, value=0.9) |
|
st.session_state.max_tokens = st.slider(label="Lunghezza Output", min_value = 2, max_value=2048, step= 32, value=1024) |
|
|
|
with st.sidebar: |
|
retrieval_settings() |
|
model_settings() |
|
st.markdown("""> **Creato da Matteo Bergamelli **""") |
|
|
|
def audioRec(): |
|
st.session_state.audio_bytes = audio_recorder(text='', icon_size="3x") |
|
if st.session_state.audio_bytes: |
|
with open("./AUDIO.wav", "wb") as file: |
|
file.write(st.session_state.audio_bytes) |
|
wav = sr.AudioFile("./AUDIO.wav") |
|
with wav as source: |
|
recognizer_instance = sr.Recognizer() |
|
recognizer_instance.pause_threshold = 3.0 |
|
audio = recognizer_instance.listen(source) |
|
print("Ok! sto ora elaborando il messaggio!") |
|
try: |
|
text = recognizer_instance.recognize_google(audio, language="it-IT") |
|
print(text) |
|
js = f""" |
|
<script> |
|
var chatInput = parent.document.querySelector('textarea[data-testid="stChatInput"]'); |
|
var nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLTextAreaElement.prototype, "value").set; |
|
nativeInputValueSetter.call(chatInput, "{text}"); |
|
var event = new Event('input', {{ bubbles: true}}); |
|
chatInput.dispatchEvent(event); |
|
var sendChat = parent.document.getElementsByClassName("st-emotion-cache-1621d17")[0] |
|
sendChat.click(); |
|
var x = parent.document.querySelector('[title="st.iframe"]'); |
|
x.style.display = "none"; |
|
</script> |
|
""" |
|
st.components.v1.html(js) |
|
except Exception as e: |
|
print(e) |
|
|
|
def header() : |
|
st.title("Bonsi A.I.", anchor=False) |
|
with st.expander("Cos'è BonsiAI?"): |
|
st.info("""BonsiAI Chat è un ChatBot personalizzato basato su un database vettoriale, funziona secondo il principio della Generazione potenziata da Recupero (RAG). |
|
La sua funzione principale ruota attorno alla gestione di un ampio repository di documenti BonsiAI e fornisce agli utenti risposte in linea con le loro domande. |
|
Questo approccio garantisce una risposta più precisa sulla base della richiesta degli utenti.""") |
|
|
|
def chat_box() : |
|
for message in st.session_state.messages: |
|
with st.chat_message(message["role"]): |
|
st.markdown(message["content"]) |
|
|
|
def formattaPrompt(prompt, systemRole, systemStyle, instruction): |
|
if st.session_state.cerca_online: |
|
systemRole += '. Ti ho fornito una lista di materiali nelle instruction. Devi rispondere sulla base delle informazioni fonrnite!' |
|
input_text = f''' |
|
{{ |
|
"input": {{ |
|
"role": "system", |
|
"content": "{systemRole}", |
|
"style": "{systemStyle} " |
|
}}, |
|
"messages": [ |
|
{{ |
|
"role": "instructions", |
|
"content": "{instruction} ({systemStyle})" |
|
}}, |
|
{{ |
|
"role": "user", |
|
"content": "{prompt}" |
|
}} |
|
] |
|
}} |
|
''' |
|
return input_text |
|
|
|
def gen_augmented_prompt(prompt, top_k) : |
|
links = "" |
|
embedding = HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") |
|
db = Chroma(persist_directory='./DB_Decreti', embedding_function=embedding) |
|
docs = db.similarity_search(prompt, k=top_k) |
|
links = [] |
|
context = '' |
|
NomeCartellaOriginariaDB = 'Documenti_2\\' |
|
for doc in docs: |
|
testo = doc.page_content.replace('\n', ' ') |
|
context += testo + '\n\n\n' |
|
reference = doc.metadata["source"].replace(NomeCartellaOriginariaDB, '') + ' (Pag. ' + str(doc.metadata["page"]) + ')' |
|
links.append((reference, testo)) |
|
return context, links |
|
|
|
def get_search_results_int(url): |
|
result = {'title': '', 'description': '', 'url': '', 'body': ''} |
|
try: |
|
if "www.youtube.com" in url: |
|
video_id = url.split("=")[1] |
|
title = 'Video Youtube' |
|
description = '' |
|
transcript = YouTubeTranscriptApi.get_transcript(video_id) |
|
body_content = " ".join([segment["text"] for segment in transcript]) |
|
print(video_id) |
|
print(body_content) |
|
result = {'title': title, 'description': body_content, 'url': url, 'body': body_content} |
|
else: |
|
response = requests.get(url) |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
title = soup.title.string if soup.title else "N/A" |
|
description = soup.find('meta', attrs={'name': 'description'})['content'] if soup.find('meta', attrs={'name': 'description'}) else "N/A" |
|
body_content = soup.find('body').get_text() if soup.find('body') else "N/A" |
|
result = {'title': title, 'description': description, 'url': url, 'body': body_content} |
|
except Exception as e: |
|
print(f"Error fetching data from {url}: {e}") |
|
return result |
|
|
|
def get_search_results(query, top_k): |
|
results = [] |
|
if st.session_state.urls[0] != "": |
|
for i in range(5): |
|
url = st.session_state.urls[i] |
|
if url != "": |
|
results.append(get_search_results_int(url)) |
|
else: |
|
for url in search(query, num=top_k, stop=top_k, tbs=st.session_state.tbs_value): |
|
results.append(get_search_results_int(url)) |
|
return results |
|
|
|
def gen_online_prompt(prompt, top_k) : |
|
links = [] |
|
context = '' |
|
results = get_search_results(prompt, top_k) |
|
for i, result in enumerate(results, start=1): |
|
context += result['title'] + '\n' + result['description'] + '\n' + '\n\n' + result['body'].replace('\n','.') + '\n\n------------------------------------------------------------' |
|
links.append((str(i) + '. ' + result['title'], result['description'] + '\n\n' + result['url'])) |
|
return context, links |
|
|
|
def generate_chat_stream(prompt) : |
|
chat_stream = chat(prompt, st.session_state.history,chat_client=CHAT_BOTS[st.session_state.chat_bot] , |
|
temperature=st.session_state.temp, max_new_tokens=st.session_state.max_tokens) |
|
return chat_stream |
|
|
|
def inserisci_istruzioni(prompt_originale): |
|
links = [] |
|
if st.session_state.cerca_online: |
|
with st.spinner("Ricerca Online...."): |
|
time.sleep(1) |
|
st.session_state.instruction, links = gen_online_prompt(prompt=prompt_originale, top_k=st.session_state.numero_siti) |
|
if st.session_state.rag_enabled : |
|
with st.spinner("Ricerca nei Decreti...."): |
|
time.sleep(1) |
|
st.session_state.instruction, links = gen_augmented_prompt(prompt=prompt_originale, top_k=st.session_state.top_k) |
|
with st.spinner("Generazione in corso...") : |
|
time.sleep(1) |
|
|
|
return links |
|
|
|
def stream_handler(chat_stream, placeholder) : |
|
full_response = '' |
|
for chunk in chat_stream : |
|
if chunk.token.text!='</s>' : |
|
full_response += chunk.token.text |
|
placeholder.markdown(full_response + "▌") |
|
placeholder.markdown(full_response) |
|
return full_response |
|
|
|
def show_source(links) : |
|
with st.expander("Mostra fonti") : |
|
for link in links: |
|
reference, testo = link |
|
st.info('##### ' + reference.replace('_', ' ') + '\n\n'+ testo) |
|
|
|
init_state() |
|
sidebar() |
|
header() |
|
chat_box() |
|
|
|
def split_text(text, chunk_size): |
|
testo_suddiviso = [] |
|
if text == '': |
|
text = ' ' |
|
if chunk_size < 100: |
|
chunk_size = 60000 |
|
for i in range(0, len(text), chunk_size): |
|
testo_suddiviso.append(text[i:i+chunk_size]) |
|
return testo_suddiviso |
|
|
|
if prompt := st.chat_input("Chatta con BonsiAI..."): |
|
prompt_originale = prompt |
|
links = inserisci_istruzioni(prompt_originale) |
|
st.session_state.instruction+= ' \n\n' + st.session_state.testo_documenti |
|
instruction_suddivise = split_text(st.session_state.instruction, st.session_state.split*2000) |
|
ruolo_originale = st.session_state.systemRole |
|
ruoli_divisi = ruolo_originale.split("&&") |
|
parte=1 |
|
i=1 |
|
risposta_completa = '' |
|
for ruolo_singolo in ruoli_divisi: |
|
for instruction_singola in instruction_suddivise: |
|
for numgen in range(1, st.session_state.numero_generazioni+1): |
|
if i==1: |
|
st.chat_message("user").markdown(prompt_originale + (': Parte ' + str(parte) if i > 1 else '')) |
|
i+=1 |
|
prompt = formattaPrompt(prompt_originale, ruolo_singolo, st.session_state.systemStyle, instruction_singola) |
|
print('------------------------------------------------------------------------------------') |
|
print(prompt) |
|
st.session_state.messages.append({"role": "user", "content": prompt_originale}) |
|
chat_stream = generate_chat_stream(prompt) |
|
with st.chat_message("assistant"): |
|
placeholder = st.empty() |
|
full_response = stream_handler(chat_stream, placeholder) |
|
if st.session_state.rag_enabled or st.session_state.cerca_online: |
|
show_source(links) |
|
if st.session_state.options.get(st.session_state.selected_option_key, {})["tipo"]=='DOCUMENTO': |
|
with st.expander("Mostra Documento") : |
|
st.info('##### ' + st.session_state.selected_documento_key + ' (Parte ' + str(parte) +')'+ '\n\n\n' + instruction_singola) |
|
parte+=1 |
|
st.session_state.messages.append({"role": "assistant", "content": full_response}) |
|
risposta_completa = risposta_completa + '\n' + full_response |
|
|
|
if st.session_state.enable_history: |
|
st.session_state.history.append([prompt_originale, full_response]) |
|
else: |
|
st.session_state.history.append(['', '']) |
|
st.success('Generazione Completata') |
|
payload = {"domanda": prompt_originale, "risposta": risposta_completa} |
|
json_payload = json.dumps(payload) |
|
response = requests.post(URL_APP_SCRIPT, data=json_payload) |