Chat_QnA_v2 / utils.py
BINHTV8's picture
Update utils.py
d966901
import datetime
import os
import json
import gradio as gr
from prompts.llm import qa_prompt_template
from prompts.condense_llm import condense_template
from config import HISTORY_DIR
def web_citation(inputs, results, custom_websearch=False):
import requests
from bs4 import BeautifulSoup
from chains.summary import WebSummary
reference_results = []
display_append = []
for idx, result in enumerate(results):
try:
head = requests.head(result['link'])
if "text/html" in head.headers['Content-Type']:
html_response = requests.get(result['link'])
soup = BeautifulSoup(html_response.content, "html.parser")
if not custom_websearch:
title = result["title"]
else:
title = soup.find_all('title')[0].get_text()
try:
web_summary = WebSummary()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
# break multi-headlines into a line each
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
# drop blank lines
text = '\n'.join(chunk for chunk in chunks if chunk)
summary = web_summary.predict(question=inputs, doc=text)
print("Can access", result['link'])
except:
summary = ""
print("Cannot access ", result['link'])
reference_results.append([summary, result['link']])
display_append.append(
f'<a href=\"{result["link"]}\" target=\"_blank\">{idx + 1}.&nbsp;{title}</a>'
)
except:
continue
return reference_results, display_append
def get_auth():
if os.path.exists("auth.json"):
auth_list = []
with open("auth.json", "r", encoding='utf-8') as f:
auth = json.load(f)
# print(auth)
for _ in auth:
if auth[_]["username"] and auth[_]["password"]:
auth_list.append((auth[_]["username"], auth[_]["password"]))
return auth_list
def transcribe(current_model, audio):
return current_model.audio_response(audio)
def related_question(current_model):
return current_model.related_question()
def history_file_path(username, file_name):
now = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
dirname = os.path.join(HISTORY_DIR, username, now)
os.makedirs(dirname, exist_ok=True)
history_path = os.path.join(dirname, f"{file_name}.json")
return history_path
def get_history_names(plain=False, user_name=""):
from cosmos_db import query_item
items = query_item(user_name)
files = [item["id"] for item in items]
if plain:
return files
else:
return gr.update(choices=files)
def load_lasted_file_username(username):
if username not in os.listdir(HISTORY_DIR):
return None
date_time_list = []
for filename in os.listdir(os.path.join(HISTORY_DIR, username)):
date_time_list.append(datetime.datetime.strptime(filename[:19], '%Y-%m-%d_%H-%M-%S'))
lasted_time = max(date_time_list)
lasted_file = lasted_time.strftime('%Y-%m-%d_%H-%M-%S')
return os.path.join(HISTORY_DIR, username, lasted_file)
def load_chat_history(current_model, file_name):
return current_model.load_history(file_name)
def save_chat_history(current_model, chatbot, file_name):
return current_model.save_history(chatbot, file_name)
def predict(chatbot, model, inputs, upload_files_btn, custom_websearch, local_db):
iter = model.inference(inputs=inputs, chatbot=chatbot, streaming=True, upload_files_btn=upload_files_btn,
custom_websearch=custom_websearch, qa_prompt_template=qa_prompt_template,
local_db=local_db, condense_prompt_template=condense_template)
for response in iter:
yield response
def set_user_identifier(current_model, *args):
return current_model.set_user_identifie(*args)
def retry(chatbot, model, upload_files_btn, custom_websearch, local_db):
model.delete_last_conversation()
if len(chatbot) > 0:
inputs = chatbot[-1][0]
iter = model.inference(inputs=inputs, chatbot=chatbot, streaming=True, upload_files_btn=upload_files_btn,
custom_websearch=custom_websearch, qa_prompt_template=qa_prompt_template,
local_db=local_db, condense_prompt_template=condense_template)
for response in iter:
yield response
def reset(current_model):
return current_model.reset_conversation()
def delete_chat_history(current_model, file_name):
return current_model.delete_history(file_name)
def delete_first_conversation(current_model):
return current_model.delete_first_conversation()
def delete_last_conversation(current_model, chatbot):
if len(chatbot) > 0:
chatbot.pop()
current_model.delete_last_conversation()
return chatbot
def add_source_numbers(lst, source_name="Source", use_source=True):
if use_source:
return [f'[{idx + 1}]\t "{item[0]}"\n{source_name}: {item[1]}' for idx, item in enumerate(lst)]
else:
return [f'[{idx + 1}]\t "{item}"' for idx, item in enumerate(lst)]
def add_details(lst, lst_src):
nodes = []
for idx, (txt, src) in enumerate(zip(lst, lst_src)):
nodes.append(
f"<details><summary>{txt[:4]}{src}</summary><p>{txt[4:]}</p></details>"
)
return nodes