Spaces:
Runtime error
Runtime error
import datetime | |
import os | |
import json | |
import gradio as gr | |
from prompts.llm import qa_prompt_template | |
from prompts.condense_llm import condense_template | |
from config import HISTORY_DIR | |
def web_citation(inputs, results, custom_websearch=False): | |
import requests | |
from bs4 import BeautifulSoup | |
from chains.summary import WebSummary | |
reference_results = [] | |
display_append = [] | |
for idx, result in enumerate(results): | |
try: | |
head = requests.head(result['link']) | |
if "text/html" in head.headers['Content-Type']: | |
html_response = requests.get(result['link']) | |
soup = BeautifulSoup(html_response.content, "html.parser") | |
if not custom_websearch: | |
title = result["title"] | |
else: | |
title = soup.find_all('title')[0].get_text() | |
try: | |
web_summary = WebSummary() | |
text = soup.get_text() | |
lines = (line.strip() for line in text.splitlines()) | |
# break multi-headlines into a line each | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
# drop blank lines | |
text = '\n'.join(chunk for chunk in chunks if chunk) | |
summary = web_summary.predict(question=inputs, doc=text) | |
print("Can access", result['link']) | |
except: | |
summary = "" | |
print("Cannot access ", result['link']) | |
reference_results.append([summary, result['link']]) | |
display_append.append( | |
f'<a href=\"{result["link"]}\" target=\"_blank\">{idx + 1}. {title}</a>' | |
) | |
except: | |
continue | |
return reference_results, display_append | |
def get_auth(): | |
if os.path.exists("auth.json"): | |
auth_list = [] | |
with open("auth.json", "r", encoding='utf-8') as f: | |
auth = json.load(f) | |
# print(auth) | |
for _ in auth: | |
if auth[_]["username"] and auth[_]["password"]: | |
auth_list.append((auth[_]["username"], auth[_]["password"])) | |
return auth_list | |
def transcribe(current_model, audio): | |
return current_model.audio_response(audio) | |
def related_question(current_model): | |
return current_model.related_question() | |
def history_file_path(username, file_name): | |
now = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') | |
dirname = os.path.join(HISTORY_DIR, username, now) | |
os.makedirs(dirname, exist_ok=True) | |
history_path = os.path.join(dirname, f"{file_name}.json") | |
return history_path | |
def get_history_names(plain=False, user_name=""): | |
from cosmos_db import query_item | |
items = query_item(user_name) | |
files = [item["id"] for item in items] | |
if plain: | |
return files | |
else: | |
return gr.update(choices=files) | |
def load_lasted_file_username(username): | |
if username not in os.listdir(HISTORY_DIR): | |
return None | |
date_time_list = [] | |
for filename in os.listdir(os.path.join(HISTORY_DIR, username)): | |
date_time_list.append(datetime.datetime.strptime(filename[:19], '%Y-%m-%d_%H-%M-%S')) | |
lasted_time = max(date_time_list) | |
lasted_file = lasted_time.strftime('%Y-%m-%d_%H-%M-%S') | |
return os.path.join(HISTORY_DIR, username, lasted_file) | |
def load_chat_history(current_model, file_name): | |
return current_model.load_history(file_name) | |
def save_chat_history(current_model, chatbot, file_name): | |
return current_model.save_history(chatbot, file_name) | |
def predict(chatbot, model, inputs, upload_files_btn, custom_websearch, local_db): | |
iter = model.inference(inputs=inputs, chatbot=chatbot, streaming=True, upload_files_btn=upload_files_btn, | |
custom_websearch=custom_websearch, qa_prompt_template=qa_prompt_template, | |
local_db=local_db, condense_prompt_template=condense_template) | |
for response in iter: | |
yield response | |
def set_user_identifier(current_model, *args): | |
return current_model.set_user_identifie(*args) | |
def retry(chatbot, model, upload_files_btn, custom_websearch, local_db): | |
model.delete_last_conversation() | |
if len(chatbot) > 0: | |
inputs = chatbot[-1][0] | |
iter = model.inference(inputs=inputs, chatbot=chatbot, streaming=True, upload_files_btn=upload_files_btn, | |
custom_websearch=custom_websearch, qa_prompt_template=qa_prompt_template, | |
local_db=local_db, condense_prompt_template=condense_template) | |
for response in iter: | |
yield response | |
def reset(current_model): | |
return current_model.reset_conversation() | |
def delete_chat_history(current_model, file_name): | |
return current_model.delete_history(file_name) | |
def delete_first_conversation(current_model): | |
return current_model.delete_first_conversation() | |
def delete_last_conversation(current_model, chatbot): | |
if len(chatbot) > 0: | |
chatbot.pop() | |
current_model.delete_last_conversation() | |
return chatbot | |
def add_source_numbers(lst, source_name="Source", use_source=True): | |
if use_source: | |
return [f'[{idx + 1}]\t "{item[0]}"\n{source_name}: {item[1]}' for idx, item in enumerate(lst)] | |
else: | |
return [f'[{idx + 1}]\t "{item}"' for idx, item in enumerate(lst)] | |
def add_details(lst, lst_src): | |
nodes = [] | |
for idx, (txt, src) in enumerate(zip(lst, lst_src)): | |
nodes.append( | |
f"<details><summary>{txt[:4]}{src}</summary><p>{txt[4:]}</p></details>" | |
) | |
return nodes | |