import datetime import os import json import gradio as gr from prompts.llm import qa_prompt_template from prompts.condense_llm import condense_template from config import HISTORY_DIR def web_citation(inputs, results, custom_websearch=False): import requests from bs4 import BeautifulSoup from chains.summary import WebSummary reference_results = [] display_append = [] for idx, result in enumerate(results): try: head = requests.head(result['link']) if "text/html" in head.headers['Content-Type']: html_response = requests.get(result['link']) soup = BeautifulSoup(html_response.content, "html.parser") if not custom_websearch: title = result["title"] else: title = soup.find_all('title')[0].get_text() try: web_summary = WebSummary() text = soup.get_text() lines = (line.strip() for line in text.splitlines()) # break multi-headlines into a line each chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) # drop blank lines text = '\n'.join(chunk for chunk in chunks if chunk) summary = web_summary.predict(question=inputs, doc=text) print("Can access", result['link']) except: summary = "" print("Cannot access ", result['link']) reference_results.append([summary, result['link']]) display_append.append( f'{idx + 1}. {title}' ) except: continue return reference_results, display_append def get_auth(): if os.path.exists("auth.json"): auth_list = [] with open("auth.json", "r", encoding='utf-8') as f: auth = json.load(f) # print(auth) for _ in auth: if auth[_]["username"] and auth[_]["password"]: auth_list.append((auth[_]["username"], auth[_]["password"])) return auth_list def transcribe(current_model, audio): return current_model.audio_response(audio) def related_question(current_model): return current_model.related_question() def history_file_path(username, file_name): now = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') dirname = os.path.join(HISTORY_DIR, username, now) os.makedirs(dirname, exist_ok=True) history_path = os.path.join(dirname, f"{file_name}.json") return history_path def get_history_names(plain=False, user_name=""): from cosmos_db import query_item items = query_item(user_name) files = [item["id"] for item in items] if plain: return files else: return gr.update(choices=files) def load_lasted_file_username(username): if username not in os.listdir(HISTORY_DIR): return None date_time_list = [] for filename in os.listdir(os.path.join(HISTORY_DIR, username)): date_time_list.append(datetime.datetime.strptime(filename[:19], '%Y-%m-%d_%H-%M-%S')) lasted_time = max(date_time_list) lasted_file = lasted_time.strftime('%Y-%m-%d_%H-%M-%S') return os.path.join(HISTORY_DIR, username, lasted_file) def load_chat_history(current_model, file_name): return current_model.load_history(file_name) def save_chat_history(current_model, chatbot, file_name): return current_model.save_history(chatbot, file_name) def predict(chatbot, model, inputs, upload_files_btn, custom_websearch, local_db): iter = model.inference(inputs=inputs, chatbot=chatbot, streaming=True, upload_files_btn=upload_files_btn, custom_websearch=custom_websearch, qa_prompt_template=qa_prompt_template, local_db=local_db, condense_prompt_template=condense_template) for response in iter: yield response def set_user_identifier(current_model, *args): return current_model.set_user_identifie(*args) def retry(chatbot, model, upload_files_btn, custom_websearch, local_db): model.delete_last_conversation() if len(chatbot) > 0: inputs = chatbot[-1][0] iter = model.inference(inputs=inputs, chatbot=chatbot, streaming=True, upload_files_btn=upload_files_btn, custom_websearch=custom_websearch, qa_prompt_template=qa_prompt_template, local_db=local_db, condense_prompt_template=condense_template) for response in iter: yield response def reset(current_model): return current_model.reset_conversation() def delete_chat_history(current_model, file_name): return current_model.delete_history(file_name) def delete_first_conversation(current_model): return current_model.delete_first_conversation() def delete_last_conversation(current_model, chatbot): if len(chatbot) > 0: chatbot.pop() current_model.delete_last_conversation() return chatbot def add_source_numbers(lst, source_name="Source", use_source=True): if use_source: return [f'[{idx + 1}]\t "{item[0]}"\n{source_name}: {item[1]}' for idx, item in enumerate(lst)] else: return [f'[{idx + 1}]\t "{item}"' for idx, item in enumerate(lst)] def add_details(lst, lst_src): nodes = [] for idx, (txt, src) in enumerate(zip(lst, lst_src)): nodes.append( f"
{txt[:4]}{src}

{txt[4:]}

" ) return nodes